¿Cómo hacer programación paralela en Python?

141

Para C ++, podemos usar OpenMP para hacer programación paralela; sin embargo, OpenMP no funcionará para Python. ¿Qué debo hacer si quiero poner en paralelo algunas partes de mi programa de Python?

La estructura del código puede considerarse como:

solve1(A)
solve2(B)

Donde solve1y solve2son dos funciones independientes. ¿Cómo ejecutar este tipo de código en paralelo en lugar de en secuencia para reducir el tiempo de ejecución? Espero que alguien pueda ayudarme. Muchas gracias de antemano. El codigo es:

def solve(Q, G, n):
    i = 0
    tol = 10 ** -4

    while i < 1000:
        inneropt, partition, x = setinner(Q, G, n)
        outeropt = setouter(Q, G, n)

        if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
            break

        node1 = partition[0]
        node2 = partition[1]

        G = updateGraph(G, node1, node2)

        if i == 999:
            print "Maximum iteration reaches"
    print inneropt

Donde setinner y setouter son dos funciones independientes. Ahí es donde quiero paralelo ...

ilovecp3
fuente
31
Echa un vistazo al multiprocesamiento . Nota: los subprocesos de Python no son adecuados para tareas vinculadas a la CPU, solo para las vinculadas a E / S.
9000
44
@ 9000 +100 Internet para mencionar las tareas dependientes de CPU vs E / S.
Hyperboreus
@ 9000 En realidad, los subprocesos no son adecuados para tareas vinculadas a la CPU, que yo sepa. Procesos es el camino a seguir cuando se realizan tareas reales vinculadas a la CPU.
Omar Al-Ithawi
66
@OmarIthawi: por qué, los subprocesos funcionan bien si tiene muchos núcleos de CPU (como de costumbre ahora). Luego, su proceso puede ejecutar varios subprocesos cargando todos estos núcleos en paralelo y compartiendo datos comunes entre ellos implícitamente (es decir, sin tener un área explícita de memoria compartida o mensajes entre procesos).
9000
1
@ user2134774: Bueno, sí, mi segundo comentario tiene poco sentido. Probablemente las únicas extensiones C que liberan el GIL pueden beneficiarse de eso; por ejemplo, partes de NumPy y Pandas hacen eso. En otros casos, está mal (pero no puedo editarlo ahora).
9000

Respuestas:

162

Puede usar el módulo de multiprocesamiento . Para este caso, podría usar un grupo de procesamiento:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

Esto generará procesos que pueden hacer un trabajo genérico para usted. Como no lo aprobamos processes, generará un proceso para cada núcleo de CPU en su máquina. Cada núcleo de CPU puede ejecutar un proceso simultáneamente.

Si desea asignar una lista a una sola función, debe hacer esto:

args = [A, B]
results = pool.map(solve1, args)

No use hilos porque el GIL bloquea cualquier operación en objetos python.

Matt Williamson
fuente
1
¿ pool.maptambién acepta diccionarios como args? ¿O solo listas simples?
The Bndr
Solo listas, creo. Pero puede pasar dict.items (), que será una lista de tuplas de valores clave
Matt Williamson
Desafortunadamente, esto termina en un error de tipo 'no compartible:' lista ''
The Bndr
Además de mi último comentario: `dict.items ()` work. El error surge, porque tuve que cambiar el manejo de la información variable de la función de proceso. Lamentablemente, el mensaje de error no fue muy útil ... Entonces: gracias por su sugerencia. :-)
The Bndr
2
¿Qué es el tiempo de espera aquí?
gamma
26

Esto se puede hacer de manera muy elegante con Ray .

Para paralelizar su ejemplo, necesitaría definir sus funciones con el @ray.remotedecorador y luego invocarlas con .remote.

import ray

ray.init()

# Define the functions.

@ray.remote
def solve1(a):
    return 1

@ray.remote
def solve2(b):
    return 2

# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)

# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])

Hay varias ventajas de esto sobre el módulo de multiprocesamiento .

  1. El mismo código se ejecutará en una máquina multinúcleo, así como en un grupo de máquinas.
  2. Los procesos comparten datos de manera eficiente a través de la memoria compartida y la serialización de copia cero .
  3. Los mensajes de error se propagan muy bien.
  4. Estas llamadas a funciones se pueden componer juntas, por ejemplo,

    @ray.remote
    def f(x):
        return x + 1
    
    x_id = f.remote(1)
    y_id = f.remote(x_id)
    z_id = f.remote(y_id)
    ray.get(z_id)  # returns 4
  5. Además de invocar funciones de forma remota, las clases se pueden instanciar de forma remota como actores .

Tenga en cuenta que Ray es un marco que he estado ayudando a desarrollar.

Robert Nishihara
fuente
sigo recibiendo un error que dice "No se pudo encontrar una versión que satisfaga el requisito ray (de las versiones
:)
2
Por lo general, este tipo de error significa que necesita actualizar pip. Sugeriría intentarlo pip install --upgrade pip. Si necesita usarlo sudo, es posible que la versión pipque está utilizando para instalar rayno sea la misma que se está actualizando. Puedes consultar con pip --version. Además, Windows no es compatible actualmente, por lo que si está en Windows, ese es probablemente el problema.
Robert Nishihara
1
Solo una nota, esto es principalmente para distribuir trabajos concurrentes en múltiples máquinas.
Matt Williamson
2
En realidad, está optimizado tanto para el caso de una sola máquina como para la configuración del clúster. Muchas de las decisiones de diseño (p. Ej., Memoria compartida, serialización de copia cero) están destinadas a admitir bien máquinas individuales.
Robert Nishihara
2
Sería genial si los documentos lo señalaran más. Al leer sobre los documentos, tuve la sensación de que en realidad no estaba destinado al caso de una sola máquina.
Sledge
4

La solución, como han dicho otros, es utilizar múltiples procesos. Sin embargo, qué marco es más apropiado depende de muchos factores. Además de los ya mencionados, también hay charm4py y mpi4py (soy el desarrollador de charm4py).

Hay una forma más eficiente de implementar el ejemplo anterior que usar la abstracción del grupo de trabajo. El bucle principal envía los mismos parámetros (incluido el gráfico completoG ) una y otra vez a los trabajadores en cada una de las 1000 iteraciones. Como al menos un trabajador residirá en un proceso diferente, esto implica copiar y enviar los argumentos a los otros procesos. Esto podría ser muy costoso dependiendo del tamaño de los objetos. En cambio, tiene sentido que los trabajadores almacenen el estado y simplemente envíen la información actualizada.

Por ejemplo, en charm4py esto se puede hacer así:

class Worker(Chare):

    def __init__(self, Q, G, n):
        self.G = G
        ...

    def setinner(self, node1, node2):
        self.updateGraph(node1, node2)
        ...


def solve(Q, G, n):
    # create 2 workers, each on a different process, passing the initial state
    worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
    worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
    while i < 1000:
        result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
        result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B

        inneropt, partition, x = result_a.get()  # wait for result from worker A
        outeropt = result_b.get()  # wait for result from worker B
        ...

Tenga en cuenta que para este ejemplo realmente solo necesitamos un trabajador. El bucle principal podría ejecutar una de las funciones y hacer que el trabajador ejecute la otra. Pero mi código ayuda a ilustrar un par de cosas:

  1. El trabajador A se ejecuta en el proceso 0 (igual que el bucle principal). Mientras result_a.get()está bloqueado esperando el resultado, el trabajador A realiza el cálculo en el mismo proceso.
  2. Los argumentos se pasan automáticamente por referencia al trabajador A, ya que está en el mismo proceso (no hay copia involucrada).
Juan galvez
fuente
2

En algunos casos, es posible paralelizar automáticamente los bucles usando Numba , aunque solo funciona con un pequeño subconjunto de Python:

from numba import njit, prange

@njit(parallel=True)
def prange_test(A):
    s = 0
    # Without "parallel=True" in the jit-decorator
    # the prange statement is equivalent to range
    for i in prange(A.shape[0]):
        s += A[i]
    return s

Desafortunadamente, parece que Numba solo funciona con matrices Numpy, pero no con otros objetos de Python. En teoría, también podría ser posible compilar Python a C ++ y luego paralelizarlo automáticamente usando el compilador Intel C ++ , aunque todavía no lo he intentado.

Anderson Green
fuente
2

Puede usar la joblibbiblioteca para hacer cómputo paralelo y multiprocesamiento.

from joblib import Parallel, delayed

Simplemente puede crear una función fooque desee que se ejecute en paralelo y en función del siguiente código: implementar el procesamiento en paralelo:

output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)

Donde num_coresse puede obtener de la multiprocessingbiblioteca de la siguiente manera:

import multiprocessing

num_cores = multiprocessing.cpu_count()

Si tiene una función con más de un argumento de entrada y solo desea iterar sobre uno de los argumentos por una lista, puede usar la partialfunción de la functoolsbiblioteca de la siguiente manera:

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
    '''
    body of the function
    '''
    return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)

Puede encontrar una explicación completa del multiprocesamiento python y R con un par de ejemplos aquí .

vahab najari
fuente