Rxjs: Observable.combineLatest vs Observable.forkJoin

84

Me pregunto cuáles son las diferencias entre Observable.combineLatesty Observable.forkJoin. Por lo que puedo ver, la única diferencia es que forkJoinespera que los Observables se completen, mientras combineLatestdevuelve los últimos valores.

Tuong Le
fuente
4
Nota: En rxjs6 +, ahora son solo funciones combineLatest()y forkJoin()que crean un observable. Hacen lo mismo, pero la sintaxis es diferente. No confunda combineLatestde rxjs/operatorscuál es un operador 'pipeable'. Si importa el incorrecto, obtendrá errores.
Simon_Weaver

Respuestas:

126

No solo forkJoinrequiere que se completen todos los observables de entrada, sino que también devuelve un observable que produce un valor único que es una matriz de los últimos valores producidos por los observables de entrada. En otras palabras, espera hasta que se complete la última entrada observable, y luego produce un valor único y se completa.

Por el contrario, combineLatestdevuelve un Observable que produce un nuevo valor cada vez que lo hacen los observables de entrada, una vez que todos los observables de entrada han producido al menos un valor. Esto significa que podría tener valores infinitos y es posible que no esté completo. También significa que los observables de entrada no tienen que completarse antes de producir un valor.

GregL
fuente
@GregL, ¿hay una función que funcione como forkJoin pero también funcionaría con llamadas http fallidas?
Tukkan
2
@Tukkan, encadenaría un operador que se recupere de los errores a cada http observable, de modo que defina los valores que se usarán para cada solicitud en caso de error, en lugar de buscar un operador que combine múltiples observables que podrían generar errores (I no estoy seguro de que exista tal operador). Los operadores que se recuperan de errores incluyen .catch(), .onErrorResumeNext()y posiblemente .retry()(si la llamada Http puede fallar de forma intermitente).
GregL
1
Solo para aclarar, ambos producen matrices.
Simon_Weaver
@Simon_Weaver No necesariamente. En el caso de combineLatest(), puede proporcionar una función de proyección para especificar cómo producir un valor de salida a partir de los últimos valores de los observables de entrada. De forma predeterminada, si no especifica uno, obtendrá una matriz de los últimos valores emitidos.
GregL
Cada emisión de combineLatest es una matriz de los valores recopilados. Solo quería agregar eso, ya que solo mencionaste array para forkjoin. Entonces, de esa manera, son iguales. Y sí, siempre hay sutilezas con RxJS, así que si me perdí algo, ¿puedes aclarar lo que quisiste decir?
Simon_Weaver
14

También:

combineLatest (...) ejecuta observables en secuencia, uno por uno

combineLatestutiliza internamente concat, lo que significa que se debe obtener un valor para cada observable en la matriz antes de pasar al siguiente.

// partial source of static combineLatest (uses the rxjs/operators combineLatest internally):
// If you're using typescript then the output array will be strongly typed based on type inference
return function (source) { return source.lift.call(from_1.from([source].concat(observables)), 
                           new combineLatest_1.CombineLatestOperator(project)); };

forkJoin (...) ejecuta observables en paralelo, al mismo tiempo

Si tiene 3 observables de origen, y cada uno de ellos tarda 5 segundos en ejecutarse, entonces tardará 15 segundos combineLatesten ejecutarse. Mientras forkJoinque se ejecutan en paralelo, tomará 5 segundos.

Por lo forkJointanto, funciona más bien Promise.all(...)cuando la orden no se aplica.

Consideración para el manejo de errores:

Si alguno de los errores observables sale, combineLatestlos siguientes no se ejecutarán, pero forkJointodos se están ejecutando. Por tanto, combineLatestpuede ser útil ejecutar una 'secuencia' de observables y recopilar el resultado en su conjunto.


Nota avanzada: si los observables de origen ya se están 'ejecutando' (suscritos por algo más) y los está usando share, entonces no verá este comportamiento.

Nota aún más avanzada: CombineLatest siempre le dará lo último de cada fuente, por lo que si uno de los observables de la fuente emite varios valores, obtendrá el último. No solo obtiene un valor único para cada fuente y pasa a la siguiente. Si necesita asegurarse de obtener solo el 'próximo elemento disponible' para cada fuente observable, puede agregar .pipe(take(1))a la fuente observable a medida que lo agrega a la matriz de entrada.

Simon_Weaver
fuente
Gracias por las consideraciones sobre el manejo de errores. El manejo de errores es bastante difícil de entender en caso de múltiples observables.
D Deshmane
2
Creo que ha malinterpretado lo que está haciendo concat()en el combineLatest()código. Me parece que es el Array.prototype.concatmétodo, no el concatmétodo RxJS . Suponiendo que tenga razón, entonces esta respuesta es engañosa e incorrecta, ya combineLatest()que no ejecuta los observables uno por uno, en secuencia. Los ejecuta en paralelo, igual que forkJoin(). La diferencia está en cuántos valores se producen y si los observables de origen deben completarse o no.
GregL
@GregL Array.concat no tiene sentido con los observables. Confío absolutamente en la naturaleza secuencial de concat; la clave para entender es que no obtienes ningún valor de salida hasta que se hayan ejecutado todos.
Simon_Weaver
Me estaba refiriendo a este código en el código fuente de muestra que publicaste:, [source].concat(observables)asumiendo que eso es lo que quisiste decir cuando dijiste que " combineLatestutiliza internamente concat". Me parece que está confundiendo array concat con RxJS concat. Este último de hecho ejecuta los observables de entrada en secuencia, esperando hasta que cada uno esté completo. Pero eso no es usado por combineLatest(), es un operador completamente separado.
GregL
3
Creo combineLatest()y forkJoin()ambos corren en paralelo. Ejecutar el siguiente código genera alrededor de 5000 para ambos. const start = new Date().getTime(); combineLatest([of(null).pipe(delay(5000)), of(null).pipe(delay(5000)), of(null).pipe(delay(5000))]).subscribe(() => console.log(new Date().getTime() - start)); forkJoin([of(null).pipe(delay(5000)), of(null).pipe(delay(5000)), of(null).pipe(delay(5000))]).subscribe(() => console.log(new Date().getTime() - start));
Jeremy
12

forkJoin : cuando se completen todos los observables, emite el último valor emitido de cada uno.

combineLatest : cuando cualquier observable emite un valor, emite el último valor de cada uno.

El uso es bastante similar, pero no debes olvidar darte de baja de combineLatest a diferencia de forkJoin .

Dmitry Grinko
fuente
1
si una, si la solicitud falla, todas las solicitudes anidadas se cancelan automáticamente, ¿cómo puedo resolver esto?
Sunil Garg
1
@SunilGarg Debe usar forkJoin o combineLatest si desea obtener todos los resultados solamente. Si no le importan todos los resultados, debe usar las suscripciones por separado.
Dmitry Grinko
¿Puede responder a esta? stackoverflow.com/questions/55558277/…
Sunil Garg
¿Puede explicar por qué es necesario cancelar la suscripción con un ejemplo de código
Sunil Garg