Java: ExecutorService que bloquea el envío después de un cierto tamaño de cola

82

Estoy tratando de codificar una solución en la que un solo subproceso produce tareas intensivas de E / S que se pueden realizar en paralelo. Cada tarea tiene datos importantes en memoria. Por eso quiero poder limitar la cantidad de tareas pendientes en un momento.

Si creo ThreadPoolExecutor así:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(maxQueue));

Luego, los executor.submit(callable)tiros RejectedExecutionExceptioncuando la cola se llena y todos los hilos ya están ocupados.

¿Qué puedo hacer para executor.submit(callable)bloquear cuando la cola está llena y todos los subprocesos están ocupados?

EDITAR : Intenté esto :

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

Y de alguna manera logra el efecto que quiero lograr, pero de una manera poco elegante (básicamente, los subprocesos rechazados se ejecutan en el subproceso de llamada, por lo que esto bloquea el subproceso de llamada para que no envíe más).

EDITAR: (5 años después de hacer la pregunta)

Para cualquiera que lea esta pregunta y sus respuestas, no tome la respuesta aceptada como una solución correcta. Lea todas las respuestas y comentarios.

Tahir Akhtar
fuente
1
He usado un semáforo antes para hacer exactamente eso, al igual que en la respuesta a la pregunta muy similar a la que @axtavt está vinculada.
Stephen Denne
2
@TomWolk Por un lado, obtienes una tarea más ejecutándose en paralelo que numWorkerThreadscuando el hilo de llamada también está ejecutando una tarea. Pero, los problemas más importantes es que si el subproceso de la persona que llama obtiene una tarea de larga duración, los otros subprocesos pueden permanecer inactivos esperando la siguiente tarea.
Tahir Akhtar
2
@TahirAkhtar, cierto; la cola debe ser lo suficientemente larga para que no se seque cuando la persona que llama tiene que ejecutar la tarea por sí misma. Pero creo que es una ventaja si se puede usar un hilo más, el hilo de la persona que llama, para ejecutar tareas. Si la persona que llama simplemente se bloquea, el hilo de la persona que llama estaría inactivo. Utilizo CallerRunsPolicy con una cola tres veces la capacidad del threadpool y funciona bien y sin problemas. En comparación con esta solución, consideraría templar el marco de ingeniería excesiva.
TomWolk
1
@TomWalk +1 Buenos puntos. Parece que otra diferencia es que si la tarea fue rechazada de la cola y fue ejecutada por el hilo de la persona que llama, entonces el hilo de la persona que llama comenzaría a procesar una solicitud fuera de servicio ya que no esperó su turno en la cola. Ciertamente, si ya ha optado por utilizar subprocesos, debe manejar las dependencias correctamente, pero es solo algo a tener en cuenta.
rimsky

Respuestas:

64

Yo he hecho lo mismo. El truco consiste en crear un BlockingQueue donde el método offer () es realmente un put (). (puede usar cualquier implícita BlockingQueue base que desee).

public class LimitedQueue<E> extends LinkedBlockingQueue<E> 
{
    public LimitedQueue(int maxSize)
    {
        super(maxSize);
    }

    @Override
    public boolean offer(E e)
    {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
            put(e);
            return true;
        } catch(InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

}

Tenga en cuenta que esto solo funciona para el grupo de subprocesos, corePoolSize==maxPoolSizeasí que tenga cuidado (ver comentarios).

jtahlborn
fuente
2
alternativamente, puede extender SynchronousQueue para evitar el almacenamiento en búfer, permitiendo solo transferencias directas.
Brendon
Elegante y aborda directamente el problema. oferta () se convierte en put (), y put () significa "... esperando si es necesario que haya espacio disponible"
Trenton
5
No creo que sea una buena idea porque cambia el protocolo del método de oferta. El método de oferta debe ser una llamada sin bloqueo.
Mingjiang Shi
6
No estoy de acuerdo: esto cambia el comportamiento de ThreadPoolExecutor.execute de modo que si tiene un corePoolSize <maxPoolSize, la lógica de ThreadPoolExecutor nunca agregará trabajadores adicionales más allá del núcleo.
Krease
5
Para aclarar, su solución funciona solo mientras mantenga la restricción donde corePoolSize==maxPoolSize. Sin eso, ya no permite que ThreadPoolExecutor tenga el comportamiento diseñado. Estaba buscando una solución a este problema que tomó que no tuviera esa restricción; vea mi respuesta alternativa a continuación para el enfoque que terminamos adoptando.
Krease
15

Así es como resolví esto por mi parte:

(nota: esta solución bloquea el hilo que envía el Callable, por lo que evita que se lance la excepción RejectedExecutionException)

public class BoundedExecutor extends ThreadPoolExecutor{

    private final Semaphore semaphore;

    public BoundedExecutor(int bound) {
        super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        semaphore = new Semaphore(bound);
    }

    /**Submits task to execution pool, but blocks while number of running threads 
     * has reached the bound limit
     */
    public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{

        semaphore.acquire();            
        return submit(task);                    
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);

        semaphore.release();
    }
}
cvacca
fuente
1
Supongo que esto no funciona bien para los casos en los que corePoolSize < maxPoolSize...: |
rogerdpack
1
Funciona para el caso donde corePoolSize < maxPoolSize. En esos casos, el semáforo estará disponible, pero no habrá un hilo y SynchronousQueuedevolverá falso. El ThreadPoolExecutorentonces esperará un nuevo hilo. El problema de esta solución es que tiene una condición de carrera . Después semaphore.release(), pero antes de que termine el hilo execute, submit () obtendrá el permiso de semáforo. SI super.submit () se ejecuta antes de que execute()finalice, el trabajo será rechazado.
Luís Guilherme
@ LuísGuilherme Pero semaphore.release () nunca se llamará antes de que el hilo finalice la ejecución. Porque esta llamada se realiza en el método after Execute (...). ¿Me falta algo en el escenario que estás describiendo?
cvacca
1
afterExecute es llamado por el mismo hilo que ejecuta la tarea, por lo que aún no ha terminado. Haz la prueba tú mismo. Implemente esa solución y arroje grandes cantidades de trabajo al ejecutor, arrojándolo si el trabajo es rechazado. Notarás que sí, esto tiene una condición de carrera y no es difícil reproducirlo.
Luís Guilherme
1
Vaya a ThreadPoolExecutor y verifique el método runWorker (Worker w). Verá que suceden cosas después de que finalice afterExecute, incluido el desbloqueo del trabajador y el aumento del número de tareas completadas. Entonces, permitió que las tareas ingresaran (liberando el semáforo) sin tener ancho de banda para procesarlas (llamando a processWorkerSalir).
Luís Guilherme
14

La respuesta actualmente aceptada tiene un problema potencialmente significativo: cambia el comportamiento de ThreadPoolExecutor.execute de modo que, si tiene un corePoolSize < maxPoolSize, la lógica de ThreadPoolExecutor nunca agregará trabajadores adicionales más allá del núcleo.

Desde ThreadPoolExecutor .execute (Ejecutable):

    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);

Específicamente, ese último bloque 'else' nunca será alcanzado.

Una mejor alternativa es hacer algo similar a lo que OP ya está haciendo: use un RejectedExecutionHandler para hacer la misma putlógica:

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    try {
        if (!executor.isShutdown()) {
            executor.getQueue().put(r);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e);
    }
}

Hay algunas cosas a tener en cuenta con este enfoque, como se señala en los comentarios (en referencia a esta respuesta ):

  1. Si corePoolSize==0, entonces hay una condición de carrera en la que todos los subprocesos del grupo pueden morir antes de que la tarea sea visible
  2. El uso de una implementación que envuelve las tareas de la cola (no se aplica a ThreadPoolExecutor) dará lugar a problemas a menos que el controlador también las envuelva de la misma manera.

Teniendo en cuenta esos errores, esta solución funcionará para la mayoría de los ThreadPoolExecutors típicos y manejará adecuadamente el caso en el que corePoolSize < maxPoolSize.

Krease
fuente
A quien votó en contra, ¿puede darnos alguna información? ¿Hay algo incorrecto / engañoso / peligroso en esta respuesta? Me gustaría tener la oportunidad de abordar sus inquietudes.
Krease
2
No voté en contra, pero parece ser una muy mala idea
vanOekel
@vanOekel - gracias por el enlace - esa respuesta plantea algunos casos válidos que deberían conocerse si se usa este enfoque, pero IMO no lo convierte en una "muy mala idea" - aún resuelve un problema presente en la respuesta actualmente aceptada. Actualicé mi respuesta con esas advertencias.
Krease
Si el tamaño del grupo principal es 0, y si la tarea se envía al ejecutor, el ejecutor comenzará a crear subprocesos si la cola está llena para manejar la tarea. Entonces, ¿por qué es propenso al estancamiento? No entendí tu punto. ¿Podría darnos más detalles?
Shirgill Farhan
@ShirgillFarhanAnsari: es el caso planteado en el comentario anterior. Puede suceder porque agregar directamente a la cola no desencadena la creación de subprocesos / inicio de trabajadores. Es un caso extremo / condición de carrera que se puede mitigar al tener un tamaño de grupo de núcleo distinto de cero
Krease
4

Sé que esta es una pregunta anterior, pero tenía un problema similar que la creación de nuevas tareas era muy rápida y si había demasiados, se producía un OutOfMemoryError porque la tarea existente no se completaba lo suficientemente rápido.

En mi caso, Callablesse envían y necesito el resultado, por lo tanto, necesito almacenar todo lo Futuresdevuelto executor.submit(). Mi solución fue poner el Futuresen un BlockingQueuetamaño máximo. Una vez que la cola está llena, no se generan más tareas hasta que se completan algunas (elementos eliminados de la cola). En pseudocódigo:

final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads);
final LinkedBlockingQueue<Future> futures = new LinkedBlockingQueue<>(maxQueueSize);
try {   
    Thread taskGenerator = new Thread() {
        @Override
        public void run() {
            while (reader.hasNext) {
                Callable task = generateTask(reader.next());
                Future future = executor.submit(task);
                try {
                    // if queue is full blocks until a task
                    // is completed and hence no future tasks are submitted.
                    futures.put(compoundFuture);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();         
                }
            }
        executor.shutdown();
        }
    }
    taskGenerator.start();

    // read from queue as long as task are being generated
    // or while Queue has elements in it
    while (taskGenerator.isAlive()
                    || !futures.isEmpty()) {
        Future compoundFuture = futures.take();
        // do something
    }
} catch (InterruptedException ex) {
    Thread.currentThread().interrupt();     
} catch (ExecutionException ex) {
    throw new MyException(ex);
} finally {
    executor.shutdownNow();
}
principiante_
fuente
2

Tuve un problema similar y lo implementé usando beforeExecute/afterExecuteganchos de ThreadPoolExecutor:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Blocks current task execution if there is not enough resources for it.
 * Maximum task count usage controlled by maxTaskCount property.
 */
public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {

    private final ReentrantLock taskLock = new ReentrantLock();
    private final Condition unpaused = taskLock.newCondition();
    private final int maxTaskCount;

    private volatile int currentTaskCount;

    public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, int maxTaskCount) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.maxTaskCount = maxTaskCount;
    }

    /**
     * Executes task if there is enough system resources for it. Otherwise
     * waits.
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        taskLock.lock();
        try {
            // Spin while we will not have enough capacity for this job
            while (maxTaskCount < currentTaskCount) {
                try {
                    unpaused.await();
                } catch (InterruptedException e) {
                    t.interrupt();
                }
            }
            currentTaskCount++;
        } finally {
            taskLock.unlock();
        }
    }

    /**
     * Signalling that one more task is welcome
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        taskLock.lock();
        try {
            currentTaskCount--;
            unpaused.signalAll();
        } finally {
            taskLock.unlock();
        }
    }
}

Esto debería ser suficiente para ti. Por cierto, la implementación original se basó en el tamaño de la tarea porque una tarea podía ser más grande 100 veces que otra y enviar dos tareas enormes estaba matando la caja, pero ejecutar una grande y muchas pequeñas estaba bien. Si sus tareas intensivas de E / S son aproximadamente del mismo tamaño, puede usar esta clase; de ​​lo contrario, hágamelo saber y publicaré la implementación basada en el tamaño.

PD: querrás comprobar ThreadPoolExecutorjavadoc. Es una guía de usuario realmente agradable de Doug Lea sobre cómo se puede personalizar fácilmente.

Petro Semeniuk
fuente
1
Me pregunto qué sucederá cuando un hilo mantenga el bloqueo antes de Ejecutar () y lo vea maxTaskCount < currentTaskCounty comience a esperar en la unpausedcondición. Al mismo tiempo, otro hilo intenta adquirir el bloqueo en afterExecute () para señalar la finalización de una tarea. ¿No será un punto muerto?
Tahir Akhtar
1
También noté que esta solución no bloqueará el hilo que envía las tareas cuando la cola se llena. Entonces RejectedExecutionExceptiontodavía es posible.
Tahir Akhtar
1
La semántica de las clases ReentrantLock / Condition es similar a lo que ofrece sincronizado y esperar / notificar. Cuando se llaman a los métodos de condición de espera, el bloqueo se libera, por lo que no habrá interbloqueo.
Petro Semeniuk
Correcto, este ExecutorService bloquea las tareas en el envío sin bloquear el hilo de la persona que llama. El trabajo se acaba de enviar y se procesará de forma asincrónica cuando haya suficientes recursos del sistema para ello.
Petro Semeniuk
2

Implementé una solución siguiendo el patrón del decorador y usando un semáforo para controlar el número de tareas ejecutadas. Puedes usarlo con cualquiera Executory:

  • Especificar el máximo de tareas en curso
  • Especifique el tiempo de espera máximo para esperar un permiso de ejecución de la tarea (si el tiempo de espera pasa y no se obtiene ningún permiso, RejectedExecutionExceptionse lanza un)
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;

import javax.annotation.Nonnull;

public class BlockingOnFullQueueExecutorDecorator implements Executor {

    private static final class PermitReleasingDecorator implements Runnable {

        @Nonnull
        private final Runnable delegate;

        @Nonnull
        private final Semaphore semaphore;

        private PermitReleasingDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) {
            this.delegate = task;
            this.semaphore = semaphoreToRelease;
        }

        @Override
        public void run() {
            try {
                this.delegate.run();
            }
            finally {
                // however execution goes, release permit for next task
                this.semaphore.release();
            }
        }

        @Override
        public final String toString() {
            return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate);
        }
    }

    @Nonnull
    private final Semaphore taskLimit;

    @Nonnull
    private final Duration timeout;

    @Nonnull
    private final Executor delegate;

    public BlockingOnFullQueueExecutorDecorator(@Nonnull final Executor executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) {
        this.delegate = Objects.requireNonNull(executor, "'executor' must not be null");
        if (maximumTaskNumber < 1) {
            throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber));
        }
        this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null");
        if (this.timeout.isNegative()) {
            throw new IllegalArgumentException("'maximumTimeout' must not be negative");
        }
        this.taskLimit = new Semaphore(maximumTaskNumber);
    }

    @Override
    public final void execute(final Runnable command) {
        Objects.requireNonNull(command, "'command' must not be null");
        try {
            // attempt to acquire permit for task execution
            if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) {
                throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate));
            }
        }
        catch (final InterruptedException e) {
            // restore interrupt status
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }

        this.delegate.execute(new PermitReleasingDecorator(command, this.taskLimit));
    }

    @Override
    public final String toString() {
        return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(),
                this.timeout, this.delegate);
    }
}
Grzegorz Lehmann
fuente
1

Creo que es tan simple como usar a en ArrayBlockingQueuelugar de aa LinkedBlockingQueue.

Ignórame ... eso está totalmente mal. ThreadPoolExecutorllamadas Queue#offerno putque tendrían el efecto que necesita.

Puede ampliar ThreadPoolExecutory proporcionar una implementación de execute(Runnable)esas llamadas puten lugar de offer.

Me temo que no parece una respuesta completamente satisfactoria.

Gareth Davis
fuente