Java detiene el servicio del ejecutor una vez que una de sus tareas asignadas falla por cualquier motivo

12

Necesito algún tipo de servicio que ejecute algunas tareas simultáneamente y en un intervalo de 1 segundo durante 1 minuto.

Si una de las tareas falla, quiero detener el servicio y cada tarea que se ejecutó con algún tipo de indicador de que algo salió mal, de lo contrario, si después de un minuto todo salió bien, el servicio se detendrá con un indicador de que todo salió bien.

Por ejemplo, tengo 2 funciones:

Runnable task1 = ()->{
      int num = Math.rand(1,100);
      if (num < 5){
          throw new Exception("something went wrong with this task,terminate");
      }
}

Runnable task2 = ()->{
      int num = Math.rand(1,100)
      return num < 50;
}



ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
task1schedule = scheduledExecutorService.scheduleAtFixedRate(task1, 1, 60, TimeUnit.SECONDS);
task2schedule = scheduledExecutorService.scheduleAtFixedRate(task2, 1, 60, TimeUnit.SECONDS);

if (!task1schedule || !task2schedule) scheduledExecutorService.shutdown();

¿Alguna idea sobre cómo debería abordar esto y hacer que las cosas sean lo más genéricas posible?

totothegreat
fuente
1
Pocas cosas aparte de la pregunta real, Math.randno es una API incorporada. Una implementación de Runnabledebe tener una void rundefinición. Tipo de task1/2scheduleestaría ScheduledFuture<?>en el contexto proporcionado. Pasando a la pregunta real, ¿cómo se utiliza awaitTermination? Podrías hacer eso como scheduledExecutorService.awaitTermination(1,TimeUnit.MINUTES);. Alternativamente, ¿qué pasa con verificar si alguna de las tareas se canceló antes de su finalización normal if (task1schedule.isCancelled() || task2schedule.isCancelled()) scheduledExecutorService.shutdown();:?
Naman el
2
No tiene sentido que las tareas de programación se repitan cada minuto, pero luego diga que desea detener las tareas "si después de un minuto todo salió bien". Como está deteniendo al ejecutor en cualquier caso, programar una tarea que apague al ejecutor después de un minuto es trivial. Y los futuros ya indican si algo salió mal o no. No dijiste qué otro tipo de indicador quieres.
Holger

Respuestas:

8

La idea es que las tareas estén empujando a un objeto común TaskCompleteEvent. Si generan un error, el planificador se detiene y todas las tareas se detendrán.

Puede verificar los resultados de cada iteración de tareas en los mapas "errores" y "éxito".

public class SchedulerTest {

    @Test
    public void scheduler() throws InterruptedException {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
        TaskCompleteEvent taskCompleteEvent = new TaskCompleteEvent(scheduledExecutorService);
        Runnable task1 = () -> {
            int num = new Random().nextInt(100);
            if (num < 5) {
                taskCompleteEvent.message("task1-"+UUID.randomUUID().toString(), "Num "+num+" was obatined. Breaking all the executions.", true);
            }
        };
        Runnable task2 = () -> {
            int num = new Random().nextInt(100);
            taskCompleteEvent.message("task2-"+UUID.randomUUID().toString(), num < 50, false);
        };
        scheduledExecutorService.scheduleAtFixedRate(task1, 0, 1, TimeUnit.SECONDS);
        scheduledExecutorService.scheduleAtFixedRate(task2, 0, 1, TimeUnit.SECONDS);
        scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS);
        System.out.println("Success: "+taskCompleteEvent.getSuccess());
        System.out.println("Errors: "+taskCompleteEvent.getErrors());
        System.out.println("Went well?: "+taskCompleteEvent.getErrors().isEmpty());
    }

    public static class TaskCompleteEvent {

        private final ScheduledExecutorService scheduledExecutorService;
        private final Map<String, Object> errors = new LinkedHashMap<>();
        private final Map<String, Object> success = new LinkedHashMap<>();

        public TaskCompleteEvent(ScheduledExecutorService scheduledExecutorService) {
            this.scheduledExecutorService = scheduledExecutorService;
        }

        public synchronized void message(String id, Object response, boolean error) {
            if (error) {
                errors.put(id, response);
                scheduledExecutorService.shutdown();
            } else {
                success.put(id, response);
            }
        }

        public synchronized Map<String, Object> getErrors() {
            return errors;
        }

        public synchronized Map<String, Object> getSuccess() {
            return success;
        }

    }

}
Ravenskater
fuente
2

Solo necesita agregar una tarea adicional cuyo trabajo es monitorear todas las demás tareas en ejecución, y cuando cualquiera de las tareas monitoreadas falla, deben establecer un semáforo (indicador) que el asesino pueda inspeccionar.

    ScheduledExecutorService executor = (ScheduledExecutorService) Executors.newScheduledThreadPool(2);

    // INSTANTIATE THE REMOTE-FILE-MONITOR:
    RemoteFileMonitor monitor = new RemoteFileMonitor(remotesource, localtarget);

    // THIS TimerTask PERIODICALLY TRIGGERS THE RemoteFileMonitor: 
    TimerTask remote = new TimerTask() {

        // RUN FORREST... RUN !
        public void run() {

            try { 

                kae.trace("TimerTask::run() --> Calling RemoteFileMonitor.check()");
                monitor.check();

            } catch (Exception ex) {

                // NULL TRAP: ALLOWS US TO CONTINUE AND RETRY:

            }

        }

    };

    // THIS TimerTask PERIODICALLY TRIES TO KILL THE REMOTE-FILE-MONITOR:
    TimerTask assassin = new TimerTask() {

        // WHERE DO BAD FOLKS GO WHEN THEY DIE ? 
        private final LocalDateTime death = LocalDateTime.now().plus(ConfigurationOptions.getPollingCycleTime(), ChronoUnit.MINUTES);

        // RUN FORREST... RUN !
        public void run() {

            // IS THERE LIFE AFTER DEATH ???
            if (LocalDateTime.now().isAfter(death)) {

                // THEY GO TO A LAKE OF FIRE AND FRY:
                kae.error(ReturnCode.MONITOR_POLLING_CYCLE_EXCEEDED);                   

            }

        }

    };

    // SCHEDULE THE PERIODIC EXECUTION OF THE RemoteFileMonitor: (remote --> run() monitor --> check())
    executor.scheduleAtFixedRate(remote, delay, interval, TimeUnit.MINUTES);

    // SCHEDULE PERIODIC ASSASSINATION ATTEMPTS AGAINST THE RemoteFileMonitor: (assassin --> run() --> after death --> die())
    executor.scheduleAtFixedRate(assassin, delay, 60L, TimeUnit.SECONDS);

    // LOOP UNTIL THE MONITOR COMPLETES:
    do {

        try {

            // I THINK I NEED A NAP:
            Thread.sleep(interval * 10);                

        } catch (InterruptedException e) {

            // FAIL && THEN cleanexit();
            kae.error(ReturnCode.MONITORING_ERROR, "Monitoring of the XXXXXX-Ingestion site was interrupted");

        }

        // NOTE: THE MONITOR IS SET TO 'FINISHED' WHEN THE DONE-File IS DELIVERED AND RETRIEVED:
    } while (monitor.isNotFinished());

    // SHUTDOWN THE MONITOR TASK:
    executor.shutdown();
Greg Patnude
fuente
2
La clase no TimerTasktiene relación alguna con ella ScheduledExecutorService; Simplemente sucede a implementar Runnable. Además, no tiene sentido programar una tarea periódica, solo para verificar si ConfigurationOptions.getPollingCycleTime()se ha alcanzado un tiempo particular ( ). Tiene un ScheduledExecutorService, por lo que puede indicarle que programe la tarea correctamente para el tiempo deseado.
Holger
La implementación en el ejemplo que usé fue matar una tarea de ejecución después de un cierto período de tiempo si la tarea no se había completado. El caso de uso fue: Si el servidor remoto no ha soltado un archivo en 2 horas, elimine la tarea. esto es lo que pidió el OP.
Greg Patnude el
¿Leíste y entendiste mi comentario? No importa lo que hace el código, se utiliza una clase desanimado por ninguna razón, simplemente reemplazar TimerTaskcon Runnabley que ha solucionado el problema, sin cambiar lo que hace el código. Además, solo use executor.schedule(assassin, ConfigurationOptions.getPollingCycleTime(), ChronoUnit.MINUTES);y se ejecutará una vez en el momento deseado, por lo tanto, la if(LocalDateTime.now().isAfter(death))verificación es obsoleta. Una vez más, no cambia lo que hace el código, además de hacerlo sustancialmente más simple y más eficiente.
Holger