¿IObserver <T> de .NET estaba destinado a suscribirse a múltiples IObservables?

9

Hay interfaces IObservable e IObserver en .NET (también aquí y aquí ). Curiosamente, la implementación concreta del IObserver no tiene una referencia directa al IObservable. No sabe a quién está suscrito. Solo puede invocar al suscriptor. "Por favor, tire del pin para darse de baja".

editar: El suscriptor implementa el IDisposable. Creo que este esquema se empleó para evitar el problema del oyente caducado .

Sin embargo, hay dos cosas que no me quedan del todo claras.

  1. ¿La clase interna Unsubscriber proporciona el comportamiento de suscribirse y olvidar? ¿Quién (y cuándo exactamente) llama IDisposable.Dispose()a Unsubscriber? El recolector de basura (GC) no es determinista.
    [Descargo de responsabilidad: en general, he pasado más tiempo con C y C ++ que con C #.]
  2. ¿Qué debería suceder si quiero suscribir un observador K a un L1 observable y el observador ya está suscrito a algún otro L2 observable?

    K.Subscribe(L1);
    K.Subscribe(L2);
    K.Unsubscribe();
    L1.PublishObservation(1003);
    L2.PublishObservation(1004);
    

    Cuando ejecuté este código de prueba contra el ejemplo de MSDN, el observador permaneció suscrito a L1. Esto sería peculiar en el desarrollo real. Potencialmente, hay 3 vías para mejorar esto:

    • Si el observador ya tiene una instancia de cancelación de suscripción (es decir, ya está suscrita), se da de baja silenciosamente del proveedor original antes de suscribirse a una nueva. Este enfoque oculta el hecho de que ya no está suscrito al proveedor original, lo que puede convertirse en una sorpresa más adelante.
    • Si el observador ya tiene una instancia para cancelar la suscripción, se produce una excepción. Un código de llamada con buen comportamiento tiene que cancelar la suscripción del observador explícitamente.
    • El observador se suscribe a múltiples proveedores. Esta es la opción más intrigante, pero ¿se puede implementar con IObservable e IObserver? Veamos. Es posible que el observador mantenga una lista de objetos para cancelar la suscripción: uno para cada fuente. Lamentablemente, IObserver.OnComplete()no proporciona una referencia al proveedor que lo envió. Por lo tanto, la implementación de IObserver con múltiples proveedores no podría determinar de cuál darse de baja.
  3. ¿IObserver de .NET estaba destinado a suscribirse a múltiples IObservables?
    ¿La definición de libro de texto del patrón de observador requiere que un observador pueda suscribirse a múltiples proveedores? ¿O es opcional y depende de la implementación?

Nick Alexeev
fuente

Respuestas:

5

Las dos interfaces son en realidad parte de Extensiones Reactivas (Rx para abreviar), debe usar esa biblioteca casi siempre que quiera usarlas.

Las interfaces están técnicamente en mscrolib, no en ninguno de los ensamblados Rx. Creo que esto es para facilitar la interoperabilidad: de esta manera, las bibliotecas como TPL Dataflow pueden proporcionar miembros que trabajen con esas interfaces , sin hacer referencia a Rx.

Si usa Rx's Subjectcomo su implementación IObservable, Subscribedevolverá uno IDisposableque puede usarse para cancelar la suscripción:

var observable = new Subject<int>();

var unsubscriber =
    observable.Subscribe(Observer.Create<int>(i => Console.WriteLine("1: {0}", i)));
observable.Subscribe(Observer.Create<int>(i => Console.WriteLine("2: {0}", i)));

unsubscriber.Dispose();

observable.OnNext(1003);
observable.OnNext(1004);
svick
fuente
5

Solo para aclarar algunas cosas que están bien documentadas en las Pautas oficiales de diseño de Rx y en detalle en mi sitio web IntroToRx.com :

  • No confía en el GC para limpiar sus suscripciones. Cubierto en detalle aquí
  • No hay Unsubscribemétodo Se suscribe a una secuencia observable y recibe una suscripción . A continuación, puede deshacerse de esa suscripción que indica que ya no desea que se invoquen sus devoluciones de llamada.
  • Una secuencia observable no se puede completar más de una vez (consulte la sección 4 de las Pautas de diseño de Rx).
  • Existen numerosas formas de consumir múltiples secuencias observables. También hay una gran cantidad de información al respecto en Reactivex.io y nuevamente en IntroToRx.

Para ser específico y responder directamente a la pregunta original, su uso es al revés. No empujas muchas secuencias observables en un solo observador. Usted compone secuencias observables en una sola secuencia observable. Luego te suscribes a esa secuencia única.

En vez de

K.Subscribe(L1);
K.Subscribe(L2);
K.Unsubscribe();
L1.PublishObservation(1003);
L2.PublishObservation(1004);

Que es solo un pseudocódigo y no funcionaría en la implementación .NET de Rx, debe hacer lo siguiente:

var source1 = new Subject<int>(); //was L1
var source2 = new Subject<int>(); //was L2

var subscription = source1
    .Merge(source2)
    .Subscribe(value=>Console.WriteLine("OnNext({0})", value));


source1.OnNext(1003);
source2.OnNext(1004);

subscription.Dispose();

Ahora, esto no encaja exactamente con la pregunta inicial, pero no sé qué K.Unsubscribe()se suponía que debía hacer (¿cancelar la suscripción de todos, la última o la primera suscripción?)

Lee Campbell
fuente
¿Puedo simplemente encapsular el objeto de suscripción en un bloque "en uso"?
Robert Oschler
1
En este caso síncrono puede, sin embargo, se supone que Rx es asíncrono. En el caso asíncrono, normalmente no puede usar el usingbloque. El costo de una declaración de suscripción debe ser prácticamente cero, por lo que tendría Neter el bloque usando, suscribir, dejar el bloque usando (por lo tanto darse de baja) haciendo que el código no tiene mucho sentido
Lee Campbell
3

Tienes razón. El ejemplo funciona mal para múltiples IObservables.

Supongo que OnComplete () no proporciona una referencia porque no quieren que IObservable tenga que mantenerlo. Si escribiera que probablemente admitiría varias suscripciones haciendo que Subscribe tome un identificador como un segundo parámetro, que se devuelve a la llamada OnComplete (). Entonces podrías decir

K.Subscribe(L1,"L1")
K.Subscribe(L2,"L2")
K.Unsubscribe("L1")

Tal como está, parece que .NET IObserver no es adecuado para múltiples observadores. Pero supongo que su objeto principal (LocationReporter en el ejemplo) podría tener

public Dictionary<String,IObserver> Observers;

y eso te permitiría apoyar

K.Subscribe(L1,"L1")
K.Subscribe(L2,"L2")
K.Unsubscribe("L1")

también.

Supongo que Microsoft podría argumentar que, por lo tanto, no es necesario que admitan directamente IObservables múltiples en las interfaces.

psr
fuente
También estaba pensando que la implementación observable puede tener una lista de observadores. También he notado que IObserver.OnComplete()no identifica de quién proviene la llamada. Si el observador está suscrito a más de un observable, entonces no sabe de quién darse de baja. Anticlimactico. Me pregunto, ¿tiene .NET una mejor interfaz para el patrón de observación?
Nick Alexeev
Si desea tener una referencia a algo, debería usar una referencia, no una cadena.
svick
Esta respuesta me ayudó con un error de la vida real. Estaba usando Observable.Create()para construir un observable, y encadenando varios observables de origen en él usando Subscribe(). Inadvertidamente pasé un observable completado en una ruta de código. Esto completó mi observable recién creado, aunque las otras fuentes no estaban completas. Me tomó años a trabajar en lo que tenía que hacer - interruptor Observable.Empty()para Observable.Never().
Olly
0

Sé que es muy tarde para la fiesta, pero ...

Las interfaces I Observable<T>y noIObserver<T> son parte de Rx ... son tipos centrales ... pero Rx las usa ampliamente.

Usted es libre de tener tantos (o tan pocos) observadores como desee. Si anticipa múltiples observadores, es responsabilidad OnNext()del observador enrutar las llamadas a los observadores apropiados para cada evento observado. El observable puede necesitar una lista o un diccionario como sugiere.

Hay buenos casos para permitir solo uno, y buenos para permitir muchos. Por ejemplo, en una implementación de CQRS / ES, puede aplicar un único controlador de comandos por tipo de comando en un bus de comandos, mientras que puede notificar varias transformaciones del lado de lectura para un tipo de evento determinado en el almacén de eventos.

Como se indicó en otras respuestas, no hay Unsubscribe. Desechar lo que te dan cuando Subscribegeneralmente haces el trabajo sucio. El observador, o un agente del mismo, es responsable de retener el token hasta que ya no quiera recibir más notificaciones . (pregunta 1)

Entonces, en tu ejemplo:

K.Subscribe(L1);
K.Subscribe(L2);
K.Unsubscribe();
L1.PublishObservation(1003);
L2.PublishObservation(1004);

... sería más como:

using ( var l1Token = K.Subscribe( L1 ) )
{
  using ( var l2Token = K.Subscribe( L2 );
  {
    L1.PublishObservation( 1003 );
    L2.PublishObservation( 1004 );
  } //--> effectively unsubscribing to L2 here

  L2.PublishObservation( 1005 );
}

... donde K escucharía 1003 y 1004 pero no 1005.

Para mí, esto todavía parece divertido porque nominalmente, las suscripciones son cosas de larga duración ... a menudo durante la duración del programa. No son diferentes a este respecto de los eventos normales .Net.

En muchos ejemplos que he visto, el Disposetoken funciona para eliminar al observador de la lista de observadores del observable. Prefiero que el token no contenga tanto conocimiento ... y, por lo tanto, he generalizado mis tokens de suscripción para llamar a una lambda pasada (con información de identificación capturada en el momento de la suscripción:

public class SubscriptionToken<T>: IDisposable
{
  private readonly Action unsubscribe;

  private SubscriptionToken( ) { }
  public SubscriptionToken( Action unsubscribe )
  {
    this.unsubscribe = unsubscribe;
  }

  public void Dispose( )
  {
    unsubscribe( );
  }
}

... y el observable puede instalar el comportamiento de cancelación de suscripción durante la suscripción:

IDisposable Subscribe<T>( IObserver<T> observer )
{
  var subscriberId = Guid.NewGuid( );
  subscribers.Add( subscriberId, observer );

  return new SubscriptionToken<T>
  (
    ( ) =>
    subscribers.Remove( subscriberId );
  );
}

Si su observador está captando eventos de múltiples observables, es posible que desee asegurarse de que haya algún tipo de información de correlación en los eventos mismos ... como lo hacen los eventos .Net con el sender. Depende de usted si eso importa o no. No está horneado, como has razonado correctamente. (Pregunta 3)

Arcilla
fuente