Java ThreadPoolExecutor: la actualización del tamaño del grupo principal rechaza dinámicamente las tareas entrantes de forma intermitente

13

Me encuentro con un problema en el que si intento cambiar el ThreadPoolExecutortamaño del grupo principal de un grupo a un número diferente después de que se haya creado el grupo, entonces de forma intermitente, algunas tareas se rechazan RejectedExecutionExceptionaunque nunca envíe más que un queueSize + maxPoolSizenúmero de tareas.

El problema que estoy tratando de resolver es extender ThreadPoolExecutorel tamaño de sus hilos principales en función de las ejecuciones pendientes que se encuentran en la cola del grupo de hilos. Necesito esto porque por defecto a ThreadPoolExecutorcreará uno nuevo Threadsolo si la cola está llena.

Aquí hay un pequeño programa autónomo Pure Java 8 que demuestra el problema.

import static java.lang.Math.max;
import static java.lang.Math.min;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolResizeTest {

    public static void main(String[] args) throws Exception {
        // increase the number of iterations if unable to reproduce
        // for me 100 iterations have been enough
        int numberOfExecutions = 100;

        for (int i = 1; i <= numberOfExecutions; i++) {
            executeOnce();
        }
    }

    private static void executeOnce() throws Exception {
        int minThreads = 1;
        int maxThreads = 5;
        int queueCapacity = 10;

        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                minThreads, maxThreads,
                0, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(queueCapacity),
                new ThreadPoolExecutor.AbortPolicy()
        );

        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
                0, 10, TimeUnit.MILLISECONDS);
        CompletableFuture<Void> taskBlocker = new CompletableFuture<>();

        try {
            int totalTasksToSubmit = queueCapacity + maxThreads;

            for (int i = 1; i <= totalTasksToSubmit; i++) {
                // following line sometimes throws a RejectedExecutionException
                pool.submit(() -> {
                    // block the thread and prevent it from completing the task
                    taskBlocker.join();
                });
                // Thread.sleep(10); //enabling even a small sleep makes the problem go away
            }
        } finally {
            taskBlocker.complete(null);
            scheduler.shutdown();
            pool.shutdown();
        }
    }

    /**
     * Resize the thread pool if the number of pending tasks are non-zero.
     */
    private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
        int pendingExecutions = pool.getQueue().size();
        int approximateRunningExecutions = pool.getActiveCount();

        /*
         * New core thread count should be the sum of pending and currently executing tasks
         * with an upper bound of maxThreads and a lower bound of minThreads.
         */
        int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));

        pool.setCorePoolSize(newThreadCount);
        pool.prestartAllCoreThreads();
    }
}

¿Por qué el grupo debería lanzar una RejectedExecutionException si nunca envío más que queueCapacity + maxThreads? Nunca estoy cambiando los subprocesos máximos, por lo que según la definición de ThreadPoolExecutor, debe acomodar la tarea en un subproceso o en la cola.

Por supuesto, si nunca cambio el tamaño del grupo, el grupo de subprocesos nunca rechaza ningún envío. Esto también es difícil de depurar ya que agregar cualquier tipo de demora en los envíos hace que el problema desaparezca.

¿Algún indicador sobre cómo solucionar la RejectedExecutionException?

Swaranga Sarma
fuente
¿Por qué no proporcionar su propia implementación ExecutorServiceenvolviendo una existente, que vuelve a enviar tareas que no se pudieron enviar debido al cambio de tamaño?
Daniu
@daniu que es una solución alternativa. El punto de las preguntas es por qué el grupo debería lanzar una RejectedExecutionException si nunca envío más que queueCapacity + maxThreads. Nunca estoy cambiando los subprocesos máximos, por lo que según la definición de ThreadPoolExecutor, debe acomodar la tarea en un subproceso o en la cola.
Swaranga Sarma
Ok, parece que he entendido mal tu pregunta. ¿Qué es? ¿Desea saber por qué se produce el comportamiento o cómo lo evita y le causa problemas?
daniu
Sí, cambiar mi implementación a un servicio ejecutor no es factible ya que gran parte del código se refiere al ThreadPoolExecutor. Entonces, si todavía quería tener un ThreadPoolExecutor redimensionable, necesito saber cómo puedo solucionarlo. Puede que la forma correcta de hacer algo como esto sea extender ThreadPoolExecutor y obtener acceso a algunas de sus variables protegidas y actualizar el tamaño del grupo dentro de un bloque sincronizado en un bloqueo compartido por la superclase.
Swaranga Sarma
La extensión ThreadPoolExecutores muy probablemente una mala idea, ¿y no necesitaría cambiar también el código existente en este caso? Sería mejor que proporcione algún ejemplo de cómo su código real accede al ejecutor. Me sorprendería si utilizara muchos métodos específicos para ThreadPoolExecutor(es decir, no en ExecutorService).
Daniu

Respuestas:

5

Aquí hay un escenario por el que esto está sucediendo:

En mi ejemplo, uso minThreads = 0, maxThreads = 2 y queueCapacity = 2 para acortarlo. Se envía el primer comando, esto se hace en el método ejecutar:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

para este comando se ejecuta workQueue.offer (comando) que addWorker (nulo, falso). El subproceso de trabajo primero saca este comando de la cola en el método de ejecución del subproceso, por lo que en este momento la cola todavía tiene un comando,

El segundo comando se envía esta vez se ejecuta workQueue.offer (comando). Ahora la cola esta llena

Ahora ScheduledExecutorService ejecuta el método resizeThreadPool que llama a setCorePoolSize con maxThreads. Aquí está el método setCorePoolSize:

 public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        // We don't really know how many new threads are "needed".
        // As a heuristic, prestart enough new workers (up to new
        // core size) to handle the current number of tasks in
        // queue, but stop if queue becomes empty while doing so.
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}

Este método agrega un trabajador usando addWorker (nulo, verdadero). No, hay 2 colas de trabajo en ejecución, la máxima y la cola está llena.

El tercer comando se envía y falla porque workQueue.offer (comando) y addWorker (comando, falso) fallan, lo que lleva a la excepción:

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@24c22fe rejected from java.util.concurrent.ThreadPoolExecutor@cd1e646[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at ThreadPoolResizeTest.executeOnce(ThreadPoolResizeTest.java:60)
at ThreadPoolResizeTest.runTest(ThreadPoolResizeTest.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
at org.junit.runners.ParentRunner.run(ParentRunner.java:292)
at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)

Creo que para resolver este problema, debe establecer la capacidad de la cola al máximo de comandos que desea ejecutar.

Thomas Krieger
fuente
Correcto. Pude reprobar copiando el código a mi propia clase y agregando registradores. Básicamente, cuando la cola está llena y envío una nueva tarea, intentará crear un nuevo trabajador. Mientras tanto, si en ese momento, mi redimensionador también llama a setCorePoolSize a 2, eso también crea un nuevo Worker. En este punto, dos trabajadores compiten para ser agregados, pero ambos no pueden serlo porque violaría la restricción de tamaño máximo de grupo, por lo que se rechaza el envío de la nueva tarea. Creo que esta es una condición de carrera y presenté un informe de error a OpenJDK. Veamos. Pero respondiste mi pregunta para obtener la recompensa. Gracias.
Swaranga Sarma
2

No estoy seguro si esto califica como error. Este es el comportamiento cuando se crean subprocesos de trabajo adicionales después de que la cola está llena, pero esto se ha notado en los documentos de Java que la persona que llama tiene que lidiar con las tareas rechazadas.

Documentos de Java

Fábrica de nuevos hilos. Todos los hilos se crean utilizando esta fábrica (a través del método addWorker). Todas las personas que llaman deben estar preparadas para que addWorker falle, lo que puede reflejar una política del sistema o del usuario que limita el número de subprocesos. Aunque no se trata como un error, la falta de creación de subprocesos puede provocar que se rechacen nuevas tareas o que las existentes permanezcan atascadas en la cola.

Cuando cambia el tamaño del núcleo de la agrupación, digamos aumentar, se crean los trabajadores adicionales ( addWorkermétodo en setCorePoolSize) y la llamada para crear trabajo adicional ( addWorkermétodo desde execute) se rechaza cuando addWorkerdevuelve falso ( add Workerúltimo fragmento de código) ya que ya hay suficientes trabajadores adicionales creado por, setCorePoolSize pero aún no se ejecuta, para reflejar la actualización en la cola .

Partes relevantes

Comparar

public void setCorePoolSize(int corePoolSize) {
    ....
    int k = Math.min(delta, workQueue.size());
    while (k-- > 0 && addWorker(null, true)) {
        if (workQueue.isEmpty())
             break;
    }
}

public void execute(Runnable command) {
    ...
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

private boolean addWorker(Runnable firstTask, boolean core) {
....
   if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
     return false;             
}

Utilice un controlador de ejecución de rechazo de reintento personalizado (Esto debería funcionar para su caso, ya que tiene el límite superior como tamaño máximo de grupo). Por favor, ajuste según sea necesario.

public static class RetryRejectionPolicy implements RejectedExecutionHandler {
    public RetryRejectionPolicy () {}

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
           while(true)
            if(e.getQueue().offer(r)) break;
        }
    }
}

ThreadPoolExecutor pool = new ThreadPoolExecutor(
      minThreads, maxThreads,
      0, TimeUnit.SECONDS,
      new LinkedBlockingQueue<Runnable>(queueCapacity),
      new ThreadPoolResizeTest.RetryRejectionPolicy()
 );

También tenga en cuenta que su uso del apagado no es correcto, ya que esto no esperará a que la tarea enviada complete la ejecución, sino que se usará con awaitTermination.

Sagar Veeram
fuente
Creo que el apagado espera las tareas ya enviadas, de acuerdo con JavaDoc: shutdown () Inicia un apagado ordenado en el que se ejecutan las tareas enviadas previamente, pero no se aceptarán nuevas tareas.
Thomas Krieger
@ThomasKrieger - Ejecutará las tareas ya enviadas pero no esperará a que finalicen - de docs docs.oracle.com/javase/7/docs/api/java/util/concurrent/… - Este método no espera a que se envíen previamente tareas para completar la ejecución. Use awaitTermination para hacer eso.
Sagar Veeram