Multiprocesamiento: use tqdm para mostrar una barra de progreso

103

Para hacer que mi código sea más "pitónico" y más rápido, utilizo "multiprocesamiento" y una función de mapa para enviarlo a) la función yb) el rango de iteraciones.

La solución implantada (es decir, llamar a tqdm directamente en el rango tqdm.tqdm (rango (0, 30)) no funciona con multiprocesamiento (como se formula en el código a continuación).

La barra de progreso se muestra de 0 a 100% (¿cuando Python lee el código?) Pero no indica el progreso real de la función del mapa.

¿Cómo mostrar una barra de progreso que indique en qué paso se encuentra la función 'mapa'?

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   p = Pool(2)
   r = p.map(_foo, tqdm.tqdm(range(0, 30)))
   p.close()
   p.join()

Cualquier ayuda o sugerencia es bienvenida ...

Ciencia
fuente
¿Puedes publicar el fragmento de código de la barra de progreso?
Alex
2
Para las personas que buscan una solución con .starmap(): Aquí hay un parche para Poolagregar .istarmap(), que también funcionará con tqdm.
Darkonaut

Respuestas:

136

Utilice imap en lugar de map, que devuelve un iterador de valores procesados.

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   with Pool(2) as p:
      r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))
hkyi
fuente
14
Una declaración adjunta list () espera a que finalice el iterador. total = también se requiere ya que tqdm no sabe cuánto tiempo durará la iteración,
hkyi
16
¿Existe una solución similar para starmap()?
tarashypka
2
for i in tqdm.tqdm(...): pass puede ser más sencillo, esolist(tqdm.tqdm)
savfod
1
Esto funciona, pero ¿alguien más ha tenido que imprimir continuamente la barra de progreso en una nueva línea para cada iteración?
Dennis Subachev
3
El comportamiento está conectado cuando es específico chunk_sizede p.imap. ¿Puede tqdmactualizar cada iteración en lugar de cada fragmento?
huangbiubiu
56

Solución encontrada: ¡Tenga cuidado! Debido al multiprocesamiento, el tiempo de estimación (iteración por ciclo, tiempo total, etc.) podría ser inestable, pero la barra de progreso funciona perfectamente.

Nota: El administrador de contexto para Pool solo está disponible a partir de la versión 3.3 de Python

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for i, _ in enumerate(p.imap_unordered(_foo, range(0, max_))):
                pbar.update()
Ciencia
fuente
2
pbar.close()no es obligatorio, se cerrará automáticamente al finalizarwith
Sagar Kar
5
¿Es tqdmnecesaria aquí la segunda llamada interior ?
Shadowtalker
7
¿qué pasa con la salida del _foo (my_number) que se devuelve como "r" en cuestión?
Likak
4
¿Existe una solución similar para starmap()?
tarashypka
2
@shadowtalker - parece funcionar sin;). De todos modos, imap_unorderedaquí es clave, ofrece el mejor rendimiento y las mejores estimaciones de la barra de progreso.
Tomasz Gandor
23

Perdón por llegar tarde, pero si todo lo que necesita es un mapa concurrente, agregué esta funcionalidad en tqdm>=4.42.0:

from tqdm.contrib.concurrent import process_map  # or thread_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = process_map(_foo, range(0, 30), max_workers=2)

Referencias: https://tqdm.github.io/docs/contrib.concurrent/ y https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py

casper.dcl
fuente
1
Gracias por esto. Funciona fácilmente, mucho mejor que cualquier otra solución que haya probado.
user3340499
Genial (+1), pero lanza HBox(children=(FloatProgress(value=0.0, max=30.0), HTML(value='')))Jupyter
Ébe Isaac
@ Ébe-Isaac ver github.com/tqdm/tqdm/issues/937
casper.dcl
Veo un problema con la discusión para hackear tqdm_notebook, sin embargo, no puedo encontrar una solución para resolver tqdm.contrib.concurrent.
Ébe Isaac
Esto es increíble. Funciona directamente desde el primer momento.
Lars Larsson
21

Puedes usar p_tqdm en lugar.

https://github.com/swansonk14/p_tqdm

from p_tqdm import p_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = p_map(_foo, list(range(0, 30)))
Victor Quach
fuente
1
Esto funciona muy bien y fue muy fácil pip install. Esto está reemplazando tqdm para la mayoría de mis necesidades
crypdick
Merci Victor;)
Gabriel Romon
p_tqdmestá limitado a multiprocessing.Pool, no disponible para hilos
pateheo
8

basándome en la respuesta de Xavi Martínez escribí la función imap_unordered_bar. Se puede utilizar de la misma forma que imap_unorderedcon la única diferencia de que se muestra una barra de procesamiento.

from multiprocessing import Pool
import time
from tqdm import *

def imap_unordered_bar(func, args, n_processes = 2):
    p = Pool(n_processes)
    res_list = []
    with tqdm(total = len(args)) as pbar:
        for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
            pbar.update()
            res_list.append(res)
    pbar.close()
    p.close()
    p.join()
    return res_list

def _foo(my_number):
    square = my_number * my_number
    time.sleep(1)
    return square 

if __name__ == '__main__':
    result = imap_unordered_bar(_foo, range(5))
Oliver Wilken
fuente
3
Esto volverá a dibujar la barra en cada paso de una nueva línea. ¿Cómo actualizar la misma línea?
misantroop
Solución en mi caso (Windows / Powershell): Colorama.
misantroop
'pbar.close () no es obligatorio, se cerrará automáticamente al finalizar con' como el comentario que hizo Sagar en la respuesta de @ scipy
Tejas Shetty
1

Aquí está mi opinión para cuando necesite obtener resultados de sus funciones de ejecución paralela. Esta función hace algunas cosas (hay otra publicación mía que lo explica más a fondo) pero el punto clave es que hay una cola de tareas pendientes y una cola de tareas completadas. A medida que los trabajadores terminan con cada tarea en la cola pendiente, agregan los resultados en la cola de tareas completadas. Puede ajustar el cheque a la cola de tareas completadas con la barra de progreso tqdm. No estoy poniendo la implementación de la función do_work () aquí, no es relevante, ya que el mensaje aquí es monitorear la cola de tareas completadas y actualizar la barra de progreso cada vez que aparece un resultado.

def par_proc(job_list, num_cpus=None, verbose=False):

# Get the number of cores
if not num_cpus:
    num_cpus = psutil.cpu_count(logical=False)

print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))

# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()

# Gather processes and results here
processes = []
results = []

# Count tasks
num_tasks = 0

# Add the tasks to the queue
for job in job_list:
    for task in job['tasks']:
        expanded_job = {}
        num_tasks = num_tasks + 1
        expanded_job.update({'func': pickle.dumps(job['func'])})
        expanded_job.update({'task': task})
        tasks_pending.put(expanded_job)

# Set the number of workers here
num_workers = min(num_cpus, num_tasks)

# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
    tasks_pending.put(SENTINEL)

print('* Number of tasks: {}'.format(num_tasks))

# Set-up and start the workers
for c in range(num_workers):
    p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
    p.name = 'worker' + str(c)
    processes.append(p)
    p.start()

# Gather the results
completed_tasks_counter = 0

with tqdm(total=num_tasks) as bar:
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1
        bar.update(completed_tasks_counter)

for p in processes:
    p.join()

return results
Nick B.
fuente
0
import multiprocessing as mp
import tqdm


some_iterable = ...

def some_func():
    # your logic
    ...


if __name__ == '__main__':
    with mp.Pool(mp.cpu_count()-2) as p:
        list(tqdm.tqdm(p.imap(some_func, iterable), total=len(iterable)))
hombre oscuro
fuente
-2

Este enfoque es simple y funciona.

from multiprocessing.pool import ThreadPool
import time
from tqdm import tqdm

def job():
    time.sleep(1)
    pbar.update()

pool = ThreadPool(5)
with tqdm(total=100) as pbar:
    for i in range(100):
        pool.apply_async(job)
    pool.close()
    pool.join()
Vijayabhaskar J
fuente