Estoy tratando de enumerar una gran IEnumerable
una vez, y observar la enumeración con varios operadores conectados ( Count
, Sum
, Average
etc.). La forma obvia es transformarlo en an IObservable
con el método ToObservable
, y luego suscribirle un observador. Noté que esto es mucho más lento que otros métodos, como hacer un bucle simple y notificar al observador en cada iteración, o usar el Observable.Create
método en lugar de ToObservable
. La diferencia es sustancial: es 20-30 veces más lenta. Es lo que es, o estoy haciendo algo mal?
using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
public static class Program
{
static void Main(string[] args)
{
const int COUNT = 10_000_000;
Method1(COUNT);
Method2(COUNT);
Method3(COUNT);
}
static void Method1(int count)
{
var source = Enumerable.Range(0, count);
var subject = new Subject<int>();
var stopwatch = Stopwatch.StartNew();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
static void Method2(int count)
{
var source = Enumerable.Range(0, count);
var subject = new Subject<int>();
var stopwatch = Stopwatch.StartNew();
foreach (var item in source) subject.OnNext(item);
subject.OnCompleted();
Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
static void Method3(int count)
{
var source = Enumerable.Range(0, count);
var subject = new Subject<int>();
var stopwatch = Stopwatch.StartNew();
Observable.Create<int>(o =>
{
foreach (var item in source) o.OnNext(item);
o.OnCompleted();
return Disposable.Empty;
}).Subscribe(subject);
Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
}
Salida:
ToObservable: 7,576 msec
Loop & Notify: 273 msec
Observable.Create: 511 msec
.NET Core 3.0, C # 8, System.Reactive 4.3.2, Windows 10, aplicación de consola, versión integrada
Actualización: Aquí hay un ejemplo de la funcionalidad real que quiero lograr:
var source = Enumerable.Range(0, 10_000_000).Select(i => (long)i);
var subject = new Subject<long>();
var cntTask = subject.Count().ToTask();
var sumTask = subject.Sum().ToTask();
var avgTask = subject.Average().ToTask();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"Count: {cntTask.Result:#,0}, Sum: {sumTask.Result:#,0}, Average: {avgTask.Result:#,0.0}");
Salida:
Cuenta: 10,000,000, Suma: 49,999,995,000,000, Promedio: 4,999,999.5
La diferencia importante de este enfoque en comparación con el uso de operadores LINQ estándar , es que la fuente enumerable se enumera solo una vez.
Una observación más: el uso ToObservable(Scheduler.Immediate)
es ligeramente más rápido (alrededor del 20%) que ToObservable()
.
fuente
Respuestas:
Esta es la diferencia entre un observable que se comporta bien y un observable "rodar-tu-propio-porque-piensas-más rápido-es-mejor-pero-no-es".
Cuando te sumerges lo suficiente en la fuente, descubres esta pequeña línea encantadora:
El está llamando efectivamente
hasNext = enumerator.MoveNext();
una vez por iteración recursiva programada.Esto le permite elegir el planificador para su
.ToObservable(schedulerOfYourChoice)
llamada.Con las otras opciones que ha elegido, ha creado una serie de llamadas completas
.OnNext
que prácticamente no hacen nada.Method2
Ni siquiera tiene una.Subscribe
llamada.Ambos
Method2
yMethod1
ejecutar mediante el hilo actual y ambos se termina de ejecutar antes de que termine la suscripción. Están bloqueando llamadas. Pueden causar condiciones de carrera.Method1
es el único que se comporta bien como observable. Es asíncrono y puede ejecutarse independientemente del suscriptor.Tenga en cuenta que los observables son colecciones que se ejecutan a lo largo del tiempo. Por lo general, tienen una fuente asíncrona o un temporizador o responden al estímulo externo. No suelen escapar de un simple enumerable. Si está trabajando con un enumerable, se debe esperar que el trabajo sincrónico se ejecute más rápido.
La velocidad no es el objetivo de Rx. El objetivo es realizar consultas complejas sobre valores empujados basados en el tiempo.
fuente
source.Aggregate(new { count = 0, sum = 0L }, (a, x) => new { count = a.count + 1, sum = a.sum + x }, a => new { a.count, a.sum, average = (double)a.sum / a.count })
. Solo una iteración y más de 10 veces más rápido que Rx.ToObservable
). Este es el otro extremo, donde tengo el mejor rendimiento, pero me veo obligado a volver a implementar todos los operadores LINQ dentro de una expresión lambda compleja. Es propenso a errores y menos mantenible, teniendo en cuenta que mis cálculos reales involucran aún más operadores y combinaciones de ellos. Creo que es bastante tentador pagar un precio de rendimiento x2 por tener una solución clara y legible. Por otro lado pagar x10 o x20, ¡no tanto!Porque el Sujeto no hace nada.
Parece que el rendimiento de la instrucción de bucle es diferente para 2 casos:
o
Si usa otro Asunto, con una implementación lenta de OnNext, el resultado será más aceptable
Salida
El sistema de soporte ToObservable.Reactive.Concurrency.IScheduler
Eso significa que puede implementar su propio IScheduler y decidir cuándo ejecutar cada tarea
Espero que esto ayude
Saludos
fuente
subject
es observado por otros operadores que realizan cálculos, así que no es no hacer nada. La penalización de rendimiento del usoToObservable
sigue siendo sustancial, porque los cálculos son muy leves.