¿Cómo hacer el bloque de método submit () de ThreadPoolExecutor si está saturado?

102

Quiero crear un método ThreadPoolExecutortal que cuando haya alcanzado su tamaño máximo y la cola esté llena, el submit()método se bloquee al intentar agregar nuevas tareas. ¿Necesito implementar una costumbre RejectedExecutionHandlerpara eso o existe una forma existente de hacerlo utilizando una biblioteca estándar de Java?

Punto fijo
fuente
2
¿Lo que desea es algo parecido al método offer () de la cola de bloqueo de matriz?
Extraneon
2
@bacar no estoy de acuerdo. Esta sesión de preguntas y respuestas parece más valiosa (además de ser más antigua).
JasonMArcher

Respuestas:

47

Una de las posibles soluciones que acabo de encontrar:

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }
}

¿Existen otras soluciones? Preferiría algo basado en, RejectedExecutionHandlerya que parece una forma estándar de manejar tales situaciones.

Punto fijo
fuente
2
¿Existe una condición de carrera aquí entre el punto en que se libera el semáforo en la cláusula finalmente y se adquiere el semáforo?
Volni
2
Como se mencionó anteriormente, esta implementación tiene fallas porque el semáforo se libera antes de que se complete la tarea. Sería mejor usar el método java.util.concurrent.ThreadPoolExecutor # afterExecute (Runnable, Throwable)
FelixM
2
@FelixM: usar java.util.concurrent.ThreadPoolExecutor # afterExecute (Runnable, Throwable) no resolverá el problema porque afterExecute se llama inmediatamente después de task.run () en java.util.concurrent.ThreadPoolExecutor # runWorker (Worker w), antes tomando el siguiente elemento de la cola (mirando el código fuente de openjdk 1.7.0.6).
Jaan
1
Esta respuesta es del libro Java Concurrency in Practice de Brian Goetz
orangepips
11
Esta respuesta no es del todo correcta, también lo son los comentarios. Este código proviene de Java Concurrency in Practice, y es correcto si se tiene en cuenta su contexto . El libro dice claramente, literalmente: "En tal enfoque, use una cola ilimitada (...) y establezca el límite en el semáforo para que sea igual al tamaño del grupo más el número de tareas en cola que desea permitir". Con una cola ilimitada, las tareas nunca serán rechazadas, por lo que volver a lanzar la excepción es completamente inútil. Creo que es también la razón por la throw e;que NO está en el libro. ¡JCIP es correcto!
Timmos
30

Puede usar ThreadPoolExecutor y un blockQueue:

public class ImageManager {
    BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize);
    RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
    private ExecutorService executorService =  new ThreadPoolExecutor(numOfThread, numOfThread, 
        0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);

    private int downloadThumbnail(String fileListPath){
        executorService.submit(new yourRunnable());
    }
}
DiTime4Tea
fuente
¡Solo me gustaría decir que esta fue una solución increíblemente rápida y fácil de implementar que funcionó muy bien!
Ivan
58
Esto ejecuta tareas rechazadas en el hilo de envío. Que funcionalmente no cumple con los requisitos del OP.
Percepción
4
Esto ejecuta la tarea "en el subproceso que realiza la llamada" en lugar de bloquear para ponerla en la cola, lo que puede tener algunos efectos adversos, como si varios subprocesos lo llaman de esta manera, se ejecutarán más trabajos del "tamaño de la cola" y si si la tarea tarda más de lo esperado, es posible que su hilo de "producción" no mantenga ocupado al ejecutor. ¡Pero funcionó muy bien aquí!
rogerdpack
4
Voto negativo: esto no bloquea cuando el TPE está saturado. Esta es solo una alternativa, no una solución.
Timmos
1
Voto a favor: esto se ajusta bastante al 'diseño de TPE' y 'bloquea naturalmente' los hilos del cliente al darles gustos de desbordamiento para hacer. Esto debería cubrir la mayoría de los casos de uso, pero no todos, por supuesto, y debería comprender lo que está sucediendo bajo el capó.
Mike
12

Debe usar CallerRunsPolicy, que ejecuta la tarea rechazada en el hilo de llamada. De esta manera, no puede enviar ninguna tarea nueva al ejecutor hasta que esa tarea esté terminada, momento en el que habrá algunos subprocesos de grupo libres o el proceso se repetirá.

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

De los documentos:

Tareas rechazadas

Las nuevas tareas enviadas en el método de ejecución (java.lang.Runnable) serán rechazadas cuando el Ejecutor se haya apagado, y también cuando el Ejecutor use límites finitos tanto para los subprocesos máximos como para la capacidad de la cola de trabajo, y esté saturado. En cualquier caso, el método de ejecución invoca el método RejectedExecutionHandler.rejectedExecution (java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) de su RejectedExecutionHandler. Se proporcionan cuatro políticas de controlador predefinidas:

  1. En el ThreadPoolExecutor.AbortPolicy predeterminado, el controlador lanza una RejectedExecutionException en tiempo de ejecución al ser rechazada.
  2. En ThreadPoolExecutor.CallerRunsPolicy, el hilo que invoca a ejecutarse ejecuta la tarea. Esto proporciona un mecanismo de control de retroalimentación simple que ralentizará la tasa de envío de nuevas tareas.
  3. En ThreadPoolExecutor.DiscardPolicy, una tarea que no se puede ejecutar simplemente se descarta.
  4. En ThreadPoolExecutor.DiscardOldestPolicy, si el ejecutor no se apaga, la tarea en la cabeza de la cola de trabajo se descarta y luego se reintenta la ejecución (lo que puede fallar nuevamente y hacer que esto se repita).

Además, asegúrese de usar una cola limitada, como ArrayBlockingQueue, cuando llame al ThreadPoolExecutorconstructor. De lo contrario, no se rechazará nada.

Editar: en respuesta a su comentario, establezca el tamaño de ArrayBlockingQueue para que sea igual al tamaño máximo del grupo de subprocesos y use AbortPolicy.

Edición 2: Ok, veo a dónde quieres llegar. ¿Qué pasa con esto: anule el beforeExecute()método para verificar que getActiveCount()no exceda getMaximumPoolSize(), y si lo hace, duerma y vuelva a intentarlo?

danben
fuente
3
Quiero que la cantidad de tareas ejecutadas simultáneamente estén estrictamente limitadas (por la cantidad de subprocesos en el Ejecutor), es por eso que no puedo permitir que los subprocesos de las personas que llaman ejecuten estas tareas por sí mismos.
Fixpoint
1
AbortPolicy haría que el hilo de la persona que llama reciba una excepción RejectedExecutionException, mientras que solo necesito bloquearla.
Fixpoint
2
Estoy pidiendo bloqueo, no suspensión y sondeo;)
Fixpoint
@danben: ¿No te refieres a CallerRunsPolicy ?
user359996
7
El problema con CallerRunPolicy es que si tiene un único productor de subprocesos, a menudo habrá subprocesos que no se utilizarán si una tarea de ejecución prolongada resulta rechazada (porque las otras tareas en el grupo de subprocesos se terminarán mientras la tarea de ejecución prolongada aún está en funcionamiento). corriendo).
Adam Gent
6

Hibernate tiene una función BlockPolicyque es simple y puede hacer lo que quieras:

Ver: Executors.java

/**
 * A handler for rejected tasks that will have the caller block until
 * space is available.
 */
public static class BlockPolicy implements RejectedExecutionHandler {

    /**
     * Creates a <tt>BlockPolicy</tt>.
     */
    public BlockPolicy() { }

    /**
     * Puts the Runnable to the blocking queue, effectively blocking
     * the delegating thread until space is available.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        try {
            e.getQueue().put( r );
        }
        catch (InterruptedException e1) {
            log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r );
        }
    }
}
Nate Murray
fuente
4
Pensándolo bien, esta es una muy mala idea. No te recomiendo que lo uses. Por buenas razones, consulte aquí: stackoverflow.com/questions/3446011/…
Nate Murray
Además, esto no está utilizando la "biblioteca estándar de Java", según la solicitud del OP. ¿Eliminar?
user359996
1
Woah, eso es tan feo. Básicamente, esta solución interfiere con los componentes internos de TPE. El javadoc para ThreadPoolExecutorincluso dice literalmente: "El método getQueue () permite el acceso a la cola de trabajos con el propósito de monitorear y depurar. Se desaconseja enfáticamente el uso de este método para cualquier otro propósito". Que esto esté disponible en una biblioteca que es tan ampliamente conocida, es absolutamente triste de ver.
Timmos
1
com.amazonaws.services.simpleworkflow.flow.worker.BlockCallerPolicy es similar.
Adrian Baker
6

La BoundedExecutorrespuesta citada anteriormente de Java Concurrency in Practice solo funciona correctamente si usa una cola ilimitada para el Ejecutor, o si el límite del semáforo no es mayor que el tamaño de la cola. El estado del semáforo se comparte entre el subproceso que envía y los subprocesos en el grupo, lo que hace posible saturar el ejecutor incluso si el tamaño de la cola <límite <= (tamaño de la cola + tamaño del grupo).

El uso CallerRunsPolicysolo es válido si sus tareas no se ejecutan para siempre, en cuyo caso su hilo de envío permanecerá para rejectedExecutionsiempre, y una mala idea si sus tareas tardan mucho en ejecutarse, porque el hilo de envío no puede enviar ninguna tarea nueva o hacer cualquier otra cosa si está ejecutando una tarea.

Si eso no es aceptable, sugiero verificar el tamaño de la cola limitada del ejecutor antes de enviar una tarea. Si la cola está llena, espere un poco antes de intentar enviar de nuevo. El rendimiento se verá afectado, pero sugiero que es una solución más simple que muchas de las otras soluciones propuestas y tiene la garantía de que ninguna tarea será rechazada.

stephan f
fuente
No estoy seguro de cómo comprobar la longitud de la cola antes de enviar garantiza que no haya tareas rechazadas en un entorno de subprocesos múltiples con varios productores de tareas. Eso no suena seguro para subprocesos.
Tim
5

Lo sé, es un truco, pero en mi opinión el truco más limpio entre los que se ofrecen aquí ;-)

Debido a que ThreadPoolExecutor usa la cola de bloqueo "oferta" en lugar de "poner", anulemos el comportamiento de la "oferta" de la cola de bloqueo:

class BlockingQueueHack<T> extends ArrayBlockingQueue<T> {

    BlockingQueueHack(int size) {
        super(size);
    }

    public boolean offer(T task) {
        try {
            this.put(task);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return true;
    }
}

ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new BlockingQueueHack(5));

Lo probé y parece funcionar. La implementación de alguna política de tiempo de espera se deja como ejercicio del lector.

Jakub
fuente
Consulte stackoverflow.com/a/4522411/2601671 para obtener una versión limpia de esto. Estoy de acuerdo, es la forma más limpia de hacerlo.
Trenton
3

La siguiente clase envuelve un ThreadPoolExecutor y usa un semáforo para bloquear, luego la cola de trabajo está llena:

public final class BlockingExecutor { 

    private final Executor executor;
    private final Semaphore semaphore;

    public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) {
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
        this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory);
        this.semaphore = new Semaphore(queueSize + maxPoolSize);
    }

    private void execImpl (final Runnable command) throws InterruptedException {
        semaphore.acquire();
        try {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            // will never be thrown with an unbounded buffer (LinkedBlockingQueue)
            semaphore.release();
            throw e;
        }
    }

    public void execute (Runnable command) throws InterruptedException {
        execImpl(command);
    }
}

Esta clase contenedora se basa en una solución dada en el libro Java Concurrency in Practice de Brian Goetz. La solución en el libro solo toma dos parámetros de constructor: un Executory un límite usado para el semáforo. Esto se muestra en la respuesta dada por Fixpoint. Hay un problema con ese enfoque: puede llegar a un estado en el que los subprocesos del grupo están ocupados, la cola está llena, pero el semáforo acaba de liberar un permiso. ( semaphore.release()en el bloque finalmente). En este estado, una nueva tarea puede obtener el permiso recién publicado, pero se rechaza porque la cola de tareas está llena. Por supuesto, esto no es algo que desee; desea bloquear en este caso.

Para solucionar esto, debemos utilizar una cola ilimitada , como claramente menciona JCiP. El semáforo actúa como un protector, dando el efecto de un tamaño de cola virtual. Esto tiene el efecto secundario de que es posible que la unidad pueda contener maxPoolSize + virtualQueueSize + maxPoolSizetareas. ¿Porqué es eso? Debido al semaphore.release()bloque en el finalmente. Si todos los subprocesos del grupo llaman a esta declaración al mismo tiempo, maxPoolSizese liberan los permisos, lo que permite que ingresen a la unidad el mismo número de tareas. Si estuviéramos usando una cola limitada, aún estaría llena, lo que resultaría en una tarea rechazada. Ahora, como sabemos que esto solo ocurre cuando un subproceso de grupo está casi terminado, esto no es un problema. Sabemos que el subproceso del grupo no se bloqueará, por lo que pronto se tomará una tarea de la cola.

Sin embargo, puedes usar una cola limitada. Solo asegúrate de que su tamaño sea igual virtualQueueSize + maxPoolSize. Los tamaños más grandes son inútiles, el semáforo evitará que entren más elementos. Los tamaños más pequeños darán como resultado tareas rechazadas. La posibilidad de que las tareas sean rechazadas aumenta a medida que disminuye el tamaño. Por ejemplo, digamos que desea un ejecutor acotado con maxPoolSize = 2 y virtualQueueSize = 5. Luego, tome un semáforo con 5 + 2 = 7 permisos y un tamaño de cola real de 5 + 2 = 7. El número real de tareas que pueden estar en la unidad es entonces 2 + 5 + 2 = 9. Cuando el ejecutor está lleno (5 tareas en cola, 2 en el grupo de subprocesos, por lo que hay 0 permisos disponibles) y TODOS los subprocesos del grupo liberan sus permisos, las tareas que ingresan pueden tomar exactamente 2 permisos.

Ahora, la solución de JCiP es algo engorrosa de usar ya que no hace cumplir todas estas restricciones (cola ilimitada o limitada con esas restricciones matemáticas, etc.). Creo que esto solo sirve como un buen ejemplo para demostrar cómo se pueden construir nuevas clases seguras para subprocesos basadas en las partes que ya están disponibles, pero no como una clase completamente desarrollada y reutilizable. No creo que esto último fuera la intención del autor.

Timmos
fuente
2

puede usar un RejectedExecutionHandler personalizado como este

ThreadPoolExecutor tp= new ThreadPoolExecutor(core_size, // core size
                max_handlers, // max size 
                timeout_in_seconds, // idle timeout 
                TimeUnit.SECONDS, queue, new RejectedExecutionHandler() {
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        // This will block if the queue is full
                        try {
                            executor.getQueue().put(r);
                        } catch (InterruptedException e) {
                            System.err.println(e.getMessage());
                        }

                    }
                });
Amir David Nissan Cohen
fuente
1
Los documentos de getQueue () mencionan explícitamente que el acceso a la cola de tareas está destinado principalmente a la depuración y la supervisión.
Chadi
0

Cree su propia cola de bloqueo para ser utilizada por el Ejecutor, con el comportamiento de bloqueo que está buscando, mientras siempre devuelve la capacidad restante disponible (asegurándose de que el ejecutor no intentará crear más subprocesos que su grupo principal, o activará el controlador de rechazo).

Creo que esto le dará el comportamiento de bloqueo que está buscando. Un manejador de rechazos nunca cumplirá con los requisitos, ya que eso indica que el ejecutor no puede realizar la tarea. Lo que podría imaginar es que obtienes alguna forma de 'espera ocupada' en el controlador. Eso no es lo que quieres, quieres una cola para el ejecutor que bloquea al llamador ...

Hoeben frito
fuente
2
ThreadPoolExecutorusa el offermétodo para agregar tareas a la cola. Si creara una costumbre BlockingQueueque se bloquea offer, eso rompería BlockingQueueel contrato.
Fixpoint
@Shooshpanchick, eso rompería el contrato de BlockingQueues. ¿Y qué? si está tan interesado, puede habilitar explícitamente el comportamiento solo durante submit () (tomará un ThreadLocal)
bestsss
Vea también esta respuesta a otra pregunta que detalla esta alternativa.
Robert Tupelo-Schneck
¿Hay alguna razón por la que ThreadPoolExecutorse implementó para usar offery no put(la versión de bloqueo)? Además, si hubiera una forma de que el código del cliente dijera cuál usar y cuándo, muchas personas que intentan implementar soluciones personalizadas se habrían aliviado
asgs
0

Para evitar problemas con la solución @FixPoint. Se podría usar ListeningExecutorService y liberar el semáforo onSuccess y onFailure dentro de FutureCallback.

vamsu
fuente
Eso tiene los mismos problemas inherentes que simplemente envolver el, Runnableya que esos métodos todavía se llaman antes de la limpieza del trabajador en el estado normal ThreadPoolExecutor. Es decir, aún necesitará manejar las excepciones de rechazo.
Adam Gent
0

Recientemente encontré que esta pregunta tiene el mismo problema. El OP no lo dice explícitamente, pero no queremos usar el RejectedExecutionHandlerque ejecuta una tarea en el hilo del remitente, porque esto subutilizará los hilos de trabajo si esta tarea es de larga duración.

Al leer todas las respuestas y comentarios, en particular la solución defectuosa con el semáforo o usar, afterExecuteeché un vistazo más de cerca al código del ThreadPoolExecutor para ver si hay alguna salida. Me sorprendió ver que hay más de 2000 líneas de código (comentado), algunas de las cuales me marean . Dado el requisito bastante simple que realmente tengo --- un productor, varios consumidores, dejar que el productor bloquee cuando ningún consumidor pueda aceptar el trabajo --- decidí desarrollar mi propia solución. No es un ExecutorServicesino solo un Executor. Y no adapta el número de subprocesos a la carga de trabajo, sino que tiene solo un número fijo de subprocesos, lo que también se ajusta a mis requisitos. Aquí está el código. Siéntete libre de despotricar sobre ello :-)

package x;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;

/**
 * distributes {@code Runnable}s to a fixed number of threads. To keep the
 * code lean, this is not an {@code ExecutorService}. In particular there is
 * only very simple support to shut this executor down.
 */
public class ParallelExecutor implements Executor {
  // other bounded queues work as well and are useful to buffer peak loads
  private final BlockingQueue<Runnable> workQueue =
      new SynchronousQueue<Runnable>();
  private final Thread[] threads;

  /*+**********************************************************************/
  /**
   * creates the requested number of threads and starts them to wait for
   * incoming work
   */
  public ParallelExecutor(int numThreads) {
    this.threads = new Thread[numThreads];
    for(int i=0; i<numThreads; i++) {
      // could reuse the same Runner all over, but keep it simple
      Thread t = new Thread(new Runner());
      this.threads[i] = t;
      t.start();
    }
  }
  /*+**********************************************************************/
  /**
   * returns immediately without waiting for the task to be finished, but may
   * block if all worker threads are busy.
   * 
   * @throws RejectedExecutionException if we got interrupted while waiting
   *         for a free worker
   */
  @Override
  public void execute(Runnable task)  {
    try {
      workQueue.put(task);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new RejectedExecutionException("interrupt while waiting for a free "
          + "worker.", e);
    }
  }
  /*+**********************************************************************/
  /**
   * Interrupts all workers and joins them. Tasks susceptible to an interrupt
   * will preempt their work. Blocks until the last thread surrendered.
   */
  public void interruptAndJoinAll() throws InterruptedException {
    for(Thread t : threads) {
      t.interrupt();
    }
    for(Thread t : threads) {
      t.join();
    }
  }
  /*+**********************************************************************/
  private final class Runner implements Runnable {
    @Override
    public void run() {
      while (!Thread.currentThread().isInterrupted()) {
        Runnable task;
        try {
          task = workQueue.take();
        } catch (InterruptedException e) {
          // canonical handling despite exiting right away
          Thread.currentThread().interrupt(); 
          return;
        }
        try {
          task.run();
        } catch (RuntimeException e) {
          // production code to use a logging framework
          e.printStackTrace();
        }
      }
    }
  }
}
Harald
fuente
0

Creo que hay una forma bastante elegante de resolver este problema utilizando java.util.concurrent.Semaphorey delegando el comportamiento de Executor.newFixedThreadPool. El nuevo servicio ejecutor solo ejecutará una nueva tarea cuando haya un hilo para hacerlo. Semaphore gestiona el bloqueo con un número de permisos igual al número de subprocesos. Cuando termina una tarea, devuelve un permiso.

public class FixedThreadBlockingExecutorService extends AbstractExecutorService {

private final ExecutorService executor;
private final Semaphore blockExecution;

public FixedThreadBlockingExecutorService(int nTreads) {
    this.executor = Executors.newFixedThreadPool(nTreads);
    blockExecution = new Semaphore(nTreads);
}

@Override
public void shutdown() {
    executor.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
    return executor.shutdownNow();
}

@Override
public boolean isShutdown() {
    return executor.isShutdown();
}

@Override
public boolean isTerminated() {
    return executor.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
    return executor.awaitTermination(timeout, unit);
}

@Override
public void execute(Runnable command) {
    blockExecution.acquireUninterruptibly();
    executor.execute(() -> {
        try {
            command.run();
        } finally {
            blockExecution.release();
        }
    });
}
Radoslaw Kachel
fuente
Implementé el BoundedExecutor descrito en Concurrencia de Java en la práctica y descubrí que el semáforo debe inicializarse con el indicador de equidad establecido en verdadero para garantizar que los permisos de semáforo se ofrezcan en el orden en que se realizan las solicitudes. Consulte docs.oracle.com/javase/7/docs/api/java/util/concurrent/… . para más detalles
Prahalad Deshpande
0

Tenía la misma necesidad en el pasado: una especie de cola de bloqueo con un tamaño fijo para cada cliente respaldado por un grupo de subprocesos compartidos. Terminé escribiendo mi propio tipo de ThreadPoolExecutor:

UserThreadPoolExecutor (cola de bloqueo (por cliente) + grupo de subprocesos (compartido entre todos los clientes))

Ver: https://github.com/d4rxh4wx/UserThreadPoolExecutor

Cada UserThreadPoolExecutor recibe un número máximo de subprocesos de un ThreadPoolExecutor compartido

Cada UserThreadPoolExecutor puede:

  • envíe una tarea al ejecutor del grupo de subprocesos compartidos si no se alcanza su cuota. Si se alcanza su cuota, el trabajo se pone en cola (bloqueo no consuntivo en espera de CPU). Una vez que se completa una de sus tareas enviadas, la cuota se reduce, lo que permite enviar otra tarea en espera al ThreadPoolExecutor
  • esperar a que se completen las tareas restantes
d4rxh4wx
fuente
0

Encontré esta política de rechazo en el cliente de búsqueda elástica. Bloquea el hilo de la persona que llama en la cola de bloqueo. Código abajo

 static class ForceQueuePolicy implements XRejectedExecutionHandler 
 {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 
        {
            try 
            {
                executor.getQueue().put(r);
            } 
            catch (InterruptedException e) 
            {
                //should never happen since we never wait
                throw new EsRejectedExecutionException(e);
            }
        }

        @Override
        public long rejected() 
        {
            return 0;
        }
}
Vladislav
fuente
0

Recientemente tuve la necesidad de lograr algo similar, pero en un ScheduledExecutorService.

También tuve que asegurarme de manejar el retraso que se pasa en el método y asegurarme de que la tarea se envíe para ejecutarse en el momento en que la persona que llama espera o simplemente falla y arroja un RejectedExecutionException.

Otros métodos de ScheduledThreadPoolExecutorpara ejecutar o enviar una tarea de llamada internamente #scheduleque, a su vez, invocarán los métodos anulados.

import java.util.concurrent.*;

public class BlockingScheduler extends ScheduledThreadPoolExecutor {
    private final Semaphore maxQueueSize;

    public BlockingScheduler(int corePoolSize,
                             ThreadFactory threadFactory,
                             int maxQueueSize) {
        super(corePoolSize, threadFactory, new AbortPolicy());
        this.maxQueueSize = new Semaphore(maxQueueSize);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(delay));
        return super.schedule(command, newDelayInMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(callable, unit.toMillis(delay));
        return super.schedule(callable, newDelayInMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
        return super.scheduleAtFixedRate(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long period,
                                                     TimeUnit unit) {
        final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
        return super.scheduleWithFixedDelay(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable t) {
        super.afterExecute(runnable, t);
        try {
            if (t == null && runnable instanceof Future<?>) {
                try {
                    ((Future<?>) runnable).get();
                } catch (CancellationException | ExecutionException e) {
                    t = e;
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // ignore/reset
                }
            }
            if (t != null) {
                System.err.println(t);
            }
        } finally {
            releaseQueueUsage();
        }
    }

    private long beforeSchedule(Runnable runnable, long delay) {
        try {
            return getQueuePermitAndModifiedDelay(delay);
        } catch (InterruptedException e) {
            getRejectedExecutionHandler().rejectedExecution(runnable, this);
            return 0;
        }
    }

    private long beforeSchedule(Callable callable, long delay) {
        try {
            return getQueuePermitAndModifiedDelay(delay);
        } catch (InterruptedException e) {
            getRejectedExecutionHandler().rejectedExecution(new FutureTask(callable), this);
            return 0;
        }
    }

    private long getQueuePermitAndModifiedDelay(long delay) throws InterruptedException {
        final long beforeAcquireTimeStamp = System.currentTimeMillis();
        maxQueueSize.tryAcquire(delay, TimeUnit.MILLISECONDS);
        final long afterAcquireTimeStamp = System.currentTimeMillis();
        return afterAcquireTimeStamp - beforeAcquireTimeStamp;
    }

    private void releaseQueueUsage() {
        maxQueueSize.release();
    }
}

Tengo el código aquí, agradeceré cualquier comentario. https://github.com/AmitabhAwasthi/BlockingScheduler

Dev Amitabh
fuente
Esta respuesta se basa completamente en el contenido de los enlaces externos. Si alguna vez se vuelven inválidos, su respuesta sería inútil. Así que edite su respuesta y agregue al menos un resumen de lo que se puede encontrar allí. ¡Gracias!
Fabio dice Reincorporar a Monica
@fabio: gracias por señalarlo. Agregué el código allí para que ahora tenga más sentido para los lectores. Apreciamos tu comentario :)
Dev Amitabh
0

No siempre me gusta CallerRunsPolicy, especialmente porque permite que la tarea rechazada 'salte la cola' y se ejecute antes que las tareas que se enviaron antes. Además, la ejecución de la tarea en el subproceso que realiza la llamada puede llevar mucho más tiempo que esperar a que el primer espacio esté disponible.

Resolví este problema usando un RejectedExecutionHandler personalizado, que simplemente bloquea el hilo de llamada por un tiempo y luego intenta enviar la tarea nuevamente:

public class BlockWhenQueueFull implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

        // The pool is full. Wait, then try again.
        try {
            long waitMs = 250;
            Thread.sleep(waitMs);
        } catch (InterruptedException interruptedException) {}

        executor.execute(r);
    }
}

Esta clase solo se puede usar en el ejecutor del grupo de subprocesos como un RejectedExecutinHandler como cualquier otro, por ejemplo:

executorPool = new ThreadPoolExecutor(1, 1, 10,
                                      TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
                                      new BlockWhenQueueFull());

El único inconveniente que veo es que el hilo de llamada puede bloquearse un poco más de lo estrictamente necesario (hasta 250 ms). Además, dado que este ejecutor se llama de manera recursiva, las esperas muy largas para que un hilo esté disponible (horas) pueden resultar en un desbordamiento de pila.

Sin embargo, personalmente me gusta este método. Es compacto, fácil de entender y funciona bien.

TinkerTank
fuente
1
Como usted mismo dice: esto puede crear un desbordamiento de pila. No es algo que me gustaría tener en el código de producción.
Harald
Todos deberían tomar sus propias decisiones. Para mi carga de trabajo, esto no es un problema. Las tareas se ejecutan en segundos en lugar de las horas que serían necesarias para volar la pila. Además, lo mismo puede decirse de prácticamente cualquier algoritmo recursivo. ¿Es esa una razón para no utilizar nunca ningún algoritmo recursivo en producción?
TinkerTank