Me gustaría duplicar una secuencia de Java 8 para poder lidiar con ella dos veces. Puedo collect
como una lista y obtener nuevas transmisiones de eso;
// doSomething() returns a stream
List<A> thing = doSomething().collect(toList());
thing.stream()... // do stuff
thing.stream()... // do other stuff
Pero creo que debería haber una forma más eficiente / elegante.
¿Existe alguna forma de copiar la secuencia sin convertirla en una colección?
De hecho, estoy trabajando con una secuencia de Either
s, así que quiero procesar la proyección izquierda de una manera antes de pasar a la proyección derecha y tratar con eso de otra manera. Algo así (que, hasta ahora, me veo obligado a usar el toList
truco).
List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList());
Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left());
failures.forEach(failure -> ... );
Stream<A> successes = results.stream().flatMap(either -> either.right());
successes.forEach(success -> ... );
java
lambda
java-8
java-stream
Toby
fuente
fuente
Respuestas:
Creo que su suposición sobre la eficiencia es un poco al revés. Obtiene esta enorme recuperación de la eficiencia si solo va a usar los datos una vez, porque no tiene que almacenarlos, y los flujos le brindan potentes optimizaciones de "fusión de bucle" que le permiten hacer fluir todos los datos de manera eficiente a través de la tubería.
Si desea reutilizar los mismos datos, entonces, por definición, debe generarlos dos veces (determinísticamente) o almacenarlos. Si ya está en una colección, genial; luego, repetirlo dos veces es barato.
Experimentamos en el diseño con "flujos bifurcados". Lo que encontramos fue que apoyar esto tenía costos reales; cargó el caso común (uso una vez) a expensas del caso poco común. El gran problema era lidiar con "qué sucede cuando las dos canalizaciones no consumen datos al mismo ritmo". Ahora has vuelto a almacenar en búfer de todos modos. Esta fue una característica que claramente no tuvo su peso.
Si desea operar con los mismos datos repetidamente, almacénelos o estructura sus operaciones como Consumidores y haga lo siguiente:
También puede buscar en la biblioteca RxJava, ya que su modelo de procesamiento se presta mejor a este tipo de "bifurcación de flujo".
fuente
toList
) para poder procesarlos (elEither
caso siendo el ejemplo)?Puede usar una variable local con
Supplier
para configurar partes comunes de la canalización de flujo.De http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/ :
fuente
Supplier
si elStream
está construido de una manera "costosa", usted paga ese costo por cada llamada aSupplier.get()
. es decir, si una consulta de base de datos ... esa consulta se realiza cada vezSet<Integer>
usocollect(Collectors.toSet())
... y hacer un par de operaciones en eso. Queríamax()
y si un valor específico estaba establecido como dos operaciones ...filter(d -> d == -1).count() == 1;
Utilice a
Supplier
para producir la secuencia para cada operación de terminación.Siempre que necesite una transmisión de esa colección, utilícela
streamSupplier.get()
para obtener una nueva transmisión.Ejemplos:
streamSupplier.get().anyMatch(predicate);
streamSupplier.get().allMatch(predicate2);
fuente
Implementamos un
duplicate()
método para transmisiones en jOOλ , una biblioteca de código abierto que creamos para mejorar las pruebas de integración para jOOQ . Básicamente, puedes escribir:Internamente, hay un búfer que almacena todos los valores que se han consumido de un flujo pero no del otro. Probablemente sea lo más eficiente posible si sus dos transmisiones se consumen aproximadamente al mismo ritmo y si puede vivir con la falta de seguridad de los subprocesos .
Así es como funciona el algoritmo:
Más código fuente aquí
Tuple2
es probablemente como suPair
tipo, mientras queSeq
tieneStream
algunas mejoras.fuente
Tuple2<Seq<A>>, Seq<A>> t = duplicate(stream); long count = t.collect(counting()); List<A> list = t.collect(toList());
, es mejorTuple2<Long, List<A>> t = stream.collect(Tuple.collectors(counting(), toList()));
. El uso deCollectors.mapping/reducing
uno puede expresar otras operaciones de flujo como recolectores y elementos de proceso de una manera bastante diferente creando una única tupla resultante. Entonces, en general, puede hacer muchas cosas consumiendo la transmisión una vez sin duplicación y será compatible con el paralelo.offer()
/poll()
API, peroArrayDeque
podría hacer lo mismo.Puede crear una secuencia de ejecutables (por ejemplo):
Dónde
failure
ysuccess
son las operaciones a aplicar. Sin embargo, esto creará bastantes objetos temporales y puede que no sea más eficiente que comenzar desde una colección y transmitirla / iterarla dos veces.fuente
Otra forma de manejar los elementos varias veces es usar Stream.peek (Consumer) :
peek(Consumer)
se puede encadenar tantas veces como sea necesario.fuente
cyclops-react , una biblioteca a la que contribuyo, tiene un método estático que te permitirá duplicar un Stream (y devuelve un jOOλ Tuple of Streams).
Consulte los comentarios, existe una penalización de rendimiento en la que se incurrirá al usar un duplicado en una secuencia existente. Una alternativa más eficaz sería utilizar Streamable: -
También hay una clase Streamable (perezosa) que se puede construir a partir de Stream, Iterable o Array y reproducir varias veces.
AsStreamable.synchronizedFromStream (stream): se puede usar para crear un Streamable que llenará perezosamente su colección de respaldo, de tal manera que se pueda compartir entre hilos. Streamable.fromStream (stream) no incurrirá en ninguna sobrecarga de sincronización.
fuente
List<Integer> list = stream.collect(Collectors.toList()); streams = new Tuple2<>(list.stream(), list.stream())
(como sugiere OP). También indique explícitamente en la respuesta que es el autor de cyclop-streams. Lee esto .Para este problema en particular, también puede utilizar particiones. Algo como
fuente
Podemos hacer uso de Stream Builder a la hora de leer o iterar un stream. Aquí está el documento de Stream Builder .
https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.Builder.html
Caso de uso
Digamos que tenemos un flujo de empleados y necesitamos usar este flujo para escribir datos de empleados en un archivo de Excel y luego actualizar la colección / tabla de empleados [Este es solo un caso de uso para mostrar el uso de Stream Builder]:
fuente
Tuve un problema similar y pude pensar en tres estructuras intermedias diferentes a partir de las cuales crear una copia de la secuencia: a
List
, una matriz y aStream.Builder
. Escribí un pequeño programa de referencia, que sugería que, desde el punto de vista del desempeño, elList
era aproximadamente un 30% más lento que los otros dos, que eran bastante similares.El único inconveniente de convertir a una matriz es que es complicado si su tipo de elemento es un tipo genérico (que en mi caso lo era); por eso prefiero usar un
Stream.Builder
.Terminé escribiendo una pequeña función que crea un
Collector
:Luego puedo hacer una copia de cualquier flujo
str
haciendo lostr.collect(copyCollector())
que se sienta bastante en consonancia con el uso idiomático de los flujos.fuente