Me encuentro con un problema en el que si intento cambiar el ThreadPoolExecutor
tamaño del grupo principal de un grupo a un número diferente después de que se haya creado el grupo, entonces de forma intermitente, algunas tareas se rechazan RejectedExecutionException
aunque nunca envíe más que un queueSize + maxPoolSize
número de tareas.
El problema que estoy tratando de resolver es extender ThreadPoolExecutor
el tamaño de sus hilos principales en función de las ejecuciones pendientes que se encuentran en la cola del grupo de hilos. Necesito esto porque por defecto a ThreadPoolExecutor
creará uno nuevo Thread
solo si la cola está llena.
Aquí hay un pequeño programa autónomo Pure Java 8 que demuestra el problema.
import static java.lang.Math.max;
import static java.lang.Math.min;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolResizeTest {
public static void main(String[] args) throws Exception {
// increase the number of iterations if unable to reproduce
// for me 100 iterations have been enough
int numberOfExecutions = 100;
for (int i = 1; i <= numberOfExecutions; i++) {
executeOnce();
}
}
private static void executeOnce() throws Exception {
int minThreads = 1;
int maxThreads = 5;
int queueCapacity = 10;
ThreadPoolExecutor pool = new ThreadPoolExecutor(
minThreads, maxThreads,
0, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueCapacity),
new ThreadPoolExecutor.AbortPolicy()
);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
0, 10, TimeUnit.MILLISECONDS);
CompletableFuture<Void> taskBlocker = new CompletableFuture<>();
try {
int totalTasksToSubmit = queueCapacity + maxThreads;
for (int i = 1; i <= totalTasksToSubmit; i++) {
// following line sometimes throws a RejectedExecutionException
pool.submit(() -> {
// block the thread and prevent it from completing the task
taskBlocker.join();
});
// Thread.sleep(10); //enabling even a small sleep makes the problem go away
}
} finally {
taskBlocker.complete(null);
scheduler.shutdown();
pool.shutdown();
}
}
/**
* Resize the thread pool if the number of pending tasks are non-zero.
*/
private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
int pendingExecutions = pool.getQueue().size();
int approximateRunningExecutions = pool.getActiveCount();
/*
* New core thread count should be the sum of pending and currently executing tasks
* with an upper bound of maxThreads and a lower bound of minThreads.
*/
int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));
pool.setCorePoolSize(newThreadCount);
pool.prestartAllCoreThreads();
}
}
¿Por qué el grupo debería lanzar una RejectedExecutionException si nunca envío más que queueCapacity + maxThreads? Nunca estoy cambiando los subprocesos máximos, por lo que según la definición de ThreadPoolExecutor, debe acomodar la tarea en un subproceso o en la cola.
Por supuesto, si nunca cambio el tamaño del grupo, el grupo de subprocesos nunca rechaza ningún envío. Esto también es difícil de depurar ya que agregar cualquier tipo de demora en los envíos hace que el problema desaparezca.
¿Algún indicador sobre cómo solucionar la RejectedExecutionException?
fuente
ExecutorService
envolviendo una existente, que vuelve a enviar tareas que no se pudieron enviar debido al cambio de tamaño?ThreadPoolExecutor
es muy probablemente una mala idea, ¿y no necesitaría cambiar también el código existente en este caso? Sería mejor que proporcione algún ejemplo de cómo su código real accede al ejecutor. Me sorprendería si utilizara muchos métodos específicos paraThreadPoolExecutor
(es decir, no enExecutorService
).Respuestas:
Aquí hay un escenario por el que esto está sucediendo:
En mi ejemplo, uso minThreads = 0, maxThreads = 2 y queueCapacity = 2 para acortarlo. Se envía el primer comando, esto se hace en el método ejecutar:
para este comando se ejecuta workQueue.offer (comando) que addWorker (nulo, falso). El subproceso de trabajo primero saca este comando de la cola en el método de ejecución del subproceso, por lo que en este momento la cola todavía tiene un comando,
El segundo comando se envía esta vez se ejecuta workQueue.offer (comando). Ahora la cola esta llena
Ahora ScheduledExecutorService ejecuta el método resizeThreadPool que llama a setCorePoolSize con maxThreads. Aquí está el método setCorePoolSize:
Este método agrega un trabajador usando addWorker (nulo, verdadero). No, hay 2 colas de trabajo en ejecución, la máxima y la cola está llena.
El tercer comando se envía y falla porque workQueue.offer (comando) y addWorker (comando, falso) fallan, lo que lleva a la excepción:
Creo que para resolver este problema, debe establecer la capacidad de la cola al máximo de comandos que desea ejecutar.
fuente
No estoy seguro si esto califica como error. Este es el comportamiento cuando se crean subprocesos de trabajo adicionales después de que la cola está llena, pero esto se ha notado en los documentos de Java que la persona que llama tiene que lidiar con las tareas rechazadas.
Documentos de Java
Cuando cambia el tamaño del núcleo de la agrupación, digamos aumentar, se crean los trabajadores adicionales (
addWorker
método ensetCorePoolSize
) y la llamada para crear trabajo adicional (addWorker
método desdeexecute
) se rechaza cuandoaddWorker
devuelve falso (add Worker
último fragmento de código) ya que ya hay suficientes trabajadores adicionales creado por,setCorePoolSize
pero aún no se ejecuta, para reflejar la actualización en la cola .Partes relevantes
Comparar
Utilice un controlador de ejecución de rechazo de reintento personalizado (Esto debería funcionar para su caso, ya que tiene el límite superior como tamaño máximo de grupo). Por favor, ajuste según sea necesario.
También tenga en cuenta que su uso del apagado no es correcto, ya que esto no esperará a que la tarea enviada complete la ejecución, sino que se usará con
awaitTermination
.fuente