Para hacer que mi código sea más "pitónico" y más rápido, utilizo "multiprocesamiento" y una función de mapa para enviarlo a) la función yb) el rango de iteraciones.
La solución implantada (es decir, llamar a tqdm directamente en el rango tqdm.tqdm (rango (0, 30)) no funciona con multiprocesamiento (como se formula en el código a continuación).
La barra de progreso se muestra de 0 a 100% (¿cuando Python lee el código?) Pero no indica el progreso real de la función del mapa.
¿Cómo mostrar una barra de progreso que indique en qué paso se encuentra la función 'mapa'?
from multiprocessing import Pool
import tqdm
import time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
p = Pool(2)
r = p.map(_foo, tqdm.tqdm(range(0, 30)))
p.close()
p.join()
Cualquier ayuda o sugerencia es bienvenida ...
python
multiprocessing
progress-bar
tqdm
Ciencia
fuente
fuente
.starmap()
: Aquí hay un parche paraPool
agregar.istarmap()
, que también funcionará contqdm
.Respuestas:
Utilice imap en lugar de map, que devuelve un iterador de valores procesados.
from multiprocessing import Pool import tqdm import time def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': with Pool(2) as p: r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))
fuente
starmap()
?for i in tqdm.tqdm(...): pass
puede ser más sencillo, esolist(tqdm.tqdm)
chunk_size
dep.imap
. ¿Puedetqdm
actualizar cada iteración en lugar de cada fragmento?Solución encontrada: ¡Tenga cuidado! Debido al multiprocesamiento, el tiempo de estimación (iteración por ciclo, tiempo total, etc.) podría ser inestable, pero la barra de progreso funciona perfectamente.
Nota: El administrador de contexto para Pool solo está disponible a partir de la versión 3.3 de Python
from multiprocessing import Pool import time from tqdm import * def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': with Pool(processes=2) as p: max_ = 30 with tqdm(total=max_) as pbar: for i, _ in enumerate(p.imap_unordered(_foo, range(0, max_))): pbar.update()
fuente
pbar.close()
no es obligatorio, se cerrará automáticamente al finalizarwith
tqdm
necesaria aquí la segunda llamada interior ?starmap()
?imap_unordered
aquí es clave, ofrece el mejor rendimiento y las mejores estimaciones de la barra de progreso.Perdón por llegar tarde, pero si todo lo que necesita es un mapa concurrente, agregué esta funcionalidad en
tqdm>=4.42.0
:from tqdm.contrib.concurrent import process_map # or thread_map import time def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': r = process_map(_foo, range(0, 30), max_workers=2)
Referencias: https://tqdm.github.io/docs/contrib.concurrent/ y https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py
fuente
HBox(children=(FloatProgress(value=0.0, max=30.0), HTML(value='')))
JupyterPuedes usar
p_tqdm
en lugar.https://github.com/swansonk14/p_tqdm
from p_tqdm import p_map import time def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': r = p_map(_foo, list(range(0, 30)))
fuente
pip install
. Esto está reemplazando tqdm para la mayoría de mis necesidadesp_tqdm
está limitado amultiprocessing.Pool
, no disponible para hilosbasándome en la respuesta de Xavi Martínez escribí la función
imap_unordered_bar
. Se puede utilizar de la misma forma queimap_unordered
con la única diferencia de que se muestra una barra de procesamiento.from multiprocessing import Pool import time from tqdm import * def imap_unordered_bar(func, args, n_processes = 2): p = Pool(n_processes) res_list = [] with tqdm(total = len(args)) as pbar: for i, res in tqdm(enumerate(p.imap_unordered(func, args))): pbar.update() res_list.append(res) pbar.close() p.close() p.join() return res_list def _foo(my_number): square = my_number * my_number time.sleep(1) return square if __name__ == '__main__': result = imap_unordered_bar(_foo, range(5))
fuente
Aquí está mi opinión para cuando necesite obtener resultados de sus funciones de ejecución paralela. Esta función hace algunas cosas (hay otra publicación mía que lo explica más a fondo) pero el punto clave es que hay una cola de tareas pendientes y una cola de tareas completadas. A medida que los trabajadores terminan con cada tarea en la cola pendiente, agregan los resultados en la cola de tareas completadas. Puede ajustar el cheque a la cola de tareas completadas con la barra de progreso tqdm. No estoy poniendo la implementación de la función do_work () aquí, no es relevante, ya que el mensaje aquí es monitorear la cola de tareas completadas y actualizar la barra de progreso cada vez que aparece un resultado.
def par_proc(job_list, num_cpus=None, verbose=False): # Get the number of cores if not num_cpus: num_cpus = psutil.cpu_count(logical=False) print('* Parallel processing') print('* Running on {} cores'.format(num_cpus)) # Set-up the queues for sending and receiving data to/from the workers tasks_pending = mp.Queue() tasks_completed = mp.Queue() # Gather processes and results here processes = [] results = [] # Count tasks num_tasks = 0 # Add the tasks to the queue for job in job_list: for task in job['tasks']: expanded_job = {} num_tasks = num_tasks + 1 expanded_job.update({'func': pickle.dumps(job['func'])}) expanded_job.update({'task': task}) tasks_pending.put(expanded_job) # Set the number of workers here num_workers = min(num_cpus, num_tasks) # We need as many sentinels as there are worker processes so that ALL processes exit when there is no more # work left to be done. for c in range(num_workers): tasks_pending.put(SENTINEL) print('* Number of tasks: {}'.format(num_tasks)) # Set-up and start the workers for c in range(num_workers): p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose)) p.name = 'worker' + str(c) processes.append(p) p.start() # Gather the results completed_tasks_counter = 0 with tqdm(total=num_tasks) as bar: while completed_tasks_counter < num_tasks: results.append(tasks_completed.get()) completed_tasks_counter = completed_tasks_counter + 1 bar.update(completed_tasks_counter) for p in processes: p.join() return results
fuente
import multiprocessing as mp import tqdm some_iterable = ... def some_func(): # your logic ... if __name__ == '__main__': with mp.Pool(mp.cpu_count()-2) as p: list(tqdm.tqdm(p.imap(some_func, iterable), total=len(iterable)))
fuente
Este enfoque es simple y funciona.
from multiprocessing.pool import ThreadPool import time from tqdm import tqdm def job(): time.sleep(1) pbar.update() pool = ThreadPool(5) with tqdm(total=100) as pbar: for i in range(100): pool.apply_async(job) pool.close() pool.join()
fuente