Python multiprocessing PicklingError: Can't pickle <type 'function'>

243

Lamento no poder reproducir el error con un ejemplo más simple, y mi código es demasiado complicado para publicar. Si ejecuto el programa en el shell de IPython en lugar del Python normal, las cosas funcionan bien.

Busqué algunas notas anteriores sobre este problema. Todos fueron causados ​​por el uso de grupo para llamar a la función definida dentro de una función de clase. Pero este no es el caso para mí.

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Apreciaría cualquier ayuda.

Actualización : la función I pickle se define en el nivel superior del módulo. Aunque llama a una función que contiene una función anidada. es decir, f()llama a g()llamadas h()que tienen una función anidada i(), y estoy llamando pool.apply_async(f). f(), g(), h()Están todos definidos en el nivel superior. Intenté un ejemplo más simple con este patrón y funciona.

Vendetta
fuente
3
La respuesta de nivel superior / aceptada es buena, pero podría significar que necesita reestructurar su código, lo que puede ser doloroso. Recomendaría a cualquiera que tenga este problema que lea también las respuestas adicionales que utilizan dilly pathos. Sin embargo, no tuve suerte con ninguna de las soluciones cuando trabajo con vtkobjects :( ¿Alguien ha logrado ejecutar código python en procesamiento paralelo vtkPolyData?
Chris

Respuestas:

305

Aquí hay una lista de lo que se puede encurtir . En particular, las funciones solo son seleccionables si se definen en el nivel superior de un módulo.

Esta pieza de código:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

produce un error casi idéntico al publicado:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

El problema es que pooltodos los métodos usan a mp.SimpleQueuepara pasar tareas a los procesos de trabajo. Todo lo que pasa por el mp.SimpleQueuedebe ser seleccionable y foo.workno es seleccionable ya que no está definido en el nivel superior del módulo.

Se puede solucionar definiendo una función en el nivel superior, que llama foo.work():

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

Tenga en cuenta que fooes seleccionable, ya que Foose define en el nivel superior y foo.__dict__es seleccionable.

unutbu
fuente
2
Gracias por su respuesta. Actualicé mi pregunta. Sin embargo, no creo que esa sea la causa
Vendetta
77
Para obtener un PicklingError, se debe colocar algo en la Cola que no se puede seleccionar. Podría ser la función o sus argumentos. Para obtener más información sobre el problema, sugiero hacer una copia de su programa y comenzar a reducirlo, haciéndolo más y más simple, cada vez que vuelva a ejecutar el programa para ver si el problema persiste. Cuando se vuelva realmente simple, habrá descubierto el problema usted mismo o tendrá algo que puede publicar aquí.
unutbu
3
Además: si define una función en el nivel superior de un módulo, pero está decorada, entonces la referencia será a la salida del decorador, y obtendrá este error de todos modos.
bobpoekert
55
Solo tarde 5 años, pero me acabo de encontrar con esto. Resulta que el "nivel superior" debe tomarse más literalmente de lo habitual: me parece que la definición de la función tiene que preceder a la inicialización del grupo (es decir, la pool = Pool()línea aquí ). No esperaba eso, y esta podría ser la razón por la cual el problema de OP persistió.
Andras Deak
44
En particular, las funciones solo son seleccionables si se definen en el nivel superior de un módulo. Parece que el resultado de aplicar functool.partiala una función de nivel superior también se puede seleccionar, incluso si se define dentro de otra función.
user1071847
96

Yo usaría pathos.multiprocesssing, en lugar de multiprocessing. pathos.multiprocessingEs una bifurcación de multiprocessingque utiliza dill. dillpuede serializar casi cualquier cosa en python, por lo que puede enviar mucho más en paralelo. La pathosbifurcación también tiene la capacidad de trabajar directamente con múltiples funciones de argumento, como lo necesita para los métodos de clase.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

Obtenga pathos(y si lo desea dill) aquí: https://github.com/uqfoundation

Mike McKerns
fuente
55
trabajó una delicia. Para cualquier otra persona, instalé ambas bibliotecas a través de: sudo pip install git+https://github.com/uqfoundation/dill.git@masterysudo pip install git+https://github.com/uqfoundation/pathos.git@master
Alexander McFarlane
55
@AlexanderMcFarlane No instalaría paquetes de python sudo(especialmente de fuentes externas como github). En cambio, recomendaría ejecutar:pip install --user git+...
Chris
Usar simplemente pip install pathosno funciona tristemente y da este mensaje:Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos)
xApple
11
pip install pathosahora funciona y pathoses compatible con python 3.
Mike McKerns
3
@DanielGoldfarb: multiprocesses una bifurcación de multiprocessingdonde se dillha reemplazado pickleen varios lugares en el código ... pero esencialmente, eso es todo. pathosproporciona algunas capas API adicionales multiprocessy también tiene backends adicionales. Pero, eso es lo esencial.
Mike McKerns
29

Como han dicho otros, multiprocessingsolo se pueden transferir objetos de Python a procesos de trabajo que se pueden encurtir. Si no puede reorganizar su código como lo describe unutbu, puede usar dilllas capacidades extendidas de decapado / desempaquetado para transferir datos (especialmente datos de código) como se muestra a continuación.

Esta solución requiere solo la instalación de dilly no otras bibliotecas como pathos:

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()
Rockportrocker
fuente
66
Soy el autor dilly pathos... y si bien tienes razón, ¿no es mucho más agradable, más limpio y más flexible de usar pathosque en mi respuesta? O tal vez soy un poco parcial ...
Mike McKerns
44
No estaba al tanto del estado pathosen el momento de escribir y quería presentar una solución que esté muy cerca de la respuesta. Ahora que he visto su solución, estoy de acuerdo en que este es el camino a seguir.
Rocksportrocker
Leí tu solución y pensé, Doh… I didn't even think of doing it like that. así que fue genial.
Mike McKerns
44
Gracias por publicar, utilicé este enfoque para argumentos de dilling / undilling que no se pudieron encurtir: stackoverflow.com/questions/27883574/…
jazzblue
@rocksportrocker. Estoy leyendo este ejemplo y no puedo entender por qué hay un forbucle explícito . Normalmente vería que la rutina paralela toma una lista y devuelve una lista sin bucle.
user1700890
20

He descubierto que también puedo generar exactamente esa salida de error en un fragmento de código que funciona perfectamente al intentar usar el generador de perfiles en él.

Tenga en cuenta que esto fue en Windows (donde la bifurcación es un poco menos elegante).

Yo estaba corriendo:

python -m profile -o output.pstats <script> 

Y descubrió que al eliminar el perfil se eliminó el error y al colocar el perfil se restableció. También me estaba volviendo loco porque sabía que el código solía funcionar. Estaba comprobando si algo había actualizado pool.py ... luego tuve una sensación de hundimiento y eliminé el perfil y eso fue todo.

Publicar aquí para los archivos en caso de que alguien más se encuentre con él.

Ezekiel Kruglick
fuente
3
WOW, gracias por mencionar! Me volvió loco durante la última hora más o menos; Intenté todo con un ejemplo muy simple: nada parecía funcionar. Pero también tenía el perfilador ejecutándose en mi archivo por lotes :(
tim
1
Oh, no puedo agradecerte lo suficiente. Sin embargo, esto suena tan tonto, ya que es tan inesperado. Creo que debería mencionarse en los documentos. Todo lo que tenía era una declaración pdb de importación, y una simple función de nivel superior con solo una passno era 'pickle'.
0xc0de
10

Cuando surge este problema, multiprocessinguna solución simple es cambiar de Poola ThreadPool. Esto se puede hacer sin cambiar el código que no sea la importación

from multiprocessing.pool import ThreadPool as Pool

Esto funciona porque ThreadPool comparte memoria con el hilo principal, en lugar de crear un nuevo proceso, esto significa que no es necesario el decapado.

La desventaja de este método es que Python no es el mejor lenguaje para el manejo de subprocesos: utiliza algo llamado Bloqueo global del intérprete para mantenerse seguro, lo que puede ralentizar algunos casos de uso aquí. Sin embargo, si está interactuando principalmente con otros sistemas (ejecutando comandos HTTP, hablando con una base de datos, escribiendo en sistemas de archivos), es probable que su código no esté vinculado por la CPU y no reciba mucho impacto. De hecho, cuando escribí puntos de referencia HTTP / HTTPS, descubrí que el modelo de subprocesos utilizado aquí tiene menos sobrecarga y demoras, ya que la sobrecarga de crear nuevos procesos es mucho mayor que la sobrecarga para crear nuevos subprocesos.

Entonces, si está procesando un montón de cosas en el espacio de usuario de Python, este podría no ser el mejor método.

tedivm
fuente
2
Pero entonces solo estás usando una CPU (al menos con versiones regulares de Python que usan el GIL ), lo que de alguna manera frustra el propósito.
Endre Ambos
Eso realmente depende de cuál es el propósito. El Global Interpreter Lock significa que solo una instancia a la vez puede ejecutar código python, pero para acciones que bloquean fuertemente (acceso al sistema de archivos, descarga de archivos grandes o múltiples, ejecución de código externo), el GIL termina siendo un problema. En algunos casos, la sobrecarga de abrir nuevos procesos (en lugar de subprocesos) supera la sobrecarga de GIL.
tedivm
Eso es verdad, gracias. Aún así, es posible que desee incluir una advertencia en la respuesta. En estos días, cuando el aumento de la potencia de procesamiento se presenta principalmente en forma de núcleos de CPU más potentes que en núcleos más potentes, el cambio de ejecución multinúcleo a ejecución de núcleo único es un efecto secundario bastante significativo.
Endre Ambos
Buen punto: he actualizado la respuesta con más detalles. Sin embargo, quiero señalar que cambiar a multiprocesamiento roscado no hace que Python solo funcione en un solo núcleo.
tedivm
4

Esta solución requiere solo la instalación de eneldo y no otras bibliotecas como pathos

def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res


def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items

También funciona para matrices numpy.

Ilia w495 Nikitin
fuente
2
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Este error también se producirá si tiene alguna función incorporada dentro del objeto modelo que se pasó al trabajo asíncrono.

Así que asegúrese de verificar que los objetos del modelo que se pasan no tienen funciones incorporadas. (En nuestro caso, estábamos usando la FieldTracker()función de django-model-utils dentro del modelo para rastrear cierto campo). Aquí está el enlace a la cuestión relevante de GitHub.

Penkey Suresh
fuente
0

Basándose en la solución @rocksportrocker, tendría sentido hacer eneldo al enviar y RECUPERAR los resultados.

import dill
import itertools
def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    res = fun(*args)
    res = dill.dumps(res)
    return res

def dill_map_async(pool, fun, args_list,
                   as_tuple=True,
                   **kw):
    if as_tuple:
        args_list = ((x,) for x in args_list)

    it = itertools.izip(
        itertools.cycle([fun]),
        args_list)
    it = itertools.imap(dill.dumps, it)
    return pool.map_async(run_dill_encoded, it, **kw)

if __name__ == '__main__':
    import multiprocessing as mp
    import sys,os
    p = mp.Pool(4)
    res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
                  [lambda x:x+1]*10,)
    res = res.get(timeout=100)
    res = map(dill.loads,res)
    print(res)
debería ver
fuente