Programación de red asíncrona usando extensiones reactivas

25

Después de hacer algunos (más o menos) socketprogramación asíncrona de "bajo nivel" hace años (en forma de patrón asincrónico basado en eventos (EAP) ) y recientemente "subir" a un TcpListener(modelo de programación asincrónica (APM) ) y luego tratando de pasar a async/await(Patrón asíncrono basado en tareas (TAP) ) Casi lo he tenido con tener que molestarme con toda esta 'fontanería de bajo nivel'. Entonces estaba pensando; ¿por qué no intentarloRX ( extensiones reactivas ) ya que podría ajustarse más cómodamente a mi dominio problemático?

Una gran cantidad de código que escribo tiene que ver con muchos clientes que se conectan a través de Tcp a mi aplicación que luego inicia una comunicación bidireccional (asíncrona). El cliente o el servidor pueden decidir en cualquier momento que se debe enviar un mensaje y hacerlo, por lo que esta no es su request/responseconfiguración clásica , sino más bien una "línea" bidireccional en tiempo real abierta a ambas partes para enviar lo que quieran , cuando quieran. (¡Si alguien tiene un nombre decente para describir esto, me encantaría escucharlo!).

El "protocolo" difiere según la aplicación (y no es realmente relevante para mi pregunta). Sí, sin embargo, tengo una pregunta inicial:

  1. Dado que solo se está ejecutando un "servidor", pero tiene que realizar un seguimiento de muchas (generalmente miles) de conexiones (por ejemplo, clientes) que tienen (por falta de una mejor descripción) su propia "máquina de estado" para realizar un seguimiento de sus estados, etc., ¿qué enfoque preferirías? EAP / TAP / APM? ¿RX incluso se consideraría una opción? Si no, ¿por qué?

Por lo tanto, necesito trabajar Async ya que a) no es un protocolo de solicitud / respuesta, por lo que no puedo tener un hilo / cliente en una llamada de bloqueo de "mensaje en espera" o llamada de bloqueo de "envío de mensaje" (sin embargo, si el envío es bloqueando para ese cliente solo yo podría vivir con él) yb) necesito manejar muchas conexiones concurrentes. No veo forma de hacerlo (de manera confiable) usando llamadas de bloqueo.

La mayoría de mis aplicaciones están relacionadas con VoiP; ya sean mensajes SIP de clientes SIP o mensajes PBX (relacionados) de aplicaciones como FreeSwitch / OpenSIPS, etc. pero puede, en su forma más simple, tratar de imaginar un servidor de "chat" tratando de manejar muchos clientes de "chat". La mayoría de los protocolos están basados ​​en texto (ASCII).

Entonces, después de haber implementado muchas permutaciones diferentes de las técnicas antes mencionadas, me gustaría simplificar mi trabajo creando un objeto que pueda simplemente crear instancias, decirle sobre qué IPEndpointescuchar y que me diga cuando ocurra algo de interés (lo cual generalmente use eventos para, por lo que algunos EAP generalmente se mezclan con las otras dos técnicas). La clase no debería molestarse en tratar de 'entender' el protocolo; simplemente debe manejar cadenas entrantes / salientes. Y así, teniendo mi ojo en RX esperando que eso (al final) simplificara el trabajo, creé un nuevo "violín" desde cero:

using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Reactive.Linq;
using System.Text;

class Program
{
    static void Main(string[] args)
    {
        var f = new FiddleServer(new IPEndPoint(IPAddress.Any, 8084));
        f.Start();
        Console.ReadKey();
        f.Stop();
        Console.ReadKey();
    }
}

public class FiddleServer
{
    private TcpListener _listener;
    private ConcurrentDictionary<ulong, FiddleClient> _clients;
    private ulong _currentid = 0;

    public IPEndPoint LocalEP { get; private set; }

    public FiddleServer(IPEndPoint localEP)
    {
        this.LocalEP = localEP;
        _clients = new ConcurrentDictionary<ulong, FiddleClient>();
    }

    public void Start()
    {
        _listener = new TcpListener(this.LocalEP);
        _listener.Start();
        Observable.While(() => true, Observable.FromAsync(_listener.AcceptTcpClientAsync)).Subscribe(
            //OnNext
            tcpclient =>
            {
                //Create new FSClient with unique ID
                var fsclient = new FiddleClient(_currentid++, tcpclient);
                //Keep track of clients
                _clients.TryAdd(fsclient.ClientId, fsclient);
                //Initialize connection
                fsclient.Send("connect\n\n");

                Console.WriteLine("Client {0} accepted", fsclient.ClientId);
            },
            //OnError
            ex =>
            {

            },
            //OnComplete
            () =>
            {
                Console.WriteLine("Client connection initialized");
                //Accept new connections
                _listener.AcceptTcpClientAsync();
            }
        );
        Console.WriteLine("Started");
    }

    public void Stop()
    {
        _listener.Stop();
        Console.WriteLine("Stopped");
    }

    public void Send(ulong clientid, string rawmessage)
    {
        FiddleClient fsclient;
        if (_clients.TryGetValue(clientid, out fsclient))
        {
            fsclient.Send(rawmessage);
        }
    }
}

public class FiddleClient
{
    private TcpClient _tcpclient;

    public ulong ClientId { get; private set; }

    public FiddleClient(ulong id, TcpClient tcpclient)
    {
        this.ClientId = id;
        _tcpclient = tcpclient;
    }

    public void Send(string rawmessage)
    {
        Console.WriteLine("Sending {0}", rawmessage);
        var data = Encoding.ASCII.GetBytes(rawmessage);
        _tcpclient.GetStream().WriteAsync(data, 0, data.Length);    //Write vs WriteAsync?
    }
}

Soy consciente de que, en este "violín", hay un pequeño detalle específico de implementación; en este caso, estoy trabajando con FreeSwitch ESL, por lo que "connect\n\n"debería eliminarse en el violín cuando se refactoriza a un enfoque más genérico.

También soy consciente de que necesito refactorizar los métodos anónimos a métodos de instancia privada en la clase Servidor; No estoy seguro de qué convención (por ejemplo, " OnSomething" por ejemplo) usar para sus nombres de método.

Esta es mi base / punto de partida / base (que necesita algunos "ajustes"). Tengo algunas preguntas sobre esto:

  1. Ver la pregunta anterior "1"
  2. ¿Estoy en el camino correcto? ¿O son injustas mis decisiones de "diseño"?
  3. Concurrencia: esto podría hacer frente a miles de clientes (analizando / manejando los mensajes reales a un lado)
  4. En excepciones: no estoy seguro de cómo obtener excepciones generadas dentro de los clientes "hasta" al servidor ("RX-wise"); ¿Cuál sería una buena manera?
  5. Ahora puedo obtener cualquier cliente conectado de mi clase de servidor (usando su ClientId), suponiendo que exponga a los clientes de una forma u otra, y llame a los métodos directamente. También puedo llamar a métodos a través de la clase Servidor (por ejemplo, el Send(clientId, rawmessage)método (mientras que este último enfoque sería un método "conveniente" para enviar rápidamente un mensaje al otro lado).
  6. No estoy muy seguro de dónde (y cómo) ir desde aquí:
    • a) Necesito manejar los mensajes entrantes; ¿Cómo iba a configurar esto? Puedo obtener el flujo de curso, pero ¿ dónde manejaría la recuperación de los bytes recibidos? Creo que necesito algún tipo de "ObservableStream", ¿algo a lo que pueda suscribirme? ¿Pondría esto en el FiddleCliento FiddleServer?
    • b) Suponiendo que quiero evitar el uso de eventos hasta que estas FiddleClient/ FiddleServerclases se implementen más específicamente para adaptar su manejo de protocolo específico de la aplicación, etc., usando clases FooClient/ más específicas FooServer: ¿cómo pasaría de obtener los datos en las clases 'Fiddle' subyacentes a sus contrapartes más específicas?

Artículos / enlaces que ya leí / hojeé / utilicé como referencia:

RobIII
fuente
Eche un vistazo a la biblioteca ReactiveSockets existente
Flagbug
2
No busco bibliotecas o enlaces (aunque se les agradece como referencia), sino aportes / consejos / ayuda sobre mis preguntas y configuración general. Quiero aprender a mejorar mi propio código y estar en mejores condiciones para decidir en qué dirección tomar esto, sopesar los pros y los contras, etc. Sin hacer referencia a alguna biblioteca, colocarla y seguir adelante. Quiero aprender de esta experiencia y tener más experiencia con la programación de Rx / red.
RobIII
Claro, pero ya que la biblioteca es de código abierto se puede ver cómo su implementó allí
Flagbug
1
Claro, pero mirar el código fuente no explica por qué se tomaron / se toman algunas decisiones de diseño. Y debido a que soy relativamente nuevo en Rx en combinación con la programación en red, no tengo suficiente experiencia para saber si esta biblioteca es buena, si el diseño tiene sentido, si se tomaron las decisiones correctas e incluso si es adecuado para mí.
RobIII
Creo que será bueno repensar al servidor como un miembro activo de un apretón de manos, por lo tanto, ese servidor inicia la conexión en lugar de escuchar las conexiones. Por ejemplo, aquí: codeproject.com/Articles/20250/Reverse-Connection-Shell

Respuestas:

1

... Me gustaría simplificar mi trabajo creando un objeto que pueda simplemente instanciar, decirle en qué IPEndpoint escuchar y que me diga siempre que ocurra algo interesante ...

Después de leer esa declaración, inmediatamente pensé "actores". Los actores son muy similares a los objetos, excepto que solo tienen una sola entrada donde se le pasa el mensaje (en lugar de llamar directamente a los métodos del objeto) y funcionan de forma asíncrona. En un ejemplo muy simplificado ... crearía el actor y le enviaría un mensaje con el IPEndpoint y la dirección del actor al que enviar el resultado. Se apaga y funciona en segundo plano. Solo te responde cuando ocurre "algo de interés". Puede crear una instancia de tantos actores como sea necesario para manejar la carga.

No estoy familiarizado con ninguna biblioteca de actores en .Net, aunque sé que hay algunas. Estoy familiarizado con la biblioteca TPL Dataflow (habrá una sección que la cubre en mi libro http://DataflowBook.com ) y debería ser fácil implementar un modelo de actor simple con esa biblioteca.

Matt Carkci
fuente
Eso se ve interesante. Prepararé un proyecto de juguete para ver si me conviene. Gracias por la sugerencia.
RobIII