Tengo un archivo grande que contiene una lista de elementos.
Me gustaría crear un lote de elementos, realizar una solicitud HTTP con este lote (todos los elementos son necesarios como parámetros en la solicitud HTTP). Puedo hacerlo muy fácilmente con un for
bucle, pero como amante de Java 8, quiero intentar escribir esto con el marco Stream de Java 8 (y aprovechar los beneficios del procesamiento diferido).
Ejemplo:
List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
batch.add(data.get(i));
if (batch.size() == BATCH_SIZE) process(batch);
}
if (batch.size() > 0) process(batch);
Quiero hacer algo en la línea de
lazyFileStream.group(500).map(processBatch).collect(toList())
Cuál sería la mejor forma de hacer esto?
java
java-8
batch-processing
java-stream
Andy Dang
fuente
fuente
flatMap
(+ un mapa plano adicional para colapsar las corrientes nuevamente)? No creo que algo así exista como un método conveniente en la biblioteca estándar. O tendrá que encontrar una biblioteca de terceros o escribir la suya propia basada en spliterators y / o un recopilador queStream.generate
conreader::readLine
ylimit
, pero el problema es que las transmisiones no van bien con Excepciones. Además, esto probablemente no se pueda paralelizar bien. Creo que elfor
bucle sigue siendo la mejor opción.Respuestas:
¡Nota! Esta solución lee todo el archivo antes de ejecutar forEach.
Puede hacerlo con jOOλ , una biblioteca que extiende los flujos de Java 8 para casos de uso de flujo secuencial de un solo subproceso:
Detrás de escena,
zipWithIndex()
es solo:... mientras
groupBy()
que la API es conveniente para:(Descargo de responsabilidad: trabajo para la empresa detrás de jOOλ)
fuente
Map
(a diferencia de, por ejemplo, la solución de Ben Manes)Para completar, aquí hay una solución de guayaba .
En la pregunta, la colección está disponible, por lo que no se necesita una transmisión y se puede escribir como,
fuente
Lists.partition
es otra variación que debería haber mencionado.Stream
a la memoria antes de procesar el lote relevantebatchSize
elementos por iteración.La implementación pura de Java-8 también es posible:
Tenga en cuenta que, a diferencia de JOOl, puede funcionar bien en paralelo (siempre que
data
sea una lista de acceso aleatorio).fuente
List
(verdata.size()
,data.get()
en la pregunta). Estoy respondiendo a la pregunta formulada. Si tiene otra pregunta, hágala en su lugar (aunque creo que la pregunta de la transmisión ya se hizo).Solución pura Java 8 :
Podemos crear un recolector personalizado para hacer esto de manera elegante, que toma un
batch size
y unConsumer
para procesar cada lote:Opcionalmente, luego cree una clase de utilidad auxiliar:
Uso de ejemplo:
También publiqué mi código en GitHub, si alguien quiere echar un vistazo:
Enlace a Github
fuente
Escribí un Spliterator personalizado para escenarios como este. Llenará listas de un tamaño determinado del flujo de entrada. La ventaja de este enfoque es que realizará un procesamiento diferido y funcionará con otras funciones de flujo.
fuente
SUBSIZED
divisiones devueltas del flujo basetrySplit
pueden tener más elementos que antes de la división (si la división ocurre en medio del lote).Spliterators
es correcta,trySplit
¿siempre debería dividir los datos en dos partes aproximadamente iguales para que el resultado nunca sea más grande que el original?if this Spliterator is SUBSIZED, then estimateSize() for this spliterator before splitting must be equal to the sum of estimateSize() for this and the returned Spliterator after splitting.
Tuvimos un problema similar que resolver. Queríamos tomar una secuencia que fuera más grande que la memoria del sistema (iterando a través de todos los objetos en una base de datos) y aleatorizar el orden lo mejor posible; pensamos que estaría bien almacenar en búfer 10,000 elementos y aleatorizarlos.
El objetivo era una función que incluía una secuencia.
De las soluciones propuestas aquí, parece haber una variedad de opciones:
Nuestro instinto fue originalmente usar un colector personalizado, pero esto significó dejar de transmitir. La solución de recopilación personalizada anterior es muy buena y casi la usamos.
Aquí hay una solución que engaña al usar el hecho de que
Stream
s puede brindarle unaIterator
que puede usar como una trampilla de escape para permitirle hacer algo adicional que las transmisiones no admiten. ElIterator
se convierte de nuevo a una secuencia usando otro poco deStreamSupport
hechicería de Java 8 .Un ejemplo simple de usar esto se vería así:
Las impresiones de arriba
Para nuestro caso de uso, queríamos mezclar los lotes y luego mantenerlos como una secuencia; se veía así:
Esto genera algo como (es aleatorio, tan diferente cada vez)
La salsa secreta aquí es que siempre hay un flujo, por lo que puede operar en un flujo de lotes o hacer algo con cada lote y luego
flatMap
volver a un flujo. Aún mejor, todo lo anterior sólo se ejecuta como las finalesforEach
ocollect
expresiones u otros terminación TIRE los datos a través de la corriente.¡Resulta que
iterator
es un tipo especial de operación de terminación en una secuencia y no hace que toda la secuencia se ejecute y llegue a la memoria! ¡Gracias a los chicos de Java 8 por un diseño brillante!fuente
List
no puede diferir la iteración de los elementos dentro del lote porque el consumidor puede querer omitir un lote completo, y si no consumió el elementos entonces no estarían saltando muy lejos. (He implementado uno de estos en C #, aunque fue sustancialmente más fácil.)También puede usar RxJava :
o
o
fuente
También puede echar un vistazo a cyclops-react , soy el autor de esta biblioteca. Implementa la interfaz jOOλ (y por extensión JDK 8 Streams), pero a diferencia de JDK 8 Parallel Streams, tiene un enfoque en operaciones asíncronas (como el bloqueo potencial de llamadas de E / S asíncronas). JDK Parallel Streams, por el contrario, se centra en el paralelismo de datos para las operaciones vinculadas a la CPU. Funciona mediante la gestión de agregados de tareas basadas en el futuro bajo el capó, pero presenta una API Stream extendida estándar para los usuarios finales.
Este código de muestra puede ayudarlo a comenzar
Hay un tutorial sobre el procesamiento por lotes aquí.
Y un tutorial más general aquí
Para usar su propio Thread Pool (que probablemente sea más apropiado para bloquear E / S), puede comenzar a procesar con
fuente
Ejemplo puro de Java 8 que también funciona con flujos paralelos.
Cómo utilizar:
La declaración e implementación del método:
fuente
Para ser justos, eche un vistazo a la elegante solución Vavr :
fuente
Ejemplo simple usando Spliterator
La respuesta de Bruce es más completa, pero estaba buscando algo rápido y sucio para procesar un montón de archivos.
fuente
esta es una solución java pura que se evalúa de manera perezosa.
fuente
Puede utilizar apache.commons:
La parte de la partición se realiza de forma poco perezosa, pero después de particionar la lista, obtiene los beneficios de trabajar con secuencias (por ejemplo, usar secuencias paralelas, agregar filtros, etc.). Otras respuestas sugirieron soluciones más elaboradas, pero a veces la legibilidad y la capacidad de mantenimiento son más importantes (y a veces no lo son :-))
fuente
Se puede hacer fácilmente usando Reactor :
fuente
Con
Java 8
ycom.google.common.collect.Lists
, puede hacer algo como:Aquí
T
está el tipo de elementos en la lista de entrada yU
el tipo de elementos en la lista de salidaY puedes usarlo así:
fuente