Las secuencias Java infinitas paralelas se quedan sin memoria

16

Estoy tratando de entender por qué el siguiente programa Java da un OutOfMemoryError, mientras que el programa correspondiente sin .parallel()no.

System.out.println(Stream
    .iterate(1, i -> i+1)
    .parallel()
    .flatMap(n -> Stream.iterate(n, i -> i+n))
    .mapToInt(Integer::intValue)
    .limit(100_000_000)
    .sum()
);

Tengo dos preguntas:

  1. ¿Cuál es el resultado previsto de este programa?

    Sin .parallel()que parezca que esto simplemente sum(1+2+3+...)sale, lo que significa que simplemente "se atasca" en la primera secuencia en el plano, lo cual tiene sentido.

    Con el paralelo, no sé si hay un comportamiento esperado, pero supongo que de alguna manera intercaló los primeros nflujos, donde nestá el número de trabajadores paralelos. También podría ser ligeramente diferente en función del comportamiento de fragmentación / almacenamiento en búfer.

  2. ¿Qué hace que se quede sin memoria? Estoy tratando específicamente de entender cómo se implementan estas transmisiones bajo el capó.

    Supongo que algo bloquea el flujo, por lo que nunca termina y puede deshacerse de los valores generados, pero no sé exactamente en qué orden se evalúan las cosas y dónde se produce el almacenamiento en búfer.

Editar: en caso de que sea relevante, estoy usando Java 11.

Editt 2: Aparentemente, sucede lo mismo incluso para el programa simple IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum(), por lo que podría tener que ver con la pereza de limitmás que con flatMap.

Thomas Ahle
fuente
parallel () usa internamente ForkJoinPool. Supongo que ForkJoin Framework está en Java desde Java 7
aravind

Respuestas:

9

Usted dice " pero no sé exactamente en qué orden se evalúan las cosas y dónde ocurre el almacenamiento en búfer ", que es precisamente de lo que se tratan las corrientes paralelas. El orden de evaluación no está especificado.

Un aspecto crítico de su ejemplo es el .limit(100_000_000). Esto implica que la implementación no solo puede sumar valores arbitrarios, sino que debe sumar los primeros 100,000,000 números. Tenga en cuenta que en la implementación de referencia, .unordered().limit(100_000_000)no cambia el resultado, lo que indica que no hay una implementación especial para el caso no ordenado, pero eso es un detalle de implementación.

Ahora, cuando los subprocesos de trabajo procesan los elementos, no pueden simplemente resumirlos, ya que tienen que saber qué elementos pueden consumir, lo que depende de cuántos elementos precedan a su carga de trabajo específica. Dado que esta secuencia no conoce los tamaños, esto solo puede conocerse cuando se han procesado los elementos del prefijo, lo que nunca sucede para secuencias infinitas. Por lo tanto, los subprocesos de trabajo siguen almacenando en el búfer por el momento, esta información está disponible.

En principio, cuando un subproceso de trabajo sabe que procesa el fragmento de trabajo más a la izquierda, podría resumir los elementos de inmediato, contarlos y señalar el final al alcanzar el límite. Por lo tanto, la transmisión podría terminar, pero esto depende de muchos factores.

En su caso, un escenario plausible es que los otros hilos de trabajo son más rápidos en la asignación de buffers de lo que cuenta el trabajo más a la izquierda. En este escenario, los cambios sutiles en el tiempo podrían hacer que la transmisión vuelva ocasionalmente con un valor.

Cuando ralentizamos todos los subprocesos de trabajo, excepto el que procesa el fragmento más a la izquierda, podemos hacer que la secuencia finalice (al menos en la mayoría de las ejecuciones):

System.out.println(IntStream
    .iterate(1, i -> i+1)
    .parallel()
    .peek(i -> { if(i != 1) LockSupport.parkNanos(1_000_000_000); })
    .flatMap(n -> IntStream.iterate(n, i -> i+n))
    .limit(100_000_000)
    .sum()
);

Following Estoy siguiendo una sugerencia de Stuart Marks para usar el orden de izquierda a derecha cuando se habla del orden de encuentro en lugar del orden de procesamiento.

Holger
fuente
Muy buena respuesta! Me pregunto si existe el riesgo de que todos los subprocesos comiencen a ejecutar las operaciones flatMap y que ninguno se asigne para vaciar realmente los búferes (sumando). En mi caso de uso real, las secuencias infinitas son, en cambio, archivos demasiado grandes para guardar en la memoria. Me pregunto cómo puedo reescribir la transmisión para mantener bajo el uso de memoria.
Thomas Ahle
1
Estas usando Files.lines(…)? Se ha mejorado significativamente en Java 9.
Holger
1
Esto es lo que hace en Java 8. En los JRE más nuevos, seguirá recurriendo a BufferedReader.lines()ciertas circunstancias (no el sistema de archivos predeterminado, un juego de caracteres especial o un tamaño mayor que Integer.MAX_FILES). Si se aplica uno de estos, una solución personalizada podría ayudar. Esto valdría una nueva Q & A ...
Holger
1
Integer.MAX_VALUE, por supuesto ...
Holger
1
¿Qué es la secuencia externa, una secuencia de archivos? ¿Tiene un tamaño predecible?
Holger
5

Mi mejor conjetura es que agregar parallel()cambios flatMap()cuyo comportamiento interno ya tenía problemas para ser evaluado perezosamente antes .

El OutOfMemoryErrorerror que está recibiendo se informó en [JDK-8202307] Obteniendo un java.lang.OutOfMemoryError: espacio de almacenamiento dinámico Java al llamar a Stream.iterator (). Next () en una secuencia que utiliza una secuencia infinita / muy grande en flatMap . Si observa el ticket, es más o menos el mismo rastro de pila que está obteniendo. El ticket se cerró porque no se solucionará con el siguiente motivo:

Los métodos iterator()y spliterator()son "trampillas de escape" que se utilizarán cuando no sea posible utilizar otras operaciones. Tienen algunas limitaciones porque convierten lo que es un modelo push de la implementación del flujo en un modelo pull. Dicha transición requiere almacenamiento en búfer en ciertos casos, como cuando un elemento se asigna (plano) a dos o más elementos . Complicaría significativamente la implementación del flujo, probablemente a expensas de casos comunes, para soportar una noción de contrapresión para comunicar cuántos elementos extraer a través de capas anidadas de producción de elementos.

Karol Dowbecki
fuente
¡Esto es muy interesante! Tiene sentido que la transición push / pull requiera almacenamiento en búfer, lo que puede agotar la memoria. Sin embargo, en mi caso, ¿parece que usar solo push debería funcionar bien y simplemente descartar los elementos restantes tal como aparecen? ¿O tal vez estás diciendo que flapmap hace que se cree un iterador?
Thomas Ahle
3

OOME es causada no por la corriente siendo infinito, sino por el hecho de que no lo es .

Es decir, si comentas, .limit(...)nunca se quedará sin memoria, pero, por supuesto, tampoco terminará.

Una vez que se divide, la secuencia solo puede realizar un seguimiento de la cantidad de elementos si se acumulan dentro de cada subproceso (parece que el acumulador real es Spliterators$ArraySpliterator#array).

Parece que puedes reproducirlo sin flatMap, solo ejecuta lo siguiente con -Xmx128m:

    System.out.println(Stream
            .iterate(1, i -> i + 1)
            .parallel()
      //    .flatMap(n -> Stream.iterate(n, i -> i+n))
            .mapToInt(Integer::intValue)
            .limit(100_000_000)
            .sum()
    );

Sin embargo, después de comentarlo limit(), debería funcionar bien hasta que decida ahorrar su computadora portátil.

Además de los detalles de implementación reales, esto es lo que creo que está sucediendo:

Con limit, el sumreductor quiere que los primeros elementos X se sumen, por lo que ningún hilo puede emitir sumas parciales. Cada "corte" (hilo) necesitará acumular elementos y pasarlos. Sin límite, no existe tal restricción, por lo que cada "porción" solo calculará la suma parcial de los elementos que obtiene (para siempre), suponiendo que eventualmente emitirá el resultado.

Costi Ciudatu
fuente
¿Qué quieres decir con "una vez que se divide"? ¿El límite lo divide de alguna manera?
Thomas Ahle
@ThomasAhle parallel()lo usará ForkJoinPoolinternamente para lograr paralelismo. Se Spliteratorutilizará para asignar trabajo a cada ForkJointarea, creo que podemos llamar a la unidad de trabajo aquí como "dividida".
Karol Dowbecki
Pero, ¿por qué eso solo sucede con límite?
Thomas Ahle
@ThomasAhle Edité la respuesta con mis dos centavos.
Costi Ciudatu
1
@ThomasAhle estableció un punto de interrupción Integer.sum(), utilizado por el IntStream.sumreductor. Verá que la versión sin límite llama que funciona todo el tiempo, mientras que la versión limitada nunca puede llamarla antes de OOM.
Costi Ciudatu