La idea Parallel.ForEach()
es que tienes un conjunto de hilos y cada hilo procesa parte de la colección. Como notó, esto no funciona con async
- await
, donde desea liberar el hilo durante la duración de la llamada asincrónica.
Podrías "arreglarlo" bloqueando los ForEach()
hilos, pero eso derrota todo el punto de async
- await
.
Lo que podría hacer es usar TPL Dataflow en lugar de hacerlo Parallel.ForEach()
, que también admite asíncronos Task
.
Específicamente, su código podría escribirse usando un TransformBlock
que transforma cada identificación en un Customer
uso de async
lambda. Este bloque se puede configurar para ejecutarse en paralelo. Debería vincular ese bloque a uno ActionBlock
que escriba cada uno Customer
en la consola. Después de configurar la red de bloque, puede Post()
cada identificación a TransformBlock
.
En codigo:
var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getCustomerBlock.Post(id);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
Aunque probablemente desee limitar el paralelismo de la TransformBlock
a una pequeña constante. Además, puede limitar la capacidad de los elementos TransformBlock
y agregarlos de forma asincrónica SendAsync()
, por ejemplo, si la colección es demasiado grande.
Como un beneficio adicional en comparación con su código (si funcionó) es que la escritura comenzará tan pronto como termine un solo elemento, y no espere hasta que todo el procesamiento haya finalizado.
Parallel.ForEach()
dePost()
elementos en paralelo no debería tener ningún efecto real.La respuesta de svick es (como siempre) excelente.
Sin embargo, creo que Dataflow es más útil cuando en realidad tiene grandes cantidades de datos para transferir. O cuando necesita una
async
cola compatible.En su caso, una solución más simple es usar el
async
paralelismo de estilo:fuente
Parallel.ForEach()
). Pero creo que actualmente es la mejor opción para hacer casi cualquierasync
trabajo con colecciones.ParallelOptions
va a ayudar? Solo es aplicable aParallel.For/ForEach/Invoke
, que como el OP establecido no sirve de nada aquí.GetCustomer
método está devolviendo aTask<T>
, ¿se debería usarSelect(async i => { await repo.GetCustomer(i);});
?Parallel.ForEach
no es compatibleasync
.Usar DataFlow como sugirió svick puede ser excesivo, y la respuesta de Stephen no proporciona los medios para controlar la concurrencia de la operación. Sin embargo, eso se puede lograr simplemente:
Las
ToArray()
llamadas se pueden optimizar utilizando una matriz en lugar de una lista y reemplazando las tareas completadas, pero dudo que haga una gran diferencia en la mayoría de los escenarios. Ejemplo de uso según la pregunta del OP:El usuario de EDIT Fellow SO y el experto en TPL, Eli Arbel, me señaló un artículo relacionado de Stephen Toub . Como de costumbre, su implementación es elegante y eficiente:
fuente
Partitioner.Create
usos de particiones fragmentadas, que proporciona elementos dinámicamente a las diferentes tareas para que el escenario que describió no tenga lugar. También tenga en cuenta que la partición estática (predeterminada) puede ser más rápida en algunos casos debido a una menor sobrecarga (específicamente sincronización). Para obtener más información, consulte: msdn.microsoft.com/en-us/library/dd997411(v=vs.110).aspx .Task.WhenAll
) contendrá la excepción (dentro deAggregateException
) y, en consecuencia, si dicha persona usaraawait
, se lanzaría una excepción en el sitio de la llamada. Sin embargo,Task.WhenAll
aún esperará a que se completen todas las tareas yGetPartitions
asignará elementos dinámicamente cuandopartition.MoveNext
se llame hasta que no queden más elementos para procesar. Esto significa que, a menos que agregue su propio mecanismo para detener el procesamiento (por ejemploCancellationToken
), no sucederá por sí solo.var current = partition.Current
antesawait body
y luego usarlocurrent
en la continuación (ContinueWith(t => { ... }
).Puede ahorrar esfuerzo con el nuevo paquete AsyncEnumerator NuGet , que no existía hace 4 años cuando la pregunta se publicó originalmente. Le permite controlar el grado de paralelismo:
Descargo de responsabilidad: soy el autor de la biblioteca AsyncEnumerator, de código abierto y con licencia del MIT, y estoy publicando este mensaje solo para ayudar a la comunidad.
fuente
AsyncStreams
eso y debo decir que es excelente. No puedo recomendar esta biblioteca lo suficiente.Envuelva el
Parallel.Foreach
en ay enTask.Run()
lugar delawait
uso de la palabra clave[yourasyncmethod].Result
(necesita hacer la tarea. Ejecutar para no bloquear el hilo de la interfaz de usuario)
Algo como esto:
fuente
Parallel.ForEach
hacer el trabajo paralelo, que bloquea hasta que todo esté listo, y luego empuje todo a un hilo de fondo para tener una interfaz de usuario receptiva. ¿Algún problema con eso? Tal vez ese es un hilo dormido demasiado, pero es un código corto y legible.Task.Run
cuandoTaskCompletionSource
es preferible.TaskCompletionSource
preferible?await
se puede mover al frente para guardar el nombre adicional de la variable.Esto debería ser bastante eficiente y más fácil que hacer que todo el flujo de datos TPL funcione:
fuente
await
comovar customers = await ids.SelectAsync(async i => { ... });
:?Llego un poco tarde a la fiesta, pero es posible que desee considerar usar GetAwaiter.GetResult () para ejecutar su código asíncrono en contexto de sincronización, pero como se muestra a continuación;
fuente
Un método de extensión para esto que hace uso de SemaphoreSlim y también permite establecer el grado máximo de paralelismo
Uso de muestra:
fuente
Después de introducir un montón de métodos auxiliares, podrá ejecutar consultas paralelas con esta sintaxis simple:
Lo que sucede aquí es: dividimos la colección de origen en 10 fragmentos (
.Split(DegreeOfParallelism)
), luego ejecutamos 10 tareas cada una procesando sus elementos uno por uno (.SelectManyAsync(...)
) y los fusionamos nuevamente en una sola lista.Vale la pena mencionar que hay un enfoque más simple:
Pero necesita una precaución : si tiene una colección de origen que es demasiado grande, programará
Task
de inmediato un elemento para cada elemento, lo que puede causar impactos significativos en el rendimiento.Los métodos de extensión utilizados en los ejemplos anteriores tienen el siguiente aspecto:
fuente