¿Cómo esperar a que se completen varios hilos?

109

¿Cuál es una forma de simplemente esperar a que finalice todo el proceso enhebrado? Por ejemplo, digamos que tengo:

public class DoSomethingInAThread implements Runnable{

    public static void main(String[] args) {
        for (int n=0; n<1000; n++) {
            Thread t = new Thread(new DoSomethingInAThread());
            t.start();
        }
        // wait for all threads' run() methods to complete before continuing
    }

    public void run() {
        // do something here
    }


}

¿Cómo modifico esto para que el main()método se detenga en el comentario hasta run()que salgan todos los métodos de los hilos ? ¡Gracias!

DivideByHero
fuente

Respuestas:

163

Pones todos los hilos en una matriz, los inicias todos y luego tienes un bucle

for(i = 0; i < threads.length; i++)
  threads[i].join();

Cada unión se bloqueará hasta que se complete el hilo respectivo. Los subprocesos pueden completarse en un orden diferente al que los une, pero eso no es un problema: cuando el bucle sale, todos los subprocesos se completan.

Martin contra Löwis
fuente
1
@Mykola: ¿ cuál es exactamente la ventaja de usar un grupo de hilos? El hecho de que la API esté ahí no significa que tenga que usarla ...
Martin v. Löwis
2
Consulte: "Un grupo de subprocesos representa un conjunto de subprocesos". ¡Esto es semántico correcto para este caso de uso! Y: "Un hilo puede acceder a información sobre su propio grupo de hilos"
Martin K.
4
El libro “Effective Java” recomienda evitar los grupos de hilos (ítem 73).
Bastien Léonard
2
Los errores mencionados en Effective Java deberían haberse solucionado en Java 6. Si las versiones más recientes de Java no son una restricción, es mejor usar Futures para resolver problemas de subprocesos. Martin v. Löwis: Tienes razón. No es relevante para ese problema, pero es bueno obtener más información sobre los subprocesos en ejecución de un objeto (como ExecutorService). Creo que es bueno utilizar determinadas funciones para resolver un problema; tal vez necesite más flexibilidad (información del hilo) en el futuro. También es correcto mencionar las antiguas clases con errores en los JDK más antiguos.
Martin K.
5
ThreadGroup no implementa una unión a nivel de grupo, por lo que es un poco desconcertante por qué la gente está presionando a ThreadGroup. ¿La gente realmente está usando cerraduras de giro y consultando el activeCount del grupo? Sería difícil convencerme de que hacerlo es mejor de alguna manera en comparación con simplemente llamar a join en todos los hilos.
41

Una forma sería hacer una Listde Threads, crear y lanzar cada hilo, mientras lo agrega a la lista. Una vez que todo esté lanzado, recorra la lista y llame join()a cada uno. No importa en qué orden terminen de ejecutarse los subprocesos, todo lo que necesita saber es que para cuando el segundo ciclo termine de ejecutarse, todos los subprocesos se habrán completado.

Un mejor enfoque es utilizar un ExecutorService y sus métodos asociados:

List<Callable> callables = ... // assemble list of Callables here
                               // Like Runnable but can return a value
ExecutorService execSvc = Executors.newCachedThreadPool();
List<Future<?>> results = execSvc.invokeAll(callables);
// Note: You may not care about the return values, in which case don't
//       bother saving them

El uso de un ExecutorService (y todas las novedades de las utilidades de concurrencia de Java 5 ) es increíblemente flexible, y el ejemplo anterior apenas toca la superficie.

Adam Batkin
fuente
¡ThreadGroup es el camino a seguir! Con una lista mutable te meterás en problemas (sincronización)
Martin K.
3
¿Qué? ¿Cómo te meterías en problemas? Solo es mutable (solo lectura) por el subproceso que realiza el lanzamiento, por lo que siempre que no modifique la lista mientras la recorre, está bien.
Adam Batkin
Depende de como lo uses. Si usa la clase de llamada en un hilo, tendrá problemas.
Martin K.
27
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class DoSomethingInAThread implements Runnable
{
   public static void main(String[] args) throws ExecutionException, InterruptedException
   {
      //limit the number of actual threads
      int poolSize = 10;
      ExecutorService service = Executors.newFixedThreadPool(poolSize);
      List<Future<Runnable>> futures = new ArrayList<Future<Runnable>>();

      for (int n = 0; n < 1000; n++)
      {
         Future f = service.submit(new DoSomethingInAThread());
         futures.add(f);
      }

      // wait for all tasks to complete before continuing
      for (Future<Runnable> f : futures)
      {
         f.get();
      }

      //shut down the executor service so that this thread can exit
      service.shutdownNow();
   }

   public void run()
   {
      // do something here
   }
}
jt.
fuente
funcionó como un encanto ... tengo dos conjuntos de hilos que no deberían ejecutarse simultáneamente debido a problemas en múltiples cookies. Usé su ejemplo para ejecutar un conjunto de subprocesos a la vez ... gracias por compartir su conocimiento ...
arn-arn
@Dantalian: en su clase Runnable (probablemente en el método de ejecución), querrá capturar cualquier excepción que haya ocurrido y almacenarla localmente (o almacenar un mensaje / condición de error). En el ejemplo, f.get () devuelve su objeto que envió al ExecutorService. Su objeto podría tener un método para recuperar cualquier condición de error / excepción. Dependiendo de cómo modifique el ejemplo proporcionado, es posible que deba convertir el objeto convertido por f.get () en su tipo esperado.
jt.
12

en lugar de join(), que es una API antigua, puede usar CountDownLatch . He modificado su código como se muestra a continuación para cumplir con sus requisitos.

import java.util.concurrent.*;
class DoSomethingInAThread implements Runnable{
    CountDownLatch latch;
    public DoSomethingInAThread(CountDownLatch latch){
        this.latch = latch;
    } 
    public void run() {
        try{
            System.out.println("Do some thing");
            latch.countDown();
        }catch(Exception err){
            err.printStackTrace();
        }
    }
}

public class CountDownLatchDemo {
    public static void main(String[] args) {
        try{
            CountDownLatch latch = new CountDownLatch(1000);
            for (int n=0; n<1000; n++) {
                Thread t = new Thread(new DoSomethingInAThread(latch));
                t.start();
            }
            latch.await();
            System.out.println("In Main thread after completion of 1000 threads");
        }catch(Exception err){
            err.printStackTrace();
        }
    }
}

Explicacion :

  1. CountDownLatch se ha inicializado con un recuento dado 1000 según sus requisitos.

  2. Cada subproceso de trabajo DoSomethingInAThread disminuirá el CountDownLatch, que se ha pasado en constructor.

  3. Hilo principal CountDownLatchDemo await()hasta que el recuento se vuelva cero. Una vez que el recuento se haya convertido en cero, obtendrá la línea por debajo de la salida.

    In Main thread after completion of 1000 threads

Más información de la página de documentación de Oracle

public void await()
           throws InterruptedException

Hace que el subproceso actual espere hasta que el pestillo haya llegado a cero, a menos que se interrumpa el subproceso.

Consulte la pregunta SE relacionada para conocer otras opciones:

espere hasta que todos los hilos terminen su trabajo en java

Ravindra babu
fuente
8

Evite la clase Thread por completo y en su lugar utilice las abstracciones más altas proporcionadas en java.util.concurrent

La clase ExecutorService proporciona el método invokeAll que parece hacer exactamente lo que quieres.

henrik
fuente
6

Considere usar java.util.concurrent.CountDownLatch. Ejemplos en javadocs

Pablo Cavalieri
fuente
Es un pestillo para hilos, el pestillo funciona con una cuenta atrás. En el método run () de su subproceso declare explícitamente que debe esperar a que CountDownLatch alcance su cuenta regresiva a 0. Puede usar el mismo CountDownLatch en más de un subproceso para liberarlos simultáneamente. No sé si es lo que necesita, solo quería mencionarlo porque es útil cuando se trabaja en un entorno multiproceso.
Pablo Cavalieri
¿Quizás deberías poner esa explicación en el cuerpo de tu respuesta?
Aaron Hall
Los ejemplos en el Javadoc son muy descriptivos, por eso no agregué ninguno. docs.oracle.com/javase/7/docs/api/java/util/concurrent/… . En el primer ejemplo, todos los subprocesos de Workers se liberan simultáneamente porque esperan a que CountdownLatch startSignal llegue a cero, lo que ocurre en startSignal.countDown (). Luego, el hilo mian espera hasta que todos los trabajos terminen usando la instrucción doneSignal.await (). doneSignal disminuye su valor en cada trabajador.
Pablo Cavalieri
6

Como sugirió Martin K, java.util.concurrent.CountDownLatchparece ser una mejor solución para esto. Solo agrego un ejemplo para el mismo

     public class CountDownLatchDemo
{

    public static void main (String[] args)
    {
        int noOfThreads = 5;
        // Declare the count down latch based on the number of threads you need
        // to wait on
        final CountDownLatch executionCompleted = new CountDownLatch(noOfThreads);
        for (int i = 0; i < noOfThreads; i++)
        {
            new Thread()
            {

                @Override
                public void run ()
                {

                    System.out.println("I am executed by :" + Thread.currentThread().getName());
                    try
                    {
                        // Dummy sleep
                        Thread.sleep(3000);
                        // One thread has completed its job
                        executionCompleted.countDown();
                    }
                    catch (InterruptedException e)
                    {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }

            }.start();
        }

        try
        {
            // Wait till the count down latch opens.In the given case till five
            // times countDown method is invoked
            executionCompleted.await();
            System.out.println("All over");
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }

}
Thommi extraño
fuente
4

Dependiendo de sus necesidades, es posible que también desee consultar las clases CountDownLatch y CyclicBarrier en el paquete java.util.concurrent. Pueden ser útiles si desea que sus subprocesos se esperen entre sí, o si desea un control más detallado sobre la forma en que se ejecutan sus subprocesos (por ejemplo, esperar en su ejecución interna a que otro subproceso establezca algún estado). También puede usar CountDownLatch para indicar a todos sus hilos que comiencen al mismo tiempo, en lugar de comenzarlos uno por uno mientras recorre su ciclo. Los documentos de la API estándar tienen un ejemplo de esto, además de usar otro CountDownLatch para esperar a que todos los subprocesos completen su ejecución.

Jeff
fuente
1

Crea el objeto hilo dentro del primer bucle for.

for (int i = 0; i < threads.length; i++) {
     threads[i] = new Thread(new Runnable() {
         public void run() {
             // some code to run in parallel
         }
     });
     threads[i].start();
 }

Y luego lo que todos aquí están diciendo.

for(i = 0; i < threads.length; i++)
  threads[i].join();
optimus0127
fuente
0

Puedes hacerlo con el Objeto "ThreadGroup" y su parámetro activeCount :

Martin K.
fuente
No estoy seguro de cómo te propones hacerlo exactamente. Si propone sondear activeCount en un bucle: eso es malo, ya que está ocupado, espere (incluso si duerme entre encuestas, entonces obtiene una compensación entre negocios y capacidad de respuesta).
Martin v. Löwis
@Martin v. Löwis: "Join esperará un solo hilo. Una mejor solución podría ser java.util.concurrent.CountDownLatch. Simplemente inicialice el pestillo con el recuento establecido en el número de hilos de trabajo. Cada hilo de trabajo debe llamar countDown () justo antes de salir, y el hilo principal simplemente llama a await (), que se bloqueará hasta que el contador llegue a cero. El problema con join () también es que no puede empezar a agregar más hilos de forma dinámica. La lista explotará con una modificación concurrente ". Su solución funciona bien para el problema pero no para fines generales.
Martin K.
0

Como alternativa a CountDownLatch , también puede utilizar CyclicBarrier, por ejemplo

public class ThreadWaitEx {
    static CyclicBarrier barrier = new CyclicBarrier(100, new Runnable(){
        public void run(){
            System.out.println("clean up job after all tasks are done.");
        }
    });
    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(new MyCallable(barrier));
            t.start();
        }       
    }

}    

class MyCallable implements Runnable{
    private CyclicBarrier b = null;
    public MyCallable(CyclicBarrier b){
        this.b = b;
    }
    @Override
    public void run(){
        try {
            //do something
            System.out.println(Thread.currentThread().getName()+" is waiting for barrier after completing his job.");
            b.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }       
}

Para usar CyclicBarrier en este caso, el valor de la barrera.await () debería ser la última declaración, es decir, cuando el hilo haya terminado con su trabajo. CyclicBarrier se puede usar de nuevo con su método reset (). Para citar javadocs:

Un CyclicBarrier admite un comando Runnable opcional que se ejecuta una vez por punto de barrera, después de que llega el último hilo del grupo, pero antes de que se libere cualquier hilo. Esta acción de barrera es útil para actualizar el estado compartido antes de que cualquiera de las partes continúe.

shailendra1118
fuente
No creo que sea un buen ejemplo de CyclicBarrier. ¿Por qué utiliza una llamada Thread.sleep ()?
Guenther
@Guenther: sí, cambié el código para cumplir con el requisito.
shailendra1118
CyclicBarrier no es una alternativa a CountDownLatch. Cuando los subprocesos deben realizar una cuenta regresiva repetidamente, debe crear un CyclicBarrier, de lo contrario, el valor predeterminado es CountDownLatch (a menos que se requiera una abstracción adicional de Ejecución, momento en el que debe buscar los Servicios de nivel superior).
Elysiumplain
0

El join()no era útil para mí. vea esta muestra en Kotlin:

    val timeInMillis = System.currentTimeMillis()
    ThreadUtils.startNewThread(Runnable {
        for (i in 1..5) {
            val t = Thread(Runnable {
                Thread.sleep(50)
                var a = i
                kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
                Thread.sleep(200)
                for (j in 1..5) {
                    a *= j
                    Thread.sleep(100)
                    kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
                }
                kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
            })
            t.start()
        }
    })

El resultado:

Thread-5|a=5
Thread-1|a=1
Thread-3|a=3
Thread-2|a=2
Thread-4|a=4
Thread-2|2*1=2
Thread-3|3*1=3
Thread-1|1*1=1
Thread-5|5*1=5
Thread-4|4*1=4
Thread-1|2*2=2
Thread-5|10*2=10
Thread-3|6*2=6
Thread-4|8*2=8
Thread-2|4*2=4
Thread-3|18*3=18
Thread-1|6*3=6
Thread-5|30*3=30
Thread-2|12*3=12
Thread-4|24*3=24
Thread-4|96*4=96
Thread-2|48*4=48
Thread-5|120*4=120
Thread-1|24*4=24
Thread-3|72*4=72
Thread-5|600*5=600
Thread-4|480*5=480
Thread-3|360*5=360
Thread-1|120*5=120
Thread-2|240*5=240
Thread-1|TaskDurationInMillis = 765
Thread-3|TaskDurationInMillis = 765
Thread-4|TaskDurationInMillis = 765
Thread-5|TaskDurationInMillis = 765
Thread-2|TaskDurationInMillis = 765

Ahora déjame usar el join()for hilos:

    val timeInMillis = System.currentTimeMillis()
    ThreadUtils.startNewThread(Runnable {
        for (i in 1..5) {
            val t = Thread(Runnable {
                Thread.sleep(50)
                var a = i
                kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
                Thread.sleep(200)
                for (j in 1..5) {
                    a *= j
                    Thread.sleep(100)
                    kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
                }
                kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
            })
            t.start()
            t.join()
        }
    })

Y el resultado:

Thread-1|a=1
Thread-1|1*1=1
Thread-1|2*2=2
Thread-1|6*3=6
Thread-1|24*4=24
Thread-1|120*5=120
Thread-1|TaskDurationInMillis = 815
Thread-2|a=2
Thread-2|2*1=2
Thread-2|4*2=4
Thread-2|12*3=12
Thread-2|48*4=48
Thread-2|240*5=240
Thread-2|TaskDurationInMillis = 1568
Thread-3|a=3
Thread-3|3*1=3
Thread-3|6*2=6
Thread-3|18*3=18
Thread-3|72*4=72
Thread-3|360*5=360
Thread-3|TaskDurationInMillis = 2323
Thread-4|a=4
Thread-4|4*1=4
Thread-4|8*2=8
Thread-4|24*3=24
Thread-4|96*4=96
Thread-4|480*5=480
Thread-4|TaskDurationInMillis = 3078
Thread-5|a=5
Thread-5|5*1=5
Thread-5|10*2=10
Thread-5|30*3=30
Thread-5|120*4=120
Thread-5|600*5=600
Thread-5|TaskDurationInMillis = 3833

Como queda claro cuando usamos join:

  1. Los subprocesos se ejecutan secuencialmente.
  2. La primera muestra tarda 765 milisegundos, mientras que la segunda muestra 3833 milisegundos.

Nuestra solución para evitar el bloqueo de otros subprocesos fue crear una ArrayList:

val threads = ArrayList<Thread>()

Ahora, cuando queremos comenzar un nuevo hilo, lo agregamos a ArrayList:

addThreadToArray(
    ThreadUtils.startNewThread(Runnable {
        ...
    })
)

La addThreadToArrayfunción:

@Synchronized
fun addThreadToArray(th: Thread) {
    threads.add(th)
}

La startNewThreadfunción:

fun startNewThread(runnable: Runnable) : Thread {
    val th = Thread(runnable)
    th.isDaemon = false
    th.priority = Thread.MAX_PRIORITY
    th.start()
    return th
}

Verifique la finalización de los hilos como se muestra a continuación en todos los lugares donde sea necesario:

val notAliveThreads = ArrayList<Thread>()
for (t in threads)
    if (!t.isAlive)
        notAliveThreads.add(t)
threads.removeAll(notAliveThreads)
if (threads.size == 0){
    // The size is 0 -> there is no alive threads.
}
Misagh
fuente