Quiero crear un método ThreadPoolExecutor
tal que cuando haya alcanzado su tamaño máximo y la cola esté llena, el submit()
método se bloquee al intentar agregar nuevas tareas. ¿Necesito implementar una costumbre RejectedExecutionHandler
para eso o existe una forma existente de hacerlo utilizando una biblioteca estándar de Java?
java
concurrency
executor
Punto fijo
fuente
fuente
Respuestas:
Una de las posibles soluciones que acabo de encontrar:
¿Existen otras soluciones? Preferiría algo basado en,
RejectedExecutionHandler
ya que parece una forma estándar de manejar tales situaciones.fuente
throw e;
que NO está en el libro. ¡JCIP es correcto!Puede usar ThreadPoolExecutor y un blockQueue:
fuente
Debe usar
CallerRunsPolicy
, que ejecuta la tarea rechazada en el hilo de llamada. De esta manera, no puede enviar ninguna tarea nueva al ejecutor hasta que esa tarea esté terminada, momento en el que habrá algunos subprocesos de grupo libres o el proceso se repetirá.http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html
De los documentos:
Además, asegúrese de usar una cola limitada, como ArrayBlockingQueue, cuando llame al
ThreadPoolExecutor
constructor. De lo contrario, no se rechazará nada.Editar: en respuesta a su comentario, establezca el tamaño de ArrayBlockingQueue para que sea igual al tamaño máximo del grupo de subprocesos y use AbortPolicy.
Edición 2: Ok, veo a dónde quieres llegar. ¿Qué pasa con esto: anule el
beforeExecute()
método para verificar quegetActiveCount()
no excedagetMaximumPoolSize()
, y si lo hace, duerma y vuelva a intentarlo?fuente
Hibernate tiene una función
BlockPolicy
que es simple y puede hacer lo que quieras:Ver: Executors.java
fuente
ThreadPoolExecutor
incluso dice literalmente: "El método getQueue () permite el acceso a la cola de trabajos con el propósito de monitorear y depurar. Se desaconseja enfáticamente el uso de este método para cualquier otro propósito". Que esto esté disponible en una biblioteca que es tan ampliamente conocida, es absolutamente triste de ver.La
BoundedExecutor
respuesta citada anteriormente de Java Concurrency in Practice solo funciona correctamente si usa una cola ilimitada para el Ejecutor, o si el límite del semáforo no es mayor que el tamaño de la cola. El estado del semáforo se comparte entre el subproceso que envía y los subprocesos en el grupo, lo que hace posible saturar el ejecutor incluso si el tamaño de la cola <límite <= (tamaño de la cola + tamaño del grupo).El uso
CallerRunsPolicy
solo es válido si sus tareas no se ejecutan para siempre, en cuyo caso su hilo de envío permanecerá pararejectedExecution
siempre, y una mala idea si sus tareas tardan mucho en ejecutarse, porque el hilo de envío no puede enviar ninguna tarea nueva o hacer cualquier otra cosa si está ejecutando una tarea.Si eso no es aceptable, sugiero verificar el tamaño de la cola limitada del ejecutor antes de enviar una tarea. Si la cola está llena, espere un poco antes de intentar enviar de nuevo. El rendimiento se verá afectado, pero sugiero que es una solución más simple que muchas de las otras soluciones propuestas y tiene la garantía de que ninguna tarea será rechazada.
fuente
Lo sé, es un truco, pero en mi opinión el truco más limpio entre los que se ofrecen aquí ;-)
Debido a que ThreadPoolExecutor usa la cola de bloqueo "oferta" en lugar de "poner", anulemos el comportamiento de la "oferta" de la cola de bloqueo:
Lo probé y parece funcionar. La implementación de alguna política de tiempo de espera se deja como ejercicio del lector.
fuente
La siguiente clase envuelve un ThreadPoolExecutor y usa un semáforo para bloquear, luego la cola de trabajo está llena:
Esta clase contenedora se basa en una solución dada en el libro Java Concurrency in Practice de Brian Goetz. La solución en el libro solo toma dos parámetros de constructor: un
Executor
y un límite usado para el semáforo. Esto se muestra en la respuesta dada por Fixpoint. Hay un problema con ese enfoque: puede llegar a un estado en el que los subprocesos del grupo están ocupados, la cola está llena, pero el semáforo acaba de liberar un permiso. (semaphore.release()
en el bloque finalmente). En este estado, una nueva tarea puede obtener el permiso recién publicado, pero se rechaza porque la cola de tareas está llena. Por supuesto, esto no es algo que desee; desea bloquear en este caso.Para solucionar esto, debemos utilizar una cola ilimitada , como claramente menciona JCiP. El semáforo actúa como un protector, dando el efecto de un tamaño de cola virtual. Esto tiene el efecto secundario de que es posible que la unidad pueda contener
maxPoolSize + virtualQueueSize + maxPoolSize
tareas. ¿Porqué es eso? Debido alsemaphore.release()
bloque en el finalmente. Si todos los subprocesos del grupo llaman a esta declaración al mismo tiempo,maxPoolSize
se liberan los permisos, lo que permite que ingresen a la unidad el mismo número de tareas. Si estuviéramos usando una cola limitada, aún estaría llena, lo que resultaría en una tarea rechazada. Ahora, como sabemos que esto solo ocurre cuando un subproceso de grupo está casi terminado, esto no es un problema. Sabemos que el subproceso del grupo no se bloqueará, por lo que pronto se tomará una tarea de la cola.Sin embargo, puedes usar una cola limitada. Solo asegúrate de que su tamaño sea igual
virtualQueueSize + maxPoolSize
. Los tamaños más grandes son inútiles, el semáforo evitará que entren más elementos. Los tamaños más pequeños darán como resultado tareas rechazadas. La posibilidad de que las tareas sean rechazadas aumenta a medida que disminuye el tamaño. Por ejemplo, digamos que desea un ejecutor acotado con maxPoolSize = 2 y virtualQueueSize = 5. Luego, tome un semáforo con 5 + 2 = 7 permisos y un tamaño de cola real de 5 + 2 = 7. El número real de tareas que pueden estar en la unidad es entonces 2 + 5 + 2 = 9. Cuando el ejecutor está lleno (5 tareas en cola, 2 en el grupo de subprocesos, por lo que hay 0 permisos disponibles) y TODOS los subprocesos del grupo liberan sus permisos, las tareas que ingresan pueden tomar exactamente 2 permisos.Ahora, la solución de JCiP es algo engorrosa de usar ya que no hace cumplir todas estas restricciones (cola ilimitada o limitada con esas restricciones matemáticas, etc.). Creo que esto solo sirve como un buen ejemplo para demostrar cómo se pueden construir nuevas clases seguras para subprocesos basadas en las partes que ya están disponibles, pero no como una clase completamente desarrollada y reutilizable. No creo que esto último fuera la intención del autor.
fuente
puede usar un RejectedExecutionHandler personalizado como este
fuente
Cree su propia cola de bloqueo para ser utilizada por el Ejecutor, con el comportamiento de bloqueo que está buscando, mientras siempre devuelve la capacidad restante disponible (asegurándose de que el ejecutor no intentará crear más subprocesos que su grupo principal, o activará el controlador de rechazo).
Creo que esto le dará el comportamiento de bloqueo que está buscando. Un manejador de rechazos nunca cumplirá con los requisitos, ya que eso indica que el ejecutor no puede realizar la tarea. Lo que podría imaginar es que obtienes alguna forma de 'espera ocupada' en el controlador. Eso no es lo que quieres, quieres una cola para el ejecutor que bloquea al llamador ...
fuente
ThreadPoolExecutor
usa eloffer
método para agregar tareas a la cola. Si creara una costumbreBlockingQueue
que se bloqueaoffer
, eso romperíaBlockingQueue
el contrato.ThreadPoolExecutor
se implementó para usaroffer
y noput
(la versión de bloqueo)? Además, si hubiera una forma de que el código del cliente dijera cuál usar y cuándo, muchas personas que intentan implementar soluciones personalizadas se habrían aliviadoPara evitar problemas con la solución @FixPoint. Se podría usar ListeningExecutorService y liberar el semáforo onSuccess y onFailure dentro de FutureCallback.
fuente
Runnable
ya que esos métodos todavía se llaman antes de la limpieza del trabajador en el estado normalThreadPoolExecutor
. Es decir, aún necesitará manejar las excepciones de rechazo.Recientemente encontré que esta pregunta tiene el mismo problema. El OP no lo dice explícitamente, pero no queremos usar el
RejectedExecutionHandler
que ejecuta una tarea en el hilo del remitente, porque esto subutilizará los hilos de trabajo si esta tarea es de larga duración.Al leer todas las respuestas y comentarios, en particular la solución defectuosa con el semáforo o usar,
afterExecute
eché un vistazo más de cerca al código del ThreadPoolExecutor para ver si hay alguna salida. Me sorprendió ver que hay más de 2000 líneas de código (comentado), algunas de las cuales me marean . Dado el requisito bastante simple que realmente tengo --- un productor, varios consumidores, dejar que el productor bloquee cuando ningún consumidor pueda aceptar el trabajo --- decidí desarrollar mi propia solución. No es unExecutorService
sino solo unExecutor
. Y no adapta el número de subprocesos a la carga de trabajo, sino que tiene solo un número fijo de subprocesos, lo que también se ajusta a mis requisitos. Aquí está el código. Siéntete libre de despotricar sobre ello :-)fuente
Creo que hay una forma bastante elegante de resolver este problema utilizando
java.util.concurrent.Semaphore
y delegando el comportamiento deExecutor.newFixedThreadPool
. El nuevo servicio ejecutor solo ejecutará una nueva tarea cuando haya un hilo para hacerlo. Semaphore gestiona el bloqueo con un número de permisos igual al número de subprocesos. Cuando termina una tarea, devuelve un permiso.fuente
Tenía la misma necesidad en el pasado: una especie de cola de bloqueo con un tamaño fijo para cada cliente respaldado por un grupo de subprocesos compartidos. Terminé escribiendo mi propio tipo de ThreadPoolExecutor:
UserThreadPoolExecutor (cola de bloqueo (por cliente) + grupo de subprocesos (compartido entre todos los clientes))
Ver: https://github.com/d4rxh4wx/UserThreadPoolExecutor
Cada UserThreadPoolExecutor recibe un número máximo de subprocesos de un ThreadPoolExecutor compartido
Cada UserThreadPoolExecutor puede:
fuente
Encontré esta política de rechazo en el cliente de búsqueda elástica. Bloquea el hilo de la persona que llama en la cola de bloqueo. Código abajo
fuente
Recientemente tuve la necesidad de lograr algo similar, pero en un
ScheduledExecutorService
.También tuve que asegurarme de manejar el retraso que se pasa en el método y asegurarme de que la tarea se envíe para ejecutarse en el momento en que la persona que llama espera o simplemente falla y arroja un
RejectedExecutionException
.Otros métodos de
ScheduledThreadPoolExecutor
para ejecutar o enviar una tarea de llamada internamente#schedule
que, a su vez, invocarán los métodos anulados.Tengo el código aquí, agradeceré cualquier comentario. https://github.com/AmitabhAwasthi/BlockingScheduler
fuente
Aquí está la solución que parece funcionar muy bien. Se llama NotifyingBlockingThreadPoolExecutor .
Programa de demostración.
Editar: hay un problema con este código, el método await () tiene errores. Llamar a shutdown () + awaitTermination () parece funcionar bien.
fuente
No siempre me gusta CallerRunsPolicy, especialmente porque permite que la tarea rechazada 'salte la cola' y se ejecute antes que las tareas que se enviaron antes. Además, la ejecución de la tarea en el subproceso que realiza la llamada puede llevar mucho más tiempo que esperar a que el primer espacio esté disponible.
Resolví este problema usando un RejectedExecutionHandler personalizado, que simplemente bloquea el hilo de llamada por un tiempo y luego intenta enviar la tarea nuevamente:
Esta clase solo se puede usar en el ejecutor del grupo de subprocesos como un RejectedExecutinHandler como cualquier otro, por ejemplo:
El único inconveniente que veo es que el hilo de llamada puede bloquearse un poco más de lo estrictamente necesario (hasta 250 ms). Además, dado que este ejecutor se llama de manera recursiva, las esperas muy largas para que un hilo esté disponible (horas) pueden resultar en un desbordamiento de pila.
Sin embargo, personalmente me gusta este método. Es compacto, fácil de entender y funciona bien.
fuente