¿Imposible hacer un grupo de subprocesos en caché con un límite de tamaño?

127

Parece imposible hacer un grupo de subprocesos en caché con un límite en el número de subprocesos que puede crear.

Así es como se implementa Executors.newCachedThreadPool estático en la biblioteca estándar de Java:

 public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Entonces, usando esa plantilla para crear un grupo de subprocesos en caché de tamaño fijo:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());

Ahora, si usa esto y envía 3 tareas, todo estará bien. El envío de cualquier otra tarea dará como resultado excepciones de ejecución rechazadas.

Intentando esto:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());

El resultado será que todos los hilos se ejecuten secuencialmente. Es decir, el grupo de subprocesos nunca hará más de un subproceso para manejar sus tareas.

Este es un error en el método de ejecución de ThreadPoolExecutor? O tal vez esto es intencional? ¿O hay alguna otra manera?

Editar: quiero algo exactamente como el grupo de subprocesos en caché (crea subprocesos bajo demanda y luego los mata después de un tiempo de espera) pero con un límite en el número de subprocesos que puede crear y la capacidad de continuar haciendo cola tareas adicionales una vez que ha golpear su límite de hilo. Según la respuesta de sjlee, esto es imposible. Mirando el método execute () de ThreadPoolExecutor, es realmente imposible. Necesitaría subclasificar ThreadPoolExecutor y anular execute () de manera similar a SwingWorker, pero lo que SwingWorker hace en execute () es un hack completo.

Matt Crinklaw-Vogt
fuente
1
¿Cuál es tu pregunta? ¿No es su segundo ejemplo de código la respuesta a su título?
rsp
44
Quiero un grupo de subprocesos que agregará subprocesos a pedido a medida que aumenta el número de tareas, pero nunca agregará más que un número máximo de subprocesos. CachedThreadPool ya hace esto, excepto que agregará un número ilimitado de hilos y no se detendrá en un tamaño predefinido. El tamaño que defino en los ejemplos es 3. El segundo ejemplo agrega 1 subproceso, pero no agrega dos más a medida que llegan nuevas tareas mientras las otras tareas aún no se han completado.
Matt Crinklaw-Vogt el
Comprueba esto, lo resuelve, debuggingisfun.blogspot.com/2012/05/…
ethan

Respuestas:

235

ThreadPoolExecutor tiene los siguientes comportamientos clave, y estos problemas pueden explicarse por estos comportamientos.

Cuando se envían tareas,

  1. Si el grupo de subprocesos no ha alcanzado el tamaño del núcleo, crea nuevos subprocesos.
  2. Si se ha alcanzado el tamaño del núcleo y no hay hilos inactivos, pone en cola las tareas.
  3. Si se ha alcanzado el tamaño del núcleo, no hay subprocesos inactivos y la cola se llena, crea nuevos subprocesos (hasta que alcanza el tamaño máximo).
  4. Si se ha alcanzado el tamaño máximo, no hay hilos inactivos y la cola se llena, la política de rechazo se activa.

En el primer ejemplo, tenga en cuenta que SynchronousQueue tiene esencialmente un tamaño de 0. Por lo tanto, en el momento en que alcanza el tamaño máximo (3), la política de rechazo entra en acción (# 4).

En el segundo ejemplo, la cola de elección es un LinkedBlockingQueue que tiene un tamaño ilimitado. Por lo tanto, te quedas atascado con el comportamiento # 2.

Realmente no puede jugar mucho con el tipo en caché o el tipo fijo, ya que su comportamiento está casi completamente determinado.

Si desea tener un grupo de subprocesos dinámico y acotado, debe usar un tamaño de núcleo positivo y un tamaño máximo combinados con una cola de un tamaño finito. Por ejemplo,

new ThreadPoolExecutor(10, // core size
    50, // max size
    10*60, // idle timeout
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(20)); // queue with a size

Anexo : esta es una respuesta bastante antigua, y parece que JDK cambió su comportamiento cuando se trata del tamaño del núcleo de 0. Desde JDK 1.6, si el tamaño del núcleo es 0 y el grupo no tiene ningún subproceso, el ThreadPoolExecutor agregará un hilo para ejecutar esa tarea. Por lo tanto, el tamaño del núcleo de 0 es una excepción a la regla anterior. Gracias Steve por llamarme la atención.

sjlee
fuente
44
Debe escribir algunas palabras sobre el método allowCoreThreadTimeOutpara que esta respuesta sea perfecta. Ver la respuesta de @ user1046052
hsestupin
1
¡Gran respuesta! Solo un punto para agregar: también vale la pena mencionar otras políticas de rechazo. Ver la respuesta de @brianegge
Jeff
1
El comportamiento 2 no debería decir 'Si se ha alcanzado el tamaño máximo de subproceso y no hay subprocesos inactivos, pone en cola las tareas'. ?
Zoltán
1
¿Podría explicar qué implica el tamaño de la cola? ¿Significa que solo se pueden poner en cola 20 tareas antes de que sean rechazadas?
Zoltán
1
@ Zoltán Escribí esto hace un tiempo, por lo que existe la posibilidad de que algún comportamiento haya cambiado desde entonces (no seguí las últimas actividades demasiado de cerca), pero suponiendo que estos comportamientos no hayan cambiado, el # 2 es correcto como se indicó, y eso es quizás el punto más importante (y algo sorprendente) de esto. Una vez que se alcanza el tamaño del núcleo, TPE favorece las colas sobre la creación de nuevos subprocesos. El tamaño de la cola es literalmente el tamaño de la cola que se pasa al TPE. Si la cola se llena pero no ha alcanzado el tamaño máximo, creará un nuevo hilo (no rechazar tareas). Ver # 3. Espero que ayude.
sjlee 01 de
60

A menos que me haya perdido algo, la solución a la pregunta original es simple. El siguiente código implementa el comportamiento deseado como se describe en el póster original. Generará hasta 5 hilos para trabajar en una cola ilimitada y los hilos inactivos terminarán después de 60 segundos.

tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);

fuente
1
Estás en lo correcto. Ese método se agregó en jdk 1.6, por lo que no mucha gente lo sabe. Además, no puede tener un tamaño de grupo de núcleos "mínimo", lo cual es lamentable.
jtahlborn
44
Mi única preocupación acerca de esto es (de los documentos de JDK 8): "Cuando se envía una nueva tarea en el método ejecutar (Ejecutable), y se ejecutan menos de los hilos corePoolSize, se crea un nuevo hilo para manejar la solicitud, incluso si otro trabajador los hilos están inactivos ".
veegee
Estoy bastante seguro de que esto realmente no funciona. La última vez que miré hacer lo anterior en realidad solo ejecuta tu trabajo en un hilo a pesar de que engendras 5. Nuevamente, pasaron unos años, pero cuando me sumergí en la implementación de ThreadPoolExecutor solo se envió a nuevos hilos una vez que tu cola estaba llena. El uso de una cola ilimitada hace que esto nunca suceda. Puede probar enviando trabajo y registrando el nombre del hilo y luego durmiendo. Cada ejecutable terminará imprimiendo el mismo nombre / no se ejecutará en ningún otro hilo.
Matt Crinklaw-Vogt
2
Esto funciona, Matt. Establece el tamaño del núcleo en 0, por eso solo tenía 1 hilo. El truco aquí es establecer el tamaño del núcleo al tamaño máximo.
T-Gergely
1
@vegee tiene razón - Esto en realidad no funciona muy bien - ThreadPoolExecutor solo reutilizará hilos cuando esté por encima de corePoolSize. Entonces, cuando corePoolSize es igual a maxPoolSize, solo se beneficiará del almacenamiento en caché de subprocesos cuando su grupo esté lleno (por lo tanto, si tiene la intención de usar esto pero generalmente permanece por debajo del tamaño máximo de su grupo, también podría reducir el tiempo de espera del hilo a un nivel bajo valor; y tenga en cuenta que no hay almacenamiento en caché - siempre nuevos hilos)
Chris Riddell
7

Tuve el mismo problema. Como ninguna otra respuesta reúne todos los problemas, agrego la mía:

Ahora está claramente escrito en documentos : si usa una cola que no bloquea ( LinkedBlockingQueue) la configuración de subprocesos máximos no tiene efecto, solo se usan subprocesos principales.

entonces:

public class MyExecutor extends ThreadPoolExecutor {

    public MyExecutor() {
        super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        allowCoreThreadTimeOut(true);
    }

    public void setThreads(int n){
        setMaximumPoolSize(Math.max(1, n));
        setCorePoolSize(Math.max(1, n));
    }

}

Este ejecutor tiene:

  1. No hay concepto de subprocesos máximos ya que estamos usando una cola ilimitada. Esto es bueno porque dicha cola puede hacer que el ejecutor cree una cantidad masiva de subprocesos extra no centrales si sigue su política habitual.

  2. Una cola de tamaño máximo Integer.MAX_VALUE. Submit()lanzará RejectedExecutionExceptionsi el número de tareas pendientes excede Integer.MAX_VALUE. No estoy seguro de que primero se nos acabe la memoria o esto sucederá.

  3. Tiene 4 hilos principales posibles. Los subprocesos de núcleo inactivo salen automáticamente si están inactivos durante 5 segundos. Entonces, sí, los subprocesos estrictamente a pedido. El número se puede variar utilizando el setThreads()método.

  4. Se asegura de que el número mínimo de subprocesos principales nunca sea inferior a uno, o submit()rechazará todas las tareas. Dado que los subprocesos centrales deben ser> = subprocesos máximos, el método también setThreads()establece subprocesos máximos, aunque la configuración de subprocesos máximos es inútil para una cola ilimitada.

Dakota del Sur
fuente
Creo que también debe establecer 'allowCoreThreadTimeOut' en 'verdadero', de lo contrario, una vez que se crean los hilos, los mantendrá para siempre: gist.github.com/ericdcobb/46b817b384f5ca9d5f5d
eric
Vaya, me perdí eso, lo siento, ¡tu respuesta es perfecta!
Eric
6

En su primer ejemplo, las tareas posteriores se rechazan porque AbortPolicyes el valor predeterminado RejectedExecutionHandler. ThreadPoolExecutor contiene las siguientes políticas, que puede cambiar mediante el setRejectedExecutionHandlermétodo:

CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy

Parece que quiere un grupo de subprocesos en caché con CallerRunsPolicy.

brianegge
fuente
5

Ninguna de las respuestas aquí solucionó mi problema, que tenía que ver con la creación de una cantidad limitada de conexiones HTTP utilizando el cliente HTTP de Apache (versión 3.x). Como me llevó algunas horas encontrar una buena configuración, compartiré:

private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
  TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
  Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

Esto crea un ThreadPoolExecutorque comienza con cinco y contiene un máximo de diez hilos que se ejecutan simultáneamente CallerRunsPolicypara ejecutar.

Pablo
fuente
El problema con esta solución es que si aumenta el número o los productores, aumentará el número de subprocesos que ejecutan los subprocesos en segundo plano. En muchos casos, eso no es lo que quieres.
Gris
3

Según el Javadoc para ThreadPoolExecutor:

Si hay más subprocesos corePoolSize pero inferiores al máximo de PoolsSize en ejecución, se creará un nuevo subproceso solo si la cola está llena . Al configurar corePoolSize y maximumPoolSize de la misma manera, crea un grupo de subprocesos de tamaño fijo.

(El énfasis es mío).

La respuesta de Jitter es lo que quieres, aunque la mía responde a tu otra pregunta. :)

Jonathan Feinberg
fuente
2

Hay una opción más. En lugar de usar el nuevo SynchronousQueue, también puede usar cualquier otra cola, pero debe asegurarse de que su tamaño sea 1, de modo que obligue al servicio del ejecutor a crear un nuevo hilo.

Ashkrit
fuente
Creo que te refieres al tamaño 0 (por defecto), para que no haya tareas en cola y realmente obligue al servicio del ejecutor a crear nuevos hilos cada vez.
Leonmax
2

No parece que ninguna de las respuestas realmente responda la pregunta, de hecho, no puedo ver una forma de hacerlo, incluso si subclasifica de PooledExecutorService ya que muchos de los métodos / propiedades son privados, por ejemplo, haciendo que addIfUnderMaximumPoolSize esté protegido, podría Haz lo siguiente:

class MyThreadPoolService extends ThreadPoolService {
    public void execute(Runnable run) {
        if (poolSize() == 0) {
            if (addIfUnderMaximumPoolSize(run) != null)
                return;
        }
        super.execute(run);
    }
}

Lo más cerca que estuve fue esto, pero incluso eso no es una muy buena solución

new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
    public void execute(Runnable command) {
        if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {        
            super.setCorePoolSize(super.getCorePoolSize() + 1);
        }
        super.execute(command);
    }

    protected void afterExecute(Runnable r, Throwable t) {
         // nothing in the queue
         if (getQueue().isEmpty() && getPoolSize() > min) {
             setCorePoolSize(getCorePoolSize() - 1);
         }
    };
 };

ps no probado lo anterior

Stuart
fuente
2

Aquí hay otra solución. Creo que esta solución se comporta como lo desea (aunque no está orgullosa de esta solución):

final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    public boolean offer(Runnable o) {
        if (size() > 1)
            return false;
        return super.offer(o);
    };

    public boolean add(Runnable o) {
        if (super.offer(o))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
};

RejectedExecutionHandler handler = new RejectedExecutionHandler() {         
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        queue.add(r);
    }
};

dbThreadExecutor =
        new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);
Stuart
fuente
2

Esto es lo que quieres (al menos supongo que sí). Para una explicación, verifique la respuesta de Jonathan Feinberg

Executors.newFixedThreadPool(int n)

Crea un grupo de subprocesos que reutiliza un número fijo de subprocesos que operan en una cola compartida no acotada. En cualquier momento, a lo sumo, los hilos nThreads serán tareas de procesamiento activas. Si se envían tareas adicionales cuando todos los hilos están activos, esperarán en la cola hasta que haya un hilo disponible. Si algún subproceso termina debido a un error durante la ejecución antes del cierre, uno nuevo tomará su lugar si es necesario para ejecutar tareas posteriores. Los subprocesos en el grupo existirán hasta que se cierre explícitamente.

estar nervioso
fuente
44
Claro, podría usar un grupo de subprocesos fijo, pero eso dejaría n subprocesos para siempre, o hasta que llame al cierre. Quiero algo exactamente como el grupo de subprocesos en caché (crea subprocesos bajo demanda y luego los mata después de un tiempo de espera) pero con un límite en el número de subprocesos que puede crear.
Matt Crinklaw-Vogt el
0
  1. Puede usar ThreadPoolExecutorcomo lo sugiere @sjlee

    Puede controlar el tamaño del grupo de forma dinámica. Echa un vistazo a esta pregunta para más detalles:

    Grupo de subprocesos dinámicos

    O

  2. Puede usar newWorkStealingPool API, que se ha introducido con java 8.

    public static ExecutorService newWorkStealingPool()

    Crea un grupo de subprocesos que roba trabajo utilizando todos los procesadores disponibles como su nivel de paralelismo objetivo.

De forma predeterminada, el nivel de paralelismo se establece en el número de núcleos de CPU en su servidor. Si tiene un servidor de CPU de 4 núcleos, el tamaño del grupo de subprocesos sería 4. Esta API devuelve el ForkJoinPooltipo de ExecutorService y permite robar trabajo de subprocesos inactivos robando tareas de subprocesos ocupados en ForkJoinPool.

Ravindra babu
fuente
0

El problema se resumió de la siguiente manera:

Quiero algo exactamente como el grupo de subprocesos en caché (crea subprocesos bajo demanda y luego los mata después de un tiempo de espera) pero con un límite en el número de subprocesos que puede crear y la capacidad de continuar haciendo cola en tareas adicionales una vez que ha alcanzado su límite límite de hilo.

Antes de señalar la solución, explicaré por qué las siguientes soluciones no funcionan:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());

Esto no pondrá en cola ninguna tarea cuando se alcance el límite de 3 porque SynchronousQueue, por definición, no puede contener ningún elemento.

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

Esto no creará más de un subproceso porque ThreadPoolExecutor solo crea subprocesos que exceden el corePoolSize si la cola está llena. Pero LinkedBlockingQueue nunca está lleno.

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS,
    new LinkedBlockingQueue<Runnable>());
executor.allowCoreThreadTimeOut(true);

Esto no reutilizará subprocesos hasta que se haya alcanzado corePoolSize porque ThreadPoolExecutor aumenta el número de subprocesos hasta que se alcance corePoolSize incluso si los subprocesos existentes están inactivos. Si puede vivir con esta desventaja, entonces esta es la solución más fácil para el problema. También es la solución descrita en "Concurrencia de Java en la práctica" (nota al pie de página en p172).

La única solución completa al problema descrito parece ser la que implica anular el offermétodo de la cola y escribir un mensajeRejectedExecutionHandler como se explica en las respuestas a esta pregunta: ¿Cómo hacer que ThreadPoolExecutor aumente los hilos al máximo antes de hacer cola?

Stefan Feuerhahn
fuente
0

Esto funciona para Java8 + (y otros, por ahora ..)

     Executor executor = new ThreadPoolExecutor(3, 3, 5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>()){{allowCoreThreadTimeOut(true);}};

donde 3 es el límite de hilos y 5 es tiempo de espera para hilos inactivos.

Si desea verificar si funciona usted mismo , aquí está el código para hacer el trabajo:

public static void main(String[] args) throws InterruptedException {
    final int DESIRED_NUMBER_OF_THREADS=3; // limit of number of Threads for the task at a time
    final int DESIRED_THREAD_IDLE_DEATH_TIMEOUT=5; //any idle Thread ends if it remains idle for X seconds

    System.out.println( java.lang.Thread.activeCount() + " threads");
    Executor executor = new ThreadPoolExecutor(DESIRED_NUMBER_OF_THREADS, DESIRED_NUMBER_OF_THREADS, DESIRED_THREAD_IDLE_DEATH_TIMEOUT, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>()) {{allowCoreThreadTimeOut(true);}};

    System.out.println(java.lang.Thread.activeCount() + " threads");

    for (int i = 0; i < 5; i++) {
        final int fi = i;
        executor.execute(() -> waitsout("starting hard thread computation " + fi, "hard thread computation done " + fi,2000));
    }
    System.out.println("If this is UP, it works");

    while (true) {
        System.out.println(
                java.lang.Thread.activeCount() + " threads");
        Thread.sleep(700);
    }

}

static void waitsout(String pre, String post, int timeout) {
    try {
        System.out.println(pre);
        Thread.sleep(timeout);
        System.out.println(post);
    } catch (Exception e) {
    }
}

salida del código anterior para mí es

1 threads
1 threads
If this is UP, it works
starting hard thread computation 0
4 threads
starting hard thread computation 2
starting hard thread computation 1
4 threads
4 threads
hard thread computation done 2
hard thread computation done 0
hard thread computation done 1
starting hard thread computation 3
starting hard thread computation 4
4 threads
4 threads
4 threads
hard thread computation done 3
hard thread computation done 4
4 threads
4 threads
4 threads
4 threads
3 threads
3 threads
3 threads
1 threads
1 threads
Ondřej
fuente