Diferencia entre CompletableFuture, Future y RxJava's Observable

194

Me gustaría saber la diferencia entre CompletableFuture, Futurey Observable RxJava.

Lo que sé es que todos son asincrónicos pero

Future.get() bloquea el hilo

CompletableFuture da los métodos de devolución de llamada

RxJava Observable--- similar a CompletableFutureotros beneficios (no estoy seguro)

Por ejemplo: si el cliente necesita hacer múltiples llamadas de servicio y cuando usamos Futures(Java) Future.get()se ejecutará secuencialmente ... me gustaría saber cómo es mejor en RxJava.

Y la documentación http://reactivex.io/intro.html dice

Es difícil usar Futures para componer de manera óptima flujos de ejecución asincrónicos condicionales (o imposible, ya que las latencias de cada solicitud varían en tiempo de ejecución). Esto se puede hacer, por supuesto, pero rápidamente se vuelve complicado (y por lo tanto propenso a errores) o se bloquea prematuramente en Future.get (), lo que elimina el beneficio de la ejecución asincrónica.

Realmente interesado en saber cómo RxJavaresuelve este problema. Me resultó difícil de entender por la documentación.

shiv455
fuente
¿Has leído la documentación de cada uno? No estoy totalmente familiarizado con RxJava, pero la documentación parece extremadamente completa de un vistazo. No parece particularmente comparable a los dos futuros.
FThompson
He pasado pero no puedo entender qué tan diferente es de los futuros de Java ...
corrígeme
¿En qué se parecen los observables a los futuros?
FThompson
2
Me gustaría saber dónde es diferente, ¿cómo es diferente en la gestión de subprocesos? EX: Future.get () bloquea el hilo ... ¿cómo se manejará en Observable ???
shiv455
2
al menos es un poco confuso para mí ... ¡una diferencia de alto nivel sería realmente útil!
shiv455

Respuestas:

281

Futuros

Los futuros se introdujeron en Java 5 (2004). Básicamente son marcadores de posición para el resultado de una operación que aún no ha terminado. Una vez que finalice la operación, Futurecontendrá ese resultado. Por ejemplo, una operación puede ser una instancia ejecutable o invocable que se envía a un ExecutorService . El remitente de la operación puede usar el Futureobjeto para verificar si la operación esDone () , o esperar a que termine de usar el método get () de bloqueo .

Ejemplo:

/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return 1;
    }

}

public static void main(String[] args) throws Exception{
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Future<Integer> f = exec.submit(new MyCallable());

    System.out.println(f.isDone()); //False

    System.out.println(f.get()); //Waits until the task is done, then prints 1
}

Futuros Completables

CompletableFutures se introdujeron en Java 8 (2014). De hecho, son una evolución de Futures regulares, inspirados en los Futuros escuchables de Google , parte de la biblioteca Guava . Son futuros que también le permiten unir tareas en una cadena. Puede usarlos para decirle a algún subproceso de trabajo que "vaya a hacer una tarea X, y cuando haya terminado, haga otra cosa usando el resultado de X". Con CompletableFutures, puede hacer algo con el resultado de la operación sin bloquear realmente un subproceso para esperar el resultado. Aquí hay un ejemplo simple:

/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {

    @Override
    public Integer get() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            //Do nothing
        }
        return 1;
    }
}

/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {

    @Override
    public Integer apply(Integer x) {
        return x + 1;
    }
}

public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
    System.out.println(f.isDone()); // False
    CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
    System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}

RxJava

RxJava es una biblioteca completa para programación reactiva creada en Netflix. De un vistazo, parecerá ser similar a las transmisiones de Java 8 . Lo es, excepto que es mucho más poderoso.

De manera similar a Futures, RxJava se puede usar para unir un conjunto de acciones síncronas o asíncronas para crear una tubería de procesamiento. A diferencia de los futuros, que son de un solo uso, RxJava funciona en flujos de cero o más elementos. Incluyendo secuencias interminables con un número infinito de elementos. También es mucho más flexible y potente gracias a un conjunto increíblemente rico de operadores .

A diferencia de las transmisiones de Java 8, RxJava también tiene un mecanismo de contrapresión , que le permite manejar casos en los que diferentes partes de su tubería de procesamiento operan en diferentes hilos, a diferentes velocidades .

La desventaja de RxJava es que a pesar de la documentación sólida, es una biblioteca difícil de aprender debido al cambio de paradigma involucrado. El código Rx también puede ser una pesadilla para depurar, especialmente si hay varios subprocesos involucrados, y aún peor, si se necesita contrapresión.

Si desea entrar en él, hay una página completa de varios tutoriales en el sitio web oficial, además de la documentación oficial y Javadoc . También puede echar un vistazo a algunos de los videos, como este, que ofrece una breve introducción a Rx y también habla sobre las diferencias entre Rx y Futures.

Bonus: secuencias reactivas de Java 9

Los flujos reactivos de Java 9, también conocidos como Flow API, son un conjunto de interfaces implementadas por varias bibliotecas de flujos reactivos como RxJava 2 , Akka Streams y Vertx . Permiten que estas bibliotecas reactivas se interconecten, al tiempo que conservan la importante contrapresión.

Malta
fuente
Sería bueno dar un código de ejemplo de cómo Rx hace esto
Zinan Xing
Entonces, usando Reactive Streams, podemos mezclar RxJava, Akka y Vertx en una sola aplicación.
IgorGanapolsky
1
@IgorGanapolsky Sí.
Malta
En CompletableFutures utilizamos métodos de devolución de llamada, estos métodos de devolución de llamada también se bloquearán si la salida de un método es la entrada de otra devolución de llamada. Como bloque futuro con llamada Future.get (). Por qué se dice que Future.get () está bloqueando la llamada mientras que CompletableFutures no bloquea. Por favor explique
Deepak
1
@Federico Claro. Cada uno Futurees un marcador de posición para un único resultado que puede o no haberse completado todavía. Si vuelve a realizar la misma operación, obtendrá una nueva Futureinstancia. RxJava trata con flujos de resultados que pueden venir en cualquier momento. Por lo tanto, una serie de operaciones puede devolver un solo RxJava observable que generará muchos resultados. Es un poco como la diferencia entre un solo sobre postal y un tubo neumático que sigue bombeando correo.
Malta
21

He estado trabajando con Rx Java desde 0.9, ahora en 1.3.2 y pronto migrando a 2.x Lo uso en un proyecto privado en el que ya trabajo durante 8 años.

Ya no programaría sin esta biblioteca. Al principio era escéptico, pero es un estado mental completamente diferente que debes crear. Tranquilo difícil al principio. A veces miraba las canicas por horas .. jajaja

Es solo una cuestión de práctica y realmente conocer el flujo (también conocido como contrato de observables y observadores), una vez que llegue allí, odiará hacerlo de otra manera.

Para mí no hay realmente un inconveniente en esa biblioteca.

Caso de uso: tengo una vista de monitor que contiene 9 indicadores (CPU, MEM, red, etc.). Al iniciar la vista, la vista se suscribe a una clase de monitor del sistema que devuelve un observable (intervalo) que contiene todos los datos de los 9 metros. Llevará cada segundo un nuevo resultado a la vista (¡así que no sondeará!). Ese observable utiliza un mapa plano para obtener simultáneamente (¡asíncrono!) Recuperar datos de 9 fuentes diferentes y comprime el resultado en un nuevo modelo que su vista obtendrá en onNext ().

¿Cómo diablos vas a hacer eso con futuros, completables, etc.? ¡Buena suerte! :)

Rx Java resuelve muchos problemas en la programación para mí y hace que sea mucho más fácil ...

Ventajas:

  • Statelss !!! (Lo importante es mencionar, lo más importante tal vez)
  • Gestión de subprocesos fuera de la caja
  • Cree secuencias que tengan su propio ciclo de vida.
  • Todo es observable, así que encadenar es fácil
  • Menos código para escribir
  • Jarra individual en classpath (muy ligera)
  • Altamente concurrente
  • Ya no hay devolución de llamadas
  • Basado en suscriptor (contrato estricto entre consumidor y productor)
  • Estrategias de contrapresión (disyuntor similar)
  • Espléndido manejo de errores y recuperación
  • Muy buena documentación (canicas <3)
  • Control completo
  • Mucho mas ...

Desventajas: - Difícil de probar

Kristoffer
fuente
13
~ " No programaría sin esta biblioteca nunca más " . ¿Entonces RxJava es el final de todos los proyectos de software?
IgorGanapolsky
¿Es útil incluso si no tengo eventos Stream of Asynchronous?
Mukesh Verma