Estoy tratando de usar la ThreadPoolExecutorclase de Java para ejecutar una gran cantidad de tareas pesadas con un número fijo de subprocesos. Cada una de las tareas tiene muchos lugares durante los cuales puede fallar debido a excepciones.
He subclasificado ThreadPoolExecutory he anulado el afterExecutemétodo que se supone que proporciona las excepciones no detectadas que se encuentran al ejecutar una tarea. Sin embargo, parece que no puedo hacer que funcione.
Por ejemplo:
public class ThreadPoolErrors extends ThreadPoolExecutor {
public ThreadPoolErrors() {
super( 1, // core threads
1, // max threads
1, // timeout
TimeUnit.MINUTES, // timeout units
new LinkedBlockingQueue<Runnable>() // work queue
);
}
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if(t != null) {
System.out.println("Got an error: " + t);
} else {
System.out.println("Everything's fine--situation normal!");
}
}
public static void main( String [] args) {
ThreadPoolErrors threadPool = new ThreadPoolErrors();
threadPool.submit(
new Runnable() {
public void run() {
throw new RuntimeException("Ouch! Got an error.");
}
}
);
threadPool.shutdown();
}
}
El resultado de este programa es "Todo está bien, ¡situación normal!" aunque el único Runnable enviado al grupo de subprocesos arroja una excepción. ¿Alguna pista de lo que está pasando aquí?
¡Gracias!

Respuestas:
De los documentos :
Cuando envíe un Runnable, quedará envuelto en un Futuro.
Su afterExecute debería ser algo como esto:
fuente
future.isDone()? ComoafterExecutese ejecuta después de queRunnablese completa, supongo quefuture.isDone()siempre regresatrue.ADVERTENCIA : Debe tenerse en cuenta que esta solución bloqueará el hilo de llamada.
Si desea excepciones de procesos lanzados por la tarea, a continuación, por lo general es mejor utilizar
Callableen lugar deRunnable.Callable.call()se le permite lanzar excepciones marcadas, y estas se propagan nuevamente al hilo de llamada:Si
Callable.call()arroja una excepción, esta será envuelta en unaExecutionExceptiony arrojada porFuture.get().Es probable que esto sea mucho más preferible que la subclase
ThreadPoolExecutor. También le brinda la oportunidad de volver a enviar la tarea si la excepción es recuperable.fuente
future.get()se llama a su versión sobrecargada.La explicación de este comportamiento está en el javadoc para afterExecute :
fuente
Lo solucioné envolviendo el ejecutable proporcionado enviado al ejecutor.
fuente
whenComplete()método deCompletableFuture.Estoy usando la
VerboseRunnableclase de jcabi-log , que se traga todas las excepciones y las registra. Muy conveniente, por ejemplo:fuente
Otra solución sería utilizar ManagedTask y ManagedTaskListener .
Necesita un Callable o Runnable que implemente la interfaz ManagedTask .
El método
getManagedTaskListenerdevuelve la instancia que desea.E implementa en ManagedTaskListener el
taskDonemétodo:Más detalles sobre el ciclo de vida de la tarea administrada y el oyente .
fuente
Esto funciona
Creará un ejecutor con un solo hilo, que puede realizar muchas tareas; y esperará a que el actual finalice la ejecución para comenzar con el siguiente
En caso de error o excepción de uncaugth, uncaughtExceptionHandler lo detectará
clase final pública SingleThreadExecutorWithExceptions { public estático ExecutorService newSingleThreadExecutorWithExceptions (final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { ThreadFactory factory = (Runnable runnable) -> { Hilo final newThread = nuevo hilo (ejecutable, "SingleThreadExecutorWithExceptions"); newThread.setUncaughtExceptionHandler ((final Thread caugthThread, final Throwable throwable) -> { uncaughtExceptionHandler.uncaughtException (caugthThread, throwable); }); return newThread; }; volver nuevo FinalizableDelegatedExecutorService (nuevo ThreadPoolExecutor (1, 1, 0L, TimeUnit.MILLISECONDS, nuevo LinkedBlockingQueue (), fábrica){ vacío protegido afterExecute (Runnable runnable, Throwable throwable) { super.afterExecute (ejecutable, arrojable); if (arrojable == nulo && instancia ejecutable de futuro) { tratar { Futuro futuro = (Futuro) ejecutable; if (future.isDone ()) { future.get (); } } catch (CancellationException ce) { arrojable = ce; } catch (ExecutionException ee) { arrojable = ee.getCause (); } catch (InterruptedException es decir) { Thread.currentThread (). Interrupt (); // ignorar / restablecer } } if (arrojable! = nulo) { uncaughtExceptionHandler.uncaughtException (Thread.currentThread (), arrojable); } } }); } clase estática privada FinalizableDelegatedExecutorService extiende DelegatedExecutorService { FinalizableDelegatedExecutorService (ExecutorService ejecutor) { super (ejecutor); } vacío protegido finalizar () { super.shutdown (); } } / ** * Una clase de contenedor que expone solo los métodos ExecutorService * de una implementación de ExecutorService. * / clase estática privada DelegatedExecutorService extiende AbstractExecutorService { Servicio de ejecutor final privado e; DelegatedExecutorService (ExecutorService ejecutor) {e = executeor; } public void execute (comando ejecutable) {e.execute (comando); } apagado público vacío () {e.shutdown (); } Lista pública shutdownNow () {return e.shutdownNow (); } public boolean isShutdown () {return e.isShutdown (); } public boolean isTerminated () {return e.isTerminated (); } public boolean awaitTermination (tiempo de espera prolongado, unidad TimeUnit) lanza InterruptedException { return e.awaitTermination (tiempo de espera, unidad); } Envío futuro público (tarea ejecutable) { return e.submit (tarea); } Envío futuro público (tarea invocable) { return e.submit (tarea); } Envío futuro público (tarea ejecutable, resultado T) { return e.submit (tarea, resultado); } Lista pública> invokeAll (Colección> tareas) lanza InterruptedException { return e.invokeAll (tareas); } Lista pública> invokeAll (Colección> tareas, tiempo de espera prolongado, unidad de unidad de tiempo) lanza InterruptedException { return e.invokeAll (tareas, tiempo de espera, unidad); } public T invokeAny (Colección> tareas) lanza InterruptedException, ExecutionException { return e.invokeAny (tareas); } public T invokeAny (Colección> tareas, tiempo de espera prolongado, unidad de unidad de tiempo) lanza InterruptedException, ExecutionException, TimeoutException { return e.invokeAny (tareas, tiempo de espera, unidad); } } private SingleThreadExecutorWithExceptions () {} }fuente
Si desea monitorear la ejecución de la tarea, puede girar 1 o 2 subprocesos (tal vez más dependiendo de la carga) y usarlos para tomar tareas de un contenedor ExecutionCompletionService.
fuente
Si
ExecutorServiceproviene de una fuente externa (es decir, no es posible subclasificarThreadPoolExecutory anularafterExecute()), puede usar un proxy dinámico para lograr el comportamiento deseado:fuente
Esto es debido a
AbstractExecutorService :: submitque se envolver surunnableenRunnableFuture(nada másFutureTask), como a continuaciónLuego
executelo pasaráWorkeryWorker.run()llamará a la siguiente.fuente
Esto es similar a la solución de mmm, pero un poco más comprensible. Haga que sus tareas extiendan una clase abstracta que envuelve el método run ().
fuente
En lugar de subclasificar ThreadPoolExecutor, le proporcionaría una instancia de ThreadFactory que crea nuevos Threads y les proporciona un UncaughtExceptionHandler
fuente