Me gustaría conocer await
el resultado de BlockingCollection<T>.Take()
forma asincrónica, por lo que no bloqueo el hilo. Buscando algo como esto:
var item = await blockingCollection.TakeAsync();
Sé que podría hacer esto:
var item = await Task.Run(() => blockingCollection.Take());
pero eso mata un poco toda la idea, porque otro hilo (de ThreadPool
) se bloquea en su lugar.
¿Existe alguna alternativa?
await Task.Run(() => blockingCollection.Take())
la tarea se realizará en otro hilo y su hilo de IU no se bloqueará. ¿No es ese el punto?Task
API basada en la exportación de bibliotecas . Se puede utilizar desde ASP.NET, por ejemplo. El código en cuestión no se escalaría bien allí.ConfigureAwait
se usara después delRun()
? [ed. no importa, veo lo que estás diciendo ahora]Respuestas:
Hay cuatro alternativas que conozco.
El primero es Channels , que proporciona una cola segura para subprocesos que admite operaciones
Read
yWrite
operaciones asincrónicas . Los canales están altamente optimizados y, opcionalmente, admiten la eliminación de algunos elementos si se alcanza un umbral.El siguiente es
BufferBlock<T>
de TPL Dataflow . Si solo tiene un consumidor, puede usarOutputAvailableAsync
oReceiveAsync
, o simplemente vincularlo a unActionBlock<T>
. Para obtener más información, consulte mi blog .Los dos últimos son tipos que creé, disponibles en mi biblioteca AsyncEx .
AsyncCollection<T>
esasync
casi equivalente aBlockingCollection<T>
, capaz de envolver una colección de productor / consumidor concurrente comoConcurrentQueue<T>
oConcurrentBag<T>
. Puede usarloTakeAsync
para consumir elementos de la colección de forma asincrónica. Para obtener más información, consulte mi blog .AsyncProducerConsumerQueue<T>
es unaasync
cola de productor / consumidor más portátil y compatible. Puede utilizarloDequeueAsync
para consumir elementos de la cola de forma asincrónica. Para obtener más información, consulte mi blog .Las últimas tres de estas alternativas permiten operaciones de compra y venta sincrónicas y asincrónicas.
fuente
AsyncCollection.TryTakeAsync
, pero no puedo encontrarlo en laNito.AsyncEx.Coordination.dll 5.0.0.0
versión descargada (última versión). El Nito.AsyncEx.Concurrent.dll al que se hace referencia no existe en el paquete . ¿Qué me estoy perdiendo?while ((result = await collection.TryTakeAsync()).Success) { }
. ¿Por qué fue eliminado?... o puedes hacer esto:
using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; public class AsyncQueue<T> { private readonly SemaphoreSlim _sem; private readonly ConcurrentQueue<T> _que; public AsyncQueue() { _sem = new SemaphoreSlim(0); _que = new ConcurrentQueue<T>(); } public void Enqueue(T item) { _que.Enqueue(item); _sem.Release(); } public void EnqueueRange(IEnumerable<T> source) { var n = 0; foreach (var item in source) { _que.Enqueue(item); n++; } _sem.Release(n); } public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken)) { for (; ; ) { await _sem.WaitAsync(cancellationToken); T item; if (_que.TryDequeue(out item)) { return item; } } } }
Cola FIFO asincrónica simple y completamente funcional.
fuente
for
? si se libera el semáforo, la cola tiene al menos un elemento para retirar, ¿no?TryDequeue
are, retorna con un valor o no retorna en absoluto. Técnicamente, si tiene más de 1 lector, el mismo lector puede consumir dos (o más) elementos antes de que cualquier otro lector esté completamente despierto. Un éxitoWaitAsync
es solo una señal de que puede haber artículos en la cola para consumir, no es una garantía.If the value of the CurrentCount property is zero before this method is called, the method also allows releaseCount threads or tasks blocked by a call to the Wait or WaitAsync method to enter the semaphore.
de docs.microsoft.com/en-us/dotnet/api/… ¿Cómo es posible queWaitAsync
no haya elementos en cola correctamente ? Si la liberación de N despierta a más consumidores de N, los quesemaphore
se rompen. ¿No es así?Aquí hay una implementación muy básica de un
BlockingCollection
que admite la espera, con muchas características faltantes. Utiliza laAsyncEnumerable
biblioteca, que hace posible la enumeración asincrónica para versiones de C # anteriores a 8.0.public class AsyncBlockingCollection<T> { // Missing features: cancellation, boundedCapacity, TakeAsync private Queue<T> _queue = new Queue<T>(); private SemaphoreSlim _semaphore = new SemaphoreSlim(0); private int _consumersCount = 0; private bool _isAddingCompleted; public void Add(T item) { lock (_queue) { if (_isAddingCompleted) throw new InvalidOperationException(); _queue.Enqueue(item); } _semaphore.Release(); } public void CompleteAdding() { lock (_queue) { if (_isAddingCompleted) return; _isAddingCompleted = true; if (_consumersCount > 0) _semaphore.Release(_consumersCount); } } public IAsyncEnumerable<T> GetConsumingEnumerable() { lock (_queue) _consumersCount++; return new AsyncEnumerable<T>(async yield => { while (true) { lock (_queue) { if (_queue.Count == 0 && _isAddingCompleted) break; } await _semaphore.WaitAsync(); bool hasItem; T item = default; lock (_queue) { hasItem = _queue.Count > 0; if (hasItem) item = _queue.Dequeue(); } if (hasItem) await yield.ReturnAsync(item); } }); } }
Ejemplo de uso:
var abc = new AsyncBlockingCollection<int>(); var producer = Task.Run(async () => { for (int i = 1; i <= 10; i++) { await Task.Delay(100); abc.Add(i); } abc.CompleteAdding(); }); var consumer = Task.Run(async () => { await abc.GetConsumingEnumerable().ForEachAsync(async item => { await Task.Delay(200); await Console.Out.WriteAsync(item + " "); }); }); await Task.WhenAll(producer, consumer);
Salida:
Actualización: con el lanzamiento de C # 8, la enumeración asincrónica se ha convertido en una característica de lenguaje incorporada. Las clases requeridas (
IAsyncEnumerable
,IAsyncEnumerator
) están integradas en .NET Core 3.0 y se ofrecen como un paquete para .NET Framework 4.6.1+ ( Microsoft.Bcl.AsyncInterfaces ).Aquí hay una
GetConsumingEnumerable
implementación alternativa , con la nueva sintaxis de C # 8:public async IAsyncEnumerable<T> GetConsumingEnumerable() { lock (_queue) _consumersCount++; while (true) { lock (_queue) { if (_queue.Count == 0 && _isAddingCompleted) break; } await _semaphore.WaitAsync(); bool hasItem; T item = default; lock (_queue) { hasItem = _queue.Count > 0; if (hasItem) item = _queue.Dequeue(); } if (hasItem) yield return item; } }
Nótese la coexistencia de
await
yyield
en el mismo método.Ejemplo de uso (C # 8):
var consumer = Task.Run(async () => { await foreach (var item in abc.GetConsumingEnumerable()) { await Task.Delay(200); await Console.Out.WriteAsync(item + " "); } });
Tenga en cuenta el
await
antes delforeach
.fuente
AsyncBlockingCollection
tiene sentido. Algo no puede ser asincrónico y bloqueante al mismo tiempo, ¡ya que estos dos conceptos son exactamente opuestos!Si no le importa un pequeño truco, puede probar estas extensiones.
public static async Task AddAsync<TEntity>( this BlockingCollection<TEntity> Bc, TEntity item, CancellationToken abortCt) { while (true) { try { if (Bc.TryAdd(item, 0, abortCt)) return; else await Task.Delay(100, abortCt); } catch (Exception) { throw; } } } public static async Task<TEntity> TakeAsync<TEntity>( this BlockingCollection<TEntity> Bc, CancellationToken abortCt) { while (true) { try { TEntity item; if (Bc.TryTake(out item, 0, abortCt)) return item; else await Task.Delay(100, abortCt); } catch (Exception) { throw; } } }
fuente