AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
.map(record -> new Record(recordNumber.incrementAndGet(), record))
.parallel()
.filter(record -> doSomeOperation())
.findFirst()
Cuando escribí esto, asumí que los hilos se generarán solo en la llamada del mapa, ya que el paralelo se coloca después del mapa. Pero algunas líneas en el archivo obtenían diferentes números de registro para cada ejecución.
Leí la documentación oficial de la transmisión de Java y algunos sitios web para comprender cómo funcionan las transmisiones bajo el capó.
Unas cuantas preguntas:
El flujo paralelo de Java funciona basado en SplitIterator , que se implementa en cada colección como ArrayList, LinkedList, etc. Cuando construimos un flujo paralelo a partir de esas colecciones, el iterador de división correspondiente se usará para dividir e iterar la colección. Esto explica por qué el paralelismo ocurrió en el nivel de la fuente de entrada original (líneas de archivo) en lugar del resultado del mapa (es decir, Grabar pojo). ¿Es correcto mi entendimiento?
En mi caso, la entrada es un archivo IO stream. ¿Qué iterador dividido se usará?
No importa dónde lo ubiquemos
parallel()
en la tubería. La fuente de entrada original siempre se dividirá y se aplicarán las operaciones intermedias restantes.En este caso, Java no debería permitir a los usuarios colocar operaciones paralelas en ningún lugar de la tubería, excepto en la fuente original. Porque, está dando una comprensión errónea para aquellos que no saben cómo funciona Java Stream internamente. Sé que la
parallel()
operación se habría definido para el tipo de objeto Stream y, por lo tanto, funciona de esta manera. Pero, es mejor proporcionar alguna solución alternativa.En el fragmento de código anterior, estoy tratando de agregar un número de línea a cada registro en el archivo de entrada, por lo que debería ordenarse. Sin embargo, quiero aplicar
doSomeOperation()
en paralelo ya que es una lógica pesada. La única forma de lograrlo es escribir mi propio iterador dividido personalizado. ¿Hay alguna otra manera?
fuente
parallel()
no es más que una solicitud de modificación general que se aplica al objeto de flujo subyacente. Recuerde que solo hay un flujo fuente si no aplica las operaciones finales a la tubería, es decir, siempre que no se "ejecute" nada. Dicho esto, básicamente solo cuestionas las opciones de diseño de Java. Que se basa en la opinión y realmente no podemos ayudar con eso.Stream
directamente en la interfaz y, debido a la buena conexión en cascada, cada operación devuelveStream
nuevamente. Imagine que alguien quiere darle unaStream
pero ya ha aplicado un par de operaciones comomap
esta. Usted, como usuario, aún quiere poder decidir si desea que se ejecute en paralelo o no. Por lo tanto, debe ser posible llamarparallel()
aún, aunque la transmisión ya existe.flatMap
o si ejecuta métodos no seguros para subprocesos o similares.Path
está en el sistema de archivos local y está utilizando un JDK reciente, el spliterator tendrá una mejor capacidad de procesamiento en paralelo que los lotes de múltiplos de 1024. Pero la división equilibrada puede ser incluso contraproducente en algunosfindFirst
escenarios ...Respuestas:
Todo el flujo es paralelo o secuencial. No seleccionamos un subconjunto de operaciones para ejecutar de forma secuencial o en paralelo.
Como mencionas, los flujos paralelos usan iteradores divididos. Claramente, esto es para particionar los datos antes de que las operaciones comiencen a ejecutarse.
Mirando la fuente, veo que usa
java.nio.file.FileChannelLinesSpliterator
Derecha. Incluso puedes llamar
parallel()
ysequential()
varias veces. El último invocado ganará. Cuando llamamosparallel()
, configuramos eso para la secuencia que se devuelve; y como se indicó anteriormente, todas las operaciones se ejecutan secuencialmente o en paralelo.Esto se convierte en una cuestión de opiniones. Creo que Zabuza da una buena razón para apoyar la elección de los diseñadores de JDK.
Esto depende de tus operaciones
findFirst()
es su operación de terminal real, entonces ni siquiera tiene que preocuparse por la ejecución en paralelo, porque dedoSomething()
todos modos no habrá muchas llamadas (findFirst()
está en cortocircuito)..parallel()
de hecho, puede hacer que se procese más de un elemento, mientras quefindFirst()
en una secuencia secuencial evitaría eso.Si su operación de terminal no crea muchos datos, entonces tal vez pueda crear sus
Record
objetos usando una secuencia secuencial y luego procesar el resultado en paralelo:Si su canalización cargaría muchos datos en la memoria (que puede ser la razón por la que está usando
Files.lines()
), entonces tal vez necesite un iterador dividido personalizado. Sin embargo, antes de ir allí, buscaría otras opciones (como guardar líneas con una columna de identificación para empezar, esa es solo mi opinión).También intentaría procesar registros en lotes más pequeños, como este:
Esto se ejecuta
doSomeOperation()
en paralelo sin cargar todos los datos en la memoria. Pero tenga en cuenta quebatchSize
habrá que pensarlo.fuente
Spliterator
implementación personalizada no sería más complicada que esto, al tiempo que permite un procesamiento paralelo más eficiente ...parallelStream
operaciones internas tiene una sobrecarga fija para iniciar la operación y esperar el resultado final, mientras se limita a un paralelismo debatchSize
. Primero, necesita un múltiplo del número de núcleos de CPU disponibles actualmente para evitar subprocesos inactivos. Entonces, el número debe ser lo suficientemente alto como para compensar la sobrecarga fija, pero cuanto mayor sea el número, mayor será la pausa impuesta por la operación de lectura secuencial que ocurre incluso antes de que comience el procesamiento paralelo.Stream.generate
produce un flujo no ordenado, que no funciona con los casos de uso previstos del OP comofindFirst()
. Por el contrario, una única secuencia paralela con un spliterator que devuelve fragmentostrySplit
funciona directamente y permite que los subprocesos de trabajo procesen el siguiente fragmento sin esperar a que se complete el anterior.findFirst()
operación procesará solo un pequeño número de elementos. La primera coincidencia aún puede ocurrir después de procesar el 90% de todos los elementos. Además, cuando se tienen diez millones de líneas, incluso encontrar una coincidencia después del 10% todavía requiere procesar un millón de líneas.El diseño original de Stream incluía la idea de admitir etapas de canalización posteriores con diferentes configuraciones de ejecución paralelas, pero esta idea ha sido abandonada. La API puede provenir de este momento, pero por otro lado, un diseño de API que obligue a la persona que llama a tomar una sola decisión inequívoca para la ejecución paralela o secuencial sería mucho más complicado.
El
Spliterator
uso real porFiles.lines(…)
depende de la implementación. En Java 8 (Oracle u OpenJDK), siempre obtienes lo mismo que conBufferedReader.lines()
. En los JDK más recientes, siPath
pertenece al sistema de archivos predeterminado y el juego de caracteres es uno de los admitidos para esta función, obtendrá un Stream con unaSpliterator
implementación dedicada , eljava.nio.file.FileChannelLinesSpliterator
. Si no se cumplen las condiciones previas, se obtiene lo mismo que conBufferedReader.lines()
, que todavía se basa en una víaIterator
implementadaBufferedReader
y envueltaSpliterators.spliteratorUnknownSize
.Su tarea específica se maneja mejor con una costumbre
Spliterator
que puede realizar la numeración de línea directamente en la fuente, antes del procesamiento paralelo, para permitir el procesamiento paralelo posterior sin restricciones.fuente
Y la siguiente es una demostración simple de cuándo se aplica la aplicación de paralelo. El resultado del vistazo muestra claramente la diferencia entre los dos ejemplos. Nota: la
map
llamada se lanza para agregar otro método antesparallel
.fuente