Estoy buscando una implementación ExecutorService que se pueda proporcionar con un tiempo de espera. Las tareas que se envían al ExecutorService se interrumpen si tardan más en ejecutarse que el tiempo de espera. Implementar tal bestia no es una tarea tan difícil, pero me pregunto si alguien sabe de una implementación existente.
Esto es lo que se me ocurrió en base a parte de la discusión a continuación. ¿Algún comentario?
import java.util.List;
import java.util.concurrent.*;
public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
private final long timeout;
private final TimeUnit timeoutUnit;
private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
@Override
public void shutdown() {
timeoutExecutor.shutdown();
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
timeoutExecutor.shutdownNow();
return super.shutdownNow();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if(timeout > 0) {
final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
runningTasks.put(r, scheduled);
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
ScheduledFuture timeoutTask = runningTasks.remove(r);
if(timeoutTask != null) {
timeoutTask.cancel(false);
}
}
class TimeoutTask implements Runnable {
private final Thread thread;
public TimeoutTask(Thread thread) {
this.thread = thread;
}
@Override
public void run() {
thread.interrupt();
}
}
}
java
multithreading
concurrency
executorservice
Edward Dale
fuente
fuente
protected void beforeExecute(Thread t, Runnable r)
gancho.Respuestas:
Puede utilizar un ScheduledExecutorService para esto. Primero, lo enviaría solo una vez para comenzar de inmediato y conservar el futuro que se crea. Después de eso, puede enviar una nueva tarea que cancelaría el futuro retenido después de un período de tiempo.
Esto ejecutará su controlador (la funcionalidad principal se interrumpirá) durante 10 segundos, luego cancelará (es decir, interrumpirá) esa tarea específica.
fuente
beforeExecute
gancho.Desafortunadamente, la solución es defectuosa. Hay una especie de error con
ScheduledThreadPoolExecutor
, también informado en esta pregunta : cancelar una tarea enviada no libera completamente los recursos de memoria asociados con la tarea; los recursos se liberan solo cuando la tarea expira.Por lo tanto, si crea un
TimeoutThreadPoolExecutor
con un tiempo de vencimiento bastante largo (un uso típico) y envía las tareas lo suficientemente rápido, terminará llenando la memoria, aunque las tareas se completaron con éxito.Puede ver el problema con el siguiente programa de prueba (muy burdo):
El programa agota la memoria disponible, aunque espera
Runnable
a que se completen los mensajes de correo electrónico generados .Pensé en esto por un tiempo, pero desafortunadamente no pude encontrar una buena solución.
EDITAR: descubrí que este problema se informó como error JDK 6602600 , y parece que se ha solucionado muy recientemente.
fuente
Envuelva la tarea en FutureTask y puede especificar el tiempo de espera para FutureTask. Mira el ejemplo en mi respuesta a esta pregunta,
Tiempo de espera del proceso nativo de Java
fuente
java.util.concurrent
clases, pero estoy buscando unaExecutorService
implementación.Después de un montón de tiempo para estudiar,
finalmente, utilizo el
invokeAll
método deExecutorService
para resolver este problema.Eso interrumpirá estrictamente la tarea mientras se ejecuta la tarea.
Aquí hay un ejemplo
La ventaja es que también puede enviar
ListenableFuture
al mismoExecutorService
.Simplemente cambie ligeramente la primera línea de código.
ListeningExecutorService
es la función de escucha delExecutorService
proyecto de google guava ( com.google.guava ))fuente
invokeAll
. Eso funciona muy bien. Solo una advertencia para cualquiera que esté pensando en usar esto: aunqueinvokeAll
devuelve una lista deFuture
objetos, en realidad parece ser una operación de bloqueo.¿Qué tal usar el
ExecutorService.shutDownNow()
método como se describe en http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html ? Parece ser la solución más sencilla.fuente
Parece que el problema no está en el error JDK 6602600 (se resolvió el 22-05-2010), sino en una llamada incorrecta de suspensión (10) en círculo. Además, tenga en cuenta que el hilo principal debe dar directamente Oportunidad a otros hilos para realizar sus tareas invocando SLEEP (0) en CADA rama del círculo exterior. Creo que es mejor usar Thread.yield () en lugar de Thread.sleep (0)
La parte de resultado corregida del código de problema anterior es como esta:
Funciona correctamente con una cantidad de contador exterior hasta 150 000 000 de círculos probados.
fuente
Usando la respuesta de John W, creé una implementación que comienza correctamente el tiempo de espera cuando la tarea comienza su ejecución. Incluso escribo una prueba unitaria para ello :)
Sin embargo, no se adapta a mis necesidades ya que algunas operaciones IO no interrumpen cuando
Future.cancel()
se llama (es decir, cuandoThread.interrupt()
se llama). Algunos ejemplos de operaciones de IO que pueden no interrumpirse cuandoThread.interrupt()
se llama sonSocket.connect
ySocket.read
(y sospecho que la mayoría de las operaciones de IO se implementan enjava.io
). Todas las operaciones de E / S enjava.nio
deberían ser interrumpibles cuandoThread.interrupt()
se llame. Por ejemplo, ese es el caso deSocketChannel.open
ySocketChannel.read
.De todos modos, si alguien está interesado, creé una esencia para un ejecutor de grupo de subprocesos que permite que las tareas se agoten (si están usando operaciones interrumpibles ...): https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf
fuente
Socket.connect
ySocket.read
myThread.interrupted()
no es el método correcto para interrumpir, ya que BORRA la bandera de interrupción. Úselo en sumyThread.interrupt()
lugar, y eso debería con socketsThread.interrupted()
que no permite interrumpir un hilo. Sin embargo,Thread.interrupt()
no interrumpe lasjava.io
operaciones, solojava.nio
funciona en operaciones.interrupt()
durante muchos años y siempre ha interrumpido las operaciones de java.io (así como otros métodos de bloqueo, como la suspensión de subprocesos, las conexiones jdbc, la toma de cola de bloqueo, etc.). Tal vez encontró una clase con errores o alguna JVM que tiene errores¿Qué pasa con esta idea alternativa?
La pequeña muestra está aquí:
fuente
compruebe si esto funciona para usted,
puede restringir el número de usos de subprocesos del programador, así como poner tiempo de espera en la tarea.
fuente