No se puede encurtir <tipo 'instanciametodo'> cuando se utiliza el multiprocesamiento Pool.map ()

218

Estoy tratando de usar multiprocessingla Pool.map()función para dividir el trabajo simultáneamente. Cuando uso el siguiente código, funciona bien:

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()

Sin embargo, cuando lo uso en un enfoque más orientado a objetos, no funciona. El mensaje de error que da es:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

Esto ocurre cuando el siguiente es mi programa principal:

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()

y la siguiente es mi someClassclase:

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))

¿Alguien sabe cuál podría ser el problema, o una forma fácil de solucionarlo?

ventolin
fuente
44
si f es una función anidada, hay un error similarPicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed
ggg

Respuestas:

122

El problema es que el multiprocesamiento debe enredar las cosas para distribuirlas entre los procesos, y los métodos enlazados no son seleccionables. La solución (ya sea que lo consideres "fácil" o no ;-) es agregar la infraestructura a tu programa para permitir que se enreden tales métodos, registrándolos con el método de biblioteca estándar copy_reg .

Por ejemplo, la contribución de Steven Bethard a este hilo (hacia el final del hilo) muestra un enfoque perfectamente viable para permitir el método de decapado / desescamado vía copy_reg.

Alex Martelli
fuente
Eso es genial, Gracias. Parece haber progresado de alguna manera, de todos modos: Usando el código en pastebin.ca/1693348 ahora obtengo un RuntimeError: excedió la profundidad máxima de recursión. Miré a mi alrededor y una publicación en el foro recomendó aumentar la profundidad máxima a 1500 (desde la predeterminada 1000), pero no tuve alegría allí. Para ser honesto, no puedo ver qué parte (de mi código, al menos) podría estar recurriendo fuera de control, a menos que, por alguna razón, el código se enrede y desenrolle en un bucle, debido a los pequeños cambios que hice para hacer Código de Steven OO'd?
ventolin
1
Sus _pickle_methoddevoluciones self._unpickle_method, un método encuadernado; así que, por supuesto, pepinillo ahora trata de encurtir ESO, y hace lo que le pediste: llamando _pickle_method, recursivamente. Es decir, al OOusar el código de esta manera, inevitablemente ha introducido una recursión infinita. Sugiero volver al código de Steven (y no adorar en el altar de OO cuando no sea apropiado: muchas cosas en Python se hacen mejor de una manera más funcional, y esta es una).
Alex Martelli, el
15
Para el súper súper vago , vea la única respuesta que se molestó en publicar el código real no destrozado ...
Cerin
2
Otra forma de solucionar / eludir el problema del decapado es usando eneldo, mira mi respuesta stackoverflow.com/questions/8804830/…
rocksportrocker
74

Todas estas soluciones son feas porque el multiprocesamiento y el decapado están rotos y limitados a menos que salte fuera de la biblioteca estándar.

Si usa una bifurcación de multiprocessingllamadas pathos.multiprocesssing, puede usar directamente clases y métodos de clase en las mapfunciones de multiprocesamiento . Esto se debe a que dillse usa en lugar de pickleo cPickle, y dillpuede serializar casi cualquier cosa en Python.

pathos.multiprocessingtambién proporciona una función de mapa asíncrono ... y puede mapfuncionar con múltiples argumentos (por ejemplo map(math.pow, [1,2,3], [4,5,6]))

Ver: ¿Qué pueden hacer juntos el multiprocesamiento y el eneldo?

y: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

Y para ser explícito, puede hacer exactamente lo que quería hacer en primer lugar, y puede hacerlo desde el intérprete, si así lo desea.

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

Obtenga el código aquí: https://github.com/uqfoundation/pathos

Mike McKerns
fuente
3
¿Puede actualizar esta respuesta basada en pathos.pp porque pathos.multiprocessing ya no existe?
Saheel Godhane
10
Soy el pathosautor La versión a la que te refieres tiene varios años. Pruebe la versión en github, puede usar pathos.ppo github.com/uqfoundation/ppft .
Mike McKerns
1
o github.com/uqfoundation/pathos . @SaheelGodhane: Hace mucho tiempo que se lanzará una nueva versión, pero debería salir pronto.
Mike McKerns
3
Primero pip install setuptools, entonces pip install git+https://github.com/uqfoundation/pathos.git@master. Esto obtendrá las dependencias apropiadas. Una nueva versión está casi lista ... ahora casi todo pathostambién se ejecuta en Windows, y es 3.xcompatible.
Mike McKerns
1
@Rika: Sí Los mapas de bloqueo, iterativos y asíncronos están disponibles.
Mike McKerns
35

También puede definir un __call__()método dentro de su someClass(), que llama someClass.go()y luego pasa una instancia someClass()al grupo. Este objeto es pickleable y funciona bien (para mí) ...

dorvak
fuente
3
Esto es mucho más fácil que la técnica propuesta por Alex Martelli, pero está limitado a enviar solo un método por clase a su grupo de multiprocesamiento.
desuso el
66
Otro detalle a tener en cuenta es que solo se encuadra el objeto (instancia de clase), no la clase en sí. Por lo tanto, si ha cambiado los atributos de clase de sus valores predeterminados, estos cambios no se propagarán a los diferentes procesos. La solución alternativa es asegurarse de que todo lo que necesita su función se almacene como un atributo de instancia.
desaprobado el
2
@dorvak, ¿podría mostrar un ejemplo simple con __call__()? Creo que su respuesta podría ser la más limpia: estoy luchando por comprender este error, y la primera vez que vengo a ver la llamada. Por cierto, también esta respuesta ayuda a aclarar qué hace el multiprocesamiento: [ stackoverflow.com/a/20789937/305883]
user305883
1
¿Puedes dar un ejemplo de esto?
frmsaul
1
Hay una nueva respuesta publicada (actualmente debajo de esta) con un código de ejemplo para esto.
Aaron
22

Algunas limitaciones a la solución de Steven Bethard:

Cuando registra su método de clase como una función, sorprendentemente se llama al destructor de su clase cada vez que finaliza el procesamiento de su método. Entonces, si tiene 1 instancia de su clase que llama n veces su método, los miembros pueden desaparecer entre 2 ejecuciones y puede obtener un mensaje malloc: *** error for object 0x...: pointer being freed was not allocated(por ejemplo, un archivo de miembro abierto) o pure virtual method called, terminate called without an active exception(lo que significa que la vida útil de un objeto miembro que usé fue más corta que lo que pensé). Obtuve esto cuando trato con n mayor que el tamaño del grupo. Aquí hay un breve ejemplo:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

Salida:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

El __call__método no es tan equivalente, porque [Ninguno, ...] se lee de los resultados:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

Entonces ninguno de los dos métodos es satisfactorio ...

Eric H.
fuente
77
Se obtiene Nonede vuelta porque su definición __call__no se encuentra el return: debe ser return self.process_obj(i).
torek
1
@Eric Recibía el mismo error y probé esta solución, sin embargo, comencé a recibir un nuevo error como "cPickle.PicklingError: No se puede encurtir <tipo 'función'>: búsqueda de atributos incorporada .función fallida". ¿Sabes cuál puede ser una razón probable detrás de esto?
Naman
15

Hay otro atajo que puede usar, aunque puede ser ineficiente dependiendo de lo que haya en las instancias de su clase.

Como todos han dicho, el problema es que el multiprocessingcódigo tiene que seleccionar las cosas que envía a los subprocesos que ha iniciado, y el selector no hace métodos de instancia.

Sin embargo, en lugar de enviar el método de instancia, puede enviar la instancia de clase real, más el nombre de la función a llamar, a una función ordinaria que luego usa getattrpara llamar al método de instancia, creando así el método enlazado en el Poolsubproceso. Esto es similar a definir un __call__método, excepto que puede llamar a más de una función miembro.

Robando el código de @ EricH. De su respuesta y anotándolo un poco (lo reescribí, por lo tanto, todos los cambios de nombre y demás, por alguna razón, esto parecía más fácil que cortar y pegar :-)) para ilustrar toda la magia:

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

El resultado muestra que, de hecho, el constructor se llama una vez (en el pid original) y el destructor se llama 9 veces (una vez por cada copia realizada = 2 o 3 veces por proceso de grupo-trabajador según sea necesario, más una vez en el original proceso). Esto a menudo está bien, como en este caso, ya que el selector predeterminado hace una copia de toda la instancia y la rellena (semi) en secreto, en este caso, haciendo:

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})

Por eso, aunque el destructor se llama ocho veces en los tres procesos de trabajo, cuenta de 1 a 0 cada vez, pero, por supuesto, aún puede meterse en problemas de esta manera. Si es necesario, puede proporcionar su propio __setstate__:

    def __setstate__(self, adict):
        self.count = adict['count']

en este caso por ejemplo.

torek
fuente
1
Esta es, con mucho, la mejor respuesta para este problema, ya que es la más fácil de aplicar al comportamiento predeterminado no apto para encurtidos
Matt Taylor
12

También puede definir un __call__()método dentro de su someClass(), que llama someClass.go()y luego pasa una instancia someClass()al grupo. Este objeto es pickleable y funciona bien (para mí) ...

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):   
     return self.f(x)

sc = someClass()
sc.go()
Parisjohn
fuente
3

La solución de parisjohn anterior funciona bien conmigo. Además, el código se ve limpio y fácil de entender. En mi caso, hay algunas funciones para llamar usando Pool, por lo que modifiqué el código de parisjohn un poco más abajo. Hice una llamada para poder llamar a varias funciones, y los nombres de las funciones se pasan en el argumento dict de go():

from multiprocessing import Pool
class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def g(self, x):
        return x*x+1    

    def go(self):
        p = Pool(4)
        sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
        print sc

    def __call__(self, x):
        if x["func"]=="f":
            return self.f(x["v"])
        if x["func"]=="g":
            return self.g(x["v"])        

sc = someClass()
sc.go()
neobot
fuente
1

Una solución potencialmente trivial para esto es cambiar a usar multiprocessing.dummy . Esta es una implementación basada en hilos de la interfaz de multiprocesamiento que no parece tener este problema en Python 2.7. No tengo mucha experiencia aquí, pero este cambio rápido de importación me permitió llamar a apply_async en un método de clase.

Algunos buenos recursos sobre multiprocessing.dummy:

https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one-line/

David Parks
fuente
1

En este caso simple, donde someClass.fno se hereda ningún dato de la clase y no se adjunta nada a la clase, una posible solución sería separarlo f, para que se pueda encurtir:

import multiprocessing


def f(x):
    return x*x


class someClass(object):
    def __init__(self):
        pass

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(f, range(10))
mhh
fuente
1

¿Por qué no utilizar funciones separadas?

def func(*args, **kwargs):
    return inst.method(args, kwargs)

print pool.map(func, arr)
0script0
fuente
1

Me encontré con este mismo problema, pero descubrí que hay un codificador JSON que se puede usar para mover estos objetos entre procesos.

from pyVmomi.VmomiSupport import VmomiJSONEncoder

Use esto para crear su lista:

jsonSerialized = json.dumps(pfVmomiObj, cls=VmomiJSONEncoder)

Luego, en la función asignada, use esto para recuperar el objeto:

pfVmomiObj = json.loads(jsonSerialized)
Jorge
fuente
0

Actualización: a partir del día de este escrito, las Tuplas nombradas son seleccionables (comenzando con Python 2.7)

El problema aquí es que los procesos secundarios no pueden importar la clase del objeto -en este caso, la clase P-, en el caso de un proyecto de modelos múltiples, la Clase P debería ser importable en cualquier lugar donde se use el proceso secundario

una solución rápida es hacer que sea importante al afectarlo a los globales ()

globals()["P"] = P
rachid el kedmiri
fuente