¿Por qué IEnumerable.ToObservable es tan lento?

9

Estoy tratando de enumerar una gran IEnumerableuna vez, y observar la enumeración con varios operadores conectados ( Count, Sum, Averageetc.). La forma obvia es transformarlo en an IObservablecon el método ToObservable, y luego suscribirle un observador. Noté que esto es mucho más lento que otros métodos, como hacer un bucle simple y notificar al observador en cada iteración, o usar el Observable.Createmétodo en lugar de ToObservable. La diferencia es sustancial: es 20-30 veces más lenta. Es lo que es, o estoy haciendo algo mal?

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

public static class Program
{
    static void Main(string[] args)
    {
        const int COUNT = 10_000_000;
        Method1(COUNT);
        Method2(COUNT);
        Method3(COUNT);
    }

    static void Method1(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        source.ToObservable().Subscribe(subject);
        Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method2(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        foreach (var item in source) subject.OnNext(item);
        subject.OnCompleted();
        Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method3(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        Observable.Create<int>(o =>
        {
            foreach (var item in source) o.OnNext(item);
            o.OnCompleted();
            return Disposable.Empty;
        }).Subscribe(subject);
        Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
}

Salida:

ToObservable: 7,576 msec
Loop & Notify: 273 msec
Observable.Create: 511 msec

.NET Core 3.0, C # 8, System.Reactive 4.3.2, Windows 10, aplicación de consola, versión integrada


Actualización: Aquí hay un ejemplo de la funcionalidad real que quiero lograr:

var source = Enumerable.Range(0, 10_000_000).Select(i => (long)i);
var subject = new Subject<long>();
var cntTask = subject.Count().ToTask();
var sumTask = subject.Sum().ToTask();
var avgTask = subject.Average().ToTask();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"Count: {cntTask.Result:#,0}, Sum: {sumTask.Result:#,0}, Average: {avgTask.Result:#,0.0}");

Salida:

Cuenta: 10,000,000, Suma: 49,999,995,000,000, Promedio: 4,999,999.5

La diferencia importante de este enfoque en comparación con el uso de operadores LINQ estándar , es que la fuente enumerable se enumera solo una vez.


Una observación más: el uso ToObservable(Scheduler.Immediate)es ligeramente más rápido (alrededor del 20%) que ToObservable().

Theodor Zoulias
fuente
2
Una medición de 1 vez no es demasiado confiable. Considere configurar un punto de referencia con BenchmarkDotNet, por ejemplo. (No afiliado)
Fildor
1
@TheodorZoulias Hay más que eso, por ejemplo, cuestionaría su punto de referencia, ya que actualmente se encuentra en el orden de ejecución dentro de esa ejecución única que podría estar causando grandes diferencias.
Oliver
1
El cronómetro puede ser suficiente si reúne estadísticas. No solo una sola muestra.
Fildor
2
@Fildor - Bastante justo. Quiero decir que las cifras son representativas de lo que uno debería esperar.
Enigmatividad
2
@TheodorZoulias - Buena pregunta, por cierto.
Enigmatividad

Respuestas:

6

Esta es la diferencia entre un observable que se comporta bien y un observable "rodar-tu-propio-porque-piensas-más rápido-es-mejor-pero-no-es".

Cuando te sumerges lo suficiente en la fuente, descubres esta pequeña línea encantadora:

scheduler.Schedule(this, (IScheduler innerScheduler, _ @this) => @this.LoopRec(innerScheduler));

El está llamando efectivamente hasNext = enumerator.MoveNext();una vez por iteración recursiva programada.

Esto le permite elegir el planificador para su .ToObservable(schedulerOfYourChoice)llamada.

Con las otras opciones que ha elegido, ha creado una serie de llamadas completas .OnNextque prácticamente no hacen nada. Method2Ni siquiera tiene una .Subscribellamada.

Ambos Method2y Method1ejecutar mediante el hilo actual y ambos se termina de ejecutar antes de que termine la suscripción. Están bloqueando llamadas. Pueden causar condiciones de carrera.

Method1es el único que se comporta bien como observable. Es asíncrono y puede ejecutarse independientemente del suscriptor.

Tenga en cuenta que los observables son colecciones que se ejecutan a lo largo del tiempo. Por lo general, tienen una fuente asíncrona o un temporizador o responden al estímulo externo. No suelen escapar de un simple enumerable. Si está trabajando con un enumerable, se debe esperar que el trabajo sincrónico se ejecute más rápido.

La velocidad no es el objetivo de Rx. El objetivo es realizar consultas complejas sobre valores empujados basados ​​en el tiempo.

Enigmatividad
fuente
2
"rodar-tu-propio-porque-piensas-más-rápido-es-mejor-pero-no-es-" - ¡excelente!
Fildor
Gracias Enigmativity por la respuesta detallada! Actualicé mi pregunta con un ejemplo de lo que realmente quiero lograr, que es un cálculo de naturaleza sincrónica. ¿Crees que en lugar de extensiones reactivas debería buscar otra herramienta, dado que el rendimiento es crítico en mi caso?
Theodor Zoulias
@TheodorZoulias - Esta es la forma enumerables para hacer su ejemplo en su pregunta: source.Aggregate(new { count = 0, sum = 0L }, (a, x) => new { count = a.count + 1, sum = a.sum + x }, a => new { a.count, a.sum, average = (double)a.sum / a.count }). Solo una iteración y más de 10 veces más rápido que Rx.
Enigmatividad
Lo acabo de probar y, de hecho, es más rápido, pero solo alrededor de x2 más rápido (en comparación con RX sin ToObservable). Este es el otro extremo, donde tengo el mejor rendimiento, pero me veo obligado a volver a implementar todos los operadores LINQ dentro de una expresión lambda compleja. Es propenso a errores y menos mantenible, teniendo en cuenta que mis cálculos reales involucran aún más operadores y combinaciones de ellos. Creo que es bastante tentador pagar un precio de rendimiento x2 por tener una solución clara y legible. Por otro lado pagar x10 o x20, ¡no tanto!
Theodor Zoulias
Quizás si publicaste exactamente lo que intentas hacer, ¿podría sugerirte una alternativa?
Enigmatividad
-1

Porque el Sujeto no hace nada.

Parece que el rendimiento de la instrucción de bucle es diferente para 2 casos:

for(int i=0;i<1000000;i++)
    total++;

o

for(int i=0;i<1000000;i++)
    DoHeavyJob();

Si usa otro Asunto, con una implementación lenta de OnNext, el resultado será más aceptable

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

public static class Program
{
    static void Main(string[] args)
    {
        const int COUNT = 100;
        Method1(COUNT);
        Method2(COUNT);
        Method3(COUNT);
    }

    class My_Slow_Subject : SubjectBase<int>
    {

        public override void OnNext(int value)
        {
            //do a job which spend 3ms
            System.Threading.Thread.Sleep(3);
        }


        bool _disposed;
        public override bool IsDisposed => _disposed;
        public override void Dispose() => _disposed = true;
        public override void OnCompleted() { }
        public override void OnError(Exception error) { }
        public override bool HasObservers => false;
        public override IDisposable Subscribe(IObserver<int> observer) 
                => throw new NotImplementedException();
    }

    static SubjectBase<int> CreateSubject()
    {
        return new My_Slow_Subject();
    }

    static void Method1(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        source.ToObservable().Subscribe(subject);
        Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method2(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        foreach (var item in source) subject.OnNext(item);
        subject.OnCompleted();
        Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method3(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        Observable.Create<int>(o =>
        {
            foreach (var item in source) o.OnNext(item);
            o.OnCompleted();
            return Disposable.Empty;
        }).Subscribe(subject);
        Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
}

Salida

ToObservable: 434 msec
Loop & Notify: 398 msec
Observable.Create: 394 msec

El sistema de soporte ToObservable.Reactive.Concurrency.IScheduler

Eso significa que puede implementar su propio IScheduler y decidir cuándo ejecutar cada tarea

Espero que esto ayude

Saludos

BlazorPlus
fuente
¿Te das cuenta de que OP está hablando explícitamente de COUNT valores con una magnitud 100.000 veces mayor?
Fildor
Gracias BlazorPlus por la respuesta. He actualizado mi pregunta agregando un ejemplo más realista de mi caso de uso. El subjectes observado por otros operadores que realizan cálculos, así que no es no hacer nada. La penalización de rendimiento del uso ToObservablesigue siendo sustancial, porque los cálculos son muy leves.
Theodor Zoulias