Uso de SignalR con conmutación por error de bus de mensajes de Redis mediante ConnectionUtils.Connect () de BookSleeve

112

Estoy tratando de crear un escenario de conmutación por error del bus de mensajes de Redis con una aplicación SignalR.

Al principio, probamos una simple conmutación por error del equilibrador de carga de hardware, que simplemente monitoreaba dos servidores Redis. La aplicación SignalR apuntó al punto final HLB singular. Luego fallé en un servidor, pero no pude transmitir con éxito ningún mensaje en el segundo servidor de Redis sin reciclar el grupo de aplicaciones de SignalR. Presumiblemente, esto se debe a que necesita enviar los comandos de configuración al nuevo bus de mensajes de Redis.

A partir de SignalR RC1, Microsoft.AspNet.SignalR.Redis.RedisMessageBususa Booksleeve RedisConnection()para conectarse a un solo Redis para pub / sub.

RedisMessageBusCluster()Creé una nueva clase, que usa Booksleeve's ConnectionUtils.Connect()para conectarse a una en un grupo de servidores Redis.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BookSleeve;
using Microsoft.AspNet.SignalR.Infrastructure;

namespace Microsoft.AspNet.SignalR.Redis
{
    /// <summary>
    /// WIP:  Getting scaleout for Redis working
    /// </summary>
    public class RedisMessageBusCluster : ScaleoutMessageBus
    {
        private readonly int _db;
        private readonly string[] _keys;
        private RedisConnection _connection;
        private RedisSubscriberConnection _channel;
        private Task _connectTask;

        private readonly TaskQueue _publishQueue = new TaskQueue();

        public RedisMessageBusCluster(string serverList, int db, IEnumerable<string> keys, IDependencyResolver resolver)
            : base(resolver)
        {
            _db = db;
            _keys = keys.ToArray();

            // uses a list of connections
            _connection = ConnectionUtils.Connect(serverList);

            //_connection = new RedisConnection(host: server, port: port, password: password);

            _connection.Closed += OnConnectionClosed;
            _connection.Error += OnConnectionError;


            // Start the connection - TODO:  can remove this Open as the connection is already opened, but there's the _connectTask is used later on
            _connectTask = _connection.Open().Then(() =>
            {
                // Create a subscription channel in redis
                _channel = _connection.GetOpenSubscriberChannel();

                // Subscribe to the registered connections
                _channel.Subscribe(_keys, OnMessage);

                // Dirty hack but it seems like subscribe returns before the actual
                // subscription is properly setup in some cases
                while (_channel.SubscriptionCount == 0)
                {
                    Thread.Sleep(500);
                }
            });
        }


        protected override Task Send(Message[] messages)
        {
            return _connectTask.Then(msgs =>
            {
                var taskCompletionSource = new TaskCompletionSource<object>();

                // Group messages by source (connection id)
                var messagesBySource = msgs.GroupBy(m => m.Source);

                SendImpl(messagesBySource.GetEnumerator(), taskCompletionSource);

                return taskCompletionSource.Task;
            },
            messages);
        }

        private void SendImpl(IEnumerator<IGrouping<string, Message>> enumerator, TaskCompletionSource<object> taskCompletionSource)
        {
            if (!enumerator.MoveNext())
            {
                taskCompletionSource.TrySetResult(null);
            }
            else
            {
                IGrouping<string, Message> group = enumerator.Current;

                // Get the channel index we're going to use for this message
                int index = Math.Abs(group.Key.GetHashCode()) % _keys.Length;

                string key = _keys[index];

                // Increment the channel number
                _connection.Strings.Increment(_db, key)
                                   .Then((id, k) =>
                                   {
                                       var message = new RedisMessage(id, group.ToArray());

                                       return _connection.Publish(k, message.GetBytes());
                                   }, key)
                                   .Then((enumer, tcs) => SendImpl(enumer, tcs), enumerator, taskCompletionSource)
                                   .ContinueWithNotComplete(taskCompletionSource);
            }
        }

        private void OnConnectionClosed(object sender, EventArgs e)
        {
            // Should we auto reconnect?
            if (true)
            {
                ;
            }
        }

        private void OnConnectionError(object sender, BookSleeve.ErrorEventArgs e)
        {
            // How do we bubble errors?
            if (true)
            {
                ;
            }
        }

        private void OnMessage(string key, byte[] data)
        {
            // The key is the stream id (channel)
            var message = RedisMessage.Deserialize(data);

            _publishQueue.Enqueue(() => OnReceived(key, (ulong)message.Id, message.Messages));
        }

        protected override void Dispose(bool disposing)
        {
            if (disposing)
            {
                if (_channel != null)
                {
                    _channel.Unsubscribe(_keys);
                    _channel.Close(abort: true);
                }

                if (_connection != null)
                {
                    _connection.Close(abort: true);
                }                
            }

            base.Dispose(disposing);
        }
    }
}

Booksleeve tiene su propio mecanismo para determinar un maestro y automáticamente pasará por error a otro servidor, y ahora estoy probando esto con SignalR.Chat.

En web.config, configuro la lista de servidores disponibles:

<add key="redis.serverList" value="dbcache1.local:6379,dbcache2.local:6379"/>

Luego en Application_Start():

        // Redis cluster server list
        string redisServerlist = ConfigurationManager.AppSettings["redis.serverList"];

        List<string> eventKeys = new List<string>();
        eventKeys.Add("SignalR.Redis.FailoverTest");
        GlobalHost.DependencyResolver.UseRedisCluster(redisServerlist, eventKeys);

Agregué dos métodos adicionales para Microsoft.AspNet.SignalR.Redis.DependencyResolverExtensions:

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, IEnumerable<string> eventKeys)
{
    return UseRedisCluster(resolver, serverList, db: 0, eventKeys: eventKeys);
}

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, int db, IEnumerable<string> eventKeys)
{
    var bus = new Lazy<RedisMessageBusCluster>(() => new RedisMessageBusCluster(serverList, db, eventKeys, resolver));
    resolver.Register(typeof(IMessageBus), () => bus.Value);

    return resolver;
}

Ahora el problema es que cuando tengo varios puntos de interrupción habilitados, hasta que se haya agregado un nombre de usuario, luego deshabilito todos los puntos de interrupción, la aplicación funciona como se esperaba. Sin embargo, con los puntos de interrupción deshabilitados desde el principio, parece haber alguna condición de carrera que puede fallar durante el proceso de conexión.

Así, en RedisMessageCluster():

    // Start the connection
    _connectTask = _connection.Open().Then(() =>
    {
        // Create a subscription channel in redis
        _channel = _connection.GetOpenSubscriberChannel();

        // Subscribe to the registered connections
        _channel.Subscribe(_keys, OnMessage);

        // Dirty hack but it seems like subscribe returns before the actual
        // subscription is properly setup in some cases
        while (_channel.SubscriptionCount == 0)
        {
            Thread.Sleep(500);
        }
    });

Intenté agregar un Task.Wait, e incluso uno adicional Sleep()(no se muestra arriba), que estaban esperando / etc, pero aún obtenían errores.

El error recurrente parece estar en Booksleeve.MessageQueue.cs~ ln 71:

A first chance exception of type 'System.InvalidOperationException' occurred in BookSleeve.dll
iisexpress.exe Error: 0 : SignalR exception thrown by Task: System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: The queue is closed
   at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71
   at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910
   at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826
   at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277
   at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821
   --- End of inner exception stack trace ---
---> (Inner Exception #0) System.InvalidOperationException: The queue is closed
   at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71
   at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910
   at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826
   at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277
   at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821<---



public void Enqueue(RedisMessage item, bool highPri)
{
    lock (stdPriority)
    {
        if (closed)
        {
            throw new InvalidOperationException("The queue is closed");
        }

Donde se lanza una excepción de cola cerrada.

Preveo otro problema: dado que la conexión de Redis se establece, Application_Start()puede haber algunos problemas en la "reconexión" a otro servidor. Sin embargo, creo que esto es válido cuando se usa el singular RedisConnection(), donde solo hay una conexión para elegir. Sin embargo, con la introducción de ConnectionUtils.Connect()Me gustaría saber de @dfowlerlos otros chicos de SignalR cómo se maneja este escenario en SignalR.

ElHaix
fuente
Voy a echar un vistazo, pero: lo primero que ocurre es que no necesitas llamar Openya que la conexión que tienes ya debería estar abierta. Sin embargo, no podré mirar de inmediato, ya que me estoy preparando para un vuelo
Marc Gravell
Creo que hay dos cuestiones aquí. 1) cómo Booksleeve se enfrenta a una conmutación por error; 2) Cómo utiliza SignalR los cursores para realizar un seguimiento de los clientes. Cuando se inicializa un nuevo bus de mensajes, todos los cursores de mb1 no salen en mb2. Por lo tanto, al restablecer el grupo de aplicaciones de SignalR, comenzará a funcionar, no antes, lo que obviamente no es una opción viable.
ElHaix
2
Enlace que describe cómo SignalR usa los cursores: stackoverflow.com/questions/13054592/…
ElHaix
Intente utilizar la última versión del bus de mensajes de redis. Admite el paso en una fábrica de conexiones y maneja el reintento de conectarse cuando el servidor deja de funcionar.
Davidfowl
¿Tiene un enlace para las notas de la versión? Gracias.
ElHaix

Respuestas:

13

El equipo de SignalR ahora ha implementado soporte para una fábrica de conexiones personalizada con StackExchange.Redis , el sucesor de BookSleeve, que admite conexiones Redis redundantes a través de ConnectionMultiplexer.

El problema inicial que encontré fue que a pesar de crear mis propios métodos de extensión en BookSleeve para aceptar una colección de servidores, la conmutación por error no era posible.

Ahora, con la evolución de BookSleeve a StackExchange.Redis, ahora podemos configurar la colección de servidores / puertos directamente en la Connectinicialización.

La nueva implementación es mucho más simple que el camino que estaba siguiendo, en la creación de un UseRedisClustermétodo, y la plomada de back-end ahora admite una verdadera conmutación por error:

var conn = ConnectionMultiplexer.Connect("redisServer1:6380,redisServer2:6380,redisServer3:6380,allowAdmin=true");

StackExchange.Redis también permite una configuración manual adicional como se describe en la Automatic and Manual Configurationsección de la documentación:

ConfigurationOptions config = new ConfigurationOptions
{
    EndPoints =
    {
        { "redis0", 6379 },
        { "redis1", 6380 }
    },
    CommandMap = CommandMap.Create(new HashSet<string>
    { // EXCLUDE a few commands
        "INFO", "CONFIG", "CLUSTER",
        "PING", "ECHO", "CLIENT"
    }, available: false),
    KeepAlive = 180,
    DefaultVersion = new Version(2, 8, 8),
    Password = "changeme"
};

En esencia, la capacidad de inicializar nuestro entorno de escalado horizontal de SignalR con una colección de servidores ahora resuelve el problema inicial.

ElHaix
fuente
¿Debería recompensar tu respuesta con una recompensa de 500 repeticiones? ;)
nicael
Bueno, si crees que esa es ahora la respuesta :)
ElHaix
@ElHaix, ya que hizo la pregunta, probablemente esté más calificado para decir si su respuesta es concluyente o si es solo una pieza en el rompecabezas; sugiero agregar una oración para indicar si resolvió su problema y posiblemente cómo
Lars Höppner
¿Entonces? Recompensa de premio? O puedo esperar hasta que atraiga más atención.
nicael
¿Me falta algo o es esto solo en una rama de características, no en el paquete nuget principal (2.1)? Además, parece que en la rama bug-stackexchange ( github.com/SignalR/SignalR/tree/bug-stackexchange/src/… ), todavía no hay una forma en la clase RedisScaleoutConfiguration de proporcionar su propio multiplexor.
Steve