Cuando ejecuto algo como:
from multiprocessing import Pool
p = Pool(5)
def f(x):
return x*x
p.map(f, [1,2,3])
funciona bien. Sin embargo, poniendo esto en función de una clase:
class calculate(object):
def run(self):
def f(x):
return x*x
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
Me da el siguiente error:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/sw/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
He visto una publicación de Alex Martelli que trata el mismo tipo de problema, pero no fue lo suficientemente explícito.
python
multiprocessing
pickle
Mermoz
fuente
fuente

IPython.Parallel, pero allí podría solucionar el problema empujando los objetos a los nodos. Parece bastante molesto solucionar este problema con el multiprocesamiento.calculatees estibables, por lo que parece que esto puede ser resuelto por 1) la creación de un objeto de función con un constructor que las copias más de unacalculateinstancia y luego 2) hacer pasar una instancia de este objeto de función dePool'smapmétodo. ¿No?multiprocessingmódulo se deben a su objetivo de ser una implementación multiplataforma y a la falta de unafork(2)llamada de sistema similar en Windows. Si no le importa el soporte de Win32, puede haber una solución alternativa más simple basada en procesos. O si está preparado para usar hilos en lugar de procesos, puede sustituirlosfrom multiprocessing import Poolporfrom multiprocessing.pool import ThreadPool as Pool.Respuestas:
También me molestaron las restricciones sobre qué tipo de funciones pool.map podría aceptar. Escribí lo siguiente para evitar esto. Parece funcionar, incluso para el uso recursivo de parmap.
fuente
No pude usar los códigos publicados hasta ahora porque los códigos que usan "multiprocesamiento.Pool" no funcionan con expresiones lambda y los códigos que no usan "multiprocesamiento.Pool" generan tantos procesos como elementos de trabajo.
Adapté el código st que genera una cantidad predefinida de trabajadores y solo itera por la lista de entrada si existe un trabajador inactivo. También habilité el modo "daemon" para los trabajadores st ctrl-c funciona como se esperaba.
fuente
parmapfunción?(None, None)como el último elemento indicafunque ha llegado al final de la secuencia de elementos para cada proceso.El multiprocesamiento y el decapado están rotos y limitados a menos que salte fuera de la biblioteca estándar.
Si usa una bifurcación de
multiprocessingllamadaspathos.multiprocesssing, puede usar directamente clases y métodos de clase en lasmapfunciones de multiprocesamiento . Esto se debe a quedillse usa en lugar depickleocPickle, ydillpuede serializar casi cualquier cosa en Python.pathos.multiprocessingtambién proporciona una función de mapa asíncrono ... y puedemapfuncionar con múltiples argumentos (por ejemplomap(math.pow, [1,2,3], [4,5,6]))Ver discusiones: ¿Qué pueden hacer juntos el multiprocesamiento y el eneldo?
y: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization
Incluso maneja el código que escribió inicialmente, sin modificaciones, y del intérprete. ¿Por qué hacer algo más que sea más frágil y específico para un solo caso?
Obtenga el código aquí: https://github.com/uqfoundation/pathos
Y, solo para mostrar un poco más de lo que puede hacer:
fuente
amap) que permite el uso de barras de progreso y otra programación asincrónica.Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos)multiprocesses compatible con 2/3. Consulte stackoverflow.com/questions/27873093/… y pypi.python.org/pypi/multiprocess .pathosha tenido una nueva versión estable y también es compatible con 2.xy 3.x.Actualmente, hasta donde yo sé, no hay una solución a su problema: la función a la que
map()debe acceder debe ser accesible mediante la importación de su módulo. Por eso funciona el código de Robert: la funciónf()se puede obtener importando el siguiente código:De hecho, agregué una sección "principal", porque esto sigue las recomendaciones para la plataforma Windows ("Asegúrese de que el módulo principal pueda ser importado de manera segura por un nuevo intérprete de Python sin causar efectos secundarios no deseados").
También agregué una letra mayúscula delante
Calculatepara seguir PEP 8 . :)fuente
La solución de mrule es correcta pero tiene un error: si el niño envía una gran cantidad de datos, puede llenar el búfer de la tubería, bloqueando al niño
pipe.send(), mientras el padre espera a que el niño salgapipe.join(). La solución es leer los datos del niño antesjoin()de hacerlo . Además, el niño debe cerrar el extremo del tubo de los padres para evitar un punto muerto. El siguiente código soluciona eso. También tenga en cuenta que estoparmapcrea un proceso por elemento enX. Una solución más avanzada es usarmultiprocessing.cpu_count()para dividirXen varios trozos y luego combinar los resultados antes de regresar. Lo dejo como un ejercicio para el lector para no estropear la concisión de la buena respuesta de mrule. ;)fuente
OSError: [Errno 24] Too many open files. Creo que debe haber algún tipo de límite en el número de procesos para que funcione correctamente ...También he luchado con esto. Tenía funciones como miembros de datos de una clase, como un ejemplo simplificado:
Necesitaba usar la función self.f en una llamada Pool.map () desde la misma clase y self.f no tomó una tupla como argumento. Como esta función estaba integrada en una clase, no tenía claro cómo escribir el tipo de contenedor que otras respuestas sugerían.
Resolví este problema usando un contenedor diferente que toma una tupla / lista, donde el primer elemento es la función, y los elementos restantes son los argumentos de esa función, llamada eval_func_tuple (f_args). Con esto, la línea problemática se puede reemplazar por return pool.map (eval_func_tuple, itertools.izip (itertools.repeat (self.f), list1, list2)). Aquí está el código completo:
Archivo: util.py
Archivo: main.py
Ejecutar main.py dará [11, 22, 33]. Siéntase libre de mejorar esto, por ejemplo eval_func_tuple también podría modificarse para tomar argumentos de palabras clave.
En otra nota, en otras respuestas, la función "parmap" puede hacerse más eficiente para el caso de más Procesos que el número de CPU disponibles. Estoy copiando una versión editada a continuación. Esta es mi primera publicación y no estaba seguro de si debería editar directamente la respuesta original. También cambié el nombre de algunas variables.
fuente
Tomé la respuesta de klaus se y aganders3 e hice un módulo documentado que es más legible y se mantiene en un solo archivo. Simplemente puede agregarlo a su proyecto. ¡Incluso tiene una barra de progreso opcional!
EDITAR : Se agregó la sugerencia @ alexander-mcfarlane y una función de prueba
fuente
join()al mismo tiempo y obtendrá un destello100%completo en latqdmpantalla. El único momento en que será útil es si cada procesador tiene una carga de trabajo sesgadatqdm()para cerrar la línea:result = [q_out.get() for _ in tqdm(sent)]y funciona mucho mejor - gran esfuerzo, aunque realmente aprecio esto, así que +1Sé que esto se hizo hace más de 6 años, pero solo quería agregar mi solución, ya que algunas de las sugerencias anteriores parecen terriblemente complicadas, pero mi solución fue realmente muy simple.
Todo lo que tenía que hacer era ajustar la llamada pool.map () a una función auxiliar. Pasar el objeto de clase junto con args para el método como una tupla, que se parecía un poco a esto.
fuente
Las funciones definidas en las clases (incluso dentro de las funciones dentro de las clases) realmente no se enredan. Sin embargo, esto funciona:
fuente
Sé que esta pregunta se hizo hace 8 años y 10 meses, pero quiero presentarles mi solución:
Solo necesita hacer que su clase funcione en un método estático. Pero también es posible con un método de clase:
Probado en Python 3.7.3
fuente
Modifiqué el método de klaus se porque mientras funcionaba para mí con listas pequeñas, se bloqueaba cuando el número de elementos era ~ 1000 o mayor. En lugar de empujar los trabajos uno a la vez con la
Nonecondición de detención, cargo la cola de entrada de una vez y solo dejo que los procesos la procesen hasta que esté vacía.Editar: desafortunadamente, ahora me encuentro con este error en mi sistema: el límite de tamaño máximo de la cola de multiprocesamiento es 32767 , espero que las soluciones allí ayuden.
fuente
Puede ejecutar su código sin ningún problema si de alguna manera ignora manualmente el
Poolobjeto de la lista de objetos en la clase porque nopicklepuede, como dice el error. Puedes hacer esto con la__getstate__función (mira aquí también) como sigue. ElPoolobjetivo será tratar de encontrar los__getstate__y las__setstate__funciones y ejecutarlas si lo encuentra cuando se ejecutamap,map_asyncetc:Entonces hazlo:
te dará el resultado:
He probado el código anterior en Python 3.xy funciona.
fuente
No estoy seguro de si se ha tomado este enfoque, pero una solución que estoy usando es:
La salida debe ser:
fuente
Existe la posibilidad de que desee aplicar esta función para cada instancia diferente de la clase. Entonces aquí está la solución para eso también
fuente
Aquí está mi solución, que creo que es un poco menos agresiva que la mayoría de los demás. Es similar a la respuesta de Nightowl.
fuente
De http://www.rueckstiess.net/research/snippets/show/ca1d7d90 y http://qingkaikong.blogspot.com/2016/12/python-parallel-method-in-class.html
Podemos hacer una función externa y sembrarla con el objeto de clase:
O sin bolsa de trabajo:
fuente
Puede que esta no sea una muy buena solución, pero en mi caso, la resuelvo así.
Tuve que pasar
selfa mi función ya que tengo que acceder a los atributos y funciones de mi clase a través de esa función. Esto es trabajo para mí. Las correcciones y sugerencias son siempre bienvenidas.fuente
Aquí hay una plantilla que escribí para usar Pool de multiprocesamiento en python3, específicamente python3.7.7 se usó para ejecutar las pruebas. Obtuve mis carreras más rápidas usando
imap_unordered. Simplemente conecte su escenario y pruébelo. Puede usartimeito simplementetime.time()para descubrir cuál funciona mejor para usted.En el escenario anterior, en
imap_unorderedrealidad parece ser el peor para mí. Pruebe su estuche y compárelo con la máquina en la que planea ejecutarlo. Lea también sobre agrupaciones de procesos . ¡Salud!fuente