¿Es posible especificar un grupo de subprocesos personalizado para la secuencia paralela de Java 8 ? No puedo encontrarlo en ningún lado.
Imagine que tengo una aplicación de servidor y me gustaría usar flujos paralelos. Pero la aplicación es grande y multiproceso, por lo que quiero compartimentarla. No quiero una tarea de ejecución lenta en un módulo de las tareas de bloqueo de aplicaciones de otro módulo.
Si no puedo usar diferentes grupos de subprocesos para diferentes módulos, significa que no puedo usar flujos paralelos de forma segura en la mayoría de las situaciones del mundo real.
Prueba el siguiente ejemplo. Hay algunas tareas intensivas de CPU ejecutadas en subprocesos separados. Las tareas aprovechan flujos paralelos. La primera tarea se interrumpe, por lo que cada paso dura 1 segundo (simulado por el hilo de suspensión). El problema es que otros hilos se atascan y esperan a que termine la tarea rota. Este es un ejemplo artificial, pero imagine una aplicación de servlet y alguien que envíe una tarea de larga ejecución al grupo de unión de fork compartida.
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
Respuestas:
En realidad, hay un truco sobre cómo ejecutar una operación paralela en un grupo de bifurcación específico. Si lo ejecuta como una tarea en un grupo fork-join, permanece allí y no usa el común.
El truco se basa en ForkJoinTask.fork que especifica: "Organiza la ejecución asincrónica de esta tarea en el grupo en el que se está ejecutando la tarea actual, si corresponde, o utiliza el ForkJoinPool.commonPool () si no está enForkJoinPool ()"
fuente
ForkJoinPool
o es un detalle de implementación? Un enlace a la documentación estaría bien.ForkJoinPool
instancia debería sershutdown()
cuando ya no sea necesaria para evitar una fuga de hilo. (ejemplo)Las secuencias paralelas usan el valor predeterminado,
ForkJoinPool.commonPool
que por defecto tiene un subproceso menos, ya que tiene procesadores , como se muestra enRuntime.getRuntime().availableProcessors()
(Esto significa que las secuencias paralelas usan todos sus procesadores porque también usan el hilo principal):Esto también significa que si ha anidado flujos paralelos o múltiples flujos paralelos iniciados simultáneamente, todos compartirán el mismo grupo. Ventaja: nunca usará más que el predeterminado (número de procesadores disponibles). Desventaja: es posible que no obtenga "todos los procesadores" asignados a cada flujo paralelo que inicie (si tiene más de uno). (Aparentemente, puedes usar un ManagedBlocker para evitar eso).
Para cambiar la forma en que se ejecutan las transmisiones paralelas, puede
yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();
oSystem.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")
para un paralelismo objetivo de 20 hilos. Sin embargo, esto ya no funciona después del parche con respaldo https://bugs.openjdk.java.net/browse/JDK-8190974 .Ejemplo de esto último en mi máquina que tiene 8 procesadores. Si ejecuto el siguiente programa:
El resultado es:
Entonces puede ver que la secuencia paralela procesa 8 elementos a la vez, es decir, utiliza 8 subprocesos. Sin embargo, si descomento la línea comentada, el resultado es:
Esta vez, la secuencia paralela ha utilizado 20 subprocesos y los 20 elementos de la secuencia se han procesado simultáneamente.
fuente
commonPool
tiene en realidad uno menor queavailableProcessors
, lo que resulta en un paralelismo total igual aavailableProcessors
porque el hilo de llamada cuenta como uno.ForkJoinTask
. Para imitarparallel()
get()
es necesario:stream.parallel().forEach(soSomething)).get();
ForkJoinPool.submit(() -> stream.forEach(...))
ejecutará mis acciones de Stream con lo dadoForkJoinPool
. Esperaría que toda la Stream-Action se ejecute en ForJoinPool como ONE action, pero internamente todavía usa el ForkJoinPool predeterminado / común. ¿Dónde viste que el ForkJoinPool.submit () haría lo que tú dices?Alternativamente al truco de activar el cálculo paralelo dentro de su propio forkJoinPool, también puede pasar ese grupo al método CompletableFuture.supplyAsync como en:
fuente
La solución original (establecer la propiedad de paralelismo común de ForkJoinPool) ya no funciona. Mirando los enlaces en la respuesta original, una actualización que rompe esto se ha vuelto a portar a Java 8. Como se mencionó en los hilos enlazados, no se garantiza que esta solución funcione para siempre. Basado en eso, la solución es forkjoinpool.submit con la solución .get discutida en la respuesta aceptada. Creo que el backport también corrige la falta de fiabilidad de esta solución.
fuente
ForkJoinPool.commonPool().getParallelism()
en modo de depuración.unreported exception InterruptedException; must be caught or declared to be thrown
Incluso con todas lascatch
excepciones en el bucle.Podemos cambiar el paralelismo predeterminado usando la siguiente propiedad:
que se puede configurar para usar más paralelismo.
fuente
Para medir la cantidad real de hilos usados, puede verificar
Thread.activeCount()
:Esto puede producir en una CPU de 4 núcleos una salida como:
Sin
.parallel()
eso da:fuente
Hasta ahora, utilicé las soluciones descritas en las respuestas de esta pregunta. Ahora, se me ocurrió una pequeña biblioteca llamada Parallel Stream Support para eso:
Pero como @PabloMatiasGomez señaló en los comentarios, hay inconvenientes con respecto al mecanismo de división de las corrientes paralelas que depende en gran medida del tamaño del grupo común. Ver flujo paralelo desde un HashSet no se ejecuta en paralelo .
Estoy usando esta solución solo para tener grupos separados para diferentes tipos de trabajo, pero no puedo establecer el tamaño del grupo común en 1, incluso si no lo uso.
fuente
Nota: Parece que hay una solución implementada en JDK 10 que garantiza que el Grupo de subprocesos personalizado use el número esperado de subprocesos.
La ejecución de flujo paralelo dentro de un ForkJoinPool personalizado debe obedecer el paralelismo https://bugs.openjdk.java.net/browse/JDK-8190974
fuente
Probé el ForkJoinPool personalizado de la siguiente manera para ajustar el tamaño de la piscina:
Aquí está el resultado que dice que el grupo está usando más subprocesos que el predeterminado 4 .
Pero en realidad hay un bicho raro , cuando traté de lograr el mismo resultado usando
ThreadPoolExecutor
lo siguiente:Pero fallé.
Solo iniciará el paralelismo en un nuevo subproceso y luego todo lo demás será igual, lo que nuevamente prueba que
parallelStream
utilizará el ForkJoinPool para iniciar sus subprocesos secundarios.fuente
Ve a buscar AbacusUtil . El número de subproceso puede especificarse para flujo paralelo. Aquí está el código de ejemplo:
Divulgación: Soy el desarrollador de AbacusUtil.
fuente
Si no desea confiar en los hacks de implementación, siempre hay una manera de lograr lo mismo mediante la implementación de recopiladores personalizados que se combinarán
map
y lacollect
semántica ... y no estaría limitado a ForkJoinPool:Afortunadamente, ya está hecho aquí y disponible en Maven Central: http://github.com/pivovarit/parallel-collectors
Descargo de responsabilidad: lo escribí y asumo la responsabilidad.
fuente
Si no le importa usar una biblioteca de terceros, con cyclops-react puede mezclar Streams secuenciales y paralelos dentro de la misma tubería y proporcionar ForkJoinPools personalizados. Por ejemplo
O si deseamos continuar procesando dentro de una secuencia secuencial
[Divulgación Soy el desarrollador principal de cyclops-react]
fuente
Si no necesita un ThreadPool personalizado pero prefiere limitar el número de tareas simultáneas, puede usar:
(La pregunta duplicada que pide esto está bloqueada, así que por favor llévame aquí)
fuente
puede intentar implementar este ForkJoinWorkerThreadFactory e inyectarlo en la clase Fork-Join.
puede usar este constructor del grupo Fork-Join para hacer esto.
notas: 1. si usa esto, tenga en cuenta que, en función de su implementación de nuevos subprocesos, la programación de JVM se verá afectada, que generalmente programa subprocesos de unión de horquilla en diferentes núcleos (tratados como un subproceso computacional). 2. la programación de tareas por fork-join a hilos no se verá afectada. 3. Realmente no he descubierto cómo la secuencia paralela está eligiendo subprocesos de fork-join (no se pudo encontrar la documentación adecuada), así que intente usar una fábrica de nombres de subprocesos diferente para asegurarse de que si se seleccionan subprocesos en secuencia paralela de customThreadFactory que proporciona. 4. commonThreadPool no utilizará este customThreadFactory.
fuente