Python multiproceso espere hasta que todos los hilos terminen

119

Es posible que esto se haya preguntado en un contexto similar, pero no pude encontrar una respuesta después de unos 20 minutos de búsqueda, así que preguntaré.

He escrito un script de Python (digamos: scriptA.py) y un script (digamos scriptB.py)

En scriptB quiero llamar a scriptA varias veces con diferentes argumentos, cada vez tarda aproximadamente una hora en ejecutarse (es un script enorme, hace muchas cosas ... no se preocupe) y quiero poder ejecutar el scriptA con todos los diferentes argumentos simultáneamente, pero necesito esperar hasta que TODOS estén listos antes de continuar; mi código:

import subprocess

#setup
do_setup()

#run scriptA
subprocess.call(scriptA + argumentsA)
subprocess.call(scriptA + argumentsB)
subprocess.call(scriptA + argumentsC)

#finish
do_finish()

Quiero ejecutar todos subprocess.call()al mismo tiempo, y luego esperar hasta que terminen, ¿cómo debo hacer esto?

Traté de usar subprocesos como el ejemplo aquí :

from threading import Thread
import subprocess

def call_script(args)
    subprocess.call(args)

#run scriptA   
t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))
t1.start()
t2.start()
t3.start()

Pero no creo que esto sea correcto.

¿Cómo sé que han terminado de correr antes de ir a mi do_finish()?

Inbar Rose
fuente

Respuestas:

150

Debe utilizar el método de unión del Threadobjeto al final del script.

t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

Así, el hilo principal esperará hasta t1, t2y t3terminar la ejecución.

Maksim Skurydzin
fuente
5
hmmm - si tiene problemas para entender algo, ¿no ejecutará primero t1, esperará hasta que termine, luego vaya a t2, etc., etc.? ¿Cómo hacer que todo suceda a la vez? No veo cómo esto los ejecutaría al mismo tiempo.
Inbar Rose
25
La llamada a los joinbloques hasta que el hilo finalice la ejecución. De todos modos, tendrá que esperar todos los hilos. Si t1termina primero, comenzará a esperar t2(que puede que ya haya terminado e inmediatamente procederá a esperar t3). Si t1tomó más tiempo para ejecutar, cuando regrese de que tanto t1y t2volverá inmediatamente sin bloquear.
Maksim Skurydzin
1
no entiendes mi pregunta, si copio el código anterior en mi código, ¿funcionará? ¿O me estoy perdiendo algo?
Inbar Rose
2
bien, ya veo. ahora lo entiendo, estaba un poco confundido al respecto, pero creo que lo entiendo, joinde alguna manera adjunta el proceso actual al hilo y espera hasta que esté listo, y si t2 termina antes de t1, cuando t1 esté listo, verificará que t2 esté listo. que es, y luego marque t3..etc..etc .. y luego sólo cuando todo esté hecho continuará. increíble.
Inbar Rose
3
digamos que t1 tarda más, pero t2 tiene una excepción. ¿Qué pasa entonces? ¿Puede detectar esa excepción o verificar si t2 terminó bien o no?
Ciprian Tomoiagă
174

Coloque los hilos en una lista y luego use el método Join

 threads = []

 t = Thread(...)
 threads.append(t)

 ...repeat as often as necessary...

 # Start all threads
 for x in threads:
     x.start()

 # Wait for all of them to finish
 for x in threads:
     x.join()
Aaron Digulla
fuente
1
Sí, eso funcionaría, pero es más difícil de entender. Siempre debe intentar encontrar un equilibrio entre el código compacto y la "legibilidad". Recuerde: el código se escribe una vez pero se lee muchas veces. Por eso es más importante que sea fácil de entender.
Aaron Digulla
2
El "patrón de fábrica" ​​no es algo que pueda explicar en una oración. Busque en Google y busque en stackoverflow.com. Hay muchos ejemplos y explicaciones. En pocas palabras: escribes código que crea algo complejo para ti. Como una fábrica real: entregas un pedido y recibes un producto terminado.
Aaron Digulla
18
No me gusta la idea de usar la comprensión de listas por sus efectos secundarios y no hacer nada útil con la lista de resultados. Un bucle for simple sería más limpio incluso si se extiende en dos filas ...
Ioan Alexandru Cucu
1
@Aaron DIgull Lo entiendo. Lo que quiero decir es que simplemente haría una for x in threads: x.join()comprensión de la lista en lugar de usarla
Ioan Alexandru Cucu
1
@IoanAlexandruCucu: Todavía me pregunto si hay una solución más legible y eficiente: stackoverflow.com/questions/21428602/…
Aaron Digulla
29

En Python3, desde Python 3.2 hay un nuevo enfoque para alcanzar el mismo resultado, que personalmente prefiero al paquete tradicional de creación / inicio / unión de subprocesos concurrent.futures: https://docs.python.org/3/library/concurrent.futures .html

Usar un ThreadPoolExecutorcódigo sería:

from concurrent.futures.thread import ThreadPoolExecutor
import time

def call_script(ordinal, arg):
    print('Thread', ordinal, 'argument:', arg)
    time.sleep(2)
    print('Thread', ordinal, 'Finished')

args = ['argumentsA', 'argumentsB', 'argumentsC']

with ThreadPoolExecutor(max_workers=2) as executor:
    ordinal = 1
    for arg in args:
        executor.submit(call_script, ordinal, arg)
        ordinal += 1
print('All tasks has been finished')

La salida del código anterior es algo como:

Thread 1 argument: argumentsA
Thread 2 argument: argumentsB
Thread 1 Finished
Thread 2 Finished
Thread 3 argument: argumentsC
Thread 3 Finished
All tasks has been finished

Una de las ventajas es que puede controlar el rendimiento estableciendo el máximo de trabajadores simultáneos.

Roberto
fuente
pero, ¿cómo puede saber cuándo han terminado todos los hilos del grupo de hilos?
Prime By Design
1
Como puede ver en el ejemplo, el código después de la withdeclaración se ejecuta cuando todas las tareas han finalizado.
Roberto
esto no funciona. Intente hacer algo realmente largo en hilos. Su declaración de impresión se ejecutará antes de que finalice el hilo
Pranalee
@Pranalee, ese código funciona, he actualizado el código para agregar las líneas de salida. No puede ver "Todas las tareas ..." antes de que todos los subprocesos estén terminados. Así es como funciona la withinstrucción por diseño en este caso. De todos modos, siempre puede abrir una nueva pregunta en SO y publicar su código para que podamos ayudarlo a averiguar qué está sucediendo en su caso.
Roberto
@PrimeByDesign puede usar la concurrent.futures.waitfunción, puede ver un ejemplo real aquí Documentos oficiales: docs.python.org/3/library/…
Alexander Fortin
28

Prefiero usar la comprensión de la lista basada en una lista de entrada:

inputs = [scriptA + argumentsA, scriptA + argumentsB, ...]
threads = [Thread(target=call_script, args=(i)) for i in inputs]
[t.start() for t in threads]
[t.join() for t in threads]
Adam Matan
fuente
La respuesta marcada explica bien, pero esta es más corta y no requiere repeticiones feas. Solo una buena respuesta. :)
tleb
La comprensión de listas solo para efectos secundarios generalmente se deprecia *. Pero en este caso de uso, parece una buena idea. * stackoverflow.com/questions/5753597/…
Vinayak Kaniyarakkal
3
@VinayakKaniyarakkal for t in threads:t.start()¿no es mejor?
SmartManoj
5

Puede tener una clase como la siguiente desde la que puede agregar 'n' número de funciones o scripts de consola que desea ejecutar en paralelo y comenzar la ejecución y esperar a que se completen todos los trabajos.

from multiprocessing import Process

class ProcessParallel(object):
    """
    To Process the  functions parallely

    """    
    def __init__(self, *jobs):
        """
        """
        self.jobs = jobs
        self.processes = []

    def fork_processes(self):
        """
        Creates the process objects for given function deligates
        """
        for job in self.jobs:
            proc  = Process(target=job)
            self.processes.append(proc)

    def start_all(self):
        """
        Starts the functions process all together.
        """
        for proc in self.processes:
            proc.start()

    def join_all(self):
        """
        Waits untill all the functions executed.
        """
        for proc in self.processes:
            proc.join()


def two_sum(a=2, b=2):
    return a + b

def multiply(a=2, b=2):
    return a * b


#How to run:
if __name__ == '__main__':
    #note: two_sum, multiply can be replace with any python console scripts which
    #you wanted to run parallel..
    procs =  ProcessParallel(two_sum, multiply)
    #Add all the process in list
    procs.fork_processes()
    #starts  process execution 
    procs.start_all()
    #wait until all the process got executed
    procs.join_all()
PBD
fuente
Esto es multiprocesamiento. La pregunta era sobre docs.python.org/3/library/threading.html
Rustam A.
3

De la threading documentación del módulo

Hay un objeto "hilo principal"; esto corresponde al hilo de control inicial en el programa Python. No es un hilo de demonio.

Existe la posibilidad de que se creen "objetos de hilo ficticio". Estos son objetos de subprocesos correspondientes a "subprocesos ajenos", que son subprocesos de control iniciados fuera del módulo de subprocesos, como directamente desde el código C. Los objetos de hilo ficticio tienen una funcionalidad limitada; siempre se consideran vivos y demoníacos, y no se pueden join()editar. Nunca se eliminan, ya que es imposible detectar la terminación de hilos extraños.

Entonces, para detectar esos dos casos en los que no está interesado en mantener una lista de los hilos que crea:

import threading as thrd


def alter_data(data, index):
    data[index] *= 2


data = [0, 2, 6, 20]

for i, value in enumerate(data):
    thrd.Thread(target=alter_data, args=[data, i]).start()

for thread in thrd.enumerate():
    if thread.daemon:
        continue
    try:
        thread.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err.args[0]:
            # catchs main thread
            continue
        else:
            raise

Después de lo cual:

>>> print(data)
[0, 4, 12, 40]
berna1111
fuente
2

Tal vez algo como

for t in threading.enumerate():
    if t.daemon:
        t.join()
jno
fuente
Probé este código pero no estoy seguro de que funcione porque se imprimió la última instrucción de mi código, que fue después de este ciclo for y aún así el proceso no se terminó.
Omkar
1

Me encontré con el mismo problema en el que necesitaba esperar todos los subprocesos que se crearon usando el bucle for. Acabo de probar el siguiente código. Puede que no sea la solución perfecta, pero pensé que sería una solución simple. Probar:

for t in threading.enumerate():
    try:
        t.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err:
            continue
        else:
            raise
Omkar
fuente