Tengo un script que está realizando con éxito un conjunto de tareas de grupo de multiprocesamiento con una imap_unordered()
llamada:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion
Sin embargo, mi num_tasks
es alrededor de 250,000, por lo que join()
bloquea el hilo principal durante 10 segundos más o menos, y me gustaría poder hacer eco en la línea de comando de forma incremental para mostrar que el proceso principal no está bloqueado. Algo como:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
if (remaining == 0): break # Jump out of while loop
print "Waiting for", remaining, "tasks to complete..."
time.sleep(2)
¿Existe un método para el objeto de resultado o el grupo en sí que indique el número de tareas restantes? Intenté usar un multiprocessing.Value
objeto como contador ( do_work
llama a una counter.value += 1
acción después de realizar su tarea), pero el contador solo llega a ~ 85% del valor total antes de dejar de incrementar.
fuente
def do_word(*a): time.sleep(.1)
como ejemplo. Si no funciona para usted, cree un ejemplo de código mínimo completo que demuestre su problema: describa con palabras qué espera que suceda y qué sucede en su lugar, mencione cómo ejecuta su script de Python, cuál es su sistema operativo, versión de Python y publíquelo como una nueva pregunta .Pool.map()
. No me di cuenta de eso soloimap()
yimap_unordered()
trabajo de esta manera: la documentación solo dice "Una versión más perezosa de map ()" pero en realidad significa "el iterador subyacente devuelve resultados a medida que ingresan".imap_unordered()
. El problema de Hanan probablemente se deba asys.stderr.write('\r..')
(sobrescribir la misma línea para mostrar el progreso).Mi favorito personal: le brinda una pequeña barra de progreso y finalización ETA mientras las cosas se ejecutan y se comprometen en paralelo.
fuente
pip install tqdm
Encontré que el trabajo ya estaba hecho cuando intenté verificar su progreso. Esto es lo que funcionó para mí usando tqdm .
pip install tqdm
Esto debería funcionar con todos los tipos de multiprocesamiento, ya sea que bloqueen o no.
fuente
Encontrado una respuesta a mí mismo con un poco más de la excavación: Echando un vistazo a la
__dict__
delimap_unordered
objeto de resultado, descubrí que tiene un_index
atributo que incrementos con cada finalización de la tarea. Entonces esto funciona para el registro, envuelto en elwhile
ciclo:Sin embargo, encontré que cambiar el
imap_unordered
por amap_async
resultó en una ejecución mucho más rápida, aunque el objeto de resultado es un poco diferente. En cambio, el objeto de resultado demap_async
tiene un_number_left
atributo y unready()
método:fuente
rs
se conoce el contenido y es un poco tarde o no.rs
ya se han lanzado los otros hilos.rs
en ningún bucle, soy novato multiprocesador y esto ayudaría. Muchas gracias.python 3.5
, la solución que utiliza_number_left
no funciona._number_left
representa los trozos que quedan por procesar. Por ejemplo, si quiero que se pasen 50 elementos a mi función en paralelo, entonces, para un grupo de subprocesos con 3 procesos,_map_async()
crea 10 fragmentos con 5 elementos cada uno._number_left
luego representa cuántos de estos fragmentos se han completado.Sé que esta es una pregunta bastante antigua, pero esto es lo que hago cuando quiero realizar un seguimiento de la progresión de un grupo de tareas en Python.
Básicamente, usa apply_async con un callbak (en este caso, es para agregar el valor devuelto a una lista), por lo que no tiene que esperar para hacer otra cosa. Luego, dentro de un ciclo while, verifica la progresión del trabajo. En este caso, agregué un widget para que se vea mejor.
La salida:
Espero eso ayude.
fuente
[pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]
para(pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args)
Como lo sugirió Tim, puede usar
tqdm
yimap
para resolver este problema. Me encontré con este problema y modifiqué laimap_unordered
solución para poder acceder a los resultados del mapeo. Así es como funciona:En caso de que no le importen los valores devueltos de sus trabajos, no necesita asignar la lista a ninguna variable.
fuente
para cualquiera que busque una solución sencilla que trabaje con
Pool.apply_async()
:fuente
Creé una clase personalizada para crear una impresión de progreso. Maby esto ayuda:
fuente
Pruebe este sencillo enfoque basado en colas, que también se puede utilizar con la agrupación. Tenga en cuenta que imprimir cualquier cosa después del inicio de la barra de progreso hará que se mueva, al menos para esta barra de progreso en particular. (Progreso de PyPI 1.5)
fuente