¿Cómo obtener el valor actual de RxJS Subject u Observable?

207

Tengo un servicio Angular 2:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';

@Injectable()
export class SessionStorage extends Storage {
  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
  }
}

Todo funciona muy bien. Pero tengo otro componente que no necesita suscribirse, solo necesita obtener el valor actual de isLoggedIn en un momento determinado. ¿Cómo puedo hacer esto?

Baconbeastnz
fuente

Respuestas:

341

A Subjecto Observableno tiene un valor actual. Cuando se emite un valor, se pasa a los suscriptores y Observablese hace con él.

Si desea tener un valor actual, use el BehaviorSubjectque está diseñado exactamente para ese propósito. BehaviorSubjectmantiene el último valor emitido y lo emite inmediatamente a los nuevos suscriptores.

También tiene un método getValue()para obtener el valor actual.

Günter Zöchbauer
fuente
1
Hola, es mi mal, son lo mismo ahora en el rxjs 5. El enlace del código fuente anterior se refería a rxjs4.
código de schrodinger
84
Nota importante del autor de RxJS 5: Usar getValue()es una GRAN bandera roja que estás haciendo algo mal. Está ahí como una escotilla de escape. En general, todo lo que haga con RxJS debe ser declarativo. getValue()Es imprescindible. Si está usando getValue(), hay un 99.9% de posibilidades de que esté haciendo algo mal o extraño.
Ben Lesh
1
@BenLesh, ¿qué pasa si tengo un BehaviorSubject<boolean>(false)y me gusta alternar?
bob
3
@BenLesh getValue () es muy útil para, por ejemplo, realizar una acción instantánea, como onClick-> dispatchToServer (x.getValue ()) pero no lo use en cadenas observables.
FlavorScape
2
@ IngoBürk La única forma en que puedo justificar su sugerencia es si no sabía que foo$era un BehaviorSubject(es decir, se define como Observabley tal vez hace calor o frío de una fuente diferida). Sin embargo, ya que luego pasa a usar .nextel $foomismo que significa que su aplicación se basa en que siendo una BehaviorSubjectpor lo que no hay justificación para el uso no sólo .valuepara obtener el valor actual en el primer lugar.
Simon_Weaver
145

La única forma en que debe estar recibiendo los valores "de" un observable / El sujeto está con subscribe!

Si está utilizando getValue(), está haciendo algo imperativo en el paradigma declarativo. Está ahí como una escotilla de escape, pero el 99.9% del tiempo NO debe usar getValue(). Hay algunas cosas interesantes que getValue()harán: arrojará un error si el sujeto se ha dado de baja, evitará que obtenga un valor si el sujeto está muerto porque está equivocado, etc. Pero, de nuevo, está ahí como un escape eclosionan por raras circunstancias.

Hay varias formas de obtener el último valor de un sujeto u observable de forma "Rx-y":

  1. Usando BehaviorSubject: Pero en realidad suscribiéndose a él . Cuando se suscribe por primera BehaviorSubjectvez, enviará sincrónicamente el valor anterior que recibió o con el que se inicializó.
  2. Uso de ReplaySubject(N): Esto almacenará en caché los Nvalores y los reproducirá a los nuevos suscriptores.
  3. A.withLatestFrom(B): Utilice este operador para obtener el valor más reciente de observable Bcuando se Aemite observable . Le dará ambos valores en una matriz [a, b].
  4. A.combineLatest(B): Utilice este operador para obtener los valores más recientes Ay Bcada vez que emite Ao Bemite. Le dará ambos valores en una matriz.
  5. shareReplay(): Realiza una multidifusión observable a través de a ReplaySubject, pero le permite volver a intentar la observable en caso de error. (Básicamente le da ese comportamiento prometedor de almacenamiento en caché).
  6. publishReplay(), publishBehavior(initialValue), multicast(subject: BehaviorSubject | ReplaySubject), Etc: Otros operadores que aprovechan BehaviorSubjecty ReplaySubject. Diferentes sabores de la misma cosa, básicamente multidifunden la fuente observable al canalizar todas las notificaciones a través de un tema. Debe llamar connect()para suscribirse a la fuente con el tema.
Ben Lesh
fuente
19
¿Puedes decir por qué usar getValue () es una bandera roja? ¿Qué sucede si necesito el valor actual del observable solo una vez, en el momento en que el usuario hace clic en un botón? ¿Debo suscribirme por el valor e inmediatamente darme de baja?
Flame
55
Está bien a veces. Pero a menudo es una señal de que el autor del código está haciendo algo muy imperativo (generalmente con efectos secundarios) donde no debería estar. También puede hacer click$.mergeMap(() => behaviorSubject.take(1))para resolver su problema.
Ben Lesh
1
¡Gran explicación! En realidad, si puedes mantener Observable y usar los métodos, es una forma mucho mejor. En un contexto de mecanografiado con Observable usado en todas partes, pero solo en uno definido como BehaviourSubject, entonces sería un código menos consistente. Los métodos que propuso me permitieron mantener los tipos Observables en todas partes.
JLavoie
2
es bueno si desea simplemente enviar el valor actual (como enviarlo al servidor al hacer clic), hacer un getValue () es muy útil. pero no lo use al encadenar operadores observables.
FlavorScape
1
Tengo un AuthenticationServiceque usa un BehaviourSubjectpara almacenar el estado de inicio de sesión actual ( boolean trueo false). Expone un isLoggedIn$observable para los suscriptores que desean saber cuándo cambia el estado. También expone una get isLoggedIn()propiedad, que devuelve el estado de inicio de sesión actual llamando getValue()al subyacente BehaviourSubject; esto lo utiliza mi guardia de autenticación para verificar el estado actual. Esto me parece un uso sensato de getValue()...
Dan King
13

Tuve una situación similar en la que los suscriptores tardíos se suscriben al Asunto después de que llegó su valor.

Encontré ReplaySubject, que es similar a BehaviorSubject, funciona de maravilla en este caso. Y aquí hay un enlace para una mejor explicación: http://reactivex.io/rxjs/manual/overview.html#replaysubject

Kfir Erez
fuente
1
eso me ayudó en mi aplicación Angular4: tuve que mover la suscripción del constructor del componente al ngOnInit () (dicho componente se comparte entre las rutas), solo dejándome aquí en caso de que alguien tenga un problema similar
Luca
1
Tuve un problema con una aplicación Angular 5 en la que estaba usando un servicio para obtener valores de una API y establecer variables en diferentes componentes. Estaba usando sujetos / observables pero no empujaría los valores después de un cambio de ruta. ReplaySubject fue una caída en el reemplazo de Subject y resolvió todo.
jafaircl
1
Asegúrate de estar usando ReplaySubject (1), de lo contrario, los nuevos suscriptores obtendrán todos los valores emitidos previamente en secuencia; esto no siempre es obvio en el tiempo de ejecución
Drenai
@Drenai, por lo que entiendo, ReplaySubject (1) se comporta igual que BehaviorSubject ()
Kfir Erez
No es lo mismo, la gran diferencia es que ReplaySubject no emite inmediatamente un valor predeterminado cuando se suscribe si su next()función aún no se ha invocado, mientras que BehaviourSubject sí lo hace. La emisión inmediata es muy útil para aplicar valores predeterminados a una vista, cuando BehaviourSubject se usa como la fuente de datos observable en un servicio, por ejemplo
Drenai el
5
const observable = of('response')

function hasValue(value: any) {
  return value !== null && value !== undefined;
}

function getValue<T>(observable: Observable<T>): Promise<T> {
  return observable
    .pipe(
      filter(hasValue),
      first()
    )
    .toPromise();
}

const result = await getValue(observable)
// Do the logic with the result
// .................
// .................
// .................

Puede consultar el artículo completo sobre cómo implementarlo desde aquí. https://www.imkrish.com/how-to-get-current-value-of-observable-in-a-clean-way/

Keerati Limkulphong
fuente
3

Encontré el mismo problema en componentes secundarios donde inicialmente tendría que tener el valor actual del Asunto, luego suscribirse al Asunto para escuchar los cambios. Solo mantengo el valor actual en el Servicio para que esté disponible para que los componentes accedan, por ejemplo:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';

@Injectable()
export class SessionStorage extends Storage {

  isLoggedIn: boolean;

  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
    this.currIsLoggedIn = false;
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
    this.isLoggedIn = value;
  }
}

Un componente que necesita el valor actual podría acceder desde el servicio, es decir:

sessionStorage.isLoggedIn

No estoy seguro si esta es la práctica correcta :)

Molp Burnbright
fuente
2
Si necesita el valor de un observable en su vista de componentes, puede usar la asynctubería.
Ingo Bürk
2

Una similares buscando respuesta fue downvoted. Pero creo que puedo justificar lo que estoy sugiriendo aquí para casos limitados.


Si bien es cierto que un observable no tiene un valor actual , a menudo tendrá un valor disponible de inmediato . Por ejemplo, con las tiendas redux / flux / akita puede solicitar datos de una tienda central, en función de una cantidad de observables y ese valor generalmente estará disponible de inmediato.

Si este es el caso, entonces cuando usted subscribe, el valor volverá inmediatamente.

Entonces, digamos que recibió una llamada a un servicio, y al finalizar desea obtener el último valor de algo de su tienda, que posiblemente no emita :

Puede intentar hacer esto (y debe mantener todo lo posible 'cosas dentro de las tuberías'):

 serviceCallResponse$.pipe(withLatestFrom(store$.select(x => x.customer)))
                     .subscribe(([ serviceCallResponse, customer] => {

                        // we have serviceCallResponse and customer 
                     });

El problema con esto es que se bloqueará hasta que el observable secundario emita un valor, que potencialmente nunca podría ser.

Recientemente me encontré necesitando evaluar un observable solo si un valor estaba disponible de inmediato , y lo que es más importante, necesitaba poder detectar si no lo estaba. Terminé haciendo esto:

 serviceCallResponse$.pipe()
                     .subscribe(serviceCallResponse => {

                        // immediately try to subscribe to get the 'available' value
                        // note: immediately unsubscribe afterward to 'cancel' if needed
                        let customer = undefined;

                        // whatever the secondary observable is
                        const secondary$ = store$.select(x => x.customer);

                        // subscribe to it, and assign to closure scope
                        sub = secondary$.pipe(take(1)).subscribe(_customer => customer = _customer);
                        sub.unsubscribe();

                        // if there's a delay or customer isn't available the value won't have been set before we get here
                        if (customer === undefined) 
                        {
                           // handle, or ignore as needed
                           return throwError('Customer was not immediately available');
                        }
                     });

Tenga en cuenta que para todo lo anterior, estoy usando subscribepara obtener el valor (como comenta @Ben). No usar una .valuepropiedad, incluso si tuviera un BehaviorSubject.

Simon_Weaver
fuente
1
Por cierto, esto funciona porque por defecto 'horario' utiliza el hilo actual.
Simon_Weaver
1

Aunque puede sonar excesivo, esta es solo otra solución "posible" para mantener el tipo Observable y reducir la repetitiva ...

Siempre puede crear un getter de extensión para obtener el valor actual de un Observable.

Para hacer esto, necesitaría extender la Observable<T>interfaz en un global.d.tsarchivo de declaración de tipings. Luego implemente el getter de extensión en un observable.extension.tsarchivo y finalmente incluya ambos tipos y archivo de extensión en su aplicación.

Puede consultar esta Respuesta de StackOverflow para saber cómo incluir las extensiones en su aplicación Angular.

// global.d.ts
declare module 'rxjs' {
  interface Observable<T> {
    /**
     * _Extension Method_ - Returns current value of an Observable.
     * Value is retrieved using _first()_ operator to avoid the need to unsubscribe.
     */
    value: Observable<T>;
  }
}

// observable.extension.ts
Object.defineProperty(Observable.prototype, 'value', {
  get <T>(this: Observable<T>): Observable<T> {
    return this.pipe(
      filter(value => value !== null && value !== undefined),
      first());
  },
});

// using the extension getter example
this.myObservable$.value
  .subscribe(value => {
    // whatever code you need...
  });
j3ff
fuente
0

Puede almacenar el último valor emitido por separado del Observable. Luego léelo cuando sea necesario.

let lastValue: number;

const subscription = new Service().start();
subscription
    .subscribe((data) => {
        lastValue = data;
    }
);
Slawa
fuente
1
No es un enfoque reactivo almacenar algunas cosas fuera de lo observable. En su lugar, debe tener tantos datos como sea posible fluyendo dentro de las corrientes observables.
ganqqwerty
0

La mejor manera de hacer esto es usar Behaviur Subject, aquí hay un ejemplo:

var sub = new rxjs.BehaviorSubject([0, 1])
sub.next([2, 3])
setTimeout(() => {sub.next([4, 5])}, 1500)
sub.subscribe(a => console.log(a)) //2, 3 (current value) -> wait 2 sec -> 4, 5
yaya
fuente
0

Se puede crear una suscripción y después de tomar el primer elemento emitido destruido. Pipe es una función que utiliza un Observable como entrada y devuelve otro Observable como salida, sin modificar el primer observable. Angular 8.1.0. Paquetes: "rxjs": "6.5.3","rxjs-observable": "0.0.7"

  ngOnInit() {

    ...

    // If loading with previously saved value
    if (this.controlValue) {

      // Take says once you have 1, then close the subscription
      this.selectList.pipe(take(1)).subscribe(x => {
        let opt = x.find(y => y.value === this.controlValue);
        this.updateValue(opt);
      });

    }
  }
SushiGuy
fuente