Tengo un código de biblioteca (redes de socket) que proporciona una TaskAPI basada en respuestas pendientes a solicitudes, según TaskCompletionSource<T>. Sin embargo, hay una molestia en el TPL porque parece imposible evitar las continuaciones sincrónicas. Lo que me gustaría poder hacer es:
- decirle 
TaskCompletionSource<T>que no debe permitir que las personas que llaman se conecten conTaskContinuationOptions.ExecuteSynchronously, o - establezca el resultado ( 
SetResult/TrySetResult) de una manera que especifique queTaskContinuationOptions.ExecuteSynchronouslydebe ignorarse, usando el grupo en su lugar 
Específicamente, el problema que tengo es que los datos entrantes están siendo procesados por un lector dedicado, y si una persona que llama puede conectarse TaskContinuationOptions.ExecuteSynchronously, puede detener al lector (lo que afecta a más que solo a ellos). Anteriormente, he trabajado en torno a esto mediante algún pirata informático que detecta si hay continuaciones presentes y, si lo están, empuja la finalización al ThreadPool, sin embargo, esto tiene un impacto significativo si la persona que llama ha saturado su cola de trabajo, ya que la finalización no se procesará. en el momento oportuno. Si están usando Task.Wait()(o similar), entonces esencialmente se bloquearán ellos mismos. Asimismo, esta es la razón por la que el lector está en un hilo dedicado en lugar de utilizar trabajadores.
Entonces; antes de intentar fastidiar al equipo de TPL: ¿me estoy perdiendo una opción?
Puntos clave:
- No quiero que las personas que llaman externas puedan secuestrar mi hilo
 - No puedo usar el 
ThreadPoolcomo implementación, ya que debe funcionar cuando el grupo está saturado 
El siguiente ejemplo produce resultados (el pedido puede variar según el tiempo):
Continuation on: Main thread
Press [return]
Continuation on: Thread pool
El problema es el hecho de que una persona que llama al azar logró obtener una continuación en "Hilo principal". En el código real, esto interrumpiría al lector principal; ¡cosas malas!
Código:
using System;
using System.Threading;
using System.Threading.Tasks;
static class Program
{
    static void Identify()
    {
        var thread = Thread.CurrentThread;
        string name = thread.IsThreadPoolThread
            ? "Thread pool" : thread.Name;
        if (string.IsNullOrEmpty(name))
            name = "#" + thread.ManagedThreadId;
        Console.WriteLine("Continuation on: " + name);
    }
    static void Main()
    {
        Thread.CurrentThread.Name = "Main thread";
        var source = new TaskCompletionSource<int>();
        var task = source.Task;
        task.ContinueWith(delegate {
            Identify();
        });
        task.ContinueWith(delegate {
            Identify();
        }, TaskContinuationOptions.ExecuteSynchronously);
        source.TrySetResult(123);
        Console.WriteLine("Press [return]");
        Console.ReadLine();
    }
}
    fuente

TaskCompletionSourcecon mi propia API para evitar llamadas directas aContinueWith, ya que niTaskCompletionSource, niTaskno se adapta bien a la herencia de ellos.Taskque está expuesto, no elTaskCompletionSource. Eso (exponer una API diferente) es técnicamente una opción, pero es algo bastante extremo para hacer solo por esto ... No estoy seguro de que lo justifiqueThreadPoolpara esto (que ya mencioné, causa problemas), o tiene un hilo de "continuaciones pendientes" dedicado, y luego ellos (continuaciones conExecuteSynchronouslyespecificado) pueden secuestrar eso uno en su lugar , lo que causa exactamente el mismo problema, porque significa que las continuaciones de otros mensajes se pueden estancar, lo que nuevamente afecta a múltiples personas que llamanRespuestas:
Nuevo en .NET 4.6:
.NET 4.6 contiene un nuevo
TaskCreationOptions:RunContinuationsAsynchronously.Ya que está dispuesto a usar Reflection para acceder a campos privados ...
Puede marcar la Tarea de TCS con la
TASK_STATE_THREAD_WAS_ABORTEDbandera, lo que provocaría que todas las continuaciones no estén alineadas.const int TASK_STATE_THREAD_WAS_ABORTED = 134217728; var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance); stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);Editar:
En lugar de usar Reflection emit, te sugiero que uses expresiones. Esto es mucho más legible y tiene la ventaja de ser compatible con PCL:
var taskParameter = Expression.Parameter(typeof (Task)); const string stateFlagsFieldName = "m_stateFlags"; var setter = Expression.Lambda<Action<Task>>( Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName), Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName), Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();Sin usar Reflection:
Si alguien está interesado, he descubierto una manera de hacer esto sin Reflection, pero también es un poco "sucio" y, por supuesto, tiene una penalización de rendimiento no despreciable:
try { Thread.CurrentThread.Abort(); } catch (ThreadAbortException) { source.TrySetResult(123); Thread.ResetAbort(); }fuente
TaskCreationOptions.DoNotInline- y ni siquiera necesitaría un cambio de firma ctor aTaskCompletionSourceILGeneratoretc: github.com/StackExchange/StackExchange.Redis/blob/master/…No creo que haya nada en TPL que proporcione un control API explícito sobre las
TaskCompletionSource.SetResultcontinuaciones. Decidí mantener mi respuesta inicial para controlar este comportamiento paraasync/awaitescenarios.Aquí hay otra solución que impone asincrónica
ContinueWith, si latcs.SetResultcontinuación activada tiene lugar en el mismo hilo en el queSetResultse llamó:public static class TaskExt { static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks = new ConcurrentDictionary<Task, Thread>(); // SetResultAsync static public void SetResultAsync<TResult>( this TaskCompletionSource<TResult> @this, TResult result) { s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread); try { @this.SetResult(result); } finally { Thread thread; s_tcsTasks.TryRemove(@this.Task, out thread); } } // ContinueWithAsync, TODO: more overrides static public Task ContinueWithAsync<TResult>( this Task<TResult> @this, Action<Task<TResult>> action, TaskContinuationOptions continuationOptions = TaskContinuationOptions.None) { return @this.ContinueWith((Func<Task<TResult>, Task>)(t => { Thread thread = null; s_tcsTasks.TryGetValue(t, out thread); if (Thread.CurrentThread == thread) { // same thread which called SetResultAsync, avoid potential deadlocks // using thread pool return Task.Run(() => action(t)); // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread) // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning); } else { // continue on the same thread var task = new Task(() => action(t)); task.RunSynchronously(); return Task.FromResult(task); } }), continuationOptions).Unwrap(); } }Actualizado para abordar el comentario:
No sabía que no controlas a la persona que llama. Sin embargo, si no lo controla, probablemente tampoco esté pasando el
TaskCompletionSourceobjeto directamente a la persona que llama. Lógicamente, estarías pasando la parte del token , es decirtcs.Task. En cuyo caso, la solución podría ser aún más fácil, agregando otro método de extensión al anterior:// ImposeAsync, TODO: more overrides static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this) { return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent => { Thread thread = null; s_tcsTasks.TryGetValue(antecedent, out thread); if (Thread.CurrentThread == thread) { // continue on a pool thread return antecedent.ContinueWith(t => t, TaskContinuationOptions.None).Unwrap(); } else { return antecedent; } }), TaskContinuationOptions.ExecuteSynchronously).Unwrap(); }Utilizar:
// library code var source = new TaskCompletionSource<int>(); var task = source.Task.ImposeAsync(); // ... // client code task.ContinueWith(delegate { Identify(); }, TaskContinuationOptions.ExecuteSynchronously); // ... // library code source.SetResultAsync(123);Esto realmente funciona para ambos
awaityContinueWith( violín ) y está libre de trucos de reflexión.fuente
¿Qué tal en lugar de hacer
var task = source.Task;tu haces esto en su lugar
var task = source.Task.ContinueWith<Int32>( x => x.Result );Por lo tanto, siempre está agregando una continuación que se ejecutará de forma asincrónica y luego no importa si los suscriptores quieren una continuación en el mismo contexto. Es una especie de curry la tarea, ¿no?
fuente
ContinueWithyawaitnormalmente se esfuerzan por evitar (mediante la comprobación de que ya completa, etc) - y ya que esto obligaría a todo en el trabajadores, en realidad agravaría la situación. Es una idea positiva, y se las agradezco, pero no ayudará en este escenario.si puede y está dispuesto a utilizar la reflexión, debería hacerlo;
public static class MakeItAsync { static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result) { var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance); var continuations = (List<object>)continuation.GetValue(source.Task); foreach (object c in continuations) { var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance); var options = (TaskContinuationOptions)option.GetValue(c); options &= ~TaskContinuationOptions.ExecuteSynchronously; option.SetValue(c, options); } source.TrySetResult(result); } }fuente
Task.Run(() => tcs.SetResult(result))?Actualizado , publiqué una respuesta separada para tratar
ContinueWithen lugar deawait(porqueContinueWithno le importa el contexto de sincronización actual).Se puede usar un contexto de sincronización tonto como para imponer la asincronía en la continuación activa llamando
SetResult/SetCancelled/SetExceptionaTaskCompletionSource. Creo que el contexto de sincronización actual (en el punto deawait tcs.Task) es el criterio que usa TPL para decidir si hacer dicha continuación sincrónica o asincrónica.Lo siguiente funciona para mí:
if (notifyAsync) { tcs.SetResultAsync(null); } else { tcs.SetResult(null); }SetResultAsyncse implementa así:public static class TaskExt { static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result) { FakeSynchronizationContext.Execute(() => tcs.SetResult(result)); } // FakeSynchronizationContext class FakeSynchronizationContext : SynchronizationContext { private static readonly ThreadLocal<FakeSynchronizationContext> s_context = new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext()); private FakeSynchronizationContext() { } public static FakeSynchronizationContext Instance { get { return s_context.Value; } } public static void Execute(Action action) { var savedContext = SynchronizationContext.Current; SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance); try { action(); } finally { SynchronizationContext.SetSynchronizationContext(savedContext); } } // SynchronizationContext methods public override SynchronizationContext CreateCopy() { return this; } public override void OperationStarted() { throw new NotImplementedException("OperationStarted"); } public override void OperationCompleted() { throw new NotImplementedException("OperationCompleted"); } public override void Post(SendOrPostCallback d, object state) { throw new NotImplementedException("Post"); } public override void Send(SendOrPostCallback d, object state) { throw new NotImplementedException("Send"); } } }SynchronizationContext.SetSynchronizationContextes muy barato en términos de gastos generales que agrega. De hecho, la implementación de WPFDispatcher.BeginInvokeadopta un enfoque muy similar .TPL compara el contexto de sincronización de destino en el punto de
awaitcon el del punto detcs.SetResult. Si el contexto de sincronización es el mismo (o no hay contexto de sincronización en ambos lugares), la continuación se llama directamente, sincrónicamente. De lo contrario, se poneSynchronizationContext.Posten cola usando el contexto de sincronización de destino, es decir, elawaitcomportamiento normal . Lo que hace este enfoque es siempre imponer elSynchronizationContext.Postcomportamiento (o una continuación del hilo del grupo si no hay un contexto de sincronización de destino).Actualizado , esto no funcionará
task.ContinueWith, porqueContinueWithno le importa el contexto de sincronización actual. Sin embargo, funciona paraawait task( violín ). También funciona paraawait task.ConfigureAwait(false).OTOH, este enfoque funciona
ContinueWith.fuente
tcs.SetResultllamada. Es un poco se convierte atómica y seguro para subprocesos esta manera, debido a la continuación en sí va a suceder en cualquiera otro hilo piscina o en la sincronización inicial. contexto capturado enawait tcs.Task. Y enSynchronizationContext.SetSynchronizationContextsí mismo es muy barato, mucho más barato que un interruptor de hilo.ThreadPool. Con esta solución, el TPL de hecho se utilizaráThreadPool, si no hubo sincronización. contexto (o era el predeterminado básico) enawait tcs.Task. Pero este es el comportamiento estándar de TPL.El enfoque de simulación de aborto se veía realmente bien, pero llevó a los hilos de secuestro de TPL en algunos escenarios .
Luego tuve una implementación que era similar a verificar el objeto de continuación , pero solo verificaba cualquier continuación, ya que en realidad hay demasiados escenarios para que el código dado funcione bien, pero eso significaba que incluso cosas como
Task.Waitresultaron en una búsqueda de grupo de subprocesos.En última instancia, después de inspeccionar montones y montones de IL, el único escenario seguro y útil es el
SetOnInvokeMresescenario (continuación manual-reset-event-slim). Hay muchos otros escenarios:Así que al final, opté por buscar un objeto de continuación no nulo; si es nulo, bien (sin continuaciones); si no es nulo, verifique en caso especial
SetOnInvokeMres- si es eso: fino (seguro para invocar); de lo contrario, deje que el grupo de subprocesos realice elTrySetComplete, sin decirle a la tarea que haga nada especial como abortar la suplantación.Task.Waitutiliza elSetOnInvokeMresenfoque, que es el escenario específico en el que queremos esforzarnos mucho para no bloquearlo.Type taskType = typeof(Task); FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic); Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic); if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null) { var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true); var il = method.GetILGenerator(); var hasContinuation = il.DefineLabel(); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Ldfld, continuationField); Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel(); // check if null il.Emit(OpCodes.Brtrue_S, nonNull); il.MarkLabel(goodReturn); il.Emit(OpCodes.Ldc_I4_1); il.Emit(OpCodes.Ret); // check if is a SetOnInvokeMres - if so, we're OK il.MarkLabel(nonNull); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Ldfld, continuationField); il.Emit(OpCodes.Isinst, safeScenario); il.Emit(OpCodes.Brtrue_S, goodReturn); il.Emit(OpCodes.Ldc_I4_0); il.Emit(OpCodes.Ret); IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));fuente