Digamos que tengo un Observable
, así:
var one = someObservable.take(1);
one.subscribe(function(){ /* do something */ });
Entonces, tengo un segundo Observable
:
var two = someOtherObservable.take(1);
Ahora, quiero subscribe()
a two
, pero yo quiero para asegurarse de que one
se ha completado antes de que el two
se dispara suscriptor.
¿Qué tipo de método de almacenamiento en búfer puedo usar two
para hacer que el segundo espere a que se complete el primero?
Supongo que estoy buscando hacer una pausa two
hasta que one
se complete.
javascript
observable
rxjs
Stephen
fuente
fuente
Respuestas:
Un par de formas en las que puedo pensar
import {take, publish} from 'rxjs/operators' import {concat} from 'rxjs' //Method one var one = someObservable.pipe(take(1)); var two = someOtherObservable.pipe(take(1)); concat(one, two).subscribe(function() {/*do something */}); //Method two, if they need to be separate for some reason var one = someObservable.pipe(take(1)); var two = someOtherObservable.pipe(take(1), publish()); two.subscribe(function(){/*do something */}); one.subscribe(function(){/*do something */}, null, two.connect.bind(two));
fuente
pause
y enresume
lugar depublish
yconnect
, pero el ejemplo dos es esencialmente la ruta que tomé.one
) se resuelva antes que el segundo (two
) dentro de la función subscribe ()?Observable.forkJoin()
? Vea este enlace learnrxjs.io/operators/combination/forkjoin.htmlforkJoin
se suscribe simultáneamente.Si desea asegurarse de que se mantenga el orden de ejecución, puede usar flatMap como el siguiente ejemplo
const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i)); const second = Rx.Observable.of(11).delay(500).do(i => console.log(i)); const third = Rx.Observable.of(111).do(i => console.log(i)); first .flatMap(() => second) .flatMap(() => third) .subscribe(()=> console.log('finished'));
El resultado sería:
"1" "11" "111" "finished"
fuente
skipUntil () con last ()
skipUntil: ignora los elementos emitidos hasta que otro observable haya emitido
último: emite el último valor de una secuencia (es decir, espere hasta que se complete y luego emita)
Tenga en cuenta que cualquier cosa emitida desde el observable pasado a
skipUntil
cancelará la omisión, por lo que debemos agregarlast()
: esperar a que se complete la transmisión.Oficial: https://rxjs-dev.firebaseapp.com/api/operators/skipUntil
Posible problema: tenga en cuenta que,
last()
por sí solo, se producirá un error si no se emite nada. Ellast()
operador tiene undefault
parámetro, pero solo cuando se usa junto con un predicado. Creo que si esta situación es un problema para usted (sisequence2$
puede completarse sin emitir), entonces uno de estos debería funcionar (actualmente no probado):main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last())) main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))
Tenga en cuenta que
undefined
es un elemento válido para ser emitido, pero en realidad podría tener cualquier valor. También tenga en cuenta que esta es la tubería conectadasequence2$
y no lamain$
tubería.fuente
Aquí hay otra posibilidad que aprovecha el selector de resultados de switchMap
var one$ = someObservable.take(1); var two$ = someOtherObservable.take(1); two$.switchMap( /** Wait for first Observable */ () => one$, /** Only return the value we're actually interested in */ (value2, value1) => value2 ) .subscribe((value2) => { /* do something */ });
Dado que el selector de resultados de switchMap se ha depreciado, aquí hay una versión actualizada
const one$ = someObservable.pipe(take(1)); const two$ = someOtherObservable.pipe( take(1), switchMap(value2 => one$.map(_ => value2)) ); two$.subscribe(value2 => { /* do something */ });
fuente
Aquí hay una forma reutilizable de hacerlo (es mecanografiado pero puede adaptarlo a js):
export function waitFor<T>(signal: Observable<any>) { return (source: Observable<T>) => new Observable<T>(observer => signal.pipe(first()) .subscribe(_ => source.subscribe(observer) ) ); }
y puedes usarlo como cualquier operador:
var two = someOtherObservable.pipe(waitFor(one), take(1));
Es básicamente un operador que pospone la suscripción a la fuente observable hasta que la señal observable emite el primer evento.
fuente
Si el segundo observable está caliente , hay otra forma de pausar / reanudar :
var pauser = new Rx.Subject(); var source1 = Rx.Observable.interval(1000).take(1); /* create source and pause */ var source2 = Rx.Observable.interval(1000).pausable(pauser); source1.doOnCompleted(function () { /* resume paused source2 */ pauser.onNext(true); }).subscribe(function(){ // do something }); source2.subscribe(function(){ // start to recieve data });
También puede usar la versión pausableBuffered en búfer para mantener los datos durante la pausa.
fuente
Aquí hay otro enfoque, pero me siento más sencillo e intuitivo (o al menos natural si estás acostumbrado a Promesas). Básicamente, creas un Observable usando
Observable.create()
para envolverone
ytwo
como un Observable único. Esto es muy similar a cómoPromise.all()
puede funcionar.var first = someObservable.take(1); var second = Observable.create((observer) => { return first.subscribe( function onNext(value) { /* do something with value like: */ // observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { someOtherObservable.take(1).subscribe( function onNext(value) { observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { observer.complete(); } ); } ); });
Entonces, ¿qué está pasando aquí? Primero, creamos un nuevo Observable. La función pasada
Observable.create()
, apropiadamente nombradaonSubscription
, se pasa al observador (construido a partir de los parámetros a los que le pasasubscribe()
), que es similar aresolve
yreject
combina en un solo objeto al crear una nueva Promesa. Así es como hacemos que la magia funcione.En
onSubscription
, nos suscribimos al primer Observable (en el ejemplo anterior, esto se llamóone
). Cómo lo manejamosnext
yerror
depende de usted, pero el valor predeterminado proporcionado en mi muestra debería ser apropiado en general. Sin embargo, cuando recibimos elcomplete
evento, lo que significa queone
ya está hecho, podemos suscribirnos al siguiente Observable; disparando así el segundo Observable después de que el primero esté completo.El observador de ejemplo proporcionado para el segundo Observable es bastante simple. Básicamente,
second
ahora actúa como lo que esperaríastwo
que actuara en el OP. Más específicamente,second
emitirá el primer y único valor emitido porsomeOtherObservable
(debido atake(1)
) y luego se completará, asumiendo que no hay error.Ejemplo
Aquí hay un ejemplo completo y funcional que puede copiar / pegar si desea ver mi ejemplo funcionando en la vida real:
var someObservable = Observable.from([1, 2, 3, 4, 5]); var someOtherObservable = Observable.from([6, 7, 8, 9]); var first = someObservable.take(1); var second = Observable.create((observer) => { return first.subscribe( function onNext(value) { /* do something with value like: */ observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { someOtherObservable.take(1).subscribe( function onNext(value) { observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { observer.complete(); } ); } ); }).subscribe( function onNext(value) { console.log(value); }, function onError(error) { console.error(error); }, function onComplete() { console.log("Done!"); } );
Si mira la consola, el ejemplo anterior se imprimirá:
fuente
Aquí hay un operador personalizado escrito con TypeScript que espera una señal antes de emitir resultados:
export function waitFor<T>( signal$: Observable<any> ) { return (source$: Observable<T>) => new Observable<T>(observer => { // combineLatest emits the first value only when // both source and signal emitted at least once combineLatest([ source$, signal$.pipe( first(), ), ]) .subscribe(([v]) => observer.next(v)); }); }
Puedes usarlo así:
two.pipe(waitFor(one)) .subscribe(value => ...);
fuente
bueno, sé que esto es bastante antiguo, pero creo que lo que podrías necesitar es:
var one = someObservable.take(1); var two = someOtherObservable.pipe( concatMap((twoRes) => one.pipe(mapTo(twoRes))), take(1) ).subscribe((twoRes) => { // one is completed and we get two's subscription. })
fuente
Puede usar el resultado emitido desde Observable anterior gracias al operador mergeMap (o su alias flatMap ) como este:
const one = Observable.of('https://api.github.com/users'); const two = (c) => ajax(c);//ajax from Rxjs/dom library one.mergeMap(two).subscribe(c => console.log(c))
fuente