Estoy tratando de entender por qué el siguiente programa Java da un OutOfMemoryError
, mientras que el programa correspondiente sin .parallel()
no.
System.out.println(Stream
.iterate(1, i -> i+1)
.parallel()
.flatMap(n -> Stream.iterate(n, i -> i+n))
.mapToInt(Integer::intValue)
.limit(100_000_000)
.sum()
);
Tengo dos preguntas:
¿Cuál es el resultado previsto de este programa?
Sin
.parallel()
que parezca que esto simplementesum(1+2+3+...)
sale, lo que significa que simplemente "se atasca" en la primera secuencia en el plano, lo cual tiene sentido.Con el paralelo, no sé si hay un comportamiento esperado, pero supongo que de alguna manera intercaló los primeros
n
flujos, donden
está el número de trabajadores paralelos. También podría ser ligeramente diferente en función del comportamiento de fragmentación / almacenamiento en búfer.¿Qué hace que se quede sin memoria? Estoy tratando específicamente de entender cómo se implementan estas transmisiones bajo el capó.
Supongo que algo bloquea el flujo, por lo que nunca termina y puede deshacerse de los valores generados, pero no sé exactamente en qué orden se evalúan las cosas y dónde se produce el almacenamiento en búfer.
Editar: en caso de que sea relevante, estoy usando Java 11.
Editt 2: Aparentemente, sucede lo mismo incluso para el programa simple IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum()
, por lo que podría tener que ver con la pereza de limit
más que con flatMap
.
fuente
Respuestas:
Usted dice " pero no sé exactamente en qué orden se evalúan las cosas y dónde ocurre el almacenamiento en búfer ", que es precisamente de lo que se tratan las corrientes paralelas. El orden de evaluación no está especificado.
Un aspecto crítico de su ejemplo es el
.limit(100_000_000)
. Esto implica que la implementación no solo puede sumar valores arbitrarios, sino que debe sumar los primeros 100,000,000 números. Tenga en cuenta que en la implementación de referencia,.unordered().limit(100_000_000)
no cambia el resultado, lo que indica que no hay una implementación especial para el caso no ordenado, pero eso es un detalle de implementación.Ahora, cuando los subprocesos de trabajo procesan los elementos, no pueden simplemente resumirlos, ya que tienen que saber qué elementos pueden consumir, lo que depende de cuántos elementos precedan a su carga de trabajo específica. Dado que esta secuencia no conoce los tamaños, esto solo puede conocerse cuando se han procesado los elementos del prefijo, lo que nunca sucede para secuencias infinitas. Por lo tanto, los subprocesos de trabajo siguen almacenando en el búfer por el momento, esta información está disponible.
En principio, cuando un subproceso de trabajo sabe que procesa el fragmento de trabajo más a la izquierda, podría resumir los elementos de inmediato, contarlos y señalar el final al alcanzar el límite. Por lo tanto, la transmisión podría terminar, pero esto depende de muchos factores.
En su caso, un escenario plausible es que los otros hilos de trabajo son más rápidos en la asignación de buffers de lo que cuenta el trabajo más a la izquierda. En este escenario, los cambios sutiles en el tiempo podrían hacer que la transmisión vuelva ocasionalmente con un valor.
Cuando ralentizamos todos los subprocesos de trabajo, excepto el que procesa el fragmento más a la izquierda, podemos hacer que la secuencia finalice (al menos en la mayoría de las ejecuciones):
Following Estoy siguiendo una sugerencia de Stuart Marks para usar el orden de izquierda a derecha cuando se habla del orden de encuentro en lugar del orden de procesamiento.
fuente
Files.lines(…)
? Se ha mejorado significativamente en Java 9.BufferedReader.lines()
ciertas circunstancias (no el sistema de archivos predeterminado, un juego de caracteres especial o un tamaño mayor queInteger.MAX_FILES
). Si se aplica uno de estos, una solución personalizada podría ayudar. Esto valdría una nueva Q & A ...Integer.MAX_VALUE
, por supuesto ...Mi mejor conjetura es que agregar
parallel()
cambiosflatMap()
cuyo comportamiento interno ya tenía problemas para ser evaluado perezosamente antes .El
OutOfMemoryError
error que está recibiendo se informó en [JDK-8202307] Obteniendo un java.lang.OutOfMemoryError: espacio de almacenamiento dinámico Java al llamar a Stream.iterator (). Next () en una secuencia que utiliza una secuencia infinita / muy grande en flatMap . Si observa el ticket, es más o menos el mismo rastro de pila que está obteniendo. El ticket se cerró porque no se solucionará con el siguiente motivo:fuente
OOME es causada no por la corriente siendo infinito, sino por el hecho de que no lo es .
Es decir, si comentas,
.limit(...)
nunca se quedará sin memoria, pero, por supuesto, tampoco terminará.Una vez que se divide, la secuencia solo puede realizar un seguimiento de la cantidad de elementos si se acumulan dentro de cada subproceso (parece que el acumulador real es
Spliterators$ArraySpliterator#array
).Parece que puedes reproducirlo sin
flatMap
, solo ejecuta lo siguiente con-Xmx128m
:Sin embargo, después de comentarlo
limit()
, debería funcionar bien hasta que decida ahorrar su computadora portátil.Además de los detalles de implementación reales, esto es lo que creo que está sucediendo:
Con
limit
, elsum
reductor quiere que los primeros elementos X se sumen, por lo que ningún hilo puede emitir sumas parciales. Cada "corte" (hilo) necesitará acumular elementos y pasarlos. Sin límite, no existe tal restricción, por lo que cada "porción" solo calculará la suma parcial de los elementos que obtiene (para siempre), suponiendo que eventualmente emitirá el resultado.fuente
parallel()
lo usaráForkJoinPool
internamente para lograr paralelismo. SeSpliterator
utilizará para asignar trabajo a cadaForkJoin
tarea, creo que podemos llamar a la unidad de trabajo aquí como "dividida".Integer.sum()
, utilizado por elIntStream.sum
reductor. Verá que la versión sin límite llama que funciona todo el tiempo, mientras que la versión limitada nunca puede llamarla antes de OOM.