¿Las secuencias de Java 8 son similares a las observables de RxJava?
Definición de flujo de Java 8:
Las clases en el nuevo
java.util.stream
paquete proporcionan una API Stream para admitir operaciones de estilo funcional en secuencias de elementos.
java-8
java-stream
rx-java
observable
rahulrv
fuente
fuente
Respuestas:
TL; DR : todas las bibliotecas de procesamiento de secuencia / secuencia ofrecen API muy similar para la construcción de tuberías. Las diferencias están en la API para el manejo de subprocesos múltiples y la composición de tuberías.
RxJava es bastante diferente de Stream. De todas las cosas de JDK, la más cercana a rx.Observable es quizás
java.util.stream.CollectorStream + CompletableFuture combo (que tiene un costo de tratar con una capa adicional de mónada, es decir, tener que manejar la conversión entreStream<CompletableFuture<T>>
yCompletableFuture<Stream<T>>
).Existen diferencias significativas entre Observable y Stream:
Stream#parallel()
divide la secuencia en particiones,Observable#subscribeOn()
yObservable#observeOn()
no lo hace; es difícil emular elStream#parallel()
comportamiento con Observable, una vez tuvo un.parallel()
método, pero este método causó tanta confusión que el.parallel()
soporte se movió a un repositorio separado en github, RxJavaParallel. Más detalles están en otra respuesta .Stream#parallel()
no permite especificar un grupo de subprocesos para usar, a diferencia de la mayoría de los métodos de RxJava que aceptan el Programador opcional. Dado que todas las instancias de transmisión en una JVM usan el mismo grupo de bifurcación, la adición.parallel()
puede afectar accidentalmente el comportamiento en otro módulo de su programaObservable#interval()
,Observable#window()
y muchas otras; Esto se debe principalmente a que las transmisiones están basadas en extracción y la transmisión no tiene control sobre cuándo flujo emitir el siguiente elemento aguas abajo.takeWhile()
,takeUntil()
); la solución alternativaStream#anyMatch()
es limitada: es una operación terminal, por lo que no puede usarla más de una vez por transmisiónLos Streams son difíciles de construir por usted mismo, Observable se puede construir de muchas manerasEDITAR: Como se señaló en los comentarios, hay formas de construir Stream. Sin embargo, dado que no hay un cortocircuito no terminal, no puede, por ejemplo, generar fácilmente un flujo de líneas en el archivo (JDK proporciona archivos # líneas y líneas BufferedReader # fuera de la caja, y otros escenarios similares se pueden gestionar construyendo flujo de Iterator).Observable#using()
); puede envolver el flujo de E / S o mutex con él y asegurarse de que el usuario no se olvide de liberar el recurso; se eliminará automáticamente al finalizar la suscripción; Stream tiene unonClose(Runnable)
método, pero debe llamarlo manualmente o mediante try-with-resources. P.ej. debe tener en cuenta que Files # lines () debe estar encerrado en el bloque try-with-resources.Resumen: RxJava difiere significativamente de Streams. Las alternativas reales de RxJava son otras implementaciones de ReactiveStreams , por ejemplo, parte relevante de Akka.
Actualización . Hay un truco para usar el grupo de unión de bifurcación no predeterminado
Stream#parallel
, consulte Grupo de subprocesos personalizado en la secuencia paralela de Java 8Actualización . Todo lo anterior se basa en la experiencia con RxJava 1.x. Ahora que RxJava 2.x está aquí , esta respuesta puede estar desactualizada.
fuente
Stream.generate()
y pasar su propiaSupplier<U>
implementación, solo un método simple desde el que proporciona el siguiente elemento en la secuencia. Hay muchos otros métodos. Para construir fácilmente una secuenciaStream
que depende de valores anteriores, puede usar elinterate()
método, cada unoCollection
tiene unstream()
método yStream.of()
construye un aStream
partir de un varargs o matriz. FinalmenteStreamSupport
tiene soporte para una creación de flujo más avanzada usando spliterators o para tipos primitivos de flujos.takeWhile()
,takeUntil()
);" - JDK9 tiene estos, creo, en takeWhile () y dropWhile ()Java 8 Stream y RxJava se ve bastante similar. Tienen operadores parecidos (filtro, mapa, flatMap ...) pero no están diseñados para el mismo uso.
Puede realizar tareas asíncronas con RxJava.
Con Java 8 stream, atravesará elementos de su colección.
Puede hacer más o menos lo mismo en RxJava (elementos transversales de una colección) pero, dado que RxJava se enfoca en tareas concurrentes, ..., usa sincronización, bloqueo, ... Entonces, la misma tarea usando RxJava puede ser más lenta que con Java 8 stream.
Se puede comparar con RxJava
CompletableFuture
, pero eso puede ser capaz de calcular más de un solo valor.fuente
parallelStream
admite una sincronización similar de recorridos / mapas / filtros simples, etc.Existen algunas diferencias técnicas y conceptuales, por ejemplo, las secuencias de Java 8 son secuencias de valores síncronas de valores de un solo uso, mientras que los observables RxJava son secuencias de valores re-observables, basadas en push-pull adaptativas y potencialmente asíncronas. RxJava está dirigido a Java 6+ y también funciona en Android.
fuente
Java 8 Streams se basan en extracción. Se itera sobre una secuencia de Java 8 que consume cada elemento. Y podría ser una corriente interminable.
RXJava
Observable
está por defecto basado en push. Se suscribe a un Observable y se le notificará cuando llegue el siguiente elemento (onNext
), o cuando se complete la transmisión (onCompleted
), o cuando ocurra un error (onError
). Debido aObservable
que usted recibeonNext
,onCompleted
,onError
eventos, se pueden hacer algunas funciones de gran alcance como la combinación de diferentesObservable
s a una nueva (zip
,merge
,concat
). Otras cosas que podría hacer es el almacenamiento en caché, la aceleración, ... Y utiliza más o menos la misma API en diferentes idiomas (RxJava, RX en C #, RxJS, ...)Por defecto, RxJava es de un solo subproceso. A menos que comience a usar Programadores, todo sucederá en el mismo hilo.
fuente
Las respuestas existentes son completas y correctas, pero falta un ejemplo claro para principiantes. Permítanme poner algunos términos concretos detrás de "push / pull-based" y "re-observable". Nota : Odio el término
Observable
(es una transmisión por amor de Dios), así que simplemente me referiré a las transmisiones J8 vs RX.Considere una lista de enteros,
Un J8 Stream es una utilidad para modificar la colección. Por ejemplo, incluso los dígitos se pueden extraer como,
Esto es básicamente el mapa de Python , filtro, reducción , una adición muy agradable (y muy atrasado) a Java. Pero, ¿qué pasaría si los dígitos no se recopilaran antes de tiempo? ¿Qué pasaría si los dígitos se transmitieran mientras la aplicación se estaba ejecutando? ¿Podríamos filtrar los pares en tiempo real?
Imagine que un proceso de subproceso separado genera números enteros en momentos aleatorios mientras la aplicación se está ejecutando (
---
indica el tiempo)En RX,
even
puede reaccionar a cada nuevo dígito y aplicar el filtro en tiempo realNo es necesario almacenar listas de entrada y salida. Si desea una lista de salida, no hay problema que sea transferible también. De hecho, todo es una corriente.
Es por eso que términos como "sin estado" y "funcional" están más asociados con RX
fuente
RxJava también está estrechamente relacionado con la iniciativa de flujos reactivos y se considera una implementación simple de la API de flujos reactivos (por ejemplo, en comparación con la implementación de los flujos de Akka ). La principal diferencia es que las corrientes reactivas están diseñadas para poder manejar la contrapresión, pero si echa un vistazo a la página de corrientes reactivas, obtendrá la idea. Describen sus objetivos bastante bien y las corrientes también están estrechamente relacionadas con el manifiesto reactivo .
Las secuencias de Java 8 son más o menos la implementación de una colección ilimitada, bastante similar a la secuencia de Scala o la secuencia lenta de Clojure .
fuente
Java 8 Streams permite el procesamiento de colecciones realmente grandes de manera eficiente, al tiempo que aprovecha las arquitecturas multinúcleo. Por el contrario, RxJava tiene un solo subproceso de forma predeterminada (sin programadores). Por lo tanto, RxJava no aprovechará las máquinas multinúcleo a menos que usted mismo codifique esa lógica.
fuente