Manejo de excepciones de tareas de Java ExecutorService

213

Estoy tratando de usar la ThreadPoolExecutorclase de Java para ejecutar una gran cantidad de tareas pesadas con un número fijo de subprocesos. Cada una de las tareas tiene muchos lugares durante los cuales puede fallar debido a excepciones.

He subclasificado ThreadPoolExecutory he anulado el afterExecutemétodo que se supone que proporciona las excepciones no detectadas que se encuentran al ejecutar una tarea. Sin embargo, parece que no puedo hacer que funcione.

Por ejemplo:

public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

El resultado de este programa es "Todo está bien, ¡situación normal!" aunque el único Runnable enviado al grupo de subprocesos arroja una excepción. ¿Alguna pista de lo que está pasando aquí?

¡Gracias!

Tom
fuente
nunca preguntaste el futuro de la tarea, qué pasó allí. No se bloqueará todo el programa o ejecutor de servicios. La excepción se detecta y se envuelve en ExecutionException. Y lo volverá a lanzar si llamas a future.get (). PD: The future.isDone () [Lea el nombre real de la API] volverá verdadero, incluso cuando el ejecutable haya terminado erróneamente. Porque la tarea se hace de verdad.
Jai Pandit

Respuestas:

156

De los documentos :

Nota: Cuando las acciones están encerradas en tareas (como FutureTask) ya sea explícitamente o mediante métodos como el envío, estos objetos de tarea capturan y mantienen excepciones computacionales, por lo que no causan una terminación abrupta, y las excepciones internas no se pasan a este método .

Cuando envíe un Runnable, quedará envuelto en un Futuro.

Su afterExecute debería ser algo como esto:

public final class ExtendedExecutor extends ThreadPoolExecutor {

    // ...

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Future<?> future = (Future<?>) r;
                if (future.isDone()) {
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            System.out.println(t);
        }
    }
}
nos
fuente
77
Gracias, terminé usando esta solución. Además, en caso de que alguien esté interesado: otros han sugerido no subclasificar el Servicio de Ejecutor, pero lo hice de todos modos porque quería monitorear las tareas a medida que se completaban en lugar de esperar a que finalizaran y luego llamar a get () en todos los Futuros devueltos .
Tom
1
Otro enfoque para subclasificar al ejecutor es subclasificar FutureTask y anular su método 'hecho'
nos
1
Tom >> ¿Puede publicar su código de fragmento de muestra donde subclasificó ExecutorService para supervisar las tareas a medida que se completan ...
jagamot
1
Esta respuesta no funcionará si está utilizando ComplableFuture.runAsync, ya que afterExecute contendrá un objeto que es un paquete privado y no hay forma de acceder a los elementos arrojables. Lo solucioné envolviendo la llamada. Vea mi respuesta a continuación.
mmm
2
¿Tenemos que verificar si el futuro se completa usando future.isDone()? Como afterExecutese ejecuta después de que Runnablese completa, supongo que future.isDone()siempre regresa true.
Searene
248

ADVERTENCIA : Debe tenerse en cuenta que esta solución bloqueará el hilo de llamada.


Si desea excepciones de procesos lanzados por la tarea, a continuación, por lo general es mejor utilizar Callableen lugar de Runnable.

Callable.call() se le permite lanzar excepciones marcadas, y estas se propagan nuevamente al hilo de llamada:

Callable task = ...
Future future = executor.submit(task);
try {
   future.get();
} catch (ExecutionException ex) {
   ex.getCause().printStackTrace();
}

Si Callable.call()arroja una excepción, esta será envuelta en una ExecutionExceptiony arrojada por Future.get().

Es probable que esto sea mucho más preferible que la subclase ThreadPoolExecutor. También le brinda la oportunidad de volver a enviar la tarea si la excepción es recuperable.

skaffman
fuente
55
> Callable.call () puede lanzar excepciones marcadas, y éstas se propagan de nuevo al hilo de llamada: Tenga en cuenta que la excepción lanzada se propagará al hilo de llamada solo si future.get()se llama a su versión sobrecargada.
nhylated
16
Es perfecto, pero ¿qué hacer si ejecuto tareas en paralelo y no quiero bloquear la ejecución?
Grigory Kislin
43
No use esta solución, ya que interrumpe el propósito de usar ExecutorService. Un ExecutorService es un mecanismo de ejecución asíncrono que es capaz de ejecutar tareas en segundo plano. Si llama a future.get () justo después de ejecutarlo, bloqueará el hilo de llamada hasta que finalice la tarea.
user1801374
2
Esta solución no debe tener una calificación tan alta. Future.get () funciona sincrónicamente y actuará como un bloqueador hasta que Runnable o Callable se hayan ejecutado y, como se indicó anteriormente, derrote el propósito de usar el Servicio de Ejecutor
Super Hans
2
Como señaló #nhylated, esto merece un ERROR jdk. Si no se llama a Future.get (), cualquier excepción no detectada de Callable se ignora en silencio. Muy mal diseño ... acabo de pasar más de 1 día para descubrir que una biblioteca usó esto y jdk ignoró silenciosamente las excepciones. Y, esto todavía existe en jdk12.
Ben Jiang
18

La explicación de este comportamiento está en el javadoc para afterExecute :

Nota: Cuando las acciones están encerradas en tareas (como FutureTask) ya sea explícitamente o mediante métodos como el envío, estos objetos de tarea capturan y mantienen excepciones computacionales, por lo que no causan una terminación abrupta, y las excepciones internas no se pasan a este método .

Drew Wills
fuente
10

Lo solucioné envolviendo el ejecutable proporcionado enviado al ejecutor.

CompletableFuture.runAsync(() -> {
        try {
              runnable.run();
        } catch (Throwable e) {
              Log.info(Concurrency.class, "runAsync", e);
        }
}, executorService);
mmm
fuente
3
Puede mejorar la legibilidad utilizando el whenComplete()método de CompletableFuture.
Eduard Wirch
@EduardWirch esto funciona pero no puede devolver una excepción de whenComplete ()
Akshat
7

Estoy usando la VerboseRunnableclase de jcabi-log , que se traga todas las excepciones y las registra. Muy conveniente, por ejemplo:

import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
  new VerboseRunnable(
    Runnable() {
      public void run() { 
        // the code, which may throw
      }
    },
    true // it means that all exceptions will be swallowed and logged
  ),
  1, 1, TimeUnit.MILLISECONDS
);
yegor256
fuente
3

Otra solución sería utilizar ManagedTask y ManagedTaskListener .

Necesita un Callable o Runnable que implemente la interfaz ManagedTask .

El método getManagedTaskListenerdevuelve la instancia que desea.

public ManagedTaskListener getManagedTaskListener() {

E implementa en ManagedTaskListener el taskDonemétodo:

@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
    if (exception != null) {
        LOGGER.log(Level.SEVERE, exception.getMessage());
    }
}

Más detalles sobre el ciclo de vida de la tarea administrada y el oyente .

CSchulz
fuente
2

Esto funciona

  • Se deriva de SingleThreadExecutor, pero puede adaptarlo fácilmente
  • Código Java 8 lamdas, pero fácil de arreglar

Creará un ejecutor con un solo hilo, que puede realizar muchas tareas; y esperará a que el actual finalice la ejecución para comenzar con el siguiente

En caso de error o excepción de uncaugth, uncaughtExceptionHandler lo detectará

clase final pública SingleThreadExecutorWithExceptions {

    public estático ExecutorService newSingleThreadExecutorWithExceptions (final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {

        ThreadFactory factory = (Runnable runnable) -> {
            Hilo final newThread = nuevo hilo (ejecutable, "SingleThreadExecutorWithExceptions");
            newThread.setUncaughtExceptionHandler ((final Thread caugthThread, final Throwable throwable) -> {
                uncaughtExceptionHandler.uncaughtException (caugthThread, throwable);
            });
            return newThread;
        };
        volver nuevo FinalizableDelegatedExecutorService
                (nuevo ThreadPoolExecutor (1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        nuevo LinkedBlockingQueue (),
                        fábrica){


                    vacío protegido afterExecute (Runnable runnable, Throwable throwable) {
                        super.afterExecute (ejecutable, arrojable);
                        if (arrojable == nulo && instancia ejecutable de futuro) {
                            tratar {
                                Futuro futuro = (Futuro) ejecutable;
                                if (future.isDone ()) {
                                    future.get ();
                                }
                            } catch (CancellationException ce) {
                                arrojable = ce;
                            } catch (ExecutionException ee) {
                                arrojable = ee.getCause ();
                            } catch (InterruptedException es decir) {
                                Thread.currentThread (). Interrupt (); // ignorar / restablecer
                            }
                        }
                        if (arrojable! = nulo) {
                            uncaughtExceptionHandler.uncaughtException (Thread.currentThread (), arrojable);
                        }
                    }
                });
    }



    clase estática privada FinalizableDelegatedExecutorService
            extiende DelegatedExecutorService {
        FinalizableDelegatedExecutorService (ExecutorService ejecutor) {
            super (ejecutor);
        }
        vacío protegido finalizar () {
            super.shutdown ();
        }
    }

    / **
     * Una clase de contenedor que expone solo los métodos ExecutorService
     * de una implementación de ExecutorService.
     * /
    clase estática privada DelegatedExecutorService extiende AbstractExecutorService {
        Servicio de ejecutor final privado e;
        DelegatedExecutorService (ExecutorService ejecutor) {e = executeor; }
        public void execute (comando ejecutable) {e.execute (comando); }
        apagado público vacío () {e.shutdown (); }
        Lista pública shutdownNow () {return e.shutdownNow (); }
        public boolean isShutdown () {return e.isShutdown (); }
        public boolean isTerminated () {return e.isTerminated (); }
        public boolean awaitTermination (tiempo de espera prolongado, unidad TimeUnit)
                lanza InterruptedException {
            return e.awaitTermination (tiempo de espera, unidad);
        }
        Envío futuro público (tarea ejecutable) {
            return e.submit (tarea);
        }
        Envío futuro público (tarea invocable) {
            return e.submit (tarea);
        }
        Envío futuro público (tarea ejecutable, resultado T) {
            return e.submit (tarea, resultado);
        }
        Lista pública> invokeAll (Colección> tareas)
                lanza InterruptedException {
            return e.invokeAll (tareas);
        }
        Lista pública> invokeAll (Colección> tareas,
                                             tiempo de espera prolongado, unidad de unidad de tiempo)
                lanza InterruptedException {
            return e.invokeAll (tareas, tiempo de espera, unidad);
        }
        public T invokeAny (Colección> tareas)
                lanza InterruptedException, ExecutionException {
            return e.invokeAny (tareas);
        }
        public T invokeAny (Colección> tareas,
                               tiempo de espera prolongado, unidad de unidad de tiempo)
                lanza InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny (tareas, tiempo de espera, unidad);
        }
    }



    private SingleThreadExecutorWithExceptions () {}
}
obesga_tirant
fuente
Desafortunadamente, el uso de finalizar es un poco inestable, ya que solo se llamará "más tarde cuando el recolector de basura lo recoja" (o tal vez no en el caso de un Hilo, no sé) ...
rogerdpack
1

Si desea monitorear la ejecución de la tarea, puede girar 1 o 2 subprocesos (tal vez más dependiendo de la carga) y usarlos para tomar tareas de un contenedor ExecutionCompletionService.

Cristian Botiza
fuente
0

Si ExecutorServiceproviene de una fuente externa (es decir, no es posible subclasificar ThreadPoolExecutory anular afterExecute()), puede usar un proxy dinámico para lograr el comportamiento deseado:

public static ExecutorService errorAware(final ExecutorService executor) {
    return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
            new Class[] {ExecutorService.class},
            (proxy, method, args) -> {
                if (method.getName().equals("submit")) {
                    final Object arg0 = args[0];
                    if (arg0 instanceof Runnable) {
                        args[0] = new Runnable() {
                            @Override
                            public void run() {
                                final Runnable task = (Runnable) arg0;
                                try {
                                    task.run();
                                    if (task instanceof Future<?>) {
                                        final Future<?> future = (Future<?>) task;

                                        if (future.isDone()) {
                                            try {
                                                future.get();
                                            } catch (final CancellationException ce) {
                                                // Your error-handling code here
                                                ce.printStackTrace();
                                            } catch (final ExecutionException ee) {
                                                // Your error-handling code here
                                                ee.getCause().printStackTrace();
                                            } catch (final InterruptedException ie) {
                                                Thread.currentThread().interrupt();
                                            }
                                        }
                                    }
                                } catch (final RuntimeException re) {
                                    // Your error-handling code here
                                    re.printStackTrace();
                                    throw re;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    } else if (arg0 instanceof Callable<?>) {
                        args[0] = new Callable<Object>() {
                            @Override
                            public Object call() throws Exception {
                                final Callable<?> task = (Callable<?>) arg0;
                                try {
                                    return task.call();
                                } catch (final Exception e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    }
                }
                return method.invoke(executor, args);
            });
}
Bajo
fuente
0

Esto es debido a AbstractExecutorService :: submitque se envolver su runnableen RunnableFuture(nada más FutureTask), como a continuación

AbstractExecutorService.java

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
    execute(ftask);
    return ftask;
}

Luego executelo pasará Workery Worker.run()llamará a la siguiente.

ThreadPoolExecutor.java

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();           /////////HERE////////
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

Finalmente task.run();en el código anterior se llamará call FutureTask.run(). Aquí está el código del controlador de excepciones, debido a esto NO está obteniendo la excepción esperada.

class FutureTask<V> implements RunnableFuture<V>

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {   /////////HERE////////
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
Kanagavelu Sugumar
fuente
0

Esto es similar a la solución de mmm, pero un poco más comprensible. Haga que sus tareas extiendan una clase abstracta que envuelve el método run ().

public abstract Task implements Runnable {

    public abstract void execute();

    public void run() {
      try {
        execute();
      } catch (Throwable t) {
        // handle it  
      }
    }
}


public MySampleTask extends Task {
    public void execute() {
        // heavy, error-prone code here
    }
}
ccleve
fuente
-4

En lugar de subclasificar ThreadPoolExecutor, le proporcionaría una instancia de ThreadFactory que crea nuevos Threads y les proporciona un UncaughtExceptionHandler

Kevin
fuente
3
Intenté esto también, pero parece que nunca se llama al método uncaughtException. Creo que esto se debe a que un hilo de trabajo en la clase ThreadPoolExecutor está capturando las excepciones.
Tom
55
El método uncaughtException no se llama porque el método de envío del ExecutorService está envolviendo el Callable / Runnable en un futuro; La excepción está siendo capturada allí.
Emil Sit