Tengo un código de biblioteca (redes de socket) que proporciona una Task
API basada en respuestas pendientes a solicitudes, según TaskCompletionSource<T>
. Sin embargo, hay una molestia en el TPL porque parece imposible evitar las continuaciones sincrónicas. Lo que me gustaría poder hacer es:
- decirle
TaskCompletionSource<T>
que no debe permitir que las personas que llaman se conecten conTaskContinuationOptions.ExecuteSynchronously
, o - establezca el resultado (
SetResult
/TrySetResult
) de una manera que especifique queTaskContinuationOptions.ExecuteSynchronously
debe ignorarse, usando el grupo en su lugar
Específicamente, el problema que tengo es que los datos entrantes están siendo procesados por un lector dedicado, y si una persona que llama puede conectarse TaskContinuationOptions.ExecuteSynchronously
, puede detener al lector (lo que afecta a más que solo a ellos). Anteriormente, he trabajado en torno a esto mediante algún pirata informático que detecta si hay continuaciones presentes y, si lo están, empuja la finalización al ThreadPool
, sin embargo, esto tiene un impacto significativo si la persona que llama ha saturado su cola de trabajo, ya que la finalización no se procesará. en el momento oportuno. Si están usando Task.Wait()
(o similar), entonces esencialmente se bloquearán ellos mismos. Asimismo, esta es la razón por la que el lector está en un hilo dedicado en lugar de utilizar trabajadores.
Entonces; antes de intentar fastidiar al equipo de TPL: ¿me estoy perdiendo una opción?
Puntos clave:
- No quiero que las personas que llaman externas puedan secuestrar mi hilo
- No puedo usar el
ThreadPool
como implementación, ya que debe funcionar cuando el grupo está saturado
El siguiente ejemplo produce resultados (el pedido puede variar según el tiempo):
Continuation on: Main thread
Press [return]
Continuation on: Thread pool
El problema es el hecho de que una persona que llama al azar logró obtener una continuación en "Hilo principal". En el código real, esto interrumpiría al lector principal; ¡cosas malas!
Código:
using System;
using System.Threading;
using System.Threading.Tasks;
static class Program
{
static void Identify()
{
var thread = Thread.CurrentThread;
string name = thread.IsThreadPoolThread
? "Thread pool" : thread.Name;
if (string.IsNullOrEmpty(name))
name = "#" + thread.ManagedThreadId;
Console.WriteLine("Continuation on: " + name);
}
static void Main()
{
Thread.CurrentThread.Name = "Main thread";
var source = new TaskCompletionSource<int>();
var task = source.Task;
task.ContinueWith(delegate {
Identify();
});
task.ContinueWith(delegate {
Identify();
}, TaskContinuationOptions.ExecuteSynchronously);
source.TrySetResult(123);
Console.WriteLine("Press [return]");
Console.ReadLine();
}
}
fuente
TaskCompletionSource
con mi propia API para evitar llamadas directas aContinueWith
, ya que niTaskCompletionSource
, niTask
no se adapta bien a la herencia de ellos.Task
que está expuesto, no elTaskCompletionSource
. Eso (exponer una API diferente) es técnicamente una opción, pero es algo bastante extremo para hacer solo por esto ... No estoy seguro de que lo justifiqueThreadPool
para esto (que ya mencioné, causa problemas), o tiene un hilo de "continuaciones pendientes" dedicado, y luego ellos (continuaciones conExecuteSynchronously
especificado) pueden secuestrar eso uno en su lugar , lo que causa exactamente el mismo problema, porque significa que las continuaciones de otros mensajes se pueden estancar, lo que nuevamente afecta a múltiples personas que llamanRespuestas:
Nuevo en .NET 4.6:
.NET 4.6 contiene un nuevo
TaskCreationOptions
:RunContinuationsAsynchronously
.Ya que está dispuesto a usar Reflection para acceder a campos privados ...
Puede marcar la Tarea de TCS con la
TASK_STATE_THREAD_WAS_ABORTED
bandera, lo que provocaría que todas las continuaciones no estén alineadas.const int TASK_STATE_THREAD_WAS_ABORTED = 134217728; var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance); stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);
Editar:
En lugar de usar Reflection emit, te sugiero que uses expresiones. Esto es mucho más legible y tiene la ventaja de ser compatible con PCL:
var taskParameter = Expression.Parameter(typeof (Task)); const string stateFlagsFieldName = "m_stateFlags"; var setter = Expression.Lambda<Action<Task>>( Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName), Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName), Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();
Sin usar Reflection:
Si alguien está interesado, he descubierto una manera de hacer esto sin Reflection, pero también es un poco "sucio" y, por supuesto, tiene una penalización de rendimiento no despreciable:
try { Thread.CurrentThread.Abort(); } catch (ThreadAbortException) { source.TrySetResult(123); Thread.ResetAbort(); }
fuente
TaskCreationOptions.DoNotInline
- y ni siquiera necesitaría un cambio de firma ctor aTaskCompletionSource
ILGenerator
etc: github.com/StackExchange/StackExchange.Redis/blob/master/…No creo que haya nada en TPL que proporcione un control API explícito sobre las
TaskCompletionSource.SetResult
continuaciones. Decidí mantener mi respuesta inicial para controlar este comportamiento paraasync/await
escenarios.Aquí hay otra solución que impone asincrónica
ContinueWith
, si latcs.SetResult
continuación activada tiene lugar en el mismo hilo en el queSetResult
se llamó:public static class TaskExt { static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks = new ConcurrentDictionary<Task, Thread>(); // SetResultAsync static public void SetResultAsync<TResult>( this TaskCompletionSource<TResult> @this, TResult result) { s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread); try { @this.SetResult(result); } finally { Thread thread; s_tcsTasks.TryRemove(@this.Task, out thread); } } // ContinueWithAsync, TODO: more overrides static public Task ContinueWithAsync<TResult>( this Task<TResult> @this, Action<Task<TResult>> action, TaskContinuationOptions continuationOptions = TaskContinuationOptions.None) { return @this.ContinueWith((Func<Task<TResult>, Task>)(t => { Thread thread = null; s_tcsTasks.TryGetValue(t, out thread); if (Thread.CurrentThread == thread) { // same thread which called SetResultAsync, avoid potential deadlocks // using thread pool return Task.Run(() => action(t)); // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread) // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning); } else { // continue on the same thread var task = new Task(() => action(t)); task.RunSynchronously(); return Task.FromResult(task); } }), continuationOptions).Unwrap(); } }
Actualizado para abordar el comentario:
No sabía que no controlas a la persona que llama. Sin embargo, si no lo controla, probablemente tampoco esté pasando el
TaskCompletionSource
objeto directamente a la persona que llama. Lógicamente, estarías pasando la parte del token , es decirtcs.Task
. En cuyo caso, la solución podría ser aún más fácil, agregando otro método de extensión al anterior:// ImposeAsync, TODO: more overrides static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this) { return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent => { Thread thread = null; s_tcsTasks.TryGetValue(antecedent, out thread); if (Thread.CurrentThread == thread) { // continue on a pool thread return antecedent.ContinueWith(t => t, TaskContinuationOptions.None).Unwrap(); } else { return antecedent; } }), TaskContinuationOptions.ExecuteSynchronously).Unwrap(); }
Utilizar:
// library code var source = new TaskCompletionSource<int>(); var task = source.Task.ImposeAsync(); // ... // client code task.ContinueWith(delegate { Identify(); }, TaskContinuationOptions.ExecuteSynchronously); // ... // library code source.SetResultAsync(123);
Esto realmente funciona para ambos
await
yContinueWith
( violín ) y está libre de trucos de reflexión.fuente
¿Qué tal en lugar de hacer
var task = source.Task;
tu haces esto en su lugar
var task = source.Task.ContinueWith<Int32>( x => x.Result );
Por lo tanto, siempre está agregando una continuación que se ejecutará de forma asincrónica y luego no importa si los suscriptores quieren una continuación en el mismo contexto. Es una especie de curry la tarea, ¿no?
fuente
ContinueWith
yawait
normalmente se esfuerzan por evitar (mediante la comprobación de que ya completa, etc) - y ya que esto obligaría a todo en el trabajadores, en realidad agravaría la situación. Es una idea positiva, y se las agradezco, pero no ayudará en este escenario.si puede y está dispuesto a utilizar la reflexión, debería hacerlo;
public static class MakeItAsync { static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result) { var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance); var continuations = (List<object>)continuation.GetValue(source.Task); foreach (object c in continuations) { var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance); var options = (TaskContinuationOptions)option.GetValue(c); options &= ~TaskContinuationOptions.ExecuteSynchronously; option.SetValue(c, options); } source.TrySetResult(result); } }
fuente
Task.Run(() => tcs.SetResult(result))
?Actualizado , publiqué una respuesta separada para tratar
ContinueWith
en lugar deawait
(porqueContinueWith
no le importa el contexto de sincronización actual).Se puede usar un contexto de sincronización tonto como para imponer la asincronía en la continuación activa llamando
SetResult/SetCancelled/SetException
aTaskCompletionSource
. Creo que el contexto de sincronización actual (en el punto deawait tcs.Task
) es el criterio que usa TPL para decidir si hacer dicha continuación sincrónica o asincrónica.Lo siguiente funciona para mí:
if (notifyAsync) { tcs.SetResultAsync(null); } else { tcs.SetResult(null); }
SetResultAsync
se implementa así:public static class TaskExt { static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result) { FakeSynchronizationContext.Execute(() => tcs.SetResult(result)); } // FakeSynchronizationContext class FakeSynchronizationContext : SynchronizationContext { private static readonly ThreadLocal<FakeSynchronizationContext> s_context = new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext()); private FakeSynchronizationContext() { } public static FakeSynchronizationContext Instance { get { return s_context.Value; } } public static void Execute(Action action) { var savedContext = SynchronizationContext.Current; SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance); try { action(); } finally { SynchronizationContext.SetSynchronizationContext(savedContext); } } // SynchronizationContext methods public override SynchronizationContext CreateCopy() { return this; } public override void OperationStarted() { throw new NotImplementedException("OperationStarted"); } public override void OperationCompleted() { throw new NotImplementedException("OperationCompleted"); } public override void Post(SendOrPostCallback d, object state) { throw new NotImplementedException("Post"); } public override void Send(SendOrPostCallback d, object state) { throw new NotImplementedException("Send"); } } }
SynchronizationContext.SetSynchronizationContext
es muy barato en términos de gastos generales que agrega. De hecho, la implementación de WPFDispatcher.BeginInvoke
adopta un enfoque muy similar .TPL compara el contexto de sincronización de destino en el punto de
await
con el del punto detcs.SetResult
. Si el contexto de sincronización es el mismo (o no hay contexto de sincronización en ambos lugares), la continuación se llama directamente, sincrónicamente. De lo contrario, se poneSynchronizationContext.Post
en cola usando el contexto de sincronización de destino, es decir, elawait
comportamiento normal . Lo que hace este enfoque es siempre imponer elSynchronizationContext.Post
comportamiento (o una continuación del hilo del grupo si no hay un contexto de sincronización de destino).Actualizado , esto no funcionará
task.ContinueWith
, porqueContinueWith
no le importa el contexto de sincronización actual. Sin embargo, funciona paraawait task
( violín ). También funciona paraawait task.ConfigureAwait(false)
.OTOH, este enfoque funciona
ContinueWith
.fuente
tcs.SetResult
llamada. Es un poco se convierte atómica y seguro para subprocesos esta manera, debido a la continuación en sí va a suceder en cualquiera otro hilo piscina o en la sincronización inicial. contexto capturado enawait tcs.Task
. Y enSynchronizationContext.SetSynchronizationContext
sí mismo es muy barato, mucho más barato que un interruptor de hilo.ThreadPool
. Con esta solución, el TPL de hecho se utilizaráThreadPool
, si no hubo sincronización. contexto (o era el predeterminado básico) enawait tcs.Task
. Pero este es el comportamiento estándar de TPL.El enfoque de simulación de aborto se veía realmente bien, pero llevó a los hilos de secuestro de TPL en algunos escenarios .
Luego tuve una implementación que era similar a verificar el objeto de continuación , pero solo verificaba cualquier continuación, ya que en realidad hay demasiados escenarios para que el código dado funcione bien, pero eso significaba que incluso cosas como
Task.Wait
resultaron en una búsqueda de grupo de subprocesos.En última instancia, después de inspeccionar montones y montones de IL, el único escenario seguro y útil es el
SetOnInvokeMres
escenario (continuación manual-reset-event-slim). Hay muchos otros escenarios:Así que al final, opté por buscar un objeto de continuación no nulo; si es nulo, bien (sin continuaciones); si no es nulo, verifique en caso especial
SetOnInvokeMres
- si es eso: fino (seguro para invocar); de lo contrario, deje que el grupo de subprocesos realice elTrySetComplete
, sin decirle a la tarea que haga nada especial como abortar la suplantación.Task.Wait
utiliza elSetOnInvokeMres
enfoque, que es el escenario específico en el que queremos esforzarnos mucho para no bloquearlo.Type taskType = typeof(Task); FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic); Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic); if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null) { var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true); var il = method.GetILGenerator(); var hasContinuation = il.DefineLabel(); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Ldfld, continuationField); Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel(); // check if null il.Emit(OpCodes.Brtrue_S, nonNull); il.MarkLabel(goodReturn); il.Emit(OpCodes.Ldc_I4_1); il.Emit(OpCodes.Ret); // check if is a SetOnInvokeMres - if so, we're OK il.MarkLabel(nonNull); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Ldfld, continuationField); il.Emit(OpCodes.Isinst, safeScenario); il.Emit(OpCodes.Brtrue_S, goodReturn); il.Emit(OpCodes.Ldc_I4_0); il.Emit(OpCodes.Ret); IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));
fuente