RxJS: ¿Cómo actualizaría "manualmente" un Observable?

139

Creo que debo estar malentendiendo algo fundamental, porque en mi opinión, este debería ser el caso más básico para un observable, pero durante mi vida no puedo encontrar la manera de hacerlo desde los documentos.

Básicamente, quiero poder hacer esto:

// create a dummy observable, which I would update manually
var eventObservable = rx.Observable.create(function(observer){});
var observer = eventObservable.subscribe(
   function(x){
     console.log('next: ' + x);
   }
...
var my_function = function(){
  eventObservable.push('foo'); 
  //'push' adds an event to the datastream, the observer gets it and prints 
  // next: foo
}

Pero no he podido encontrar un método como push. Estoy usando esto para un controlador de clics, y sé que lo tienen Observable.fromEventpara eso, pero estoy tratando de usarlo con React y prefiero poder simplemente actualizar el flujo de datos en una devolución de llamada, en lugar de usar un completamente diferente sistema de manejo de eventos. Entonces básicamente quiero esto:

$( "#target" ).click(function(e) {
  eventObservable.push(e.target.text()); 
});

Lo más cerca que conseguí fue usarlo observer.onNext('foo'), pero eso no pareció funcionar realmente y eso fue llamado por el observador, lo que no parece correcto. El observador debería ser lo que reacciona al flujo de datos, no lo cambia, ¿verdad?

¿No entiendo la relación observador / observable?

LiamD
fuente
Eche un vistazo a esto para aclarar su idea (Introducción a la programación reactiva que se ha estado perdiendo): gist.github.com/staltz/868e7e9bc2a7b8c1f754 . Aquí también hay un montón de recursos con los que puede mejorar su comprensión: github.com/Reactive-Extensions/RxJS#resources
user3743222
Había comprobado el primero, parece un recurso sólido. La segunda es una gran lista, en ella encontré aaronstacy.com/writings/reactive-programming-and-mvc que me ayudó a descubrir Rx.Subject, que resuelve mi problema. ¡Así que gracias! Una vez que haya escrito un poco más de aplicación, publicaré mi solución, solo quiero probarlo un poco.
LiamD
Jeje, muchas gracias por hacer esta pregunta, estaba a punto de hacer la misma pregunta con el mismo ejemplo de código en mente :-)
arturh

Respuestas:

154

En RX, Observador y Observable son entidades distintas. Un observador se suscribe a un Observable. Un observable emite elementos a sus observadores llamando a los métodos de los observadores. Si necesita llamar a los métodos de observador fuera de su alcance Observable.create(), puede usar un Asunto, que es un proxy que actúa como observador y Observable al mismo tiempo.

Puedes hacer así:

var eventStream = new Rx.Subject();

var subscription = eventStream.subscribe(
   function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);
    },
    function () {
        console.log('Completed');
    });

var my_function = function() {
  eventStream.next('foo'); 
}

Puede encontrar más información sobre temas aquí:

luisgabriel
fuente
1
¡Esto es exactamente lo que terminé haciendo! Seguí trabajando en ello para ver si podía encontrar una mejor manera de hacer lo que tenía que hacer, pero esta es definitivamente una solución viable. Lo vi por primera vez en este artículo: aaronstacy.com/writings/reactive-programming-and-mvc .
LiamD
34

Creo Observable.create()que no toma un observador como parámetro de devolución de llamada, sino un emisor. Entonces, si desea agregar un nuevo valor a su Observable, intente esto:

var emitter;
var observable = Rx.Observable.create(e => emitter = e);
var observer = {
  next: function(next) {
    console.log(next);
  },
  error: function(error) {
    console.log(error);
  },
  complete: function() {
    console.log("done");
  }
}
observable.subscribe(observer);
emitter.next('foo');
emitter.next('bar');
emitter.next('baz');
emitter.complete();

//console output
//"foo"
//"bar"
//"baz"
//"done"

Sí, el Sujeto lo hace más fácil, proporcionando Observable y Observador en el mismo objeto, pero no es exactamente lo mismo, ya que el Sujeto le permite suscribir múltiples observadores al mismo observable cuando un observable solo envía datos al último observador suscrito, así que úselo conscientemente . Aquí hay un JsBin si quieres jugar con él.

theFreedomBanana
fuente
¿Está documentada la posibilidad de sobrescribir la propiedad del emisor en algún lugar de los manuales de RxJS?
Tomás
En este caso emittersolo habrá next()nuevos valores para el observador que suscribió el último. Un mejor enfoque sería recopilar todos emitterlos correos electrónicos en una matriz e iterar a través de todos ellos y nextel valor de cada uno de ellos
Eric Gopak