Cuando ejecuto algo como:
from multiprocessing import Pool
p = Pool(5)
def f(x):
return x*x
p.map(f, [1,2,3])
funciona bien. Sin embargo, poniendo esto en función de una clase:
class calculate(object):
def run(self):
def f(x):
return x*x
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
Me da el siguiente error:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/sw/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
He visto una publicación de Alex Martelli que trata el mismo tipo de problema, pero no fue lo suficientemente explícito.
python
multiprocessing
pickle
Mermoz
fuente
fuente
IPython.Parallel
, pero allí podría solucionar el problema empujando los objetos a los nodos. Parece bastante molesto solucionar este problema con el multiprocesamiento.calculate
es estibables, por lo que parece que esto puede ser resuelto por 1) la creación de un objeto de función con un constructor que las copias más de unacalculate
instancia y luego 2) hacer pasar una instancia de este objeto de función dePool
'smap
método. ¿No?multiprocessing
módulo se deben a su objetivo de ser una implementación multiplataforma y a la falta de unafork(2)
llamada de sistema similar en Windows. Si no le importa el soporte de Win32, puede haber una solución alternativa más simple basada en procesos. O si está preparado para usar hilos en lugar de procesos, puede sustituirlosfrom multiprocessing import Pool
porfrom multiprocessing.pool import ThreadPool as Pool
.Respuestas:
También me molestaron las restricciones sobre qué tipo de funciones pool.map podría aceptar. Escribí lo siguiente para evitar esto. Parece funcionar, incluso para el uso recursivo de parmap.
fuente
No pude usar los códigos publicados hasta ahora porque los códigos que usan "multiprocesamiento.Pool" no funcionan con expresiones lambda y los códigos que no usan "multiprocesamiento.Pool" generan tantos procesos como elementos de trabajo.
Adapté el código st que genera una cantidad predefinida de trabajadores y solo itera por la lista de entrada si existe un trabajador inactivo. También habilité el modo "daemon" para los trabajadores st ctrl-c funciona como se esperaba.
fuente
parmap
función?(None, None)
como el último elemento indicafun
que ha llegado al final de la secuencia de elementos para cada proceso.El multiprocesamiento y el decapado están rotos y limitados a menos que salte fuera de la biblioteca estándar.
Si usa una bifurcación de
multiprocessing
llamadaspathos.multiprocesssing
, puede usar directamente clases y métodos de clase en lasmap
funciones de multiprocesamiento . Esto se debe a quedill
se usa en lugar depickle
ocPickle
, ydill
puede serializar casi cualquier cosa en Python.pathos.multiprocessing
también proporciona una función de mapa asíncrono ... y puedemap
funcionar con múltiples argumentos (por ejemplomap(math.pow, [1,2,3], [4,5,6])
)Ver discusiones: ¿Qué pueden hacer juntos el multiprocesamiento y el eneldo?
y: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization
Incluso maneja el código que escribió inicialmente, sin modificaciones, y del intérprete. ¿Por qué hacer algo más que sea más frágil y específico para un solo caso?
Obtenga el código aquí: https://github.com/uqfoundation/pathos
Y, solo para mostrar un poco más de lo que puede hacer:
fuente
amap
) que permite el uso de barras de progreso y otra programación asincrónica.Could not find a version that satisfies the requirement pp==1.5.7-pathos (from pathos)
multiprocess
es compatible con 2/3. Consulte stackoverflow.com/questions/27873093/… y pypi.python.org/pypi/multiprocess .pathos
ha tenido una nueva versión estable y también es compatible con 2.xy 3.x.Actualmente, hasta donde yo sé, no hay una solución a su problema: la función a la que
map()
debe acceder debe ser accesible mediante la importación de su módulo. Por eso funciona el código de Robert: la funciónf()
se puede obtener importando el siguiente código:De hecho, agregué una sección "principal", porque esto sigue las recomendaciones para la plataforma Windows ("Asegúrese de que el módulo principal pueda ser importado de manera segura por un nuevo intérprete de Python sin causar efectos secundarios no deseados").
También agregué una letra mayúscula delante
Calculate
para seguir PEP 8 . :)fuente
La solución de mrule es correcta pero tiene un error: si el niño envía una gran cantidad de datos, puede llenar el búfer de la tubería, bloqueando al niño
pipe.send()
, mientras el padre espera a que el niño salgapipe.join()
. La solución es leer los datos del niño antesjoin()
de hacerlo . Además, el niño debe cerrar el extremo del tubo de los padres para evitar un punto muerto. El siguiente código soluciona eso. También tenga en cuenta que estoparmap
crea un proceso por elemento enX
. Una solución más avanzada es usarmultiprocessing.cpu_count()
para dividirX
en varios trozos y luego combinar los resultados antes de regresar. Lo dejo como un ejercicio para el lector para no estropear la concisión de la buena respuesta de mrule. ;)fuente
OSError: [Errno 24] Too many open files
. Creo que debe haber algún tipo de límite en el número de procesos para que funcione correctamente ...También he luchado con esto. Tenía funciones como miembros de datos de una clase, como un ejemplo simplificado:
Necesitaba usar la función self.f en una llamada Pool.map () desde la misma clase y self.f no tomó una tupla como argumento. Como esta función estaba integrada en una clase, no tenía claro cómo escribir el tipo de contenedor que otras respuestas sugerían.
Resolví este problema usando un contenedor diferente que toma una tupla / lista, donde el primer elemento es la función, y los elementos restantes son los argumentos de esa función, llamada eval_func_tuple (f_args). Con esto, la línea problemática se puede reemplazar por return pool.map (eval_func_tuple, itertools.izip (itertools.repeat (self.f), list1, list2)). Aquí está el código completo:
Archivo: util.py
Archivo: main.py
Ejecutar main.py dará [11, 22, 33]. Siéntase libre de mejorar esto, por ejemplo eval_func_tuple también podría modificarse para tomar argumentos de palabras clave.
En otra nota, en otras respuestas, la función "parmap" puede hacerse más eficiente para el caso de más Procesos que el número de CPU disponibles. Estoy copiando una versión editada a continuación. Esta es mi primera publicación y no estaba seguro de si debería editar directamente la respuesta original. También cambié el nombre de algunas variables.
fuente
Tomé la respuesta de klaus se y aganders3 e hice un módulo documentado que es más legible y se mantiene en un solo archivo. Simplemente puede agregarlo a su proyecto. ¡Incluso tiene una barra de progreso opcional!
EDITAR : Se agregó la sugerencia @ alexander-mcfarlane y una función de prueba
fuente
join()
al mismo tiempo y obtendrá un destello100%
completo en latqdm
pantalla. El único momento en que será útil es si cada procesador tiene una carga de trabajo sesgadatqdm()
para cerrar la línea:result = [q_out.get() for _ in tqdm(sent)]
y funciona mucho mejor - gran esfuerzo, aunque realmente aprecio esto, así que +1Sé que esto se hizo hace más de 6 años, pero solo quería agregar mi solución, ya que algunas de las sugerencias anteriores parecen terriblemente complicadas, pero mi solución fue realmente muy simple.
Todo lo que tenía que hacer era ajustar la llamada pool.map () a una función auxiliar. Pasar el objeto de clase junto con args para el método como una tupla, que se parecía un poco a esto.
fuente
Las funciones definidas en las clases (incluso dentro de las funciones dentro de las clases) realmente no se enredan. Sin embargo, esto funciona:
fuente
Sé que esta pregunta se hizo hace 8 años y 10 meses, pero quiero presentarles mi solución:
Solo necesita hacer que su clase funcione en un método estático. Pero también es posible con un método de clase:
Probado en Python 3.7.3
fuente
Modifiqué el método de klaus se porque mientras funcionaba para mí con listas pequeñas, se bloqueaba cuando el número de elementos era ~ 1000 o mayor. En lugar de empujar los trabajos uno a la vez con la
None
condición de detención, cargo la cola de entrada de una vez y solo dejo que los procesos la procesen hasta que esté vacía.Editar: desafortunadamente, ahora me encuentro con este error en mi sistema: el límite de tamaño máximo de la cola de multiprocesamiento es 32767 , espero que las soluciones allí ayuden.
fuente
Puede ejecutar su código sin ningún problema si de alguna manera ignora manualmente el
Pool
objeto de la lista de objetos en la clase porque nopickle
puede, como dice el error. Puedes hacer esto con la__getstate__
función (mira aquí también) como sigue. ElPool
objetivo será tratar de encontrar los__getstate__
y las__setstate__
funciones y ejecutarlas si lo encuentra cuando se ejecutamap
,map_async
etc:Entonces hazlo:
te dará el resultado:
He probado el código anterior en Python 3.xy funciona.
fuente
No estoy seguro de si se ha tomado este enfoque, pero una solución que estoy usando es:
La salida debe ser:
fuente
Existe la posibilidad de que desee aplicar esta función para cada instancia diferente de la clase. Entonces aquí está la solución para eso también
fuente
Aquí está mi solución, que creo que es un poco menos agresiva que la mayoría de los demás. Es similar a la respuesta de Nightowl.
fuente
De http://www.rueckstiess.net/research/snippets/show/ca1d7d90 y http://qingkaikong.blogspot.com/2016/12/python-parallel-method-in-class.html
Podemos hacer una función externa y sembrarla con el objeto de clase:
O sin bolsa de trabajo:
fuente
Puede que esta no sea una muy buena solución, pero en mi caso, la resuelvo así.
Tuve que pasar
self
a mi función ya que tengo que acceder a los atributos y funciones de mi clase a través de esa función. Esto es trabajo para mí. Las correcciones y sugerencias son siempre bienvenidas.fuente
Aquí hay una plantilla que escribí para usar Pool de multiprocesamiento en python3, específicamente python3.7.7 se usó para ejecutar las pruebas. Obtuve mis carreras más rápidas usando
imap_unordered
. Simplemente conecte su escenario y pruébelo. Puede usartimeit
o simplementetime.time()
para descubrir cuál funciona mejor para usted.En el escenario anterior, en
imap_unordered
realidad parece ser el peor para mí. Pruebe su estuche y compárelo con la máquina en la que planea ejecutarlo. Lea también sobre agrupaciones de procesos . ¡Salud!fuente