Multiprocesamiento: ¿Cómo usar Pool.map en una función definida en una clase?

179

Cuando ejecuto algo como:

from multiprocessing import Pool

p = Pool(5)
def f(x):
     return x*x

p.map(f, [1,2,3])

funciona bien. Sin embargo, poniendo esto en función de una clase:

class calculate(object):
    def run(self):
        def f(x):
            return x*x

        p = Pool()
        return p.map(f, [1,2,3])

cl = calculate()
print cl.run()

Me da el siguiente error:

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/sw/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

He visto una publicación de Alex Martelli que trata el mismo tipo de problema, pero no fue lo suficientemente explícito.

Mermoz
fuente
1
"esto como una función de una clase"? ¿Puedes publicar el código que realmente obtiene el error real? Sin el código real, solo podemos adivinar lo que estás haciendo mal.
S.Lott
Como observación general, existen módulos de decapado más potentes que el módulo de decapado estándar de Python (como el módulo de picloud mencionado en esta respuesta ).
Klaus se
1
Tuve un problema similar con los cierres IPython.Parallel, pero allí podría solucionar el problema empujando los objetos a los nodos. Parece bastante molesto solucionar este problema con el multiprocesamiento.
Alex S
Aquí calculatees estibables, por lo que parece que esto puede ser resuelto por 1) la creación de un objeto de función con un constructor que las copias más de una calculateinstancia y luego 2) hacer pasar una instancia de este objeto de función de Pool's mapmétodo. ¿No?
rd11
1
@math No creo que ninguno de los "cambios recientes" de Python sean de ninguna ayuda. Algunas limitaciones del multiprocessingmódulo se deben a su objetivo de ser una implementación multiplataforma y a la falta de una fork(2)llamada de sistema similar en Windows. Si no le importa el soporte de Win32, puede haber una solución alternativa más simple basada en procesos. O si está preparado para usar hilos en lugar de procesos, puede sustituirlos from multiprocessing import Poolpor from multiprocessing.pool import ThreadPool as Pool.
Aya

Respuestas:

69

También me molestaron las restricciones sobre qué tipo de funciones pool.map podría aceptar. Escribí lo siguiente para evitar esto. Parece funcionar, incluso para el uso recursivo de parmap.

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(pipe, x):
        pipe.send(f(x))
        pipe.close()
    return fun

def parmap(f, X):
    pipe = [Pipe() for x in X]
    proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
    [p.start() for p in proc]
    [p.join() for p in proc]
    return [p.recv() for (p, c) in pipe]

if __name__ == '__main__':
    print parmap(lambda x: x**x, range(1, 5))
mrule
fuente
1
Esto me ha funcionado muy bien, gracias. He encontrado una debilidad: intenté usar parmap en algunas funciones que pasaron por alto un fallo predeterminado y obtuve el PicklingError nuevamente. No encontré una solución para esto, simplemente modifiqué mi código para no usar el defaultdict.
sin
2
Esto no funciona en Python 2.7.2 (predeterminado, 12 de junio de 2011, 15:08:59) [MSC v.1500 32 bit (Intel)] en win32
ubershmekel
3
Esto funciona en Python 2.7.3 agosto 1,2012, 05:14:39. Esto no funciona en iterables gigantes -> causa un error de OSE: [Errno 24] Demasiados archivos abiertos debido a la cantidad de tuberías que abre.
Eiyrioü von Kauyf
Esta solución genera un proceso para cada elemento de trabajo. La solución de "klaus se" a continuación es más eficiente.
ypnos
85

No pude usar los códigos publicados hasta ahora porque los códigos que usan "multiprocesamiento.Pool" no funcionan con expresiones lambda y los códigos que no usan "multiprocesamiento.Pool" generan tantos procesos como elementos de trabajo.

Adapté el código st que genera una cantidad predefinida de trabajadores y solo itera por la lista de entrada si existe un trabajador inactivo. También habilité el modo "daemon" para los trabajadores st ctrl-c funciona como se esperaba.

import multiprocessing


def fun(f, q_in, q_out):
    while True:
        i, x = q_in.get()
        if i is None:
            break
        q_out.put((i, f(x)))


def parmap(f, X, nprocs=multiprocessing.cpu_count()):
    q_in = multiprocessing.Queue(1)
    q_out = multiprocessing.Queue()

    proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
            for _ in range(nprocs)]
    for p in proc:
        p.daemon = True
        p.start()

    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [q_in.put((None, None)) for _ in range(nprocs)]
    res = [q_out.get() for _ in range(len(sent))]

    [p.join() for p in proc]

    return [x for i, x in sorted(res)]


if __name__ == '__main__':
    print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))
klaus se
fuente
2
¿Cómo obtendrías una barra de progreso para que funcione correctamente con esta parmapfunción?
shockburner
2
Una pregunta: utilicé esta solución pero noté que los procesos de Python que generé permanecieron activos en la memoria. ¿Alguna idea rápida sobre cómo matarlos cuando sale tu parmap?
CompEcon
1
@ klaus-se Sé que estamos desanimados de decir gracias en los comentarios, pero su respuesta es demasiado valiosa para mí, no pude resistirme. Ojalá pudiera darte más de una reputación ...
deshtop
2
@greole pasando (None, None)como el último elemento indica funque ha llegado al final de la secuencia de elementos para cada proceso.
aganders3
44
@deshtop: puedes con una recompensa, si tienes suficiente reputación tú mismo :-)
Mark
57

El multiprocesamiento y el decapado están rotos y limitados a menos que salte fuera de la biblioteca estándar.

Si usa una bifurcación de multiprocessingllamadas pathos.multiprocesssing, puede usar directamente clases y métodos de clase en las mapfunciones de multiprocesamiento . Esto se debe a que dillse usa en lugar de pickleo cPickle, y dillpuede serializar casi cualquier cosa en Python.

pathos.multiprocessingtambién proporciona una función de mapa asíncrono ... y puede mapfuncionar con múltiples argumentos (por ejemplo map(math.pow, [1,2,3], [4,5,6]))

Ver discusiones: ¿Qué pueden hacer juntos el multiprocesamiento y el eneldo?

y: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

Incluso maneja el código que escribió inicialmente, sin modificaciones, y del intérprete. ¿Por qué hacer algo más que sea más frágil y específico para un solo caso?

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> class calculate(object):
...  def run(self):
...   def f(x):
...    return x*x
...   p = Pool()
...   return p.map(f, [1,2,3])
... 
>>> cl = calculate()
>>> print cl.run()
[1, 4, 9]

Obtenga el código aquí: https://github.com/uqfoundation/pathos

Y, solo para mostrar un poco más de lo que puede hacer:

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> p = Pool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> res = p.amap(t.plus, x, y)
>>> res.get()
[4, 6, 8, 10]
Mike McKerns
fuente
1
pathos.multiprocessing también tiene un mapa asincrónico ( amap) que permite el uso de barras de progreso y otra programación asincrónica.
Mike McKerns
Me gusta pathos.multiprocessing, que puede servir casi como un reemplazo directo del mapa no paralelo mientras disfruta del multiprocesamiento. Tengo un contenedor simple de pathos.multiprocessing.map, de modo que es más eficiente en la memoria al procesar una estructura de datos grandes de solo lectura en múltiples núcleos, consulte este repositorio de git .
Fashandge
Parece interesante, pero no se instala. Este es el mensaje que da pip:Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos)
xApple
1
Si. No lo he lanzado en mucho tiempo ya que he estado dividiendo la funcionalidad en paquetes separados y también convirtiendo a 2/3 código compatible. Gran parte de lo anterior se ha modularizado y multiprocesses compatible con 2/3. Consulte stackoverflow.com/questions/27873093/… y pypi.python.org/pypi/multiprocess .
Mike McKerns
3
@xApple: solo como seguimiento, pathosha tenido una nueva versión estable y también es compatible con 2.xy 3.x.
Mike McKerns
40

Actualmente, hasta donde yo sé, no hay una solución a su problema: la función a la que map()debe acceder debe ser accesible mediante la importación de su módulo. Por eso funciona el código de Robert: la función f()se puede obtener importando el siguiente código:

def f(x):
    return x*x

class Calculate(object):
    def run(self):
        p = Pool()
        return p.map(f, [1,2,3])

if __name__ == '__main__':
    cl = Calculate()
    print cl.run()

De hecho, agregué una sección "principal", porque esto sigue las recomendaciones para la plataforma Windows ("Asegúrese de que el módulo principal pueda ser importado de manera segura por un nuevo intérprete de Python sin causar efectos secundarios no deseados").

También agregué una letra mayúscula delante Calculatepara seguir PEP 8 . :)

Eric O Lebigot
fuente
18

La solución de mrule es correcta pero tiene un error: si el niño envía una gran cantidad de datos, puede llenar el búfer de la tubería, bloqueando al niño pipe.send(), mientras el padre espera a que el niño salga pipe.join(). La solución es leer los datos del niño antes join()de hacerlo . Además, el niño debe cerrar el extremo del tubo de los padres para evitar un punto muerto. El siguiente código soluciona eso. También tenga en cuenta que esto parmapcrea un proceso por elemento en X. Una solución más avanzada es usar multiprocessing.cpu_count()para dividir Xen varios trozos y luego combinar los resultados antes de regresar. Lo dejo como un ejercicio para el lector para no estropear la concisión de la buena respuesta de mrule. ;)

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(ppipe, cpipe,x):
        ppipe.close()
        cpipe.send(f(x))
        cpipe.close()
    return fun

def parmap(f,X):
    pipe=[Pipe() for x in X]
    proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)]
    [p.start() for p in proc]
    ret = [p.recv() for (p,c) in pipe]
    [p.join() for p in proc]
    return ret

if __name__ == '__main__':
    print parmap(lambda x:x**x,range(1,5))
Bob McElrath
fuente
¿Cómo eliges el número de procesos?
patapouf_ai
Sin embargo, muere bastante rápido debido al error OSError: [Errno 24] Too many open files. Creo que debe haber algún tipo de límite en el número de procesos para que funcione correctamente ...
patapouf_ai
13

También he luchado con esto. Tenía funciones como miembros de datos de una clase, como un ejemplo simplificado:

from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # Needed to do something like this (the following line won't work)
        return pool.map(self.f,list1,list2)  

Necesitaba usar la función self.f en una llamada Pool.map () desde la misma clase y self.f no tomó una tupla como argumento. Como esta función estaba integrada en una clase, no tenía claro cómo escribir el tipo de contenedor que otras respuestas sugerían.

Resolví este problema usando un contenedor diferente que toma una tupla / lista, donde el primer elemento es la función, y los elementos restantes son los argumentos de esa función, llamada eval_func_tuple (f_args). Con esto, la línea problemática se puede reemplazar por return pool.map (eval_func_tuple, itertools.izip (itertools.repeat (self.f), list1, list2)). Aquí está el código completo:

Archivo: util.py

def add(a, b): return a+b

def eval_func_tuple(f_args):
    """Takes a tuple of a function and args, evaluates and returns result"""
    return f_args[0](*f_args[1:])  

Archivo: main.py

from multiprocessing import Pool
import itertools
import util  

pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # The following line will now work
        return pool.map(util.eval_func_tuple, 
            itertools.izip(itertools.repeat(self.f), list1, list2)) 

if __name__ == '__main__':
    myExample = Example(util.add)
    list1 = [1, 2, 3]
    list2 = [10, 20, 30]
    print myExample.add_lists(list1, list2)  

Ejecutar main.py dará [11, 22, 33]. Siéntase libre de mejorar esto, por ejemplo eval_func_tuple también podría modificarse para tomar argumentos de palabras clave.

En otra nota, en otras respuestas, la función "parmap" puede hacerse más eficiente para el caso de más Procesos que el número de CPU disponibles. Estoy copiando una versión editada a continuación. Esta es mi primera publicación y no estaba seguro de si debería editar directamente la respuesta original. También cambié el nombre de algunas variables.

from multiprocessing import Process, Pipe  
from itertools import izip  

def spawn(f):  
    def fun(pipe,x):  
        pipe.send(f(x))  
        pipe.close()  
    return fun  

def parmap(f,X):  
    pipe=[Pipe() for x in X]  
    processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]  
    numProcesses = len(processes)  
    processNum = 0  
    outputList = []  
    while processNum < numProcesses:  
        endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)  
        for proc in processes[processNum:endProcessNum]:  
            proc.start()  
        for proc in processes[processNum:endProcessNum]:  
            proc.join()  
        for proc,c in pipe[processNum:endProcessNum]:  
            outputList.append(proc.recv())  
        processNum = endProcessNum  
    return outputList    

if __name__ == '__main__':  
    print parmap(lambda x:x**x,range(1,5))         
Brandt
fuente
8

Tomé la respuesta de klaus se y aganders3 e hice un módulo documentado que es más legible y se mantiene en un solo archivo. Simplemente puede agregarlo a su proyecto. ¡Incluso tiene una barra de progreso opcional!

"""
The ``processes`` module provides some convenience functions
for using parallel processes in python.

Adapted from http://stackoverflow.com/a/16071616/287297

Example usage:

    print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True)

Comments:

"It spawns a predefined amount of workers and only iterates through the input list
 if there exists an idle worker. I also enabled the "daemon" mode for the workers so
 that KeyboardInterupt works as expected."

Pitfalls: all the stdouts are sent back to the parent stdout, intertwined.

Alternatively, use this fork of multiprocessing: 
https://github.com/uqfoundation/multiprocess
"""

# Modules #
import multiprocessing
from tqdm import tqdm

################################################################################
def apply_function(func_to_apply, queue_in, queue_out):
    while not queue_in.empty():
        num, obj = queue_in.get()
        queue_out.put((num, func_to_apply(obj)))

################################################################################
def prll_map(func_to_apply, items, cpus=None, verbose=False):
    # Number of processes to use #
    if cpus is None: cpus = min(multiprocessing.cpu_count(), 32)
    # Create queues #
    q_in  = multiprocessing.Queue()
    q_out = multiprocessing.Queue()
    # Process list #
    new_proc  = lambda t,a: multiprocessing.Process(target=t, args=a)
    processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)]
    # Put all the items (objects) in the queue #
    sent = [q_in.put((i, x)) for i, x in enumerate(items)]
    # Start them all #
    for proc in processes:
        proc.daemon = True
        proc.start()
    # Display progress bar or not #
    if verbose:
        results = [q_out.get() for x in tqdm(range(len(sent)))]
    else:
        results = [q_out.get() for x in range(len(sent))]
    # Wait for them to finish #
    for proc in processes: proc.join()
    # Return results #
    return [x for i, x in sorted(results)]

################################################################################
def test():
    def slow_square(x):
        import time
        time.sleep(2)
        return x**2
    objs    = range(20)
    squares = prll_map(slow_square, objs, 4, verbose=True)
    print "Result: %s" % squares

EDITAR : Se agregó la sugerencia @ alexander-mcfarlane y una función de prueba

xApple
fuente
un problema con su barra de progreso ... La barra solo mide cuán ineficientemente se dividió la carga de trabajo entre los procesadores. Si la carga de trabajo se divide perfectamente, todos los procesadores lo harán join()al mismo tiempo y obtendrá un destello 100%completo en la tqdmpantalla. El único momento en que será útil es si cada procesador tiene una carga de trabajo sesgada
Alexander McFarlane
1
mover tqdm()para cerrar la línea: result = [q_out.get() for _ in tqdm(sent)]y funciona mucho mejor - gran esfuerzo, aunque realmente aprecio esto, así que +1
Alexander McFarlane
Gracias por ese consejo, lo intentaré y luego actualizaré la respuesta.
xApple
¡La respuesta se actualiza y la barra de progreso funciona mucho mejor!
xApple
8

Sé que esto se hizo hace más de 6 años, pero solo quería agregar mi solución, ya que algunas de las sugerencias anteriores parecen terriblemente complicadas, pero mi solución fue realmente muy simple.

Todo lo que tenía que hacer era ajustar la llamada pool.map () a una función auxiliar. Pasar el objeto de clase junto con args para el método como una tupla, que se parecía un poco a esto.

def run_in_parallel(args):
    return args[0].method(args[1])

myclass = MyClass()
method_args = [1,2,3,4,5,6]
args_map = [ (myclass, arg) for arg in method_args ]
pool = Pool()
pool.map(run_in_parallel, args_map)
noctámbulo
fuente
7

Las funciones definidas en las clases (incluso dentro de las funciones dentro de las clases) realmente no se enredan. Sin embargo, esto funciona:

def f(x):
    return x*x

class calculate(object):
    def run(self):
        p = Pool()
    return p.map(f, [1,2,3])

cl = calculate()
print cl.run()
robert
fuente
15
gracias, pero me parece un poco sucio definir la función fuera de la clase. La clase debe agrupar todo lo que necesita para lograr una tarea determinada.
Mermoz
3
@Memoz: "La clase debería agrupar todo lo que necesita" ¿En serio? No puedo encontrar muchos ejemplos de esto. La mayoría de las clases dependen de otras clases o funciones. ¿Por qué llamar a una dependencia de clase "sucia"? ¿Qué tiene de malo una dependencia?
S.Lott
Bueno, la función no debería modificar los datos de clase existentes, porque modificaría la versión en el otro proceso, por lo que podría ser un método estático. Puede elegir un método estático: stackoverflow.com/questions/1914261/… O, para algo tan trivial, podría usar un lambda.
robert
6

Sé que esta pregunta se hizo hace 8 años y 10 meses, pero quiero presentarles mi solución:

from multiprocessing import Pool

class Test:

    def __init__(self):
        self.main()

    @staticmethod
    def methodForMultiprocessing(x):
        print(x*x)

    def main(self):
        if __name__ == "__main__":
            p = Pool()
            p.map(Test.methodForMultiprocessing, list(range(1, 11)))
            p.close()

TestObject = Test()

Solo necesita hacer que su clase funcione en un método estático. Pero también es posible con un método de clase:

from multiprocessing import Pool

class Test:

    def __init__(self):
        self.main()

    @classmethod
    def methodForMultiprocessing(cls, x):
        print(x*x)

    def main(self):
        if __name__ == "__main__":
            p = Pool()
            p.map(Test.methodForMultiprocessing, list(range(1, 11)))
            p.close()

TestObject = Test()

Probado en Python 3.7.3

TornaxO7
fuente
3

Modifiqué el método de klaus se porque mientras funcionaba para mí con listas pequeñas, se bloqueaba cuando el número de elementos era ~ 1000 o mayor. En lugar de empujar los trabajos uno a la vez con la Nonecondición de detención, cargo la cola de entrada de una vez y solo dejo que los procesos la procesen hasta que esté vacía.

from multiprocessing import cpu_count, Queue, Process

def apply_func(f, q_in, q_out):
    while not q_in.empty():
        i, x = q_in.get()
        q_out.put((i, f(x)))

# map a function using a pool of processes
def parmap(f, X, nprocs = cpu_count()):
    q_in, q_out   = Queue(), Queue()
    proc = [Process(target=apply_func, args=(f, q_in, q_out)) for _ in range(nprocs)]
    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [p.start() for p in proc]
    res = [q_out.get() for _ in sent]
    [p.join() for p in proc]

    return [x for i,x in sorted(res)]

Editar: desafortunadamente, ahora me encuentro con este error en mi sistema: el límite de tamaño máximo de la cola de multiprocesamiento es 32767 , espero que las soluciones allí ayuden.

aganders3
fuente
1

Puede ejecutar su código sin ningún problema si de alguna manera ignora manualmente el Poolobjeto de la lista de objetos en la clase porque no picklepuede, como dice el error. Puedes hacer esto con la __getstate__función (mira aquí también) como sigue. El Poolobjetivo será tratar de encontrar los __getstate__y las __setstate__funciones y ejecutarlas si lo encuentra cuando se ejecuta map, map_asyncetc:

class calculate(object):
    def __init__(self):
        self.p = Pool()
    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['p']
        return self_dict
    def __setstate__(self, state):
        self.__dict__.update(state)

    def f(self, x):
        return x*x
    def run(self):
        return self.p.map(self.f, [1,2,3])

Entonces hazlo:

cl = calculate()
cl.run()

te dará el resultado:

[1, 4, 9]

He probado el código anterior en Python 3.xy funciona.

Amir
fuente
0

No estoy seguro de si se ha tomado este enfoque, pero una solución que estoy usando es:

from multiprocessing import Pool

t = None

def run(n):
    return t.f(n)

class Test(object):
    def __init__(self, number):
        self.number = number

    def f(self, x):
        print x * self.number

    def pool(self):
        pool = Pool(2)
        pool.map(run, range(10))

if __name__ == '__main__':
    t = Test(9)
    t.pool()
    pool = Pool(2)
    pool.map(run, range(10))

La salida debe ser:

0
9
18
27
36
45
54
63
72
81
0
9
18
27
36
45
54
63
72
81
CpILL
fuente
0
class Calculate(object):
  # Your instance method to be executed
  def f(self, x, y):
    return x*y

if __name__ == '__main__':
  inp_list = [1,2,3]
  y = 2
  cal_obj = Calculate()
  pool = Pool(2)
  results = pool.map(lambda x: cal_obj.f(x, y), inp_list)

Existe la posibilidad de que desee aplicar esta función para cada instancia diferente de la clase. Entonces aquí está la solución para eso también

class Calculate(object):
  # Your instance method to be executed
  def __init__(self, x):
    self.x = x

  def f(self, y):
    return self.x*y

if __name__ == '__main__':
  inp_list = [Calculate(i) for i in range(3)]
  y = 2
  pool = Pool(2)
  results = pool.map(lambda x: x.f(y), inp_list)
ShikharDua
fuente
0

Aquí está mi solución, que creo que es un poco menos agresiva que la mayoría de los demás. Es similar a la respuesta de Nightowl.

someclasses = [MyClass(), MyClass(), MyClass()]

def method_caller(some_object, some_method='the method'):
    return getattr(some_object, some_method)()

othermethod = partial(method_caller, some_method='othermethod')

with Pool(6) as pool:
    result = pool.map(othermethod, someclasses)
Erlend Aune
fuente
0

De http://www.rueckstiess.net/research/snippets/show/ca1d7d90 y http://qingkaikong.blogspot.com/2016/12/python-parallel-method-in-class.html

Podemos hacer una función externa y sembrarla con el objeto de clase:

from joblib import Parallel, delayed
def unwrap_self(arg, **kwarg):
    return square_class.square_int(*arg, **kwarg)

class square_class:
    def square_int(self, i):
        return i * i

    def run(self, num):
        results = []
        results = Parallel(n_jobs= -1, backend="threading")\
            (delayed(unwrap_self)(i) for i in zip([self]*len(num), num))
        print(results)

O sin bolsa de trabajo:

from multiprocessing import Pool
import time

def unwrap_self_f(arg, **kwarg):
    return C.f(*arg, **kwarg)

class C:
    def f(self, name):
        print 'hello %s,'%name
        time.sleep(5)
        print 'nice to meet you.'

    def run(self):
        pool = Pool(processes=2)
        names = ('frank', 'justin', 'osi', 'thomas')
        pool.map(unwrap_self_f, zip([self]*len(names), names))

if __name__ == '__main__':
    c = C()
    c.run()
Bob Baxley
fuente
0

Puede que esta no sea una muy buena solución, pero en mi caso, la resuelvo así.

from multiprocessing import Pool

def foo1(data):
    self = data.get('slf')
    lst = data.get('lst')
    return sum(lst) + self.foo2()

class Foo(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b

    def foo2(self):
        return self.a**self.b   

    def foo(self):
        p = Pool(5)
        lst = [1, 2, 3]
        result = p.map(foo1, (dict(slf=self, lst=lst),))
        return result

if __name__ == '__main__':
    print(Foo(2, 4).foo())

Tuve que pasar selfa mi función ya que tengo que acceder a los atributos y funciones de mi clase a través de esa función. Esto es trabajo para mí. Las correcciones y sugerencias son siempre bienvenidas.

Muhammad Hassan
fuente
0

Aquí hay una plantilla que escribí para usar Pool de multiprocesamiento en python3, específicamente python3.7.7 se usó para ejecutar las pruebas. Obtuve mis carreras más rápidas usando imap_unordered. Simplemente conecte su escenario y pruébelo. Puede usar timeito simplemente time.time()para descubrir cuál funciona mejor para usted.

import multiprocessing
import time

NUMBER_OF_PROCESSES = multiprocessing.cpu_count()
MP_FUNCTION = 'starmap'  # 'imap_unordered' or 'starmap' or 'apply_async'

def process_chunk(a_chunk):
    print(f"processig mp chunk {a_chunk}")
    return a_chunk


map_jobs = [1, 2, 3, 4]

result_sum = 0

s = time.time()
if MP_FUNCTION == 'imap_unordered':
    pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
    for i in pool.imap_unordered(process_chunk, map_jobs):
        result_sum += i
elif MP_FUNCTION == 'starmap':
    pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
    try:
        map_jobs = [(i, ) for i in map_jobs]
        result_sum = pool.starmap(process_chunk, map_jobs)
        result_sum = sum(result_sum)
    finally:
        pool.close()
        pool.join()
elif MP_FUNCTION == 'apply_async':
    with multiprocessing.Pool(processes=NUMBER_OF_PROCESSES) as pool:
        result_sum = [pool.apply_async(process_chunk, [i, ]).get() for i in map_jobs]
    result_sum = sum(result_sum)
print(f"result_sum is {result_sum}, took {time.time() - s}s")

En el escenario anterior, en imap_unorderedrealidad parece ser el peor para mí. Pruebe su estuche y compárelo con la máquina en la que planea ejecutarlo. Lea también sobre agrupaciones de procesos . ¡Salud!

radtek
fuente