Flujo paralelo de Java: orden de invocar el método parallel () [cerrado]

11
AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
     .map(record -> new Record(recordNumber.incrementAndGet(), record)) 
     .parallel()           
     .filter(record -> doSomeOperation())
     .findFirst()

Cuando escribí esto, asumí que los hilos se generarán solo en la llamada del mapa, ya que el paralelo se coloca después del mapa. Pero algunas líneas en el archivo obtenían diferentes números de registro para cada ejecución.

Leí la documentación oficial de la transmisión de Java y algunos sitios web para comprender cómo funcionan las transmisiones bajo el capó.

Unas cuantas preguntas:

  • El flujo paralelo de Java funciona basado en SplitIterator , que se implementa en cada colección como ArrayList, LinkedList, etc. Cuando construimos un flujo paralelo a partir de esas colecciones, el iterador de división correspondiente se usará para dividir e iterar la colección. Esto explica por qué el paralelismo ocurrió en el nivel de la fuente de entrada original (líneas de archivo) en lugar del resultado del mapa (es decir, Grabar pojo). ¿Es correcto mi entendimiento?

  • En mi caso, la entrada es un archivo IO stream. ¿Qué iterador dividido se usará?

  • No importa dónde lo ubiquemos parallel()en la tubería. La fuente de entrada original siempre se dividirá y se aplicarán las operaciones intermedias restantes.

    En este caso, Java no debería permitir a los usuarios colocar operaciones paralelas en ningún lugar de la tubería, excepto en la fuente original. Porque, está dando una comprensión errónea para aquellos que no saben cómo funciona Java Stream internamente. Sé que la parallel()operación se habría definido para el tipo de objeto Stream y, por lo tanto, funciona de esta manera. Pero, es mejor proporcionar alguna solución alternativa.

  • En el fragmento de código anterior, estoy tratando de agregar un número de línea a cada registro en el archivo de entrada, por lo que debería ordenarse. Sin embargo, quiero aplicar doSomeOperation()en paralelo ya que es una lógica pesada. La única forma de lograrlo es escribir mi propio iterador dividido personalizado. ¿Hay alguna otra manera?

explorador
fuente
2
Tiene más que ver con cómo los creadores de Java decidieron diseñar la interfaz. Coloca sus solicitudes en la tubería y todo lo que no sea una operación final se recopilará primero. parallel()no es más que una solicitud de modificación general que se aplica al objeto de flujo subyacente. Recuerde que solo hay un flujo fuente si no aplica las operaciones finales a la tubería, es decir, siempre que no se "ejecute" nada. Dicho esto, básicamente solo cuestionas las opciones de diseño de Java. Que se basa en la opinión y realmente no podemos ayudar con eso.
Zabuzard
1
Entiendo totalmente su punto y confusión, pero no creo que haya soluciones mucho mejores. El método se ofrece Streamdirectamente en la interfaz y, debido a la buena conexión en cascada, cada operación devuelve Streamnuevamente. Imagine que alguien quiere darle una Streampero ya ha aplicado un par de operaciones como mapesta. Usted, como usuario, aún quiere poder decidir si desea que se ejecute en paralelo o no. Por lo tanto, debe ser posible llamar parallel()aún, aunque la transmisión ya existe.
Zabuzard
1
Además, preferiría preguntar por qué querría ejecutar una parte de una secuencia secuencialmente y luego, cambiar a paralelo. Si la secuencia ya es lo suficientemente grande como para calificar para la ejecución paralela, entonces esto probablemente también se aplica a todo lo que está antes en la tubería. Entonces, ¿por qué no usar también la ejecución paralela para esa parte? Entiendo que hay casos extremos como si aumenta drásticamente el tamaño con flatMapo si ejecuta métodos no seguros para subprocesos o similares.
Zabuzard
1
@Zabuza No estoy cuestionando la elección del diseño de Java, pero solo estoy planteando mi preocupación. Cualquier usuario básico de Java Stream podría tener la misma confusión a menos que comprenda el funcionamiento de Stream. Sin embargo, estoy totalmente de acuerdo con tu segundo comentario. Acabo de destacar una posible solución que podría tener su propio inconveniente, como usted ha mencionado. Pero, podemos ver si se puede resolver de otra manera. Con respecto a su tercer comentario, ya he mencionado mi caso de uso en el último punto de mi descripción
explorador el
1
@Eugene cuando Pathestá en el sistema de archivos local y está utilizando un JDK reciente, el spliterator tendrá una mejor capacidad de procesamiento en paralelo que los lotes de múltiplos de 1024. Pero la división equilibrada puede ser incluso contraproducente en algunos findFirstescenarios ...
Holger

Respuestas:

8

Esto explica por qué el paralelismo ocurrió en el nivel de la fuente de entrada original (líneas de archivo) en lugar del resultado del mapa (es decir, Grabar pojo).

Todo el flujo es paralelo o secuencial. No seleccionamos un subconjunto de operaciones para ejecutar de forma secuencial o en paralelo.

Cuando se inicia la operación del terminal, la tubería de flujo se ejecuta secuencialmente o en paralelo, dependiendo de la orientación del flujo en el que se invoca. [...] Cuando se inicia la operación del terminal, la tubería de flujo se ejecuta secuencialmente o en paralelo, dependiendo del modo del flujo en el que se invoca. misma fuente

Como mencionas, los flujos paralelos usan iteradores divididos. Claramente, esto es para particionar los datos antes de que las operaciones comiencen a ejecutarse.


En mi caso, la entrada es un archivo IO stream. ¿Qué iterador dividido se usará?

Mirando la fuente, veo que usa java.nio.file.FileChannelLinesSpliterator


No importa dónde coloquemos paralelo () en la tubería. La fuente de entrada original siempre se dividirá y se aplicarán las operaciones intermedias restantes.

Derecha. Incluso puedes llamar parallel()y sequential()varias veces. El último invocado ganará. Cuando llamamos parallel(), configuramos eso para la secuencia que se devuelve; y como se indicó anteriormente, todas las operaciones se ejecutan secuencialmente o en paralelo.


En este caso, Java no debería permitir a los usuarios colocar operaciones paralelas en ningún lugar de la tubería, excepto en la fuente original ...

Esto se convierte en una cuestión de opiniones. Creo que Zabuza da una buena razón para apoyar la elección de los diseñadores de JDK.


La única forma de lograrlo es escribir mi propio iterador dividido personalizado. ¿Hay alguna otra manera?

Esto depende de tus operaciones

  • Si findFirst()es su operación de terminal real, entonces ni siquiera tiene que preocuparse por la ejecución en paralelo, porque de doSomething()todos modos no habrá muchas llamadas ( findFirst()está en cortocircuito). .parallel()de hecho, puede hacer que se procese más de un elemento, mientras que findFirst()en una secuencia secuencial evitaría eso.
  • Si su operación de terminal no crea muchos datos, entonces tal vez pueda crear sus Recordobjetos usando una secuencia secuencial y luego procesar el resultado en paralelo:

    List<Record> smallData = Files.lines(inputFile.toPath(), 
                                         StandardCharsets.UTF_8)
      .map(record -> new Record(recordNumber.incrementAndGet(), record)) 
      .collect(Collectors.toList())
      .parallelStream()     
      .filter(record -> doSomeOperation())
      .collect(Collectors.toList());
    
  • Si su canalización cargaría muchos datos en la memoria (que puede ser la razón por la que está usando Files.lines()), entonces tal vez necesite un iterador dividido personalizado. Sin embargo, antes de ir allí, buscaría otras opciones (como guardar líneas con una columna de identificación para empezar, esa es solo mi opinión).
    También intentaría procesar registros en lotes más pequeños, como este:

    AtomicInteger recordNumber = new AtomicInteger();
    final int batchSize = 10;
    
    try(BufferedReader reader = Files.newBufferedReader(inputFile.toPath(), 
            StandardCharsets.UTF_8);) {
        Supplier<List<Record>> batchSupplier = () -> {
            List<Record> batch = new ArrayList<>();
            for (int i = 0; i < batchSize; i++) {
                String nextLine;
                try {
                    nextLine = reader.readLine();
                } catch (IOException e) {
                    //hanlde exception
                    throw new RuntimeException(e);
                }
    
                if(null == nextLine) 
                    return batch;
                batch.add(new Record(recordNumber.getAndIncrement(), nextLine));
            }
            System.out.println("next batch");
    
            return batch;
        };
    
        Stream.generate(batchSupplier)
            .takeWhile(list -> list.size() >= batchSize)
            .map(list -> list.parallelStream()
                             .filter(record -> doSomeOperation())
                             .collect(Collectors.toList()))
            .flatMap(List::stream)
            .forEach(System.out::println);
    }
    

    Esto se ejecuta doSomeOperation()en paralelo sin cargar todos los datos en la memoria. Pero tenga en cuenta que batchSizehabrá que pensarlo.

ernest_k
fuente
1
Gracias por la aclaración. Es bueno saber acerca de la tercera solución que ha resaltado. Echaré un vistazo ya que no he usado takeWhile y Supplier.
explorador
2
Una Spliteratorimplementación personalizada no sería más complicada que esto, al tiempo que permite un procesamiento paralelo más eficiente ...
Holger
1
Cada una de sus parallelStreamoperaciones internas tiene una sobrecarga fija para iniciar la operación y esperar el resultado final, mientras se limita a un paralelismo de batchSize. Primero, necesita un múltiplo del número de núcleos de CPU disponibles actualmente para evitar subprocesos inactivos. Entonces, el número debe ser lo suficientemente alto como para compensar la sobrecarga fija, pero cuanto mayor sea el número, mayor será la pausa impuesta por la operación de lectura secuencial que ocurre incluso antes de que comience el procesamiento paralelo.
Holger
1
Girar el flujo externo en paralelo causaría una mala interferencia con el interno en la implementación actual, además del punto que Stream.generateproduce un flujo no ordenado, que no funciona con los casos de uso previstos del OP como findFirst(). Por el contrario, una única secuencia paralela con un spliterator que devuelve fragmentos trySplitfunciona directamente y permite que los subprocesos de trabajo procesen el siguiente fragmento sin esperar a que se complete el anterior.
Holger
2
No hay razón para suponer que una findFirst()operación procesará solo un pequeño número de elementos. La primera coincidencia aún puede ocurrir después de procesar el 90% de todos los elementos. Además, cuando se tienen diez millones de líneas, incluso encontrar una coincidencia después del 10% todavía requiere procesar un millón de líneas.
Holger
7

El diseño original de Stream incluía la idea de admitir etapas de canalización posteriores con diferentes configuraciones de ejecución paralelas, pero esta idea ha sido abandonada. La API puede provenir de este momento, pero por otro lado, un diseño de API que obligue a la persona que llama a tomar una sola decisión inequívoca para la ejecución paralela o secuencial sería mucho más complicado.

El Spliteratoruso real por Files.lines(…)depende de la implementación. En Java 8 (Oracle u OpenJDK), siempre obtienes lo mismo que con BufferedReader.lines(). En los JDK más recientes, si Pathpertenece al sistema de archivos predeterminado y el juego de caracteres es uno de los admitidos para esta función, obtendrá un Stream con una Spliteratorimplementación dedicada , el java.nio.file.FileChannelLinesSpliterator. Si no se cumplen las condiciones previas, se obtiene lo mismo que con BufferedReader.lines(), que todavía se basa en una vía Iteratorimplementada BufferedReadery envuelta Spliterators.spliteratorUnknownSize.

Su tarea específica se maneja mejor con una costumbre Spliteratorque puede realizar la numeración de línea directamente en la fuente, antes del procesamiento paralelo, para permitir el procesamiento paralelo posterior sin restricciones.

public static Stream<Record> records(Path p) throws IOException {
    LineNoSpliterator sp = new LineNoSpliterator(p);
    return StreamSupport.stream(sp, false).onClose(sp);
}

private static class LineNoSpliterator implements Spliterator<Record>, Runnable {
    int chunkSize = 100;
    SeekableByteChannel channel;
    LineNumberReader reader;

    LineNoSpliterator(Path path) throws IOException {
        channel = Files.newByteChannel(path, StandardOpenOption.READ);
        reader=new LineNumberReader(Channels.newReader(channel,StandardCharsets.UTF_8));
    }

    @Override
    public void run() {
        try(Closeable c1 = reader; Closeable c2 = channel) {}
        catch(IOException ex) { throw new UncheckedIOException(ex); }
        finally { reader = null; channel = null; }
    }

    @Override
    public boolean tryAdvance(Consumer<? super Record> action) {
        try {
            String line = reader.readLine();
            if(line == null) return false;
            action.accept(new Record(reader.getLineNumber(), line));
            return true;
        } catch (IOException ex) {
            throw new UncheckedIOException(ex);
        }
    }

    @Override
    public Spliterator<Record> trySplit() {
        Record[] chunks = new Record[chunkSize];
        int read;
        for(read = 0; read < chunks.length; read++) {
            int pos = read;
            if(!tryAdvance(r -> chunks[pos] = r)) break;
        }
        return Spliterators.spliterator(chunks, 0, read, characteristics());
    }

    @Override
    public long estimateSize() {
        try {
            return (channel.size() - channel.position()) / 60;
        } catch (IOException ex) {
            return 0;
        }
    }

    @Override
    public int characteristics() {
        return ORDERED | NONNULL | DISTINCT;
    }
}
Holger
fuente
0

Y la siguiente es una demostración simple de cuándo se aplica la aplicación de paralelo. El resultado del vistazo muestra claramente la diferencia entre los dos ejemplos. Nota: la mapllamada se lanza para agregar otro método antes parallel.

IntStream.rangeClosed (1,20).peek(a->System.out.print(a+" "))
        .map(a->a + 200).sum();
System.out.println();
IntStream.rangeClosed(1,20).peek(a->System.out.print(a+" "))
        .map(a->a + 200).parallel().sum();
WJS
fuente