Python multiprocessing pool.map para múltiples argumentos

536

En la biblioteca de multiprocesamiento de Python, ¿hay una variante de pool.map que admita múltiples argumentos?

text = "test"
def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text,case),case, 1)
    pool.close()
    pool.join()
usuario642897
fuente
44
Para mi sorpresa, no pude hacer partialni lambdahacer esto. Creo que tiene que ver con la extraña forma en que las funciones se pasan a los subprocesos (vía pickle).
senderle
10
@senderle: Este es un error en Python 2.6, pero se corrigió a partir de 2.7: bugs.python.org/issue5228
unutbu
1
Simplemente reemplácelo pool.map(harvester(text,case),case, 1) por: pool.apply_async(harvester(text,case),case, 1)
Tung Nguyen
3
@Syrtis_Major, por favor no edite preguntas de OP que efectivamente sesguen las respuestas que se han dado previamente. Añadiendo returna harvester()la respuesta @senderie 's convertido en ser inexacta. Eso no ayuda a los futuros lectores.
Ricalsin
1
Yo diría que una solución fácil sería empacar todos los argumentos en una tupla y desempaquetarlos en la función de ejecución. Hice esto cuando necesitaba enviar complicados argumentos múltiples a un func ejecutado por un grupo de procesos.
HS Rathore

Respuestas:

358

La respuesta a esto depende de la versión y la situación. La respuesta más general para las versiones recientes de Python (desde 3.3) fue descrita por primera vez a continuación por JF Sebastian . 1 Utiliza el Pool.starmapmétodo, que acepta una secuencia de tuplas de argumentos. Luego, desempaqueta automáticamente los argumentos de cada tupla y los pasa a la función dada:

import multiprocessing
from itertools import product

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.starmap(merge_names, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

Para versiones anteriores de Python, necesitará escribir una función auxiliar para desempaquetar los argumentos explícitamente. Si desea usar with, también necesitará escribir un contenedor para convertirse Poolen un administrador de contexto. (Gracias a muon por señalar esto).

import multiprocessing
from itertools import product
from contextlib import contextmanager

def merge_names(a, b):
    return '{} & {}'.format(a, b)

def merge_names_unpack(args):
    return merge_names(*args)

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(merge_names_unpack, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

En casos más simples, con un segundo argumento fijo, también puede usar partial, pero solo en Python 2.7+.

import multiprocessing
from functools import partial
from contextlib import contextmanager

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(partial(merge_names, b='Sons'), names)
    print(results)

# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...

1. Gran parte de esto fue inspirado por su respuesta, que probablemente debería haber sido aceptada en su lugar. Pero dado que este está atascado en la parte superior, parecía mejor mejorarlo para futuros lectores.

senderle
fuente
¿Me parece que RAW_DATASET en este caso debería ser una variable global? Si bien quiero que parcial_harvester cambie el valor de mayúsculas y minúsculas en cada llamada de harvester (). ¿Cómo lograr eso?
xgdgsc
Lo más importante aquí es asignar un =RAW_DATASETvalor predeterminado a case. De pool.maplo contrario, se confundirá con los múltiples argumentos.
Emerson Xu
1
Estoy confundido, ¿qué pasó con la textvariable en tu ejemplo? Por qué RAW_DATASETaparentemente se pasa dos veces. Creo que podrías tener un error tipográfico?
Dave
no está seguro de por qué el uso with .. as .. me da AttributeError: __exit__, pero funciona bien si acabo de llamar pool = Pool();a continuación, cierre manualmente pool.close()(python2.7)
muón
1
@muon, buena captura. Parece que los Poolobjetos no se convierten en gestores de contexto hasta Python 3.3. He agregado una función de contenedor simple que devuelve un Pooladministrador de contexto.
senderle
501

¿Hay una variante de pool.map que soporte múltiples argumentos?

Python 3.3 incluye el pool.starmap()método :

#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support

def func(a, b):
    return a + b

def main():
    a_args = [1,2,3]
    second_arg = 1
    with Pool() as pool:
        L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
        M = pool.starmap(func, zip(a_args, repeat(second_arg)))
        N = pool.map(partial(func, b=second_arg), a_args)
        assert L == M == N

if __name__=="__main__":
    freeze_support()
    main()

Para versiones anteriores:

#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support

def func(a, b):
    print a, b

def func_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return func(*a_b)

def main():
    pool = Pool()
    a_args = [1,2,3]
    second_arg = 1
    pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))

if __name__=="__main__":
    freeze_support()
    main()

Salida

1 1
2 1
3 1

Observe cómo itertools.izip()y itertools.repeat()se usan aquí.

Debido al error mencionado por @unutbu, no puede usar functools.partial()capacidades similares en Python 2.6, por lo que la función de contenedor simple func_star()debe definirse explícitamente. Consulte también la solución sugerida poruptimebox .

jfs
fuente
1
M .: Usted puede descomprimir el argumento tupla en la firma de func_starla siguiente manera: def func_star((a, b)). Por supuesto, esto solo funciona para un número fijo de argumentos, pero si ese es el único caso que tiene, es más legible.
Björn Pollex
1
@ Space_C0wb0y: la f((a,b))sintaxis está en desuso y se elimina en py3k. Y es innecesario aquí.
jfs
quizás más pitónico: en func = lambda x: func(*x)lugar de definir una función de envoltura
dylam
1
@ zthomas.nc esta pregunta trata sobre cómo admitir múltiples argumentos para multiprocesamiento pool.map. Si desea saber cómo llamar a un método en lugar de una función en un proceso Python diferente mediante multiprocesamiento, haga una pregunta por separado (si todo lo demás falla, siempre puede crear una función global que envuelva la llamada al método similar a la func_star()anterior)
jfs
1
Desearía que hubiera starstarmap.
Константин Ван
141

Creo que lo de abajo será mejor

def multi_run_wrapper(args):
   return add(*args)
def add(x,y):
    return x+y
if __name__ == "__main__":
    from multiprocessing import Pool
    pool = Pool(4)
    results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
    print results

salida

[3, 5, 7]
imotai
fuente
16
La solución más fácil. Hay una pequeña optimización; eliminar la función de envoltura y descomprimir argsdirectamente add, funciona para cualquier número de argumentos:def add(args): (x,y) = args
Ahmed
1
también podría usar una lambdafunción en lugar de definirmulti_run_wrapper(..)
Andre Holzner,
2
hm ... de hecho, usar a lambdano funciona porque pool.map(..)intenta encurtir la función dada
Andre Holzner
¿Cómo se usa esto si desea almacenar el resultado adden una lista?
Vivek Subramanian
@Ahmed Me gusta cómo es, porque en mi humilde opinión, la llamada al método debería fallar, siempre que el número de parámetro no sea correcto.
Michael Dorner
56

Usando Python 3.3+ conpool.starmap():

from multiprocessing.dummy import Pool as ThreadPool 

def write(i, x):
    print(i, "---", x)

a = ["1","2","3"]
b = ["4","5","6"] 

pool = ThreadPool(2)
pool.starmap(write, zip(a,b)) 
pool.close() 
pool.join()

Resultado:

1 --- 4
2 --- 5
3 --- 6

También puede comprimir () más argumentos si lo desea: zip(a,b,c,d,e)

En caso de que desee que se pase un valor constante como argumento, debe usarlo import itertoolsy, zip(itertools.repeat(constant), a)por ejemplo,

usuario136036
fuente
2
Esta es una respuesta duplicada casi exacta como la de @JFSebastian en 2011 (con más de 60 votos).
Mike McKerns
29
No. En primer lugar, eliminó muchas cosas innecesarias y claramente indica que es para Python 3.3+ y está destinado a principiantes que buscan una respuesta simple y limpia. Como principiante, me tomó un tiempo descubrirlo de esa manera (sí, con las publicaciones de JFSebastians) y es por eso que escribí mi publicación para ayudar a otros principiantes, porque su publicación simplemente decía "hay un mapa estelar" pero no lo explicó. es lo que pretende mi publicación. Así que no hay absolutamente ninguna razón para criticarme con dos votos negativos.
user136036
En 2011, no había "+" en Python 3.3 + ... obviamente.
Mike McKerns
27

Después de aprender sobre itertools en la respuesta de JF Sebastian , decidí ir un paso más allá y escribir un parmappaquete que se ocupara de la paralelización, la oferta mapy las starmapfunciones en python-2.7 y python-3.2 (y más tarde también) que pueden llevar cualquier cantidad de argumentos posicionales .

Instalación

pip install parmap

Cómo paralelizar:

import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)

# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)

# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
        listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)

He subido parmap a PyPI y a un repositorio de github .

Como ejemplo, la pregunta se puede responder de la siguiente manera:

import parmap

def harvester(case, text):
    X = case[0]
    text+ str(X)

if __name__ == "__main__":
    case = RAW_DATASET  # assuming this is an iterable
    parmap.map(harvester, case, "test", chunksize=1)
zeehio
fuente
20

# "Cómo tomar múltiples argumentos".

def f1(args):
    a, b, c = args[0] , args[1] , args[2]
    return a+b+c

if __name__ == "__main__":
    import multiprocessing
    pool = multiprocessing.Pool(4) 

    result1 = pool.map(f1, [ [1,2,3] ])
    print(result1)
Dane Lee
fuente
2
Aseado y elegante.
Prav001
1
No entiendo por qué tengo que desplazarme hasta aquí para encontrar la mejor respuesta.
toti
12

Hay una bifurcación de pathosmultiprocessing llamado ( nota: use la versión en github ) que no necesita : las funciones de mapa reflejan la API para el mapa de Python, por lo tanto, el mapa puede tomar múltiples argumentos. Con , generalmente también puede hacer multiprocesamiento en el intérprete, en lugar de quedarse atrapado en el bloque. Pathos se lanzará después de una actualización leve, principalmente la conversión a python 3.x.starmappathos__main__

  Python 2.7.5 (default, Sep 30 2013, 20:15:49) 
  [GCC 4.2.1 (Apple Inc. build 5566)] on darwin
  Type "help", "copyright", "credits" or "license" for more information.
  >>> def func(a,b):
  ...     print a,b
  ...
  >>>
  >>> from pathos.multiprocessing import ProcessingPool    
  >>> pool = ProcessingPool(nodes=4)
  >>> pool.map(func, [1,2,3], [1,1,1])
  1 1
  2 1
  3 1
  [None, None, None]
  >>>
  >>> # also can pickle stuff like lambdas 
  >>> result = pool.map(lambda x: x**2, range(10))
  >>> result
  [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  >>>
  >>> # also does asynchronous map
  >>> result = pool.amap(pow, [1,2,3], [4,5,6])
  >>> result.get()
  [1, 32, 729]
  >>>
  >>> # or can return a map iterator
  >>> result = pool.imap(pow, [1,2,3], [4,5,6])
  >>> result
  <processing.pool.IMapIterator object at 0x110c2ffd0>
  >>> list(result)
  [1, 32, 729]

pathostiene varias formas de obtener el comportamiento exacto starmap.

>>> def add(*x):
...   return sum(x)
... 
>>> x = [[1,2,3],[4,5,6]]
>>> import pathos
>>> import numpy as np
>>> # use ProcessPool's map and transposing the inputs
>>> pp = pathos.pools.ProcessPool()
>>> pp.map(add, *np.array(x).T)
[6, 15]
>>> # use ProcessPool's map and a lambda to apply the star
>>> pp.map(lambda x: add(*x), x)
[6, 15]
>>> # use a _ProcessPool, which has starmap
>>> _pp = pathos.pools._ProcessPool()
>>> _pp.starmap(add, x)
[6, 15]
>>> 
Mike McKerns
fuente
Quiero señalar que esto no aborda la estructura en la pregunta original. [[1,2,3], [4,5,6]] se descomprimiría con el mapa estelar en [pow (1,2,3), pow (4,5,6)], no [pow (1,4) , pow (2,5), pow (3, 6)]. Si no tiene un buen control sobre las entradas que se pasan a su función, es posible que deba reestructurarlas primero.
Scott,
@ Scott: ah, no me di cuenta de eso ... hace más de 5 años. Haré una pequeña actualización. Gracias.
Mike McKerns
8

Puede usar las siguientes dos funciones para evitar escribir un contenedor para cada nueva función:

import itertools
from multiprocessing import Pool

def universal_worker(input_pair):
    function, args = input_pair
    return function(*args)

def pool_args(function, *args):
    return zip(itertools.repeat(function), zip(*args))

Use la función functioncon las listas de argumentos arg_0, arg_1y de la arg_2siguiente manera:

pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()
M. Toya
fuente
8

Una mejor solución para python2:

from multiprocessing import Pool
def func((i, (a, b))):
    print i, a, b
    return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])

2 3 4

1 2 3

0 1 2

fuera[]:

[3, 5, 7]

xmduhan
fuente
7

Otra alternativa simple es envolver sus parámetros de función en una tupla y luego envolver los parámetros que también deberían pasarse en tuplas. Quizás esto no sea ideal cuando se trata de grandes cantidades de datos. Creo que haría copias para cada tupla.

from multiprocessing import Pool

def f((a,b,c,d)):
    print a,b,c,d
    return a + b + c +d

if __name__ == '__main__':
    p = Pool(10)
    data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
    print(p.map(f, data))
    p.close()
    p.join()

Da la salida en un orden aleatorio:

0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]
Alex Klibisz
fuente
De hecho, sigue buscando una mejor manera :(
Fábio Dias
6

Una mejor manera es usar decorador en lugar de escribir la función de envoltura a mano. Especialmente cuando tiene muchas funciones para mapear, el decorador le ahorrará tiempo al evitar el envoltorio de escritura para cada función. Por lo general, una función decorada no es seleccionable, sin embargo, podemos usarla functoolspara evitarla . Más discursos se pueden encontrar aquí .

Aquí el ejemplo

def unpack_args(func):
    from functools import wraps
    @wraps(func)
    def wrapper(args):
        if isinstance(args, dict):
            return func(**args)
        else:
            return func(*args)
    return wrapper

@unpack_args
def func(x, y):
    return x + y

Luego puede mapearlo con argumentos comprimidos

np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()

Por supuesto, siempre puede usar Pool.starmapen Python 3 (> = 3.3) como se menciona en otras respuestas.

Syrtis Major
fuente
Los resultados no son los esperados: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] Yo esperaría: [0,1,2,3,4,5,6,7,8, 9,1,2,3,4,5,6,7,8,9,10,2,3,4,5,6,7,8,9,10,11, ...
Tedo Vrbanec
@TedoVrbanec Los resultados deberían ser [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]. Si desea la posterior, puede usarla en itertools.productlugar de zip.
Syrtis Major
4

Otra forma es pasar una lista de listas a una rutina de un argumento:

import os
from multiprocessing import Pool

def task(args):
    print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]

pool = Pool()

pool.map(task, [
        [1,2],
        [3,4],
        [5,6],
        [7,8]
    ])

Uno puede construir una lista de listas de argumentos con su método favorito.

Adobe
fuente
Esta es una manera fácil, pero necesita cambiar sus funciones originales. Además, algunas veces recuerdan las funciones de otras que no pueden modificarse.
WeizhongTu
Diré que esto se adhiere al zen de Python. Debe haber una y solo una forma obvia de hacerlo. Si por casualidad usted es el autor de la función de llamada, debe usar este método, en otros casos podemos usar el método de imotai.
nehem
Mi elección es usar una tupla, y luego desenvolverlos inmediatamente como lo primero en la primera línea.
nehem
3

Aquí hay otra forma de hacerlo, en mi humilde opinión es más simple y elegante que cualquiera de las otras respuestas proporcionadas.

Este programa tiene una función que toma dos parámetros, los imprime y también imprime la suma:

import multiprocessing

def main():

    with multiprocessing.Pool(10) as pool:
        params = [ (2, 2), (3, 3), (4, 4) ]
        pool.starmap(printSum, params)
    # end with

# end function

def printSum(num1, num2):
    mySum = num1 + num2
    print('num1 = ' + str(num1) + ', num2 = ' + str(num2) + ', sum = ' + str(mySum))
# end function

if __name__ == '__main__':
    main()

la salida es:

num1 = 2, num2 = 2, sum = 4
num1 = 3, num2 = 3, sum = 6
num1 = 4, num2 = 4, sum = 8

Vea los documentos de Python para más información:

https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool

En particular, asegúrese de revisar la starmapfunción.

Estoy usando Python 3.6, no estoy seguro de si esto funcionará con versiones anteriores de Python

Por qué no hay un ejemplo tan sencillo como este en los documentos, no estoy seguro.

cdahms
fuente
2

Desde python 3.4.4, puede usar multiprocessing.get_context () para obtener un objeto de contexto para usar múltiples métodos de inicio:

import multiprocessing as mp

def foo(q, h, w):
    q.put(h + ' ' + w)
    print(h + ' ' + w)

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,'hello', 'world'))
    p.start()
    print(q.get())
    p.join()

O simplemente reemplazas

pool.map(harvester(text,case),case, 1)

por:

pool.apply_async(harvester(text,case),case, 1)
Tung Nguyen
fuente
2

Aquí hay muchas respuestas, pero ninguna parece proporcionar un código compatible con Python 2/3 que funcione en cualquier versión. Si desea que su código simplemente funcione , esto funcionará para cualquier versión de Python:

# For python 2/3 compatibility, define pool context manager
# to support the 'with' statement in Python 2
if sys.version_info[0] == 2:
    from contextlib import contextmanager
    @contextmanager
    def multiprocessing_context(*args, **kwargs):
        pool = multiprocessing.Pool(*args, **kwargs)
        yield pool
        pool.terminate()
else:
    multiprocessing_context = multiprocessing.Pool

Después de eso, puede usar el multiprocesamiento de la manera normal de Python 3, como quiera. Por ejemplo:

def _function_to_run_for_each(x):
       return x.lower()
with multiprocessing_context(processes=3) as pool:
    results = pool.map(_function_to_run_for_each, ['Bob', 'Sue', 'Tim'])    print(results)

funcionará en Python 2 o Python 3.

cgnorthcutt
fuente
1

En la documentación oficial se afirma que solo admite un argumento iterable. Me gusta usar apply_async en tales casos. En tu caso yo haría:

from multiprocessing import Process, Pool, Manager

text = "test"
def harvester(text, case, q = None):
 X = case[0]
 res = text+ str(X)
 if q:
  q.put(res)
 return res


def block_until(q, results_queue, until_counter=0):
 i = 0
 while i < until_counter:
  results_queue.put(q.get())
  i+=1

if __name__ == '__main__':
 pool = multiprocessing.Pool(processes=6)
 case = RAW_DATASET
 m = Manager()
 q = m.Queue()
 results_queue = m.Queue() # when it completes results will reside in this queue
 blocking_process = Process(block_until, (q, results_queue, len(case)))
 blocking_process.start()
 for c in case:
  try:
   res = pool.apply_async(harvester, (text, case, q = None))
   res.get(timeout=0.1)
  except:
   pass
 blocking_process.join()
roj4s
fuente
1
text = "test"

def unpack(args):
    return args[0](*args[1:])

def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    # args is a list of tuples 
    # with the function to execute as the first item in each tuple
    args = [(harvester, text, c) for c in case]
    # doing it this way, we can pass any function
    # and we don't need to define a wrapper for each different function
    # if we need to use more than one
    pool.map(unpack, args)
    pool.close()
    pool.join()
Jaime
fuente
1

Este es un ejemplo de la rutina que uso para pasar múltiples argumentos a una función de un argumento utilizada en una bifurcación pool.imap :

from multiprocessing import Pool

# Wrapper of the function to map:
class makefun:
    def __init__(self, var2):
        self.var2 = var2
    def fun(self, i):
        var2 = self.var2
        return var1[i] + var2

# Couple of variables for the example:
var1 = [1, 2, 3, 5, 6, 7, 8]
var2 = [9, 10, 11, 12]

# Open the pool:
pool = Pool(processes=2)

# Wrapper loop
for j in range(len(var2)):
    # Obtain the function to map
    pool_fun = makefun(var2[j]).fun

    # Fork loop
    for i, value in enumerate(pool.imap(pool_fun, range(len(var1))), 0):
        print(var1[i], '+' ,var2[j], '=', value)

# Close the pool
pool.close()
A. Nodar
fuente
-3

para python2, puedes usar este truco

def fun(a,b):
    return a+b

pool = multiprocessing.Pool(processes=6)
b=233
pool.map(lambda x:fun(x,b),range(1000))
Hz Shang
fuente
por qué b = 233. derrota el propósito de la pregunta
como si el