Copie una transmisión para evitar que "la transmisión ya se haya operado o cerrado"

121

Me gustaría duplicar una secuencia de Java 8 para poder lidiar con ella dos veces. Puedo collectcomo una lista y obtener nuevas transmisiones de eso;

// doSomething() returns a stream
List<A> thing = doSomething().collect(toList());
thing.stream()... // do stuff
thing.stream()... // do other stuff

Pero creo que debería haber una forma más eficiente / elegante.

¿Existe alguna forma de copiar la secuencia sin convertirla en una colección?

De hecho, estoy trabajando con una secuencia de Eithers, así que quiero procesar la proyección izquierda de una manera antes de pasar a la proyección derecha y tratar con eso de otra manera. Algo así (que, hasta ahora, me veo obligado a usar el toListtruco).

List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList());

Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left());
failures.forEach(failure -> ... );

Stream<A> successes = results.stream().flatMap(either -> either.right());
successes.forEach(success -> ... );
Toby
fuente
¿Podrías dar más detalles sobre el "proceso de una manera" ... estás consumiendo los objetos? ¿Mapearlos? partitionBy () y groupingBy () pueden llevarlo directamente a 2+ listas, pero puede beneficiarse de la asignación primero o simplemente de tener una bifurcación de decisión en su forEach ().
AjahnCharles
En algunos casos, convertirlo en una colección no podría ser una opción si estamos tratando con un flujo infinito. Puede encontrar una alternativa para la memorización aquí: dzone.com/articles/how-to-replay-java-streams
Miguel Gamboa

Respuestas:

88

Creo que su suposición sobre la eficiencia es un poco al revés. Obtiene esta enorme recuperación de la eficiencia si solo va a usar los datos una vez, porque no tiene que almacenarlos, y los flujos le brindan potentes optimizaciones de "fusión de bucle" que le permiten hacer fluir todos los datos de manera eficiente a través de la tubería.

Si desea reutilizar los mismos datos, entonces, por definición, debe generarlos dos veces (determinísticamente) o almacenarlos. Si ya está en una colección, genial; luego, repetirlo dos veces es barato.

Experimentamos en el diseño con "flujos bifurcados". Lo que encontramos fue que apoyar esto tenía costos reales; cargó el caso común (uso una vez) a expensas del caso poco común. El gran problema era lidiar con "qué sucede cuando las dos canalizaciones no consumen datos al mismo ritmo". Ahora has vuelto a almacenar en búfer de todos modos. Esta fue una característica que claramente no tuvo su peso.

Si desea operar con los mismos datos repetidamente, almacénelos o estructura sus operaciones como Consumidores y haga lo siguiente:

stream()...stuff....forEach(e -> { consumerA(e); consumerB(e); });

También puede buscar en la biblioteca RxJava, ya que su modelo de procesamiento se presta mejor a este tipo de "bifurcación de flujo".

Brian Goetz
fuente
1
Quizás no debería haber usado "eficiencia", estoy entendiendo por qué me molestaría con las transmisiones (y no almacenar nada) si todo lo que hago es almacenar inmediatamente los datos ( toList) para poder procesarlos (el Eithercaso siendo el ejemplo)?
Toby
11
Las corrientes son expresivas y eficientes . Son expresivos porque le permiten configurar operaciones agregadas complejas sin muchos detalles accidentales (por ejemplo, resultados intermedios) en la forma de leer el código. También son eficientes, en el sentido de que (generalmente) realizan una sola pasada de los datos y no llenan contenedores de resultados intermedios. Estas dos propiedades juntas las convierten en un modelo de programación atractivo para muchas situaciones. Por supuesto, no todos los modelos de programación se adaptan a todos los problemas; aún debe decidir si está utilizando una herramienta adecuada para el trabajo.
Brian Goetz
1
Pero la incapacidad de reutilizar una secuencia provoca situaciones en las que el desarrollador se ve obligado a almacenar resultados intermedios (recopilación) para procesar una secuencia de dos formas diferentes. La implicación de que la transmisión se genera más de una vez (a menos que la recopile) parece clara, porque de lo contrario no necesitaría un método de recopilación.
Niall Connaughton
@NiallConnaughton No estoy seguro de querer tu punto. Si quieres atravesarlo dos veces, alguien tiene que almacenarlo o tienes que regenerarlo. ¿Está sugiriendo que la biblioteca debería almacenarlo en búfer en caso de que alguien lo necesite dos veces? Eso sería una tontería.
Brian Goetz
No sugiero que la biblioteca deba almacenarlo en búfer, pero dice que al tener flujos como únicos, obliga a las personas que desean reutilizar un flujo semilla (es decir, compartir la lógica declarativa utilizada para definirlo) a construir múltiples flujos derivados para recopilar el flujo de semillas, o tener acceso a una fábrica de proveedores que creará un duplicado del flujo de semillas. Ambas opciones tienen sus puntos débiles. Esta respuesta tiene muchos más detalles sobre el tema: stackoverflow.com/a/28513908/114200 .
Niall Connaughton
73

Puede usar una variable local con Supplierpara configurar partes comunes de la canalización de flujo.

De http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/ :

Reutilización de transmisiones

Los flujos de Java 8 no se pueden reutilizar. Tan pronto como llame a cualquier operación de terminal, la secuencia se cerrará:

Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> s.startsWith("a"));
stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception

Calling `noneMatch` after `anyMatch` on the same stream results in the following exception:
java.lang.IllegalStateException: stream has already been operated upon or closed
at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at 
java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
at com.winterbe.java8.Streams5.test7(Streams5.java:38)
at com.winterbe.java8.Streams5.main(Streams5.java:28)

Para superar esta limitación, tenemos que crear una nueva cadena de flujo para cada operación de terminal que queramos ejecutar, por ejemplo, podríamos crear un proveedor de flujo para construir un nuevo flujo con todas las operaciones intermedias ya configuradas:

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok

Cada llamada a get() construye una nueva secuencia en la que guardamos para llamar a la operación de terminal deseada.

user4975679
fuente
2
solución agradable y elegante. mucho más java8-ish que la solución más votada.
dylaniato
Solo una nota sobre el uso Suppliersi el Streamestá construido de una manera "costosa", usted paga ese costo por cada llamada aSupplier.get() . es decir, si una consulta de base de datos ... esa consulta se realiza cada vez
Julien
Parece que no puede seguir este patrón después de un mapTo aunque use un IntStream. Descubrí que tenía que volver a convertirlo en un Set<Integer>uso collect(Collectors.toSet())... y hacer un par de operaciones en eso. Quería max()y si un valor específico estaba establecido como dos operaciones ...filter(d -> d == -1).count() == 1;
JGFMK
16

Utilice a Supplierpara producir la secuencia para cada operación de terminación.

Supplier<Stream<Integer>> streamSupplier = () -> list.stream();

Siempre que necesite una transmisión de esa colección, utilícela streamSupplier.get()para obtener una nueva transmisión.

Ejemplos:

  1. streamSupplier.get().anyMatch(predicate);
  2. streamSupplier.get().allMatch(predicate2);
Carneros
fuente
Dale un voto positivo, ya que eres el primero en señalar Proveedores aquí.
EnzoBnl
9

Implementamos un duplicate()método para transmisiones en jOOλ , una biblioteca de código abierto que creamos para mejorar las pruebas de integración para jOOQ . Básicamente, puedes escribir:

Tuple2<Seq<A>, Seq<A>> duplicates = Seq.seq(doSomething()).duplicate();

Internamente, hay un búfer que almacena todos los valores que se han consumido de un flujo pero no del otro. Probablemente sea lo más eficiente posible si sus dos transmisiones se consumen aproximadamente al mismo ritmo y si puede vivir con la falta de seguridad de los subprocesos .

Así es como funciona el algoritmo:

static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
    final List<T> gap = new LinkedList<>();
    final Iterator<T> it = stream.iterator();

    @SuppressWarnings("unchecked")
    final Iterator<T>[] ahead = new Iterator[] { null };

    class Duplicate implements Iterator<T> {
        @Override
        public boolean hasNext() {
            if (ahead[0] == null || ahead[0] == this)
                return it.hasNext();

            return !gap.isEmpty();
        }

        @Override
        public T next() {
            if (ahead[0] == null)
                ahead[0] = this;

            if (ahead[0] == this) {
                T value = it.next();
                gap.offer(value);
                return value;
            }

            return gap.poll();
        }
    }

    return tuple(seq(new Duplicate()), seq(new Duplicate()));
}

Más código fuente aquí

Tuple2es probablemente como su Pairtipo, mientras que Seqtiene Streamalgunas mejoras.

Lukas Eder
fuente
2
Esta solución no es segura para subprocesos: no puede pasar uno de los flujos a otro subproceso. Realmente no veo ningún escenario en el que ambas transmisiones se puedan consumir a la misma velocidad en un solo hilo y realmente necesite dos transmisiones distintas. Si desea producir dos resultados de la misma secuencia, sería mucho mejor utilizar recopiladores de combinación (que ya tiene en JOOL).
Tagir Valeev
@TagirValeev: Tienes razón sobre la seguridad de los hilos, buen punto. ¿Cómo podría hacerse esto combinando coleccionistas?
Lukas Eder
1
Quiero decir, si alguien quiere usar la misma transmisión dos veces así Tuple2<Seq<A>>, Seq<A>> t = duplicate(stream); long count = t.collect(counting()); List<A> list = t.collect(toList());, es mejor Tuple2<Long, List<A>> t = stream.collect(Tuple.collectors(counting(), toList()));. El uso de Collectors.mapping/reducinguno puede expresar otras operaciones de flujo como recolectores y elementos de proceso de una manera bastante diferente creando una única tupla resultante. Entonces, en general, puede hacer muchas cosas consumiendo la transmisión una vez sin duplicación y será compatible con el paralelo.
Tagir Valeev
2
En este caso, seguirá reduciendo un flujo tras otro. Por lo tanto, no tiene sentido hacer la vida más difícil presentando el sofisticado iterador que de todos modos recopilará todo el flujo en la lista debajo del capó. Puede simplemente recopilar la lista explícitamente y luego crear dos secuencias a partir de ella como lo indica OP (es el mismo número de líneas de código). Bueno, es posible que solo tenga alguna mejora si la primera reducción es un cortocircuito, pero no es el caso de OP.
Tagir Valeev
1
@maaartinus: Gracias, buen puntero. Creé un problema para el punto de referencia. Lo usé para offer()/ poll()API, pero ArrayDequepodría hacer lo mismo.
Lukas Eder
7

Puede crear una secuencia de ejecutables (por ejemplo):

results.stream()
    .flatMap(either -> Stream.<Runnable> of(
            () -> failure(either.left()),
            () -> success(either.right())))
    .forEach(Runnable::run);

Dónde failurey successson las operaciones a aplicar. Sin embargo, esto creará bastantes objetos temporales y puede que no sea más eficiente que comenzar desde una colección y transmitirla / iterarla dos veces.

Assylias
fuente
4

Otra forma de manejar los elementos varias veces es usar Stream.peek (Consumer) :

doSomething().stream()
.peek(either -> handleFailure(either.left()))
.foreach(either -> handleSuccess(either.right()));

peek(Consumer) se puede encadenar tantas veces como sea necesario.

doSomething().stream()
.peek(element -> handleFoo(element.foo()))
.peek(element -> handleBar(element.bar()))
.peek(element -> handleBaz(element.baz()))
.foreach(element-> handleQux(element.qux()));
Martín
fuente
Parece que peek no debe usarse para esto (consulte softwareengineering.stackexchange.com/a/308979/195787 )
HectorJ
2
@HectorJ El otro hilo trata sobre la modificación de elementos. Supuse que no se hace aquí.
Martin
2

cyclops-react , una biblioteca a la que contribuyo, tiene un método estático que te permitirá duplicar un Stream (y devuelve un jOOλ Tuple of Streams).

    Stream<Integer> stream = Stream.of(1,2,3);
    Tuple2<Stream<Integer>,Stream<Integer>> streams =  StreamUtils.duplicate(stream);

Consulte los comentarios, existe una penalización de rendimiento en la que se incurrirá al usar un duplicado en una secuencia existente. Una alternativa más eficaz sería utilizar Streamable: -

También hay una clase Streamable (perezosa) que se puede construir a partir de Stream, Iterable o Array y reproducir varias veces.

    Streamable<Integer> streamable = Streamable.of(1,2,3);
    streamable.stream().forEach(System.out::println);
    streamable.stream().forEach(System.out::println);

AsStreamable.synchronizedFromStream (stream): se puede usar para crear un Streamable que llenará perezosamente su colección de respaldo, de tal manera que se pueda compartir entre hilos. Streamable.fromStream (stream) no incurrirá en ninguna sobrecarga de sincronización.

John McClean
fuente
2
Y, por supuesto, debe tenerse en cuenta que los flujos resultantes tienen una sobrecarga significativa de CPU / memoria y un rendimiento paralelo muy deficiente. Además, esta solución no es segura para subprocesos (no puede pasar uno de los flujos resultantes a otro subproceso y procesarlo de forma segura en paralelo). Sería mucho más eficaz y seguro List<Integer> list = stream.collect(Collectors.toList()); streams = new Tuple2<>(list.stream(), list.stream())(como sugiere OP). También indique explícitamente en la respuesta que es el autor de cyclop-streams. Lee esto .
Tagir Valeev
Actualizado para reflejar que soy el autor. También es un buen punto para discutir las características de desempeño de cada uno. Su evaluación anterior es bastante acertada para StreamUtils.duplicate. StreamUtils.duplicate funciona almacenando datos en búfer de un flujo a otro, lo que genera una sobrecarga tanto de CPU como de memoria (según el caso de uso). Sin embargo, para Streamable. De (1,2,3), se crea un nuevo Stream directamente desde el Array cada vez y las características de rendimiento, incluido el rendimiento paralelo, serán las mismas que para el Stream creado normalmente.
John McClean
Además, existe una clase AsStreamable que permite la creación de una instancia Streamable desde un Stream pero sincroniza el acceso a la colección que respalda el Streamable a medida que se crea (AsStreamable.synchronizedFromStream). Haciéndolo más adecuado para su uso en subprocesos (si eso es lo que necesita, me imagino que el 99% del tiempo los flujos se crean y reutilizan en el mismo subproceso).
John McClean
Hola Tagir, ¿no debería revelar también en su comentario que es autor de una biblioteca de la competencia?
John McClean
1
Los comentarios no son respuestas y no publicito mi biblioteca aquí, ya que mi biblioteca no tiene una función para duplicar la transmisión (solo porque creo que es inútil), por lo que no competimos aquí. Por supuesto, cuando propongo una solución que involucra a mi biblioteca, siempre digo explícitamente que soy el autor.
Tagir Valeev
0

Para este problema en particular, también puede utilizar particiones. Algo como

     // Partition Eighters into left and right
     List<Either<Pair<A, Throwable>, A>> results = doSomething();
     Map<Boolean, Object> passingFailing = results.collect(Collectors.partitioningBy(s -> s.isLeft()));
     passingFailing.get(true) <- here will be all passing (left values)
     passingFailing.get(false) <- here will be all failing (right values)
Lubomir Varga
fuente
0

Podemos hacer uso de Stream Builder a la hora de leer o iterar un stream. Aquí está el documento de Stream Builder .

https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.Builder.html

Caso de uso

Digamos que tenemos un flujo de empleados y necesitamos usar este flujo para escribir datos de empleados en un archivo de Excel y luego actualizar la colección / tabla de empleados [Este es solo un caso de uso para mostrar el uso de Stream Builder]:

Stream.Builder<Employee> builder = Stream.builder();

employee.forEach( emp -> {
   //store employee data to excel file 
   // and use the same object to build the stream.
   builder.add(emp);
});

//Now this stream can be used to update the employee collection
Stream<Employee> newStream = builder.build();
Lokesh Singal
fuente
0

Tuve un problema similar y pude pensar en tres estructuras intermedias diferentes a partir de las cuales crear una copia de la secuencia: a List, una matriz y a Stream.Builder. Escribí un pequeño programa de referencia, que sugería que, desde el punto de vista del desempeño, elList era aproximadamente un 30% más lento que los otros dos, que eran bastante similares.

El único inconveniente de convertir a una matriz es que es complicado si su tipo de elemento es un tipo genérico (que en mi caso lo era); por eso prefiero usar unStream.Builder .

Terminé escribiendo una pequeña función que crea un Collector:

private static <T> Collector<T, Stream.Builder<T>, Stream<T>> copyCollector()
{
    return Collector.of(Stream::builder, Stream.Builder::add, (b1, b2) -> {
        b2.build().forEach(b1);
        return b1;
    }, Stream.Builder::build);
}

Luego puedo hacer una copia de cualquier flujo strhaciendo lo str.collect(copyCollector())que se sienta bastante en consonancia con el uso idiomático de los flujos.

Jeremy Hicks
fuente