¿Crear una cola de bloqueo <T> en .NET?

163

Tengo un escenario en el que tengo varios subprocesos que se agregan a una cola y múltiples subprocesos que se leen desde la misma cola. Si la cola alcanza un tamaño específico, todos los hilos que llenen la cola se bloquearán al agregarlos hasta que se elimine un elemento de la cola.

La solución a continuación es lo que estoy usando en este momento y mi pregunta es: ¿cómo se puede mejorar esto? ¿Hay algún objeto que ya habilite este comportamiento en el BCL que debería estar usando?

internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
    //todo: might be worth changing this into a proper QUEUE

    private AutoResetEvent _FullEvent = new AutoResetEvent(false);

    internal T this[int i]
    {
        get { return (T) List[i]; }
    }

    private int _MaxSize;
    internal int MaxSize
    {
        get { return _MaxSize; }
        set
        {
            _MaxSize = value;
            checkSize();
        }
    }

    internal BlockingCollection(int maxSize)
    {
        MaxSize = maxSize;
    }

    internal void Add(T item)
    {
        Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(item);

        Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    internal void Remove(T item)
    {
        lock (List)
        {
            List.Remove(item);
        }

        Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
    }

    protected override void OnRemoveComplete(int index, object value)
    {
        checkSize();
        base.OnRemoveComplete(index, value);
    }

    internal new IEnumerator GetEnumerator()
    {
        return List.GetEnumerator();
    }

    private void checkSize()
    {
        if (Count < MaxSize)
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        else
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }
}
Eric Schoonover
fuente
55
.Net cómo ha incorporado las clases para ayudar con este escenario. La mayoría de las respuestas enumeradas aquí son obsoletas. Vea las respuestas más recientes al final. Mira en colecciones de bloqueo seguras para subprocesos. Las respuestas pueden ser obsoletas, ¡pero sigue siendo una buena pregunta!
Tom A
Creo que sigue siendo una buena idea aprender sobre Monitor.Wait / Pulse / PulseAll incluso si tenemos nuevas clases concurrentes en .NET.
thewpfguy
1
De acuerdo con @thewpfguy. Querrá comprender los mecanismos básicos de bloqueo detrás de escena. También vale la pena señalar que Systems.Collections.Concurrent no existía hasta abril de 2010 y solo en Visual Studio 2010 y versiones posteriores. Definitivamente no es una opción para los holdouts VS2008 ...
Vic
Si está leyendo esto ahora, eche un vistazo a System.Threading.Channels para una implementación multi-escritor / multi-lector, acotada, opcionalmente bloqueante de esto para .NET Core y .NET Standard.
Mark Rendle

Respuestas:

200

Eso parece muy inseguro (muy poca sincronización); ¿Qué tal algo como:

class SizeQueue<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int maxSize;
    public SizeQueue(int maxSize) { this.maxSize = maxSize; }

    public void Enqueue(T item)
    {
        lock (queue)
        {
            while (queue.Count >= maxSize)
            {
                Monitor.Wait(queue);
            }
            queue.Enqueue(item);
            if (queue.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.PulseAll(queue);
            }
        }
    }
    public T Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            T item = queue.Dequeue();
            if (queue.Count == maxSize - 1)
            {
                // wake up any blocked enqueue
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }
}

(editar)

En realidad, desearía una forma de cerrar la cola para que los lectores comiencen a salir limpiamente, tal vez algo así como una bandera de bool, si se establece, una cola vacía simplemente regresa (en lugar de bloquear):

bool closing;
public void Close()
{
    lock(queue)
    {
        closing = true;
        Monitor.PulseAll(queue);
    }
}
public bool TryDequeue(out T value)
{
    lock (queue)
    {
        while (queue.Count == 0)
        {
            if (closing)
            {
                value = default(T);
                return false;
            }
            Monitor.Wait(queue);
        }
        value = queue.Dequeue();
        if (queue.Count == maxSize - 1)
        {
            // wake up any blocked enqueue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}
Marc Gravell
fuente
1
¿Qué hay de cambiar la espera a WaitAny y pasar un control de espera terminado en la construcción ...
Sam Saffron
1
@ La optimización de Marc-, si esperaba que la cola siempre alcanzara la capacidad, sería pasar el valor maxSize al constructor de la Cola <T>. Puede agregar otro constructor a su clase para acomodarlo.
RichardOD
3
¿Por qué SizeQueue, por qué no FixedSizeQueue?
mindless.panda
44
@Lasse: libera los bloqueos durante Wait, por lo que otros subprocesos pueden adquirirlo. Recupera las cerraduras cuando se despierta.
Marc Gravell
1
Agradable, como dije, había algo que no estaba entendiendo :) Eso me hace querer volver a visitar parte de mi código de hilo ...
Lasse V. Karlsen
14

"¿Cómo se puede mejorar esto?"

Bueno, debe mirar cada método en su clase y considerar qué sucedería si otro hilo llamara simultáneamente a ese método o cualquier otro método. Por ejemplo, coloca un bloqueo en el método Eliminar, pero no en el método Agregar. ¿Qué sucede si un hilo agrega al mismo tiempo que otro hilo elimina? Cosas malas.

También considere que un método puede devolver un segundo objeto que proporciona acceso a los datos internos del primer objeto, por ejemplo, GetEnumerator. Imagine que un hilo está pasando por ese enumerador, otro hilo está modificando la lista al mismo tiempo. No está bien.

Una buena regla general es hacer que esto sea más sencillo, reduciendo el número de métodos en la clase al mínimo absoluto.

En particular, no herede otra clase de contenedor, porque expondrá todos los métodos de esa clase, proporcionando una forma para que la persona que llama corrompa los datos internos o vea cambios parcialmente completos en los datos (igual de malo, porque los datos parece corrompido en ese momento). Oculta todos los detalles y sé completamente despiadado sobre cómo permites el acceso a ellos.

Le recomiendo encarecidamente que use soluciones estándar: obtenga un libro sobre subprocesos o use una biblioteca de terceros. De lo contrario, dado lo que está intentando, va a depurar su código durante mucho tiempo.

Además, ¿no tendría más sentido que Eliminar devuelva un elemento (por ejemplo, el que se agregó primero, ya que es una cola), en lugar de que la persona que llama elija un elemento específico? Y cuando la cola está vacía, tal vez Eliminar también debería bloquear.

Actualización: ¡la respuesta de Marc realmente implementa todas estas sugerencias! :) Pero lo dejaré aquí, ya que puede ser útil entender por qué su versión es una mejora.

Daniel Earwicker
fuente
12

Puede usar BlockingCollection y ConcurrentQueue en el espacio de nombres System.Collections.Concurrent

 public class ProducerConsumerQueue<T> : BlockingCollection<T>
{
    /// <summary>
    /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
    /// </summary>
    public ProducerConsumerQueue()  
        : base(new ConcurrentQueue<T>())
    {
    }

  /// <summary>
  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
  /// </summary>
  /// <param name="maxSize"></param>
    public ProducerConsumerQueue(int maxSize)
        : base(new ConcurrentQueue<T>(), maxSize)
    {
    }



}
Andreas
fuente
3
BlockingCollection se establece de manera predeterminada en Queue. Entonces, no creo que esto sea necesario.
Curtis White
¿BlockingCollection conserva el pedido como una cola?
joelc
Sí, cuando se inicializa con un ConcurrentQueue
Andreas
6

Acabo de hacer esto usando las extensiones reactivas y recordé esta pregunta:

public class BlockingQueue<T>
{
    private readonly Subject<T> _queue;
    private readonly IEnumerator<T> _enumerator;
    private readonly object _sync = new object();

    public BlockingQueue()
    {
        _queue = new Subject<T>();
        _enumerator = _queue.GetEnumerator();
    }

    public void Enqueue(T item)
    {
        lock (_sync)
        {
            _queue.OnNext(item);
        }
    }

    public T Dequeue()
    {
        _enumerator.MoveNext();
        return _enumerator.Current;
    }
}

No necesariamente del todo seguro, pero muy simple.

Mark Rendle
fuente
¿Qué es el sujeto <t>? No tengo ningún solucionador para su espacio de nombres.
theJerm
Es parte de las extensiones reactivas.
Mark Rendle
No es una respuesta Esto no responde a la pregunta en absoluto.
makhdumi
5

Esto es lo que encontré para una cola de bloqueo limitada segura para subprocesos.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

public class BlockingBuffer<T>
{
    private Object t_lock;
    private Semaphore sema_NotEmpty;
    private Semaphore sema_NotFull;
    private T[] buf;

    private int getFromIndex;
    private int putToIndex;
    private int size;
    private int numItems;

    public BlockingBuffer(int Capacity)
    {
        if (Capacity <= 0)
            throw new ArgumentOutOfRangeException("Capacity must be larger than 0");

        t_lock = new Object();
        buf = new T[Capacity];
        sema_NotEmpty = new Semaphore(0, Capacity);
        sema_NotFull = new Semaphore(Capacity, Capacity);
        getFromIndex = 0;
        putToIndex = 0;
        size = Capacity;
        numItems = 0;
    }

    public void put(T item)
    {
        sema_NotFull.WaitOne();
        lock (t_lock)
        {
            while (numItems == size)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            buf[putToIndex++] = item;

            if (putToIndex == size)
                putToIndex = 0;

            numItems++;

            Monitor.Pulse(t_lock);

        }
        sema_NotEmpty.Release();


    }

    public T take()
    {
        T item;

        sema_NotEmpty.WaitOne();
        lock (t_lock)
        {

            while (numItems == 0)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            item = buf[getFromIndex++];

            if (getFromIndex == size)
                getFromIndex = 0;

            numItems--;

            Monitor.Pulse(t_lock);

        }
        sema_NotFull.Release();

        return item;
    }
}
Kevin
fuente
¿Podría proporcionar algunos ejemplos de código de cómo pondría en cola algunas funciones de subproceso utilizando esta biblioteca, incluida la forma en que instanciaría esta clase?
theJerm
Esta pregunta / respuesta está un poco anticuada. Debe mirar el espacio de nombres System.Collections.Concurrent para bloquear el soporte de colas.
Kevin
2

No he explorado completamente el TPL pero pueden tener algo que se adapte a sus necesidades, o al menos, algo de forraje Reflector para inspirarse.

Espero que ayude.

TheMissingLINQ
fuente
Soy consciente de que esto es antiguo, pero mi comentario es para los recién llegados a SO, ya que OP ya lo sabe hoy. Esta no es una respuesta, esto debería haber sido un comentario.
John Demetriou
0

Bueno, podrías mirar la System.Threading.Semaphoreclase. Aparte de eso, no, tienes que hacer esto tú mismo. AFAIK no existe tal colección incorporada.

Vilx-
fuente
Observé eso para limitar la cantidad de subprocesos que están accediendo a un recurso, pero no le permite bloquear todo el acceso a un recurso en función de alguna condición (como Collection.Count). AFAIK de todos modos
Eric Schoonover
Bueno, haces esa parte tú mismo, como lo haces ahora. Simplemente, en lugar de MaxSize y _FullEvent, tiene el semáforo, que inicializa con el recuento correcto en el constructor. Luego, en cada Agregar / Eliminar, llama a WaitForOne () o Release ().
Vilx-
No es muy diferente de lo que tienes ahora. Simplemente más simple en mi humilde opinión.
Vilx-
¿Me puede dar un ejemplo que muestre esto funcionando? No vi cómo ajustar dinámicamente el tamaño de un semáforo que requiere este escenario. Dado que debe poder bloquear todos los recursos solo si la cola está llena.
Eric Schoonover
Ahh, cambiando de tamaño! ¿Por qué no lo dijiste de inmediato? OK, entonces un semáforo no es para ti. ¡Buena suerte con este enfoque!
Vilx-
-1

Si desea un rendimiento máximo, que permita la lectura de varios lectores y la escritura de un solo escritor, BCL tiene algo llamado ReaderWriterLockSlim que debería ayudar a reducir su código ...

DavidN
fuente
Sin embargo, no quiero que ninguno pueda escribir si la cola está llena.
Eric Schoonover
Entonces lo combinas con un candado. Aquí hay algunos ejemplos muy buenos albahari.com/threading/part2.aspx#_ProducerConsumerQWaitHandle albahari.com/threading/part4.aspx
DavidN
3
Con cola / cola, todo el mundo es escritor ... un bloqueo exclusivo quizás sea más pragmático
Marc Gravell
Soy consciente de que esto es antiguo, pero mi comentario es para los recién llegados a SO, ya que OP ya lo sabe hoy. Esta no es una respuesta, esto debería haber sido un comentario.
John Demetriou