From 44ac8e90fbbc6bbb89fd885c7d536b3d95918183 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Thu, 16 May 2024 23:32:13 -0400 Subject: [PATCH 1/2] Fix cancellation unregistration in DataflowBlock.OutputAvailableAsync (#99632) OutputAvailableAsync is not unregistering from the supplied CancellationToken. If a cancelable token is supplied and is long lived, each call with that token to OutputAvailableAsync will add another callback into that token, and that will continue to grow until either the token is dropped or has been cancellation requested. For a long-lived cancellation token, this is akin to a leak. The implementation was trying to be too clever in avoiding an additional continuation that was previously there. However, this continuation makes it a lot easier to avoid possible deadlocks that can occur if a cancellation request comes in concurrently with a message being pushed. Instead of trying to avoid it, just use an async method, which still incurs the extra task but does so with less allocation and greatly simplifies the code while also fixing the issue, as all cleanup can now be done in the continuation as part of the async method. --- .../src/Base/DataflowBlock.cs | 131 +++++------------- 1 file changed, 37 insertions(+), 94 deletions(-) diff --git a/src/libraries/System.Threading.Tasks.Dataflow/src/Base/DataflowBlock.cs b/src/libraries/System.Threading.Tasks.Dataflow/src/Base/DataflowBlock.cs index 63cdedf4b466a..7007b0c80f5f7 100644 --- a/src/libraries/System.Threading.Tasks.Dataflow/src/Base/DataflowBlock.cs +++ b/src/libraries/System.Threading.Tasks.Dataflow/src/Base/DataflowBlock.cs @@ -1434,63 +1434,48 @@ public static Task OutputAvailableAsync(this ISourceBlock OutputAvailableAsync( this ISourceBlock source, CancellationToken cancellationToken) { - if (source is null) - { - throw new ArgumentNullException(nameof(source)); - } - - // Fast path for cancellation - if (cancellationToken.IsCancellationRequested) - return Common.CreateTaskFromCancellation(cancellationToken); - - // In a method like this, normally we would want to check source.Completion.IsCompleted - // and avoid linking completely by simply returning a completed task. However, - // some blocks that are completed still have data available, like WriteOnceBlock, - // which completes as soon as it gets a value and stores that value forever. - // As such, OutputAvailableAsync must link from the source so that the source - // can push data to us if it has it, at which point we can immediately unlink. + return + source is null ? throw new ArgumentNullException(nameof(source)) : + cancellationToken.IsCancellationRequested ? Common.CreateTaskFromCancellation(cancellationToken) : + Impl(source, cancellationToken); - // Create a target task that will complete when it's offered a message (but it won't accept the message) - var target = new OutputAvailableAsyncTarget(); - try + static async Task Impl(ISourceBlock source, CancellationToken cancellationToken) { - // Link from the source. If the source propagates a message during or immediately after linking - // such that our target is already completed, just return its task. - target._unlinker = source.LinkTo(target, DataflowLinkOptions.UnlinkAfterOneAndPropagateCompletion); + // In a method like this, normally we would want to check source.Completion.IsCompleted + // and avoid linking completely by simply returning a completed task. However, + // some blocks that are completed still have data available, like WriteOnceBlock, + // which completes as soon as it gets a value and stores that value forever. + // As such, OutputAvailableAsync must link from the source so that the source + // can push data to us if it has it, at which point we can immediately unlink. - // If the task is already completed (an exception may have occurred, or the source may have propagated - // a message to the target during LinkTo or soon thereafter), just return the task directly. - if (target.Task.IsCompleted) - { - return target.Task; - } + // Create a target task that will complete when it's offered a message (but it won't accept the message) + var target = new OutputAvailableAsyncTarget(); - // If cancellation could be requested, hook everything up to be notified of cancellation requests. - if (cancellationToken.CanBeCanceled) + // Link from the source. + using (source.LinkTo(target, DataflowLinkOptions.UnlinkAfterOneAndPropagateCompletion)) { - // When cancellation is requested, unlink the target from the source and cancel the target. - target._ctr = cancellationToken.Register( + CancellationTokenRegistration registration = default; + try + { + // Register for cancellation if the target isn't already completed (the source may have propagated + // a message to the target during LinkTo or soon thereafter). + if (!target.Task.IsCompleted) + { + registration = #if NET6_0_OR_GREATER - OutputAvailableAsyncTarget.CancelAndUnlink, + cancellationToken.UnsafeRegister(static (state, cancellationToken) => ((OutputAvailableAsyncTarget)state!).TrySetCanceled(cancellationToken), target); #else - static state => OutputAvailableAsyncTarget.CancelAndUnlink(state, default), + cancellationToken.Register(static state => ((OutputAvailableAsyncTarget)state!).TrySetCanceled(), target); #endif - target); - } - - return target.Task; - } - catch (Exception exc) - { - // Source.LinkTo could throw, as could cancellationToken.Register if cancellation was already requested - // such that it synchronously invokes the source's unlinker IDisposable, which could throw. - target.TrySetException(exc); - - // Undo the link from the source to the target - target.AttemptThreadSafeUnlink(); + } - // Return the now faulted task - return target.Task; + return await target.Task.ConfigureAwait(false); + } + finally + { + registration.Dispose(); + } + } } } @@ -1504,46 +1489,6 @@ public OutputAvailableAsyncTarget() : { } - /// - /// Cached continuation delegate that unregisters from cancellation and - /// marshals the antecedent's result to the return value. - /// - internal static readonly Func, object?, bool> s_handleCompletion = (antecedent, state) => - { - var target = state as OutputAvailableAsyncTarget; - Debug.Assert(target != null, "Expected non-null target"); - target._ctr.Dispose(); - return antecedent.GetAwaiter().GetResult(); - }; - - /// Cancels the target and unlinks the target from the source. - /// An OutputAvailableAsyncTarget. - /// The token that triggered cancellation - internal static void CancelAndUnlink(object? state, CancellationToken cancellationToken) - { - var target = state as OutputAvailableAsyncTarget; - Debug.Assert(target != null, "Expected a non-null target"); - - target.TrySetCanceled(cancellationToken); - target.AttemptThreadSafeUnlink(); - } - - /// Disposes of _unlinker if the target has been linked. - internal void AttemptThreadSafeUnlink() - { - // A race is possible. Therefore use an interlocked operation. - IDisposable? cachedUnlinker = _unlinker; - if (cachedUnlinker != null && Interlocked.CompareExchange(ref _unlinker, null, cachedUnlinker) == cachedUnlinker) - { - cachedUnlinker.Dispose(); - } - } - - /// The IDisposable used to unlink this target from its source. - internal IDisposable? _unlinker; - /// The registration used to unregister this target from the cancellation token. - internal CancellationTokenRegistration _ctr; - /// Completes the task when offered a message (but doesn't consume the message). DataflowMessageStatus ITargetBlock.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock? source, bool consumeToAccept) { @@ -1551,14 +1496,12 @@ DataflowMessageStatus ITargetBlock.OfferMessage(DataflowMessageHeader message if (source == null) throw new ArgumentNullException(nameof(source)); TrySetResult(true); + return DataflowMessageStatus.DecliningPermanently; } /// - void IDataflowBlock.Complete() - { - TrySetResult(false); - } + void IDataflowBlock.Complete() => TrySetResult(false); /// void IDataflowBlock.Fault(Exception exception) @@ -1572,13 +1515,13 @@ void IDataflowBlock.Fault(Exception exception) } /// - Task IDataflowBlock.Completion { get { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); } } + Task IDataflowBlock.Completion => throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); /// The data to display in the debugger display attribute. private object DebuggerDisplayContent => $"{Common.GetNameForDebugger(this)} IsCompleted = {base.Task.IsCompleted}"; /// Gets the data to display in the debugger display attribute for this instance. - object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } } + object IDebuggerDisplay.Content => DebuggerDisplayContent; } #endregion From dfbec3c251950c33218fa0a22f00cc1623c2a07d Mon Sep 17 00:00:00 2001 From: Eric StJohn Date: Wed, 22 May 2024 12:21:56 -0700 Subject: [PATCH 2/2] Enable DataFlow package in servicing --- .../src/System.Threading.Tasks.Dataflow.csproj | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/libraries/System.Threading.Tasks.Dataflow/src/System.Threading.Tasks.Dataflow.csproj b/src/libraries/System.Threading.Tasks.Dataflow/src/System.Threading.Tasks.Dataflow.csproj index 9835aa30e3b35..3a4156130e82f 100644 --- a/src/libraries/System.Threading.Tasks.Dataflow/src/System.Threading.Tasks.Dataflow.csproj +++ b/src/libraries/System.Threading.Tasks.Dataflow/src/System.Threading.Tasks.Dataflow.csproj @@ -2,6 +2,8 @@ $(NetCoreAppCurrent);$(NetCoreAppPrevious);$(NetCoreAppMinimum);netstandard2.1;netstandard2.0;$(NetFrameworkMinimum) true + true + 1 TPL Dataflow promotes actor/agent-oriented designs through primitives for in-process message passing, dataflow, and pipelining. TDF builds upon the APIs and scheduling infrastructure provided by the Task Parallel Library (TPL), and integrates with the language support for asynchrony provided by C#, Visual Basic, and F#. Commonly Used Types: