Java 8 Stream con procesamiento por lotes

96

Tengo un archivo grande que contiene una lista de elementos.

Me gustaría crear un lote de elementos, realizar una solicitud HTTP con este lote (todos los elementos son necesarios como parámetros en la solicitud HTTP). Puedo hacerlo muy fácilmente con un forbucle, pero como amante de Java 8, quiero intentar escribir esto con el marco Stream de Java 8 (y aprovechar los beneficios del procesamiento diferido).

Ejemplo:

List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
  batch.add(data.get(i));
  if (batch.size() == BATCH_SIZE) process(batch);
}

if (batch.size() > 0) process(batch);

Quiero hacer algo en la línea de lazyFileStream.group(500).map(processBatch).collect(toList())

Cuál sería la mejor forma de hacer esto?

Andy Dang
fuente
No puedo entender cómo realizar la agrupación, lo siento, pero Archivos # líneas leerán perezosamente el contenido del archivo.
Toby
1
entonces, básicamente, necesita un inverso de flatMap(+ un mapa plano adicional para colapsar las corrientes nuevamente)? No creo que algo así exista como un método conveniente en la biblioteca estándar. O tendrá que encontrar una biblioteca de terceros o escribir la suya propia basada en spliterators y / o un recopilador que
emita
3
Tal vez puedas combinar Stream.generatecon reader::readLiney limit, pero el problema es que las transmisiones no van bien con Excepciones. Además, esto probablemente no se pueda paralelizar bien. Creo que el forbucle sigue siendo la mejor opción.
tobias_k
Acabo de agregar un código de ejemplo. No creo que flatMap sea el camino a seguir. Sospechando que podría tener que escribir un Spliterator personalizado
Andy Dang
1
Estoy acuñando el término "Abuso de transmisiones" para preguntas como esta.
kervin

Respuestas:

13

¡Nota! Esta solución lee todo el archivo antes de ejecutar forEach.

Puede hacerlo con jOOλ , una biblioteca que extiende los flujos de Java 8 para casos de uso de flujo secuencial de un solo subproceso:

Seq.seq(lazyFileStream)              // Seq<String>
   .zipWithIndex()                   // Seq<Tuple2<String, Long>>
   .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
   .forEach((index, batch) -> {
       process(batch);
   });

Detrás de escena, zipWithIndex()es solo:

static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
    final Iterator<T> it = stream.iterator();

    class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
        long index;

        @Override
        public boolean hasNext() {
            return it.hasNext();
        }

        @Override
        public Tuple2<T, Long> next() {
            return tuple(it.next(), index++);
        }
    }

    return seq(new ZipWithIndex());
}

... mientras groupBy()que la API es conveniente para:

default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
    return collect(Collectors.groupingBy(classifier));
}

(Descargo de responsabilidad: trabajo para la empresa detrás de jOOλ)

Lukas Eder
fuente
Guau. Esto es EXACTAMENTE lo que estoy buscando. Nuestro sistema normalmente procesa flujos de datos en secuencia, por lo que sería una buena opción para pasar a Java 8.
Andy Dang
16
Tenga en cuenta que esta solución almacena innecesariamente todo el flujo de entrada en el intermedio Map(a diferencia de, por ejemplo, la solución de Ben Manes)
Tagir Valeev
124

Para completar, aquí hay una solución de guayaba .

Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);

En la pregunta, la colección está disponible, por lo que no se necesita una transmisión y se puede escribir como,

Iterables.partition(data, batchSize).forEach(this::process);
Ben Manes
fuente
11
Lists.partitiones otra variación que debería haber mencionado.
Ben Manes
2
esto es vago, ¿verdad? no llamará todo Streama la memoria antes de procesar el lote relevante
orirab
1
@orirab sí. Es perezoso entre lotes, ya que consumirá batchSizeelementos por iteración.
Ben Manes
Podría, por favor, eche un vistazo stackoverflow.com/questions/58666190/...
gstackoverflow
58

La implementación pura de Java-8 también es posible:

int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
         .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
         .forEach(batch -> process(batch));

Tenga en cuenta que, a diferencia de JOOl, puede funcionar bien en paralelo (siempre que datasea ​​una lista de acceso aleatorio).

Tagir Valeev
fuente
1
¿Qué pasa si sus datos son en realidad un flujo? (digamos líneas en un archivo, o incluso desde la red).
Omry Yadan
6
@OmryYadan, la pregunta era sobre recibir información del List(ver data.size(), data.get()en la pregunta). Estoy respondiendo a la pregunta formulada. Si tiene otra pregunta, hágala en su lugar (aunque creo que la pregunta de la transmisión ya se hizo).
Tagir Valeev
1
¿Cómo procesar los lotes en paralelo?
soup_boy
38

Solución pura Java 8 :

Podemos crear un recolector personalizado para hacer esto de manera elegante, que toma un batch sizey un Consumerpara procesar cada lote:

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;

import static java.util.Objects.requireNonNull;


/**
 * Collects elements in the stream and calls the supplied batch processor
 * after the configured batch size is reached.
 *
 * In case of a parallel stream, the batch processor may be called with
 * elements less than the batch size.
 *
 * The elements are not kept in memory, and the final result will be an
 * empty list.
 *
 * @param <T> Type of the elements being collected
 */
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {

    private final int batchSize;
    private final Consumer<List<T>> batchProcessor;


    /**
     * Constructs the batch collector
     *
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     */
    BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        batchProcessor = requireNonNull(batchProcessor);

        this.batchSize = batchSize;
        this.batchProcessor = batchProcessor;
    }

    public Supplier<List<T>> supplier() {
        return ArrayList::new;
    }

    public BiConsumer<List<T>, T> accumulator() {
        return (ts, t) -> {
            ts.add(t);
            if (ts.size() >= batchSize) {
                batchProcessor.accept(ts);
                ts.clear();
            }
        };
    }

    public BinaryOperator<List<T>> combiner() {
        return (ts, ots) -> {
            // process each parallel list without checking for batch size
            // avoids adding all elements of one to another
            // can be modified if a strict batching mode is required
            batchProcessor.accept(ts);
            batchProcessor.accept(ots);
            return Collections.emptyList();
        };
    }

    public Function<List<T>, List<T>> finisher() {
        return ts -> {
            batchProcessor.accept(ts);
            return Collections.emptyList();
        };
    }

    public Set<Characteristics> characteristics() {
        return Collections.emptySet();
    }
}

Opcionalmente, luego cree una clase de utilidad auxiliar:

import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;

public class StreamUtils {

    /**
     * Creates a new batch collector
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     * @param <T> the type of elements being processed
     * @return a batch collector instance
     */
    public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        return new BatchCollector<T>(batchSize, batchProcessor);
    }
}

Uso de ejemplo:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();

int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);

input.stream()
     .collect(StreamUtils.batchCollector(batchSize, batchProcessor));

También publiqué mi código en GitHub, si alguien quiere echar un vistazo:

Enlace a Github

rohitvats
fuente
1
Esta es una buena solución, a menos que no pueda colocar todos los elementos de su flujo en la memoria. Además, no funcionará en transmisiones interminables: el método de recopilación es terminal, lo que significa que en lugar de producir una transmisión de lotes, esperará hasta que se complete la transmisión y luego procesará el resultado en lotes.
Alex Ackerman
2
@AlexAckerman un flujo infinito significará que nunca se llamará al finalizador, pero se seguirá llamando al acumulador para que los elementos se sigan procesando. Además, solo requiere que el tamaño del lote de elementos esté en la memoria en cualquier momento.
Solubris
@Solubris, ¡tienes razón! Mi mal, gracias por señalar esto: no eliminaré el comentario como referencia, si alguien tiene la misma idea de cómo funciona el método de recopilación.
Alex Ackerman
La lista enviada al consumidor debe copiarse para que sea segura la modificación, por ejemplo: batchProcessor.accept (copyOf (ts))
Solubris
19

Escribí un Spliterator personalizado para escenarios como este. Llenará listas de un tamaño determinado del flujo de entrada. La ventaja de este enfoque es que realizará un procesamiento diferido y funcionará con otras funciones de flujo.

public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) {
    return batchSize <= 0
        ? Stream.of(stream.collect(Collectors.toList()))
        : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());
}

private static class BatchSpliterator<E> implements Spliterator<List<E>> {

    private final Spliterator<E> base;
    private final int batchSize;

    public BatchSpliterator(Spliterator<E> base, int batchSize) {
        this.base = base;
        this.batchSize = batchSize;
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<E>> action) {
        final List<E> batch = new ArrayList<>(batchSize);
        for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
            ;
        if (batch.isEmpty())
            return false;
        action.accept(batch);
        return true;
    }

    @Override
    public Spliterator<List<E>> trySplit() {
        if (base.estimateSize() <= batchSize)
            return null;
        final Spliterator<E> splitBase = this.base.trySplit();
        return splitBase == null ? null
                : new BatchSpliterator<>(splitBase, batchSize);
    }

    @Override
    public long estimateSize() {
        final double baseSize = base.estimateSize();
        return baseSize == 0 ? 0
                : (long) Math.ceil(baseSize / (double) batchSize);
    }

    @Override
    public int characteristics() {
        return base.characteristics();
    }

}
Bruce Hamilton
fuente
de mucha ayuda. Si alguien quiere agrupar en algunos criterios personalizados (por ejemplo, el tamaño de la colección en bytes), entonces puede delegar su predicado personalizado y usarlo en for-loop como condición (en mi humilde opinión, while loop será más legible entonces)
pls
No estoy seguro de que la implementación sea correcta. Por ejemplo, si las SUBSIZEDdivisiones devueltas del flujo base trySplitpueden tener más elementos que antes de la división (si la división ocurre en medio del lote).
Malta
@Malt, si mi comprensión Spliteratorses correcta, trySplit¿siempre debería dividir los datos en dos partes aproximadamente iguales para que el resultado nunca sea más grande que el original?
Bruce Hamilton
@BruceHamilton Desafortunadamente, según los documentos, las partes no pueden ser más o menos iguales. Ellos deben ser iguales:if this Spliterator is SUBSIZED, then estimateSize() for this spliterator before splitting must be equal to the sum of estimateSize() for this and the returned Spliterator after splitting.
malta
Sí, eso es consistente con mi entendimiento de la división de Spliterator. Sin embargo, me está costando entender cómo "las divisiones devueltas por trySplit pueden tener más elementos que antes de la división", ¿podría explicarnos lo que quiere decir?
Bruce Hamilton
13

Tuvimos un problema similar que resolver. Queríamos tomar una secuencia que fuera más grande que la memoria del sistema (iterando a través de todos los objetos en una base de datos) y aleatorizar el orden lo mejor posible; pensamos que estaría bien almacenar en búfer 10,000 elementos y aleatorizarlos.

El objetivo era una función que incluía una secuencia.

De las soluciones propuestas aquí, parece haber una variedad de opciones:

  • Utilice varias bibliotecas adicionales que no sean de Java 8
  • Comience con algo que no sea una transmisión, por ejemplo, una lista de acceso aleatorio
  • Tener una corriente que se pueda dividir fácilmente en un spliterator

Nuestro instinto fue originalmente usar un colector personalizado, pero esto significó dejar de transmitir. La solución de recopilación personalizada anterior es muy buena y casi la usamos.

Aquí hay una solución que engaña al usar el hecho de que Streams puede brindarle una Iteratorque puede usar como una trampilla de escape para permitirle hacer algo adicional que las transmisiones no admiten. El Iteratorse convierte de nuevo a una secuencia usando otro poco de StreamSupporthechicería de Java 8 .

/**
 * An iterator which returns batches of items taken from another iterator
 */
public class BatchingIterator<T> implements Iterator<List<T>> {
    /**
     * Given a stream, convert it to a stream of batches no greater than the
     * batchSize.
     * @param originalStream to convert
     * @param batchSize maximum size of a batch
     * @param <T> type of items in the stream
     * @return a stream of batches taken sequentially from the original stream
     */
    public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
        return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
    }

    private static <T> Stream<T> asStream(Iterator<T> iterator) {
        return StreamSupport.stream(
            Spliterators.spliteratorUnknownSize(iterator,ORDERED),
            false);
    }

    private int batchSize;
    private List<T> currentBatch;
    private Iterator<T> sourceIterator;

    public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
        this.batchSize = batchSize;
        this.sourceIterator = sourceIterator;
    }

    @Override
    public boolean hasNext() {
        prepareNextBatch();
        return currentBatch!=null && !currentBatch.isEmpty();
    }

    @Override
    public List<T> next() {
        return currentBatch;
    }

    private void prepareNextBatch() {
        currentBatch = new ArrayList<>(batchSize);
        while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
            currentBatch.add(sourceIterator.next());
        }
    }
}

Un ejemplo simple de usar esto se vería así:

@Test
public void getsBatches() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        .forEach(System.out::println);
}

Las impresiones de arriba

[A, B, C]
[D, E, F]

Para nuestro caso de uso, queríamos mezclar los lotes y luego mantenerlos como una secuencia; se veía así:

@Test
public void howScramblingCouldBeDone() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
        .map(list -> {
            Collections.shuffle(list); return list; })
        .flatMap(List::stream)
        .forEach(System.out::println);
}

Esto genera algo como (es aleatorio, tan diferente cada vez)

A
C
B
E
D
F

La salsa secreta aquí es que siempre hay un flujo, por lo que puede operar en un flujo de lotes o hacer algo con cada lote y luego flatMapvolver a un flujo. Aún mejor, todo lo anterior sólo se ejecuta como las finales forEacho collectexpresiones u otros terminación TIRE los datos a través de la corriente.

¡Resulta que iteratores un tipo especial de operación de terminación en una secuencia y no hace que toda la secuencia se ejecute y llegue a la memoria! ¡Gracias a los chicos de Java 8 por un diseño brillante!

Ashley Frieze
fuente
Y es muy bueno que itere completamente sobre cada lote cuando se recopila y persista en un; Listno puede diferir la iteración de los elementos dentro del lote porque el consumidor puede querer omitir un lote completo, y si no consumió el elementos entonces no estarían saltando muy lejos. (He implementado uno de estos en C #, aunque fue sustancialmente más fácil.)
ErikE
9

También puede usar RxJava :

Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));

o

Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();

o

Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();
frhack
fuente
8

También puede echar un vistazo a cyclops-react , soy el autor de esta biblioteca. Implementa la interfaz jOOλ (y por extensión JDK 8 Streams), pero a diferencia de JDK 8 Parallel Streams, tiene un enfoque en operaciones asíncronas (como el bloqueo potencial de llamadas de E / S asíncronas). JDK Parallel Streams, por el contrario, se centra en el paralelismo de datos para las operaciones vinculadas a la CPU. Funciona mediante la gestión de agregados de tareas basadas en el futuro bajo el capó, pero presenta una API Stream extendida estándar para los usuarios finales.

Este código de muestra puede ayudarlo a comenzar

LazyFutureStream.parallelCommonBuilder()
                .react(data)
                .grouped(BATCH_SIZE)                  
                .map(this::process)
                .run();

Hay un tutorial sobre el procesamiento por lotes aquí.

Y un tutorial más general aquí

Para usar su propio Thread Pool (que probablemente sea más apropiado para bloquear E / S), puede comenzar a procesar con

     LazyReact reactor = new LazyReact(40);

     reactor.react(data)
            .grouped(BATCH_SIZE)                  
            .map(this::process)
            .run();
John McClean
fuente
3

Ejemplo puro de Java 8 que también funciona con flujos paralelos.

Cómo utilizar:

Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed();
CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));

La declaración e implementación del método:

public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor)
{
    List<ElementType> newBatch = new ArrayList<>(batchSize);

    stream.forEach(element -> {
        List<ElementType> fullBatch;

        synchronized (newBatch)
        {
            if (newBatch.size() < batchSize)
            {
                newBatch.add(element);
                return;
            }
            else
            {
                fullBatch = new ArrayList<>(newBatch);
                newBatch.clear();
                newBatch.add(element);
            }
        }

        batchProcessor.accept(fullBatch);
    });

    if (newBatch.size() > 0)
        batchProcessor.accept(new ArrayList<>(newBatch));
}
Nicolas Lacombe
fuente
2

Para ser justos, eche un vistazo a la elegante solución Vavr :

Stream.ofAll(data).grouped(BATCH_SIZE).forEach(this::process);
Nolequen
fuente
1

Ejemplo simple usando Spliterator

    // read file into stream, try-with-resources
    try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
        //skip header
        Spliterator<String> split = stream.skip(1).spliterator();
        Chunker<String> chunker = new Chunker<String>();
        while(true) {              
            boolean more = split.tryAdvance(chunker::doSomething);
            if (!more) {
                break;
            }
        }           
    } catch (IOException e) {
        e.printStackTrace();
    }
}

static class Chunker<T> {
    int ct = 0;
    public void doSomething(T line) {
        System.out.println(ct++ + " " + line.toString());
        if (ct % 100 == 0) {
            System.out.println("====================chunk=====================");               
        }           
    }       
}

La respuesta de Bruce es más completa, pero estaba buscando algo rápido y sucio para procesar un montón de archivos.

rhinmass
fuente
1

esta es una solución java pura que se evalúa de manera perezosa.

public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
    List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable 
    currentBatch.add(new ArrayList<T>(batchSize));
    return Stream.concat(stream
      .sequential()                   
      .map(new Function<T, List<T>>(){
          public List<T> apply(T t){
              currentBatch.get(0).add(t);
              return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
            }
      }), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
                .limit(1)
    ).filter(Objects::nonNull);
}
Hei
fuente
1

Puede utilizar apache.commons:

ListUtils.partition(ListOfLines, 500).stream()
                .map(partition -> processBatch(partition)
                .collect(Collectors.toList());

La parte de la partición se realiza de forma poco perezosa, pero después de particionar la lista, obtiene los beneficios de trabajar con secuencias (por ejemplo, usar secuencias paralelas, agregar filtros, etc.). Otras respuestas sugirieron soluciones más elaboradas, pero a veces la legibilidad y la capacidad de mantenimiento son más importantes (y a veces no lo son :-))

Tal Joffe
fuente
No estoy seguro de quién votó negativamente, pero sería bueno entender por qué. Di una respuesta que complementó las otras respuestas para las personas que no pueden usar Guava
Tal Joffe
Estás procesando una lista aquí, no una transmisión.
Drakemor
@Drakemor Estoy procesando un flujo de sublistas. observe la llamada a la función stream ()
Tal Joffe
Pero primero la conviertes en una lista de sub-listas, que no funcionan correctamente para los verdaderos datos de streaming. Aquí está la referencia a la partición: commons.apache.org/proper/commons-collections/apidocs/org/…
Drakemor
1
TBH No entiendo completamente su argumento, pero creo que podemos estar de acuerdo en no estar de acuerdo. Edité mi respuesta para reflejar nuestra conversación aquí. Gracias por la discusión
Tal Joffe
1

Se puede hacer fácilmente usando Reactor :

Flux.fromStream(fileReader.lines().onClose(() -> safeClose(fileReader)))
            .map(line -> someProcessingOfSingleLine(line))
            .buffer(BUFFER_SIZE)
            .subscribe(apiService::makeHttpRequest);
Alex
fuente
0

Con Java 8y com.google.common.collect.Lists, puede hacer algo como:

public class BatchProcessingUtil {
    public static <T,U> List<U> process(List<T> data, int batchSize, Function<List<T>, List<U>> processFunction) {
        List<List<T>> batches = Lists.partition(data, batchSize);
        return batches.stream()
                .map(processFunction) // Send each batch to the process function
                .flatMap(Collection::stream) // flat results to gather them in 1 stream
                .collect(Collectors.toList());
    }
}

Aquí Testá el tipo de elementos en la lista de entrada y Uel tipo de elementos en la lista de salida

Y puedes usarlo así:

List<String> userKeys = [... list of user keys]
List<Users> users = BatchProcessingUtil.process(
    userKeys,
    10, // Batch Size
    partialKeys -> service.getUsers(partialKeys)
);
josebui
fuente