Me he sentido frustrado durante algún tiempo con el comportamiento predeterminado ThreadPoolExecutor
que respalda los ExecutorService
grupos de subprocesos que muchos de nosotros usamos. Para citar de los Javadocs:
Si hay más subprocesos que corePoolSize pero menos que maximumPoolSize en ejecución, se creará un nuevo subproceso solo si la cola está llena .
Lo que esto significa es que si define un grupo de subprocesos con el siguiente código, nunca iniciará el segundo subproceso porque LinkedBlockingQueue
no tiene límites.
ExecutorService threadPool =
new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));
Solo si tiene una cola limitada y la cola está llena, se iniciarán los subprocesos por encima del número principal. Sospecho que un gran número de programadores junior de Java multiproceso desconocen este comportamiento de ThreadPoolExecutor
.
Ahora tengo un caso de uso específico donde esto no es óptimo. Estoy buscando formas, sin escribir mi propia clase de TPE, para solucionarlo.
Mis requisitos son para un servicio web que realice devoluciones de llamadas a un tercero posiblemente no confiable.
- No quiero hacer la devolución de llamada sincrónicamente con la solicitud web, así que quiero usar un grupo de subprocesos.
- Por lo general, obtengo un par de estos por minuto, por lo que no quiero tener
newFixedThreadPool(...)
una gran cantidad de subprocesos que en su mayoría están inactivos. - De vez en cuando obtengo una ráfaga de este tráfico y quiero escalar el número de subprocesos a un valor máximo (digamos 50).
- Necesito hacer un mejor intento para hacer todas las devoluciones de llamada, así que quiero poner en cola las adicionales por encima de 50. No quiero abrumar al resto de mi servidor web usando un
newCachedThreadPool()
.
¿Cómo puedo evitar esta limitación en la ThreadPoolExecutor
que la cola debe estar limitada y llena antes de que se inicien más subprocesos? ¿Cómo puedo hacer que inicie más subprocesos antes de poner en cola las tareas?
Editar:
@Flavio hace un buen punto sobre el uso de ThreadPoolExecutor.allowCoreThreadTimeOut(true)
para que los hilos principales se agoten y salgan. Lo consideré, pero todavía quería la función de hilos centrales. No quería que la cantidad de subprocesos en el grupo cayera por debajo del tamaño del núcleo si es posible.
Respuestas:
Creo que finalmente he encontrado una solución algo elegante (tal vez un poco hacky) para esta limitación con
ThreadPoolExecutor
. Implica extenderloLinkedBlockingQueue
para que vuelvafalse
paraqueue.offer(...)
cuando ya hay algunas tareas en cola. Si los subprocesos actuales no se mantienen al día con las tareas en cola, el TPE agregará subprocesos adicionales. Si el grupo ya está en el número máximo de subprocesos,RejectedExecutionHandler
se llamará al. Es el controlador el que luego ingresaput(...)
a la cola.Ciertamente es extraño escribir una cola en la que
offer(...)
pueda regresarfalse
yput()
nunca se bloquee, así que esa es la parte del truco. Pero esto funciona bien con el uso de la cola por parte de TPE, por lo que no veo ningún problema al hacer esto.Aquí está el código:
Con este mecanismo, cuando envío tareas a la cola, la
ThreadPoolExecutor
voluntad:offer(...)
devolverá falso.RejectedExecutionHandler
RejectedExecutionHandler
pone entonces la tarea en la cola para ser procesados por el primer hilo disponible en orden FIFO.Aunque en mi código de ejemplo anterior, la cola no está delimitada, también puede definirla como una cola delimitada. Por ejemplo, si agrega una capacidad de 1000 al,
LinkedBlockingQueue
entonces:Además, si necesita usar
offer(...)
en elRejectedExecutionHandler
, puede usar eloffer(E, long, TimeUnit)
método en su lugar conLong.MAX_VALUE
el tiempo de espera.Advertencia:
Si espera que las tareas se agreguen al ejecutor después de que se haya cerrado, entonces es posible que desee ser más inteligente al
RejectedExecutionException
eliminar nuestra costumbreRejectedExecutionHandler
cuando el servicio del ejecutor se haya cerrado. Gracias a @RaduToader por señalar esto.Editar:
Otro ajuste a esta respuesta podría ser preguntar al TPE si hay subprocesos inactivos y solo poner el elemento en cola si es así. Tendría que hacer una clase verdadera para esto y agregarle un
ourQueue.setThreadPoolExecutor(tpe);
método.Entonces su
offer(...)
método podría verse así:tpe.getPoolSize() == tpe.getMaximumPoolSize()
en cuyo caso simplemente llamesuper.offer(...)
.tpe.getPoolSize() > tpe.getActiveCount()
, llamesuper.offer(...)
porque parece que hay hilos inactivos.false
a bifurcar otro hilo.Tal vez esto:
Tenga en cuenta que los métodos de obtención en TPE son costosos ya que acceden a
volatile
campos o (en el caso degetActiveCount()
) bloquean el TPE y recorren la lista de subprocesos. Además, aquí hay condiciones de carrera que pueden hacer que una tarea se ponga en cola de forma incorrecta o que se bifurque otro hilo cuando había un hilo inactivo.fuente
Queue
para lograr esto, ciertamente no estás solo en tu idea: groovy-programming.com/post/26923146865execute(runnable)
, entoncesrunnable
simplemente se agrega a la cola. Si llamaexecute(secondRunnable)
,secondRunnable
se agrega a la cola. Pero ahora, si llamaexecute(thirdRunnable)
,thirdRunnable
se ejecutará en un nuevo hilo. Larunnable
ysecondRunnable
solo se ejecutan una vezthirdRunnable
(o la tarea original de larga duración) han finalizado.Set tamaño del núcleo y el tamaño máximo para el mismo valor, y permiten hilos de núcleo para ser retirados de la piscina con
allowCoreThreadTimeOut(true)
.fuente
Ya tengo otras dos respuestas a esta pregunta, pero sospecho que esta es la mejor.
Se basa en la técnica de la respuesta actualmente aceptada , a saber:
offer()
método de la cola para (a veces) devolver falso,ThreadPoolExecutor
que genere un nuevo hilo o rechace la tarea, yRejectedExecutionHandler
para poner en cola la tarea en caso de rechazo.El problema es cuándo
offer()
debe devolver falso. La respuesta actualmente aceptada devuelve falso cuando la cola tiene un par de tareas, pero como señalé en mi comentario allí, esto causa efectos indeseables. Alternativamente, si siempre devuelve falso, seguirá generando nuevos hilos incluso cuando tenga hilos esperando en la cola.La solución es usar Java 7
LinkedTransferQueue
y teneroffer()
calltryTransfer()
. Cuando hay un hilo consumidor en espera, la tarea simplemente se pasará a ese hilo. De lo contrario,offer()
devolverá falso yThreadPoolExecutor
generará un nuevo hilo.fuente
queue.offer()
, debido a que realmente está llamandoLinkedTransferQueue.tryTransfer()
, devolverá falso y no pondrá en cola la tarea. Sin embargo, lasRejectedExecutionHandler
llamadasqueue.put()
, que no fallan y ponen en cola la tarea.Nota: ahora prefiero y recomiendo mi otra respuesta .
Aquí hay una versión que me parece mucho más sencilla: aumente el corePoolSize (hasta el límite de maximumPoolSize) cada vez que se ejecute una nueva tarea, luego disminuya el corePoolSize (hasta el límite del "tamaño de grupo central" especificado por el usuario) siempre que un la tarea se completa.
Para decirlo de otra manera, realice un seguimiento del número de tareas en ejecución o en cola, y asegúrese de que corePoolSize sea igual al número de tareas siempre que se encuentre entre el "tamaño del grupo de núcleos" especificado por el usuario y el maximumPoolSize.
Tal como está escrito, la clase no admite cambiar el corePoolSize o maximumPoolSize especificado por el usuario después de la construcción, y no admite la manipulación de la cola de trabajo directamente o mediante
remove()
opurge()
.fuente
synchronized
bloques. ¿Puede llamar a la cola para obtener el número de tareas? ¿O tal vez usar unAtomicInteger
?execute()
llamadas en subprocesos separados, cada una va a (1) calcular cuántos subprocesos se necesitan, (2)setCorePoolSize
a ese número y (3) llamarsuper.execute()
. Si los pasos (1) y (2) no están sincronizados, no estoy seguro de cómo evitar un pedido desafortunado en el que establece el tamaño del grupo principal en un número menor después de un número mayor. Con acceso directo al campo de superclase, esto se podría hacer usando comparar y establecer en su lugar, pero no veo una forma limpia de hacerlo en una subclase sin sincronización.taskCount
campo sea válido (es decir, aAtomicInteger
). Si dos subprocesos recalculan el tamaño del grupo inmediatamente uno después del otro, debería obtener los valores adecuados. Si el segundo encoge los hilos centrales, entonces debe haber visto una caída en la cola o algo así.execute()
. Cada uno llamaráatomicTaskCount.incrementAndGet()
y obtendrán 10 y 11 respectivamente. Pero sin sincronización (por encima de obtener el recuento de tareas y establecer el tamaño del grupo central), podría obtener (1) la tarea 11 establece el tamaño del grupo central en 11, (2) la tarea 10 establece el tamaño del grupo central en 10, (3) la tarea 10 llamasuper.execute()
, (4) la tarea 11 llamasuper.execute()
y se pone en cola.Tenemos una subclase
ThreadPoolExecutor
que toma un adicionalcreationThreshold
y anulaexecute
.tal vez eso también ayude, pero el tuyo se ve más artístico, por supuesto ...
fuente
offer(...)
método solo regresefalse
condicionalmente. ¡Gracias!La respuesta recomendada resuelve solo uno (1) del problema con el grupo de subprocesos JDK:
Los grupos de subprocesos de JDK están predispuestos a las colas. Entonces, en lugar de generar un nuevo hilo, pondrán la tarea en cola. Solo si la cola alcanza su límite, el grupo de subprocesos generará un nuevo subproceso.
El retiro del hilo no ocurre cuando la carga se aligera. Por ejemplo, si tenemos una ráfaga de trabajos que llegan al grupo que hace que el grupo llegue al máximo, seguido de una carga ligera de un máximo de 2 tareas a la vez, el grupo usará todos los subprocesos para atender la carga ligera y evitar el retiro de subprocesos. (solo se necesitarían 2 hilos…)
Insatisfecho con el comportamiento anterior, seguí adelante e implementé un grupo para superar las deficiencias anteriores.
Para resolver 2) El uso de la programación de Lifo resuelve el problema. Esta idea fue presentada por Ben Maurer en la conferencia de aplicación ACM 2015: escala Systems @ Facebook
Entonces nació una nueva implementación:
LifoThreadPoolExecutorSQP
Hasta ahora, esta implementación mejora el rendimiento de ejecución asíncrona para ZEL .
La implementación es capaz de reducir la sobrecarga de cambio de contexto, lo que produce un rendimiento superior para ciertos casos de uso.
Espero eso ayude...
PD: JDK Fork Join Pool implementa ExecutorService y funciona como un grupo de subprocesos "normal", la implementación es eficaz, utiliza la programación de subprocesos LIFO, sin embargo, no hay control sobre el tamaño de la cola interna, el tiempo de espera de retiro ... y, lo más importante, las tareas no pueden ser interrumpido al cancelarlos
fuente
Nota: ahora prefiero y recomiendo mi otra respuesta .
Tengo otra propuesta, siguiendo la idea original de cambiar la cola para devolver falso. En este, todas las tareas pueden ingresar a la cola, pero siempre que una tarea se pone en cola después
execute()
, la seguimos con una tarea centinela sin operación que la cola rechaza, lo que hace que se genere un nuevo hilo, que ejecutará la no operación seguida inmediatamente por algo de la cola.Debido a que los subprocesos de trabajo pueden estar sondeando
LinkedBlockingQueue
para una nueva tarea, es posible que una tarea se ponga en cola incluso cuando hay un subproceso disponible. Para evitar generar nuevos subprocesos incluso cuando hay subprocesos disponibles, necesitamos realizar un seguimiento de cuántos subprocesos están esperando nuevas tareas en la cola y solo generar un nuevo subproceso cuando hay más tareas en la cola que subprocesos en espera.fuente
La mejor solución que se me ocurre es ampliar.
ThreadPoolExecutor
ofrece algunos métodos de gancho:beforeExecute
yafterExecute
. En su extensión, podría mantener el uso de una cola limitada para alimentar tareas y una segunda cola ilimitada para manejar el desbordamiento. Cuando alguien llamasubmit
, puede intentar colocar la solicitud en la cola limitada. Si se encuentra con una excepción, simplemente coloque la tarea en su cola de desbordamiento. A continuación, puede utilizar elafterExecute
gancho para ver si hay algo en la cola de desbordamiento después de finalizar una tarea. De esta manera, el ejecutor se ocupará primero de las cosas en su cola limitada y automáticamente sacará de esta cola ilimitada cuando el tiempo lo permita.Parece más trabajo que tu solución, pero al menos no implica dar a las colas comportamientos inesperados. También imagino que hay una mejor manera de verificar el estado de la cola y los subprocesos en lugar de confiar en las excepciones, que son bastante lentas de lanzar.
fuente
Nota: Para JDK ThreadPoolExecutor, cuando tiene una cola limitada, solo está creando nuevos hilos cuando la oferta devuelve falsa. Puede obtener algo útil con CallerRunsPolicy que crea un poco de BackPressure y las llamadas se ejecutan directamente en el hilo de la llamada.
Necesito que las tareas se ejecuten a partir de subprocesos creados por el grupo y tener una cola ubounded para la programación, mientras que la cantidad de subprocesos dentro del grupo puede crecer o reducirse entre corePoolSize y maximumPoolSize, así que ...
Terminé haciendo una copia completa pasta de ThreadPoolExecutor y cambiar un poco el método ejecuta debido a que por desgracia esto no se podía hacer por extensión (se llama a los métodos privados).
No quería generar nuevos hilos inmediatamente cuando llega una nueva solicitud y todos los hilos están ocupados (porque en general tengo tareas de corta duración). Agregué un umbral, pero siéntase libre de cambiarlo según sus necesidades (tal vez para la mayoría de los IO es mejor eliminar este umbral)
fuente