La idea Parallel.ForEach()es que tienes un conjunto de hilos y cada hilo procesa parte de la colección. Como notó, esto no funciona con async- await, donde desea liberar el hilo durante la duración de la llamada asincrónica.
Podrías "arreglarlo" bloqueando los ForEach()hilos, pero eso derrota todo el punto de async- await.
Lo que podría hacer es usar TPL Dataflow en lugar de hacerlo Parallel.ForEach(), que también admite asíncronos Task.
Específicamente, su código podría escribirse usando un TransformBlockque transforma cada identificación en un Customeruso de asynclambda. Este bloque se puede configurar para ejecutarse en paralelo. Debería vincular ese bloque a uno ActionBlockque escriba cada uno Customeren la consola. Después de configurar la red de bloque, puede Post()cada identificación a TransformBlock.
En codigo:
var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getCustomerBlock.Post(id);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
Aunque probablemente desee limitar el paralelismo de la TransformBlocka una pequeña constante. Además, puede limitar la capacidad de los elementos TransformBlocky agregarlos de forma asincrónica SendAsync(), por ejemplo, si la colección es demasiado grande.
Como un beneficio adicional en comparación con su código (si funcionó) es que la escritura comenzará tan pronto como termine un solo elemento, y no espere hasta que todo el procesamiento haya finalizado.
Parallel.ForEach()dePost()elementos en paralelo no debería tener ningún efecto real.La respuesta de svick es (como siempre) excelente.
Sin embargo, creo que Dataflow es más útil cuando en realidad tiene grandes cantidades de datos para transferir. O cuando necesita una
asynccola compatible.En su caso, una solución más simple es usar el
asyncparalelismo de estilo:fuente
Parallel.ForEach()). Pero creo que actualmente es la mejor opción para hacer casi cualquierasynctrabajo con colecciones.ParallelOptionsva a ayudar? Solo es aplicable aParallel.For/ForEach/Invoke, que como el OP establecido no sirve de nada aquí.GetCustomermétodo está devolviendo aTask<T>, ¿se debería usarSelect(async i => { await repo.GetCustomer(i);});?Parallel.ForEachno es compatibleasync.Usar DataFlow como sugirió svick puede ser excesivo, y la respuesta de Stephen no proporciona los medios para controlar la concurrencia de la operación. Sin embargo, eso se puede lograr simplemente:
Las
ToArray()llamadas se pueden optimizar utilizando una matriz en lugar de una lista y reemplazando las tareas completadas, pero dudo que haga una gran diferencia en la mayoría de los escenarios. Ejemplo de uso según la pregunta del OP:El usuario de EDIT Fellow SO y el experto en TPL, Eli Arbel, me señaló un artículo relacionado de Stephen Toub . Como de costumbre, su implementación es elegante y eficiente:
fuente
Partitioner.Createusos de particiones fragmentadas, que proporciona elementos dinámicamente a las diferentes tareas para que el escenario que describió no tenga lugar. También tenga en cuenta que la partición estática (predeterminada) puede ser más rápida en algunos casos debido a una menor sobrecarga (específicamente sincronización). Para obtener más información, consulte: msdn.microsoft.com/en-us/library/dd997411(v=vs.110).aspx .Task.WhenAll) contendrá la excepción (dentro deAggregateException) y, en consecuencia, si dicha persona usaraawait, se lanzaría una excepción en el sitio de la llamada. Sin embargo,Task.WhenAllaún esperará a que se completen todas las tareas yGetPartitionsasignará elementos dinámicamente cuandopartition.MoveNextse llame hasta que no queden más elementos para procesar. Esto significa que, a menos que agregue su propio mecanismo para detener el procesamiento (por ejemploCancellationToken), no sucederá por sí solo.var current = partition.Currentantesawait bodyy luego usarlocurrenten la continuación (ContinueWith(t => { ... }).Puede ahorrar esfuerzo con el nuevo paquete AsyncEnumerator NuGet , que no existía hace 4 años cuando la pregunta se publicó originalmente. Le permite controlar el grado de paralelismo:
Descargo de responsabilidad: soy el autor de la biblioteca AsyncEnumerator, de código abierto y con licencia del MIT, y estoy publicando este mensaje solo para ayudar a la comunidad.
fuente
AsyncStreamseso y debo decir que es excelente. No puedo recomendar esta biblioteca lo suficiente.Envuelva el
Parallel.Foreachen ay enTask.Run()lugar delawaituso de la palabra clave[yourasyncmethod].Result(necesita hacer la tarea. Ejecutar para no bloquear el hilo de la interfaz de usuario)
Algo como esto:
fuente
Parallel.ForEachhacer el trabajo paralelo, que bloquea hasta que todo esté listo, y luego empuje todo a un hilo de fondo para tener una interfaz de usuario receptiva. ¿Algún problema con eso? Tal vez ese es un hilo dormido demasiado, pero es un código corto y legible.Task.RuncuandoTaskCompletionSourcees preferible.TaskCompletionSourcepreferible?awaitse puede mover al frente para guardar el nombre adicional de la variable.Esto debería ser bastante eficiente y más fácil que hacer que todo el flujo de datos TPL funcione:
fuente
awaitcomovar customers = await ids.SelectAsync(async i => { ... });:?Llego un poco tarde a la fiesta, pero es posible que desee considerar usar GetAwaiter.GetResult () para ejecutar su código asíncrono en contexto de sincronización, pero como se muestra a continuación;
fuente
Un método de extensión para esto que hace uso de SemaphoreSlim y también permite establecer el grado máximo de paralelismo
Uso de muestra:
fuente
Después de introducir un montón de métodos auxiliares, podrá ejecutar consultas paralelas con esta sintaxis simple:
Lo que sucede aquí es: dividimos la colección de origen en 10 fragmentos (
.Split(DegreeOfParallelism)), luego ejecutamos 10 tareas cada una procesando sus elementos uno por uno (.SelectManyAsync(...)) y los fusionamos nuevamente en una sola lista.Vale la pena mencionar que hay un enfoque más simple:
Pero necesita una precaución : si tiene una colección de origen que es demasiado grande, programará
Taskde inmediato un elemento para cada elemento, lo que puede causar impactos significativos en el rendimiento.Los métodos de extensión utilizados en los ejemplos anteriores tienen el siguiente aspecto:
fuente