Estoy tratando de codificar una solución en la que un solo subproceso produce tareas intensivas de E / S que se pueden realizar en paralelo. Cada tarea tiene datos importantes en memoria. Por eso quiero poder limitar la cantidad de tareas pendientes en un momento.
Si creo ThreadPoolExecutor así:
ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
new LinkedBlockingQueue<Runnable>(maxQueue));
Luego, los executor.submit(callable)
tiros RejectedExecutionException
cuando la cola se llena y todos los hilos ya están ocupados.
¿Qué puedo hacer para executor.submit(callable)
bloquear cuando la cola está llena y todos los subprocesos están ocupados?
EDITAR : Intenté esto :
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
Y de alguna manera logra el efecto que quiero lograr, pero de una manera poco elegante (básicamente, los subprocesos rechazados se ejecutan en el subproceso de llamada, por lo que esto bloquea el subproceso de llamada para que no envíe más).
EDITAR: (5 años después de hacer la pregunta)
Para cualquiera que lea esta pregunta y sus respuestas, no tome la respuesta aceptada como una solución correcta. Lea todas las respuestas y comentarios.
cuando el hilo de llamada también está ejecutando una tarea. Pero, los problemas más importantes es que si el subproceso de la persona que llama obtiene una tarea de larga duración, los otros subprocesos pueden permanecer inactivos esperando la siguiente tarea.Respuestas:
Yo he hecho lo mismo. El truco consiste en crear un BlockingQueue donde el método offer () es realmente un put (). (puede usar cualquier implícita BlockingQueue base que desee).
public class LimitedQueue<E> extends LinkedBlockingQueue<E> { public LimitedQueue(int maxSize) { super(maxSize); } @Override public boolean offer(E e) { // turn offer() and add() into a blocking calls (unless interrupted) try { put(e); return true; } catch(InterruptedException ie) { Thread.currentThread().interrupt(); } return false; } }
Tenga en cuenta que esto solo funciona para el grupo de subprocesos,
así que tenga cuidado (ver comentarios).fuente
. Sin eso, ya no permite que ThreadPoolExecutor tenga el comportamiento diseñado. Estaba buscando una solución a este problema que tomó que no tuviera esa restricción; vea mi respuesta alternativa a continuación para el enfoque que terminamos adoptando.Así es como resolví esto por mi parte:
(nota: esta solución bloquea el hilo que envía el Callable, por lo que evita que se lance la excepción RejectedExecutionException)
public class BoundedExecutor extends ThreadPoolExecutor{ private final Semaphore semaphore; public BoundedExecutor(int bound) { super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); semaphore = new Semaphore(bound); } /**Submits task to execution pool, but blocks while number of running threads * has reached the bound limit */ public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{ semaphore.acquire(); return submit(task); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); semaphore.release(); } }
corePoolSize < maxPoolSize
...: |corePoolSize < maxPoolSize
. En esos casos, el semáforo estará disponible, pero no habrá un hilo ySynchronousQueue
devolverá falso. ElThreadPoolExecutor
entonces esperará un nuevo hilo. El problema de esta solución es que tiene una condición de carrera . Despuéssemaphore.release()
, pero antes de que termine el hiloexecute
, submit () obtendrá el permiso de semáforo. SI super.submit () se ejecuta antes de queexecute()
finalice, el trabajo será rechazado.La respuesta actualmente aceptada tiene un problema potencialmente significativo: cambia el comportamiento de ThreadPoolExecutor.execute de modo que, si tiene un
corePoolSize < maxPoolSize
, la lógica de ThreadPoolExecutor nunca agregará trabajadores adicionales más allá del núcleo.Desde ThreadPoolExecutor .execute (Ejecutable):
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);
Específicamente, ese último bloque 'else' nunca será alcanzado.
Una mejor alternativa es hacer algo similar a lo que OP ya está haciendo: use un RejectedExecutionHandler para hacer la misma
lógica:public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { if (!executor.isShutdown()) { executor.getQueue().put(r); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e); } }
Hay algunas cosas a tener en cuenta con este enfoque, como se señala en los comentarios (en referencia a esta respuesta ):
, entonces hay una condición de carrera en la que todos los subprocesos del grupo pueden morir antes de que la tarea sea visibleThreadPoolExecutor
) dará lugar a problemas a menos que el controlador también las envuelva de la misma manera.Teniendo en cuenta esos errores, esta solución funcionará para la mayoría de los ThreadPoolExecutors típicos y manejará adecuadamente el caso en el que
corePoolSize < maxPoolSize
Sé que esta es una pregunta anterior, pero tenía un problema similar que la creación de nuevas tareas era muy rápida y si había demasiados, se producía un OutOfMemoryError porque la tarea existente no se completaba lo suficientemente rápido.
En mi caso,
se envían y necesito el resultado, por lo tanto, necesito almacenar todo loFutures
. Mi solución fue poner elFutures
en unBlockingQueue
tamaño máximo. Una vez que la cola está llena, no se generan más tareas hasta que se completan algunas (elementos eliminados de la cola). En pseudocódigo:final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads); final LinkedBlockingQueue<Future> futures = new LinkedBlockingQueue<>(maxQueueSize); try { Thread taskGenerator = new Thread() { @Override public void run() { while (reader.hasNext) { Callable task = generateTask(; Future future = executor.submit(task); try { // if queue is full blocks until a task // is completed and hence no future tasks are submitted. futures.put(compoundFuture); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } executor.shutdown(); } } taskGenerator.start(); // read from queue as long as task are being generated // or while Queue has elements in it while (taskGenerator.isAlive() || !futures.isEmpty()) { Future compoundFuture = futures.take(); // do something } } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } catch (ExecutionException ex) { throw new MyException(ex); } finally { executor.shutdownNow(); }
Tuve un problema similar y lo implementé usando
ganchos deThreadPoolExecutor
:import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * Blocks current task execution if there is not enough resources for it. * Maximum task count usage controlled by maxTaskCount property. */ public class BlockingThreadPoolExecutor extends ThreadPoolExecutor { private final ReentrantLock taskLock = new ReentrantLock(); private final Condition unpaused = taskLock.newCondition(); private final int maxTaskCount; private volatile int currentTaskCount; public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, int maxTaskCount) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); this.maxTaskCount = maxTaskCount; } /** * Executes task if there is enough system resources for it. Otherwise * waits. */ @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); taskLock.lock(); try { // Spin while we will not have enough capacity for this job while (maxTaskCount < currentTaskCount) { try { unpaused.await(); } catch (InterruptedException e) { t.interrupt(); } } currentTaskCount++; } finally { taskLock.unlock(); } } /** * Signalling that one more task is welcome */ @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); taskLock.lock(); try { currentTaskCount--; unpaused.signalAll(); } finally { taskLock.unlock(); } } }
Esto debería ser suficiente para ti. Por cierto, la implementación original se basó en el tamaño de la tarea porque una tarea podía ser más grande 100 veces que otra y enviar dos tareas enormes estaba matando la caja, pero ejecutar una grande y muchas pequeñas estaba bien. Si sus tareas intensivas de E / S son aproximadamente del mismo tamaño, puede usar esta clase; de lo contrario, hágamelo saber y publicaré la implementación basada en el tamaño.
PD: querrás comprobar
javadoc. Es una guía de usuario realmente agradable de Doug Lea sobre cómo se puede personalizar fácilmente.fuente
maxTaskCount < currentTaskCount
y comience a esperar en launpaused
condición. Al mismo tiempo, otro hilo intenta adquirir el bloqueo en afterExecute () para señalar la finalización de una tarea. ¿No será un punto muerto?RejectedExecutionException
todavía es posible.Implementé una solución siguiendo el patrón del decorador y usando un semáforo para controlar el número de tareas ejecutadas. Puedes usarlo con cualquiera
se lanza un)import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.time.Duration; import java.util.Objects; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import javax.annotation.Nonnull; public class BlockingOnFullQueueExecutorDecorator implements Executor { private static final class PermitReleasingDecorator implements Runnable { @Nonnull private final Runnable delegate; @Nonnull private final Semaphore semaphore; private PermitReleasingDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) { this.delegate = task; this.semaphore = semaphoreToRelease; } @Override public void run() { try {; } finally { // however execution goes, release permit for next task this.semaphore.release(); } } @Override public final String toString() { return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate); } } @Nonnull private final Semaphore taskLimit; @Nonnull private final Duration timeout; @Nonnull private final Executor delegate; public BlockingOnFullQueueExecutorDecorator(@Nonnull final Executor executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) { this.delegate = Objects.requireNonNull(executor, "'executor' must not be null"); if (maximumTaskNumber < 1) { throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber)); } this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null"); if (this.timeout.isNegative()) { throw new IllegalArgumentException("'maximumTimeout' must not be negative"); } this.taskLimit = new Semaphore(maximumTaskNumber); } @Override public final void execute(final Runnable command) { Objects.requireNonNull(command, "'command' must not be null"); try { // attempt to acquire permit for task execution if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) { throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate)); } } catch (final InterruptedException e) { // restore interrupt status Thread.currentThread().interrupt(); throw new IllegalStateException(e); } this.delegate.execute(new PermitReleasingDecorator(command, this.taskLimit)); } @Override public final String toString() { return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(), this.timeout, this.delegate); } }
Creo que es tan simple como usar a en
lugar de aaLinkedBlockingQueue
.Ignórame ... eso está totalmente mal.
que tendrían el efecto que necesita.Puede ampliar
y proporcionar una implementación deexecute(Runnable)
esas llamadasput
en lugar deoffer
.Me temo que no parece una respuesta completamente satisfactoria.