¿Cómo puedo evitar continuaciones síncronas en una tarea?

82

Tengo un código de biblioteca (redes de socket) que proporciona una TaskAPI basada en respuestas pendientes a solicitudes, según TaskCompletionSource<T>. Sin embargo, hay una molestia en el TPL porque parece imposible evitar las continuaciones sincrónicas. Lo que me gustaría poder hacer es:

  • decirle TaskCompletionSource<T>que no debe permitir que las personas que llaman se conecten con TaskContinuationOptions.ExecuteSynchronously, o
  • establezca el resultado ( SetResult/ TrySetResult) de una manera que especifique que TaskContinuationOptions.ExecuteSynchronouslydebe ignorarse, usando el grupo en su lugar

Específicamente, el problema que tengo es que los datos entrantes están siendo procesados ​​por un lector dedicado, y si una persona que llama puede conectarse TaskContinuationOptions.ExecuteSynchronously, puede detener al lector (lo que afecta a más que solo a ellos). Anteriormente, he trabajado en torno a esto mediante algún pirata informático que detecta si hay continuaciones presentes y, si lo están, empuja la finalización al ThreadPool, sin embargo, esto tiene un impacto significativo si la persona que llama ha saturado su cola de trabajo, ya que la finalización no se procesará. en el momento oportuno. Si están usando Task.Wait()(o similar), entonces esencialmente se bloquearán ellos mismos. Asimismo, esta es la razón por la que el lector está en un hilo dedicado en lugar de utilizar trabajadores.

Entonces; antes de intentar fastidiar al equipo de TPL: ¿me estoy perdiendo una opción?

Puntos clave:

  • No quiero que las personas que llaman externas puedan secuestrar mi hilo
  • No puedo usar el ThreadPoolcomo implementación, ya que debe funcionar cuando el grupo está saturado

El siguiente ejemplo produce resultados (el pedido puede variar según el tiempo):

Continuation on: Main thread
Press [return]
Continuation on: Thread pool

El problema es el hecho de que una persona que llama al azar logró obtener una continuación en "Hilo principal". En el código real, esto interrumpiría al lector principal; ¡cosas malas!

Código:

using System;
using System.Threading;
using System.Threading.Tasks;

static class Program
{
    static void Identify()
    {
        var thread = Thread.CurrentThread;
        string name = thread.IsThreadPoolThread
            ? "Thread pool" : thread.Name;
        if (string.IsNullOrEmpty(name))
            name = "#" + thread.ManagedThreadId;
        Console.WriteLine("Continuation on: " + name);
    }
    static void Main()
    {
        Thread.CurrentThread.Name = "Main thread";
        var source = new TaskCompletionSource<int>();
        var task = source.Task;
        task.ContinueWith(delegate {
            Identify();
        });
        task.ContinueWith(delegate {
            Identify();
        }, TaskContinuationOptions.ExecuteSynchronously);
        source.TrySetResult(123);
        Console.WriteLine("Press [return]");
        Console.ReadLine();
    }
}
Marc Gravell
fuente
2
Intentaría ajustar TaskCompletionSourcecon mi propia API para evitar llamadas directas a ContinueWith, ya que ni TaskCompletionSource, ni Taskno se adapta bien a la herencia de ellos.
Dennis
1
@Dennis para ser claros, en realidad es el Taskque está expuesto, no el TaskCompletionSource. Eso (exponer una API diferente) es técnicamente una opción, pero es algo bastante extremo para hacer solo por esto ... No estoy seguro de que lo justifique
Marc Gravell
2
@MattH no realmente, simplemente reformula la pregunta: o usa el ThreadPoolpara esto (que ya mencioné, causa problemas), o tiene un hilo de "continuaciones pendientes" dedicado, y luego ellos (continuaciones con ExecuteSynchronouslyespecificado) pueden secuestrar eso uno en su lugar , lo que causa exactamente el mismo problema, porque significa que las continuaciones de otros mensajes se pueden estancar, lo que nuevamente afecta a múltiples personas que llaman
Marc Gravell
3
@Andrey eso (funciona como si todas las personas que llaman usaran ContinueWith sin exec-sync) es precisamente lo que quiero lograr. El problema es que si mi biblioteca le entrega a alguien una tarea, pueden hacer algo muy indeseable: pueden interrumpir a mi lector (desaconsejadamente) usando exec-sync. Esto es enormemente peligroso y por eso me gustaría evitarlo dentro de la biblioteca .
Marc Gravell
2
@Andrey porque a: muchas tareas nunca obtienen continuaciones en primer lugar (especialmente cuando se realiza un trabajo por lotes); esto obligaría a todas las tareas a tener una, yb: incluso aquellas que habrían tenido una continuación ahora tienen mucha más complejidad, gastos generales y operaciones de los trabajadores. Esto importa.
Marc Gravell

Respuestas:

50

Nuevo en .NET 4.6:

.NET 4.6 contiene un nuevo TaskCreationOptions: RunContinuationsAsynchronously.


Ya que está dispuesto a usar Reflection para acceder a campos privados ...

Puede marcar la Tarea de TCS con la TASK_STATE_THREAD_WAS_ABORTEDbandera, lo que provocaría que todas las continuaciones no estén alineadas.

const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;

var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);

Editar:

En lugar de usar Reflection emit, te sugiero que uses expresiones. Esto es mucho más legible y tiene la ventaja de ser compatible con PCL:

var taskParameter = Expression.Parameter(typeof (Task));
const string stateFlagsFieldName = "m_stateFlags";
var setter =
    Expression.Lambda<Action<Task>>(
        Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName),
            Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName),
                Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();

Sin usar Reflection:

Si alguien está interesado, he descubierto una manera de hacer esto sin Reflection, pero también es un poco "sucio" y, por supuesto, tiene una penalización de rendimiento no despreciable:

try
{
    Thread.CurrentThread.Abort();
}
catch (ThreadAbortException)
{
    source.TrySetResult(123);
    Thread.ResetAbort();
}
Eli Arbel
fuente
3
@MarcGravell Use esto para crear una pseudo-muestra para el equipo de TPL y haga una solicitud de cambio para poder hacer esto a través de las opciones del constructor o algo así.
Adam Houldsworth
1
@ Adam sí, si tiene que llamar a esta bandera "lo que hace" en lugar de "cuál es su causa", que sería algo así como TaskCreationOptions.DoNotInline- y ni siquiera necesitaría un cambio de firma ctor aTaskCompletionSource
Marc Gravell
2
@AdamHouldsworth y no te preocupes, ya les estoy enviando el mismo correo electrónico; p
Marc Gravell
1
Para su interés: aquí está, optimizado a través de ILGeneratoretc: github.com/StackExchange/StackExchange.Redis/blob/master/…
Marc Gravell
1
@Noseratio sí, los comprobé - gracias; todos están bien en mi opinión; Estoy de acuerdo en que esto es una solución alternativa, pero tiene exactamente los resultados correctos.
Marc Gravell
9

No creo que haya nada en TPL que proporcione un control API explícito sobre las TaskCompletionSource.SetResultcontinuaciones. Decidí mantener mi respuesta inicial para controlar este comportamiento para async/awaitescenarios.

Aquí hay otra solución que impone asincrónica ContinueWith, si la tcs.SetResultcontinuación activada tiene lugar en el mismo hilo en el que SetResultse llamó:

public static class TaskExt
{
    static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks =
        new ConcurrentDictionary<Task, Thread>();

    // SetResultAsync
    static public void SetResultAsync<TResult>(
        this TaskCompletionSource<TResult> @this,
        TResult result)
    {
        s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread);
        try
        {
            @this.SetResult(result);
        }
        finally
        {
            Thread thread;
            s_tcsTasks.TryRemove(@this.Task, out thread);
        }
    }

    // ContinueWithAsync, TODO: more overrides
    static public Task ContinueWithAsync<TResult>(
        this Task<TResult> @this,
        Action<Task<TResult>> action,
        TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
    {
        return @this.ContinueWith((Func<Task<TResult>, Task>)(t =>
        {
            Thread thread = null;
            s_tcsTasks.TryGetValue(t, out thread);
            if (Thread.CurrentThread == thread)
            {
                // same thread which called SetResultAsync, avoid potential deadlocks

                // using thread pool
                return Task.Run(() => action(t));

                // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread)
                // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning);
            }
            else
            {
                // continue on the same thread
                var task = new Task(() => action(t));
                task.RunSynchronously();
                return Task.FromResult(task);
            }
        }), continuationOptions).Unwrap();
    }
}

Actualizado para abordar el comentario:

No controlo a la persona que llama, no puedo hacer que use una variante específica de continuar con: si pudiera, el problema no existiría en primer lugar

No sabía que no controlas a la persona que llama. Sin embargo, si no lo controla, probablemente tampoco esté pasando el TaskCompletionSourceobjeto directamente a la persona que llama. Lógicamente, estarías pasando la parte del token , es decir tcs.Task. En cuyo caso, la solución podría ser aún más fácil, agregando otro método de extensión al anterior:

// ImposeAsync, TODO: more overrides
static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this)
{
    return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent =>
    {
        Thread thread = null;
        s_tcsTasks.TryGetValue(antecedent, out thread);
        if (Thread.CurrentThread == thread)
        {
            // continue on a pool thread
            return antecedent.ContinueWith(t => t, 
                TaskContinuationOptions.None).Unwrap();
        }
        else
        {
            return antecedent;
        }
    }), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}

Utilizar:

// library code
var source = new TaskCompletionSource<int>();
var task = source.Task.ImposeAsync();
// ... 

// client code
task.ContinueWith(delegate
{
    Identify();
}, TaskContinuationOptions.ExecuteSynchronously);

// ...
// library code
source.SetResultAsync(123);

Esto realmente funciona para ambos awaityContinueWith ( violín ) y está libre de trucos de reflexión.

ratio nasal
fuente
1
No controlo a la persona que llama, no puedo hacer que use una variante específica de continuar con: si pudiera, el problema no existiría en primer lugar
Marc Gravell
@MarcGravell, no sabía que no puedes controlar a la persona que llama. Publiqué una actualización sobre cómo lidiaría con eso.
noseratio
el dilema del autor de la biblioteca; p Tenga en cuenta que alguien encontró una forma mucho más simple y directa de lograr el resultado deseado
Marc Gravell
4

¿Qué tal en lugar de hacer

var task = source.Task;

tu haces esto en su lugar

var task = source.Task.ContinueWith<Int32>( x => x.Result );

Por lo tanto, siempre está agregando una continuación que se ejecutará de forma asincrónica y luego no importa si los suscriptores quieren una continuación en el mismo contexto. Es una especie de curry la tarea, ¿no?

Ivan Zlatanov
fuente
1
Eso surgió en los comentarios (ver Andrey); el problema no es que obliga a todas las tareas para tener una continuación cuando no tendrían de otra manera, que es algo que tanto ContinueWithy awaitnormalmente se esfuerzan por evitar (mediante la comprobación de que ya completa, etc) - y ya que esto obligaría a todo en el trabajadores, en realidad agravaría la situación. Es una idea positiva, y se las agradezco, pero no ayudará en este escenario.
Marc Gravell
3

si puede y está dispuesto a utilizar la reflexión, debería hacerlo;

public static class MakeItAsync
{
    static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result)
    {
        var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
        var continuations = (List<object>)continuation.GetValue(source.Task);

        foreach (object c in continuations)
        {
            var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
            var options = (TaskContinuationOptions)option.GetValue(c);

            options &= ~TaskContinuationOptions.ExecuteSynchronously;
            option.SetValue(c, options);
        }

        source.TrySetResult(result);
    }        
}
Fredou
fuente
Este truco simplemente puede dejar de funcionar en la próxima versión del Framework.
noseratio
@Noseratio, es cierto, pero funciona ahora y también podrían implementar una forma adecuada de hacerlo en la próxima versión
Fredou
Pero, ¿por qué necesitarías esto si simplemente puedes hacerlo Task.Run(() => tcs.SetResult(result))?
noseratio
@Noseratio, no lo sé, hazle esa pregunta a Marc :-), simplemente estoy quitando la bandera TaskContinuationOptions.ExecuteSynchronously en todas las tareas conectadas a TaskCompletionSource que se aseguran de que todas usen el hilo en lugar del hilo principal
Fredou
El truco m_continuationObject es en realidad el truco que ya utilizo para identificar tareas potencialmente problemáticas, por lo que esto no está fuera de consideración. Interesante, gracias. Esta es la opción más útil hasta ahora.
Marc Gravell
3

Actualizado , publiqué una respuesta separada para tratar ContinueWithen lugar de await(porque ContinueWithno le importa el contexto de sincronización actual).

Se puede usar un contexto de sincronización tonto como para imponer la asincronía en la continuación activa llamando SetResult/SetCancelled/SetExceptiona TaskCompletionSource. Creo que el contexto de sincronización actual (en el punto de await tcs.Task) es el criterio que usa TPL para decidir si hacer dicha continuación sincrónica o asincrónica.

Lo siguiente funciona para mí:

if (notifyAsync)
{
    tcs.SetResultAsync(null);
}
else
{
    tcs.SetResult(null);
}

SetResultAsync se implementa así:

public static class TaskExt
{
    static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result)
    {
        FakeSynchronizationContext.Execute(() => tcs.SetResult(result));
    }

    // FakeSynchronizationContext
    class FakeSynchronizationContext : SynchronizationContext
    {
        private static readonly ThreadLocal<FakeSynchronizationContext> s_context =
            new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext());

        private FakeSynchronizationContext() { }

        public static FakeSynchronizationContext Instance { get { return s_context.Value; } }

        public static void Execute(Action action)
        {
            var savedContext = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance);
            try
            {
                action();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(savedContext);
            }
        }

        // SynchronizationContext methods

        public override SynchronizationContext CreateCopy()
        {
            return this;
        }

        public override void OperationStarted()
        {
            throw new NotImplementedException("OperationStarted");
        }

        public override void OperationCompleted()
        {
            throw new NotImplementedException("OperationCompleted");
        }

        public override void Post(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Post");
        }

        public override void Send(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Send");
        }
    }
}

SynchronizationContext.SetSynchronizationContext es muy barato en términos de gastos generales que agrega. De hecho, la implementación de WPFDispatcher.BeginInvoke adopta un enfoque muy similar .

TPL compara el contexto de sincronización de destino en el punto de awaitcon el del punto de tcs.SetResult. Si el contexto de sincronización es el mismo (o no hay contexto de sincronización en ambos lugares), la continuación se llama directamente, sincrónicamente. De lo contrario, se pone SynchronizationContext.Posten cola usando el contexto de sincronización de destino, es decir, el awaitcomportamiento normal . Lo que hace este enfoque es siempre imponer el SynchronizationContext.Postcomportamiento (o una continuación del hilo del grupo si no hay un contexto de sincronización de destino).

Actualizado , esto no funcionará task.ContinueWith, porque ContinueWithno le importa el contexto de sincronización actual. Sin embargo, funciona para await task( violín ). También funciona para await task.ConfigureAwait(false).

OTOH, este enfoque funciona ContinueWith.

ratio nasal
fuente
Tentador, pero cambiar el contexto de sincronización seguramente afectaría a la aplicación de llamada; por ejemplo, una aplicación web o de Windows que simplemente esté usando mi biblioteca no debería encontrar que el contexto de sincronización cambia cientos de veces por segundo.
Marc Gravell
@MarcGravell, solo lo cambio por el alcance de la tcs.SetResultllamada. Es un poco se convierte atómica y seguro para subprocesos esta manera, debido a la continuación en sí va a suceder en cualquiera otro hilo piscina o en la sincronización inicial. contexto capturado en await tcs.Task. Y en SynchronizationContext.SetSynchronizationContextsí mismo es muy barato, mucho más barato que un interruptor de hilo.
noseratio
Sin embargo, es posible que esto no satisfaga su segundo requisito: no usar ThreadPool. Con esta solución, el TPL de hecho se utilizará ThreadPool, si no hubo sincronización. contexto (o era el predeterminado básico) en await tcs.Task. Pero este es el comportamiento estándar de TPL.
noseratio
Hmmm ... dado que el contexto de sincronización es por subproceso, esto podría ser viable, y no necesitaría seguir cambiando el ctx, solo configúrelo una vez para el subproceso de trabajo. Necesitaré jugar con él
Marc Gravell
1
@Noseration ah, cierto: no estaba claro que el punto clave fuera que fueran diferentes . Veré. Gracias.
Marc Gravell
3

El enfoque de simulación de aborto se veía realmente bien, pero llevó a los hilos de secuestro de TPL en algunos escenarios .

Luego tuve una implementación que era similar a verificar el objeto de continuación , pero solo verificaba cualquier continuación, ya que en realidad hay demasiados escenarios para que el código dado funcione bien, pero eso significaba que incluso cosas como Task.Waitresultaron en una búsqueda de grupo de subprocesos.

En última instancia, después de inspeccionar montones y montones de IL, el único escenario seguro y útil es el SetOnInvokeMresescenario (continuación manual-reset-event-slim). Hay muchos otros escenarios:

  • algunos no son seguros y conducen al secuestro de hilos
  • el resto no son útiles, ya que en última instancia conducen al grupo de subprocesos

Así que al final, opté por buscar un objeto de continuación no nulo; si es nulo, bien (sin continuaciones); si no es nulo, verifique en caso especial SetOnInvokeMres- si es eso: fino (seguro para invocar); de lo contrario, deje que el grupo de subprocesos realice el TrySetComplete, sin decirle a la tarea que haga nada especial como abortar la suplantación. Task.Waitutiliza el SetOnInvokeMresenfoque, que es el escenario específico en el que queremos esforzarnos mucho para no bloquearlo.

Type taskType = typeof(Task);
FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic);
Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic);
if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null)
{
    var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true);
    var il = method.GetILGenerator();
    var hasContinuation = il.DefineLabel();
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel();
    // check if null
    il.Emit(OpCodes.Brtrue_S, nonNull);
    il.MarkLabel(goodReturn);
    il.Emit(OpCodes.Ldc_I4_1);
    il.Emit(OpCodes.Ret);

    // check if is a SetOnInvokeMres - if so, we're OK
    il.MarkLabel(nonNull);
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    il.Emit(OpCodes.Isinst, safeScenario);
    il.Emit(OpCodes.Brtrue_S, goodReturn);

    il.Emit(OpCodes.Ldc_I4_0);
    il.Emit(OpCodes.Ret);

    IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));
Marc Gravell
fuente