¿Por qué no se recomiendan los sujetos en las extensiones reactivas de .NET?

111

Actualmente me estoy familiarizando con el marco de extensiones reactivas para .NET y estoy trabajando a través de los diversos recursos de introducción que he encontrado (principalmente http://www.introtorx.com )

Nuestra aplicación involucra una serie de interfaces de hardware que detectan marcos de red, estos serán mis IObservables, luego tendré una variedad de componentes que consumirán esos marcos o realizarán alguna forma de transformación en los datos y producirán un nuevo tipo de marco. También habrá otros componentes que deberán mostrar cada enésimo fotograma, por ejemplo. Estoy convencido de que Rx será útil para nuestra aplicación, sin embargo, estoy luchando con los detalles de implementación de la interfaz IObserver.

La mayoría (si no todos) de los recursos que he estado leyendo han dicho que no debería implementar la interfaz IObservable yo mismo, sino usar una de las funciones o clases proporcionadas. De mi investigación, parece que la creación de un Subject<IBaseFrame>me proporcionaría lo que necesito, tendría mi único hilo que lee datos de la interfaz de hardware y luego llama a la función OnNext de mi Subject<IBaseFrame>instancia. Los diferentes componentes de IObserver recibirían sus notificaciones de ese sujeto.

Mi confusión proviene del consejo que se da en el apéndice de este tutorial, donde dice:

Evite el uso de tipos de sujetos. Rx es efectivamente un paradigma de programación funcional. Usar sujetos significa que ahora estamos administrando el estado, que potencialmente está mutando. Tratar tanto con el estado mutante como con la programación asincrónica al mismo tiempo es muy difícil de hacer bien. Además, muchos de los operadores (métodos de extensión) se han escrito cuidadosamente para garantizar que se mantenga una vida útil correcta y coherente de las suscripciones y secuencias; cuando presenta los temas, puede romper esto. Las versiones futuras también pueden ver una degradación significativa del rendimiento si utiliza sujetos explícitamente.

Mi aplicación es bastante crítica para el rendimiento, obviamente voy a probar el rendimiento del uso de patrones Rx antes de que entre en el código de producción; sin embargo, me preocupa que estoy haciendo algo que va en contra del espíritu del marco Rx al usar la clase Asunto y que una versión futura del marco va a dañar el rendimiento.

¿Existe una mejor manera de hacer lo que quiero? El subproceso de sondeo de hardware se ejecutará continuamente, ya sea que haya observadores o no (el búfer de HW se respaldará de lo contrario), por lo que esta es una secuencia muy activa. Luego, necesito pasar los fotogramas recibidos a varios observadores.

Cualquier consejo será muy apreciado.

Antonio
fuente
1
Realmente me ayudó a comprender el tema, solo estoy aclarando las cosas en mi cabeza sobre cómo usarlo en mi aplicación. Sé que son lo correcto: tengo una canalización de componentes que están muy orientados al empuje y necesito hacer todo tipo de filtrado e invocación en el hilo de la interfaz de usuario para mostrar en una GUI, así como almacenar en búfer el último marco recibido, etc. etc - ¡Solo necesito asegurarme de hacerlo bien la primera vez!
Anthony

Respuestas:

70

Ok, si ignoramos mis formas dogmáticas e ignoramos "los temas son buenos / malos" todos juntos. Miremos el espacio del problema.

Apuesto a que tienes 1 de 2 estilos de sistema en los que necesitas integrarte.

  1. El sistema genera un evento o una devolución de llamada cuando llega un mensaje
  2. Necesita sondear el sistema para ver si hay algún mensaje para procesar

Para la opción 1, fácil, simplemente lo envolvemos con el método FromEvent apropiado y listo. ¡Al bar!

Para la opción 2, ahora debemos considerar cómo sondear esto y cómo hacerlo de manera eficiente. Además, cuando obtenemos el valor, ¿cómo lo publicamos?

Me imagino que querrías un hilo dedicado para las encuestas. No querrás que otro codificador golpee ThreadPool / TaskPool y te deje en una situación de inanición de ThreadPool. Alternativamente, no quiere la molestia de cambiar de contexto (supongo). Así que supongamos que tenemos nuestro propio hilo, probablemente tendremos algún tipo de ciclo While / Sleep en el que nos sentaremos para sondear. Cuando el cheque encuentra algunos mensajes los publicamos. Bueno, todo esto suena perfecto para Observable.Create. Ahora probablemente no podamos usar un bucle While, ya que eso no nos permitirá devolver un Desechable para permitir la cancelación. Afortunadamente, has leído todo el libro, ¡así que estás familiarizado con la programación recursiva!

Imagino que algo como esto podría funcionar. #No probado

public class MessageListener
{
    private readonly IObservable<IMessage> _messages;
    private readonly IScheduler _scheduler;

    public MessageListener()
    {
        _scheduler = new EventLoopScheduler();

        var messages = ListenToMessages()
                                    .SubscribeOn(_scheduler)
                                    .Publish();

        _messages = messages;
        messages.Connect();
    }

    public IObservable<IMessage> Messages
    {
        get {return _messages;}
    }

    private IObservable<IMessage> ListenToMessages()
    {
        return Observable.Create<IMessage>(o=>
        {
                return _scheduler.Schedule(recurse=>
                {
                    try
                    {           
                        var messages = GetMessages();
                        foreach (var msg in messages)
                        {
                            o.OnNext(msg);
                        }   
                        recurse();
                    }
                    catch (Exception ex)
                    {
                        o.OnError(ex);
                    }                   
                });
        });
    }

    private IEnumerable<IMessage> GetMessages()
    {
         //Do some work here that gets messages from a queue, 
         // file system, database or other system that cant push 
         // new data at us.
         // 
         //This may return an empty result when no new data is found.
    }
}

La razón por la que realmente no me gustan los temas es que, por lo general, el desarrollador no tiene un diseño claro sobre el problema. Hackear un tema, pincharlo aquí y allá, y luego dejar que el pobre desarrollador de soporte adivine que WTF estaba sucediendo. Cuando utiliza los métodos Crear / Generar, etc., está localizando los efectos en la secuencia. Puede verlo todo en un método y sabe que nadie más está produciendo un efecto secundario desagradable. Si veo los campos de una materia, ahora tengo que buscar todos los lugares de una clase en la que se está utilizando. Si algún MFer expone uno públicamente, entonces todas las apuestas están canceladas, ¡quién sabe cómo se está utilizando esta secuencia! Async / Concurrency / Rx es difícil. No necesita hacerlo más difícil permitiendo que los efectos secundarios y la programación de causalidad le hagan girar la cabeza aún más.

Lee Campbell
fuente
10
Estoy leyendo esta respuesta ahora, pero sentí que debería señalar que ¡nunca consideraría exponer la interfaz del sujeto! Lo estoy usando para proporcionar la implementación IObservable <> dentro de una clase sellada (que expone el IObservable <>). Definitivamente puedo ver por qué exponer la interfaz Subject <> sería algo malo ™
Anthony
oye, lamento ser grosero, pero realmente no entiendo tu código. ¿Qué hacen y regresan ListenToMessages () y GetMessages ()?
user10479
1
Para su proyecto personal @jeromerg, esto puede estar bien. Sin embargo, en mi experiencia, los desarrolladores luchan con WPF, MVVM, el diseño de GUI de pruebas unitarias y luego agregar Rx puede complicar las cosas. Probé el patrón BehaviourSubject-as-a-property. Sin embargo, descubrí que era mucho más adoptable para otros si usamos propiedades INPC estándar y luego usamos un método de extensión simple para convertir esto en IObservable. Además, necesitará enlaces WPF personalizados para trabajar con sus sujetos de comportamiento. Ahora su pobre equipo también tiene que aprender WPF, MVVM, Rx y su nuevo marco.
Lee Campbell
2
@LeeCampbell, para ponerlo en términos de su ejemplo de código, la forma normal sería que MessageListener sea construido por el sistema (probablemente registre el nombre de la clase de alguna manera), y le dice que el sistema llamará a OnCreate () y OnGoodbye (), y llamará message1 (), message2 () y message3 () a medida que se generen los mensajes. Parece que messageX [123] llamaría a OnNext sobre un tema, pero ¿hay una mejor manera?
James Moore
1
@JamesMoore ya que estas cosas son mucho más fáciles de explicar con ejemplos concretos. Si conoce una aplicación de Android de código abierto que usa Rx y Subjects, entonces tal vez pueda encontrar tiempo para ver si puedo proporcionar una mejor manera. Entiendo que no es muy útil pararse en un pedestal y decir que los sujetos son malos. Pero creo que cosas como IntroToRx, RxCookbook y ReactiveTrader dan varios niveles de ejemplo de cómo usar Rx.
Lee Campbell
38

En general, debes evitar usarlos Subject, sin embargo, para lo que estás haciendo aquí, creo que funcionan bastante bien. Hice una pregunta similar cuando encontré el mensaje "evitar temas" en los tutoriales de Rx.

Para citar a Dave Sexton (de Rxx)

"Los sujetos son los componentes con estado de Rx. Son útiles cuando necesita crear un observable similar a un evento como un campo o una variable local".

Tiendo a usarlos como punto de entrada a Rx. Entonces, si tengo algún código que necesita decir 'algo sucedió' (como tú), usaría un Subjecty llamar OnNext. Luego, exponga eso IObservablepara que otros se suscriban (puede usarlo AsObservable()en su tema para asegurarse de que nadie pueda enviar a un Asunto y estropear las cosas).

También podría lograr esto con un evento y uso .NET FromEventPattern, pero si solo voy a convertir el evento en un evento de IObservabletodos modos, no veo el beneficio de tener un evento en lugar de un Subject(lo que podría significar que me estoy perdiendo algo aqui)

Sin embargo, lo que debe evitar con bastante fuerza es suscribirse a un IObservablecon a Subject, es decir, no pasar un Subjectal IObservable.Subscribemétodo.

Wilka
fuente
¿Por qué necesitas el estado? Como muestra mi respuesta, si divide el problema en partes separadas, realmente no tiene que administrar el estado en absoluto. En este caso, no se deben utilizar sujetos .
casperOne
8
@casperOne No necesita un estado fuera del Asunto <T> o del evento (ambos tienen colecciones de cosas para llamar, observadores o controladores de eventos). Solo prefiero usar un Asunto si la única razón para agregar un evento es envolverlo con FromEventPattern. Aparte de un cambio en los esquemas de excepciones, que podría ser importante para usted, no veo ningún beneficio en evitar a Subject de esta manera. Una vez más, podría estar perdiendo algo más aquí que el evento es preferible al Sujeto. La mención del estado era solo una parte de la cita, y parecía mejor dejarla. ¿Quizás es más claro sin esa parte?
Wilka
@casperOne, pero tampoco debe crear un evento solo para envolverlo con FromEventPattern. Obviamente, es una idea terrible.
James Moore
3
He explicado mi cita con más profundidad en esta publicación de blog .
Dave Sexton
Tiendo a usarlos como punto de entrada a Rx. Esto me dio en el clavo. Tengo una situación en la que hay una API que cuando se invoca genera eventos que me gustaría pasar a través de una canalización de procesamiento reactivo. El Asunto fue la respuesta para mí, ya que FromEventPattern no parece existir en RxJava AFAICT.
scorpiodawg
31

A menudo, cuando administra un Asunto, en realidad solo está reimplementando funciones que ya están en Rx, y probablemente no de una manera tan sólida, simple y extensible.

Cuando intenta adaptar algún flujo de datos asincrónico a Rx (o crear un flujo de datos asincrónico a partir de uno que actualmente no es asincrónico), los casos más comunes suelen ser:

  • La fuente de datos es un evento : como dice Lee, este es el caso más simple: use FromEvent y diríjase al pub.

  • La fuente de datos proviene de una operación sincrónica y desea actualizaciones sondeadas (por ejemplo, un servicio web o una llamada a la base de datos): en este caso, podría usar el enfoque sugerido por Lee, o para casos simples, podría usar algo como Observable.Interval.Select(_ => <db fetch>). Es posible que desee utilizar DistinctUntilChanged () para evitar la publicación de actualizaciones cuando nada ha cambiado en los datos de origen.

  • La fuente de datos es algún tipo de API asincrónica que llama a su devolución de llamada : en este caso, use Observable.Create para conectar su devolución de llamada para llamar a OnNext / OnError / OnComplete en el observador.

  • La fuente de datos es una llamada que se bloquea hasta que haya nuevos datos disponibles (por ejemplo, algunas operaciones de lectura de socket síncronas): en este caso, puede usar Observable.Create para envolver el código imperativo que lee desde el socket y lo publica en Observer.OnNext cuando se leen los datos. Esto puede ser similar a lo que está haciendo con el sujeto.

Usar Observable.Create frente a crear una clase que gestiona un sujeto es bastante equivalente a usar la palabra clave yield frente a crear una clase completa que implemente IEnumerator. Por supuesto, puede escribir un IEnumerator para que sea tan limpio y tan bueno como el código de rendimiento, pero ¿cuál está mejor encapsulado y tiene un diseño más ordenado? Lo mismo ocurre con Observable.Create frente a la gestión de sujetos.

Observable.Create le brinda un patrón limpio para una configuración perezosa y un desmontaje limpio. ¿Cómo se logra esto con una clase que envuelve una asignatura? Necesita algún tipo de método de inicio ... ¿cómo sabe cuándo llamarlo? ¿O simplemente lo inicia siempre, incluso cuando nadie está escuchando? Y cuando haya terminado, ¿cómo puede hacer que deje de leer desde el socket / sondear la base de datos, etc.? Debe tener algún tipo de método de detención, y aún debe tener acceso no solo al IObservable al que está suscrito, sino a la clase que creó el Asunto en primer lugar.

Con Observable.Create, todo está reunido en un solo lugar. El cuerpo de Observable.Create no se ejecuta hasta que alguien se suscribe, por lo que si nadie se suscribe, nunca usas tu recurso. Y Observable.Create devuelve un Desechable que puede cerrar limpiamente sus recursos / devoluciones de llamada, etc., esto se llama cuando el Observador cancela la suscripción. La vida útil de los recursos que está utilizando para generar el Observable está estrechamente relacionada con la vida útil del Observable en sí.

Niall Connaughton
fuente
1
Explicación muy clara de Observable.Create. ¡Gracias!
Evan Moran
1
Todavía tengo casos en los que uso un sujeto, donde un objeto de corredor expone lo observable (digamos que es solo una propiedad cambiante). Diferentes componentes llamarán al corredor diciéndole cuándo cambia esa propiedad (con una llamada al método), y ese método hace un OnNext. Los consumidores se suscriben. Creo que usaría un BehaviorSubject en este caso, ¿es apropiado?
Frank Schwieterman
1
Depende de la situación. Un buen diseño de Rx tiende a transformar el sistema hacia una arquitectura asíncrona / reactiva. Puede resultar difícil integrar de forma limpia componentes pequeños de código reactivo con un sistema de diseño imperativo. La solución de curita es utilizar Sujetos para convertir acciones imperativas (llamadas a funciones, conjuntos de propiedades) en eventos observables. Entonces terminas con pequeños bolsillos de código reactivo y sin un verdadero "¡ajá!" momento. Cambiar el diseño para modelar el flujo de datos y reaccionar a él generalmente brinda un mejor diseño, pero es un cambio generalizado y requiere un cambio de mentalidad y la participación del equipo.
Niall Connaughton
1
Yo diría aquí (como Rx sin experiencia) que: Al usar Subjects, puede ingresar al mundo de Rx dentro de una aplicación imperativa creciente y transformarla lentamente. También para obtener primeras experiencias ... y ciertamente luego cambiar tu código a cómo debería haber sido desde el principio (risas). Pero para empezar, creo que podría valer la pena utilizar sujetos.
Robetto
9

El texto del bloque entre comillas explica en gran medida por qué no debería usarlo Subject<T>, pero para ponerlo más simple, está combinando las funciones de observador y observable, mientras inyecta algún tipo de estado en el medio (ya sea que esté encapsulando o extendiendo).

Aquí es donde se mete en problemas; estas responsabilidades deben estar separadas y ser distintas entre sí.

Dicho esto, en su caso específico , le recomiendo que divida sus inquietudes en partes más pequeñas.

Primero, tiene su hilo que está activo y siempre monitorea el hardware en busca de señales para generar notificaciones. ¿Cómo harías esto normalmente? Eventos . Así que comencemos con eso.

Definamos el EventArgsque disparará su evento.

// The event args that has the information.
public class BaseFrameEventArgs : EventArgs
{
    public BaseFrameEventArgs(IBaseFrame baseFrame)
    {
        // Validate parameters.
        if (baseFrame == null) throw new ArgumentNullException("IBaseFrame");

        // Set values.
        BaseFrame = baseFrame;
    }

    // Poor man's immutability.
    public IBaseFrame BaseFrame { get; private set; }
}

Ahora, la clase que disparará el evento. Tenga en cuenta que esto podría ser una clase estática (ya que siempre tiene un hilo en ejecución monitoreando el búfer de hardware), o algo que llame a pedido que se suscriba a eso . Tendrá que modificar esto según corresponda.

public class BaseFrameMonitor
{
    // You want to make this access thread safe
    public event EventHandler<BaseFrameEventArgs> HardwareEvent;

    public BaseFrameMonitor()
    {
        // Create/subscribe to your thread that
        // drains hardware signals.
    }
}

Entonces ahora tienes una clase que expone un evento. Los observables funcionan bien con eventos. Tanto es así que existe un soporte de primera clase para convertir flujos de eventos (piense en un flujo de eventos como múltiples disparos de un evento) en IObservable<T>implementaciones si sigue el patrón de evento estándar, a través del método estáticoFromEventPattern en la Observableclase .

Con la fuente de tus eventos y el FromEventPatternmétodo, podemos crear IObservable<EventPattern<BaseFrameEventArgs>>fácilmente (la EventPattern<TEventArgs>clase incorpora lo que verías en un evento .NET, en particular, una instancia derivada EventArgsy un objeto que representa al remitente), así:

// The event source.
// Or you might not need this if your class is static and exposes
// the event as a static event.
var source = new BaseFrameMonitor();

// Create the observable.  It's going to be hot
// as the events are hot.
IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
    FromEventPattern<BaseFrameEventArgs>(
        h => source.HardwareEvent += h,
        h => source.HardwareEvent -= h);

Por supuesto, quieres una IObservable<IBaseFrame>, pero eso es fácil, usando el Selectmétodo de extensión en la Observableclase para crear una proyección (como lo harías en LINQ, y podemos resumir todo esto en un método fácil de usar):

public IObservable<IBaseFrame> CreateHardwareObservable()
{
    // The event source.
    // Or you might not need this if your class is static and exposes
    // the event as a static event.
    var source = new BaseFrameMonitor();

    // Create the observable.  It's going to be hot
    // as the events are hot.
    IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
        FromEventPattern<BaseFrameEventArgs>(
            h => source.HardwareEvent += h,
            h => source.HardwareEvent -= h);

    // Return the observable, but projected.
    return observable.Select(i => i.EventArgs.BaseFrame);
}
casperOne
fuente
7
Gracias por su respuesta @casperOne, este fue mi enfoque inicial, pero me pareció "incorrecto" agregar un evento solo para poder envolverlo con Rx. Actualmente utilizo delegados (y sí, ¡sé que eso es exactamente lo que es un evento!) Para encajar con el código utilizado para cargar y guardar la configuración, esto tiene que poder reconstruir las canalizaciones de componentes y el sistema de delegados me dio más flexibilidad. Rx me está dando un dolor de cabeza en esta área ahora, pero el poder de todo lo demás en el marco hace que la solución del problema de configuración valga la pena.
Anthony
@Anthony Si puedes hacer que su código de muestra funcione, genial, pero como he comentado, no tiene sentido. En cuanto a sentirse "mal", no sé por qué subdividir las cosas en partes lógicas parece "incorrecto", pero no ha dado suficientes detalles en su publicación original para indicar cómo traducirlo mejor IObservable<T>como sin información sobre cómo Actualmente se está señalizando con esa información.
casperOne
@casperOne En su opinión, ¿sería apropiado el uso de Temas para un Bus de mensajes / Agregador de eventos?
kitsune
1
@kitsune No, no veo por qué lo harían. Si está pensando en "optimización", debe preguntarse si ese es el problema o no, ¿ha medido que Rx es la causa del problema?
casperOne
2
Estoy de acuerdo aquí con casperOne en que dividir las preocupaciones es una buena idea. Me gustaría señalar que si va con el patrón Hardware to Event to Rx, pierde la semántica del error. Las conexiones o sesiones perdidas, etc.no quedarán expuestas al consumidor. Ahora el consumidor no puede decidir si quiere volver a intentarlo, desconectarse, suscribirse a otra secuencia u otra cosa.
Lee Campbell
0

Es malo generalizar que los sujetos no son buenos para usar en una interfaz pública. Si bien es cierto que esta no es la forma en que debería verse un enfoque de programación reactiva, definitivamente es una buena opción de mejora / refactorización para su código clásico.

Si tiene una propiedad normal con un descriptor de acceso de conjunto público y desea notificar los cambios, no hay nada en contra de reemplazarla con un BehaviorSubject. El INPC u otros eventos adicionales simplemente no son tan limpios y personalmente me desgasta. Para este propósito, puede y debe usar BehaviorSubjects como propiedades públicas en lugar de propiedades normales y deshacerse de INPC u otros eventos.

Además, la interfaz de sujeto hace que los usuarios de su interfaz sean más conscientes de la funcionalidad de sus propiedades y es más probable que se suscriban en lugar de simplemente obtener el valor.

Es lo mejor para usar si desea que otros escuchen / se suscriban a los cambios de una propiedad.

Felix Keil
fuente