¿Mostrar el progreso de una llamada imap_unordered del grupo de multiprocesamiento de Python?

95

Tengo un script que está realizando con éxito un conjunto de tareas de grupo de multiprocesamiento con una imap_unordered()llamada:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

Sin embargo, mi num_taskses alrededor de 250,000, por lo que join()bloquea el hilo principal durante 10 segundos más o menos, y me gustaría poder hacer eco en la línea de comando de forma incremental para mostrar que el proceso principal no está bloqueado. Algo como:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(2)

¿Existe un método para el objeto de resultado o el grupo en sí que indique el número de tareas restantes? Intenté usar un multiprocessing.Valueobjeto como contador ( do_workllama a una counter.value += 1acción después de realizar su tarea), pero el contador solo llega a ~ 85% del valor total antes de dejar de incrementar.

MedianocheRayo
fuente

Respuestas:

80

No es necesario acceder a los atributos privados del conjunto de resultados:

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))
jfs
fuente
7
Veo la impresión solo después de la salida del código (no en todas las iteraciones). ¿Tienes una sugerencia?
Hanan Shteingart
@HananShteingart: Funciona bien en mi sistema (Ubuntu) con Python 2 y 3. Lo he usado def do_word(*a): time.sleep(.1)como ejemplo. Si no funciona para usted, cree un ejemplo de código mínimo completo que demuestre su problema: describa con palabras qué espera que suceda y qué sucede en su lugar, mencione cómo ejecuta su script de Python, cuál es su sistema operativo, versión de Python y publíquelo como una nueva pregunta .
jfs
13
Tuve el mismo problema que @HananShteingart: es porque estaba tratando de usar Pool.map(). No me di cuenta de eso solo imap() y imap_unordered()trabajo de esta manera: la documentación solo dice "Una versión más perezosa de map ()" pero en realidad significa "el iterador subyacente devuelve resultados a medida que ingresan".
simonmacmullen
@simonmacmullen: tanto la pregunta como mi respuesta usan imap_unordered(). El problema de Hanan probablemente se deba a sys.stderr.write('\r..')(sobrescribir la misma línea para mostrar el progreso).
jfs
2
¡También es posible! Principalmente quería documentar una suposición estúpida que había hecho, en caso de que alguien más leyendo esto también lo hiciera.
Simonmacmullen
94

Mi favorito personal: le brinda una pequeña barra de progreso y finalización ETA mientras las cosas se ejecutan y se comprometen en paralelo.

from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
    pass
Tim
fuente
64
¿Qué pasa si la piscina devuelve un valor?
Nickpick
11
Creé una lista vacía llamada resultado antes del ciclo y luego dentro del ciclo simplemente haz resultado.append (x). Probé esto con 2 procesos y usé imap en lugar de map y todo funcionó como quería @nickpick
bs7280
2
entonces mi barra de progreso está iterando a nuevas líneas en lugar de progresar en el lugar, ¿alguna idea de por qué podría ser esto?
Austin
2
no te olvides depip install tqdm
Mr. T
3
@ bs7280 Por result.append (x) ¿quiso decir result.append (_)? ¿Qué es x?
jason
27

Encontré que el trabajo ya estaba hecho cuando intenté verificar su progreso. Esto es lo que funcionó para mí usando tqdm .

pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm

tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))

def do_work(x):
    # do something with x
    pbar.update(1)

pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()

Esto debería funcionar con todos los tipos de multiprocesamiento, ya sea que bloqueen o no.

reubano
fuente
4
Creo que crea un montón de hilos, y cada hilo cuenta de forma independiente
nburn42
1
Tengo funciones dentro de funciones que resultan en un error de decapado.
ojunk
21

Encontrado una respuesta a mí mismo con un poco más de la excavación: Echando un vistazo a la __dict__del imap_unorderedobjeto de resultado, descubrí que tiene un _indexatributo que incrementos con cada finalización de la tarea. Entonces esto funciona para el registro, envuelto en el whileciclo:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  completed = rs._index
  if (completed == num_tasks): break
  print "Waiting for", num_tasks-completed, "tasks to complete..."
  time.sleep(2)

Sin embargo, encontré que cambiar el imap_unorderedpor a map_asyncresultó en una ejecución mucho más rápida, aunque el objeto de resultado es un poco diferente. En cambio, el objeto de resultado de map_asynctiene un _number_leftatributo y un ready()método:

p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  if (rs.ready()): break
  remaining = rs._number_left
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(0.5)
MedianocheRayo
fuente
3
Probé esto para Python 2.7.6 y rs._number_left parece ser el número de fragmentos restantes. Entonces, si rs._chunksize no es 1, entonces rs._number_left no será el número de elementos de lista restantes.
Allen
¿Dónde debo poner este código? Quiero decir que esto no se ejecuta hasta que rsse conoce el contenido y es un poco tarde o no.
Wakan Tanka
@WakanTanka: Se incluye en el script principal después de que salga de los hilos adicionales. En mi ejemplo original, va en el ciclo "while", donde rsya se han lanzado los otros hilos.
MidnightLightning
1
¿Podría editar su pregunta y / o respuesta para mostrar un ejemplo de trabajo mínimo? No veo rsen ningún bucle, soy novato multiprocesador y esto ayudaría. Muchas gracias.
Wakan Tanka
1
Al menos en python 3.5, la solución que utiliza _number_leftno funciona. _number_leftrepresenta los trozos que quedan por procesar. Por ejemplo, si quiero que se pasen 50 elementos a mi función en paralelo, entonces, para un grupo de subprocesos con 3 procesos, _map_async()crea 10 fragmentos con 5 elementos cada uno. _number_leftluego representa cuántos de estos fragmentos se han completado.
mSSM
9

Sé que esta es una pregunta bastante antigua, pero esto es lo que hago cuando quiero realizar un seguimiento de la progresión de un grupo de tareas en Python.

from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep

def my_function(letter):
    sleep(2)
    return letter+letter

dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)

results = []

pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()

r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]

while len(results) != len(dummy_args):
    pbar.update(len(results))
    sleep(0.5)
pbar.finish()

print results

Básicamente, usa apply_async con un callbak (en este caso, es para agregar el valor devuelto a una lista), por lo que no tiene que esperar para hacer otra cosa. Luego, dentro de un ciclo while, verifica la progresión del trabajo. En este caso, agregué un widget para que se vea mejor.

La salida:

4 of 4                                                                         
['AA', 'BB', 'CC', 'DD']

Espero eso ayude.

Julien Tourille
fuente
tengo que cambiar: [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]para(pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args)
David Przybilla
Eso no es cierto. Un objeto generador no funcionará aquí. Comprobado.
Swagatam
9

Como lo sugirió Tim, puede usar tqdmy imappara resolver este problema. Me encontré con este problema y modifiqué la imap_unorderedsolución para poder acceder a los resultados del mapeo. Así es como funciona:

from multiprocessing import Pool
import tqdm

pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))

En caso de que no le importen los valores devueltos de sus trabajos, no necesita asignar la lista a ninguna variable.

mrapacz
fuente
4

para cualquiera que busque una solución sencilla que trabaje con Pool.apply_async():

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep


def work(x):
    sleep(0.5)
    return x**2

n = 10

p = Pool(4)
pbar = tqdm(total=n)
res = [p.apply_async(work, args=(
    i,), callback=lambda _: pbar.update(1)) for i in range(n)]
results = [p.get() for p in res]
Zewoas
fuente
3

Creé una clase personalizada para crear una impresión de progreso. Maby esto ayuda:

from multiprocessing import Pool, cpu_count


class ParallelSim(object):
    def __init__(self, processes=cpu_count()):
        self.pool = Pool(processes=processes)
        self.total_processes = 0
        self.completed_processes = 0
        self.results = []

    def add(self, func, args):
        self.pool.apply_async(func=func, args=args, callback=self.complete)
        self.total_processes += 1

    def complete(self, result):
        self.results.extend(result)
        self.completed_processes += 1
        print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))

    def run(self):
        self.pool.close()
        self.pool.join()

    def get_results(self):
        return self.results
Aronstef
fuente
1

Pruebe este sencillo enfoque basado en colas, que también se puede utilizar con la agrupación. Tenga en cuenta que imprimir cualquier cosa después del inicio de la barra de progreso hará que se mueva, al menos para esta barra de progreso en particular. (Progreso de PyPI 1.5)

import time
from progress.bar import Bar

def status_bar( queue_stat, n_groups, n ):

    bar = Bar('progress', max = n)  

    finished = 0
    while finished < n_groups:

        while queue_stat.empty():
            time.sleep(0.01)

        gotten = queue_stat.get()
        if gotten == 'finished':
            finished += 1
        else:
            bar.next()
    bar.finish()


def process_data( queue_data, queue_stat, group):

    for i in group:

        ... do stuff resulting in new_data

        queue_stat.put(1)

    queue_stat.put('finished')  
    queue_data.put(new_data)

def multiprocess():

    new_data = []

    groups = [[1,2,3],[4,5,6],[7,8,9]]
    combined = sum(groups,[])

    queue_data = multiprocessing.Queue()
    queue_stat = multiprocessing.Queue()

    for i, group in enumerate(groups): 

        if i == 0:

            p = multiprocessing.Process(target = status_bar,
                args=(queue_stat,len(groups),len(combined)))
                processes.append(p)
                p.start()

        p = multiprocessing.Process(target = process_data,
        args=(queue_data, queue_stat, group))
        processes.append(p)
        p.start()

    for i in range(len(groups)):
        data = queue_data.get() 
        new_data += data

    for p in processes:
        p.join()
Mott la tupla
fuente