Skip to content

Commit

Permalink
Fix cancellation unregistration in DataflowBlock.OutputAvailableAsync (
Browse files Browse the repository at this point in the history
…#99632)

OutputAvailableAsync is not unregistering from the supplied CancellationToken. If a cancelable token is supplied and is long lived, each call with that token to OutputAvailableAsync will add another callback into that token, and that will continue to grow until either the token is dropped or has been cancellation requested.  For a long-lived cancellation token, this is akin to a leak.

The implementation was trying to be too clever in avoiding an additional continuation that was previously there. However, this continuation makes it a lot easier to avoid possible deadlocks that can occur if a cancellation request comes in concurrently with a message being pushed.  Instead of trying to avoid it, just use an async method, which still incurs the extra task but does so with less allocation and greatly simplifies the code while also fixing the issue, as all cleanup can now be done in the continuation as part of the async method.
  • Loading branch information
stephentoub authored May 17, 2024
1 parent 3ce07d3 commit 84885d7
Showing 1 changed file with 37 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1434,63 +1434,48 @@ public static Task<bool> OutputAvailableAsync<TOutput>(this ISourceBlock<TOutput
public static Task<bool> OutputAvailableAsync<TOutput>(
this ISourceBlock<TOutput> source, CancellationToken cancellationToken)
{
if (source is null)
{
throw new ArgumentNullException(nameof(source));
}

// Fast path for cancellation
if (cancellationToken.IsCancellationRequested)
return Common.CreateTaskFromCancellation<bool>(cancellationToken);

// In a method like this, normally we would want to check source.Completion.IsCompleted
// and avoid linking completely by simply returning a completed task. However,
// some blocks that are completed still have data available, like WriteOnceBlock,
// which completes as soon as it gets a value and stores that value forever.
// As such, OutputAvailableAsync must link from the source so that the source
// can push data to us if it has it, at which point we can immediately unlink.
return
source is null ? throw new ArgumentNullException(nameof(source)) :
cancellationToken.IsCancellationRequested ? Common.CreateTaskFromCancellation<bool>(cancellationToken) :
Impl(source, cancellationToken);

// Create a target task that will complete when it's offered a message (but it won't accept the message)
var target = new OutputAvailableAsyncTarget<TOutput>();
try
static async Task<bool> Impl(ISourceBlock<TOutput> source, CancellationToken cancellationToken)
{
// Link from the source. If the source propagates a message during or immediately after linking
// such that our target is already completed, just return its task.
target._unlinker = source.LinkTo(target, DataflowLinkOptions.UnlinkAfterOneAndPropagateCompletion);
// In a method like this, normally we would want to check source.Completion.IsCompleted
// and avoid linking completely by simply returning a completed task. However,
// some blocks that are completed still have data available, like WriteOnceBlock,
// which completes as soon as it gets a value and stores that value forever.
// As such, OutputAvailableAsync must link from the source so that the source
// can push data to us if it has it, at which point we can immediately unlink.

// If the task is already completed (an exception may have occurred, or the source may have propagated
// a message to the target during LinkTo or soon thereafter), just return the task directly.
if (target.Task.IsCompleted)
{
return target.Task;
}
// Create a target task that will complete when it's offered a message (but it won't accept the message)
var target = new OutputAvailableAsyncTarget<TOutput>();

// If cancellation could be requested, hook everything up to be notified of cancellation requests.
if (cancellationToken.CanBeCanceled)
// Link from the source.
using (source.LinkTo(target, DataflowLinkOptions.UnlinkAfterOneAndPropagateCompletion))
{
// When cancellation is requested, unlink the target from the source and cancel the target.
target._ctr = cancellationToken.Register(
CancellationTokenRegistration registration = default;
try
{
// Register for cancellation if the target isn't already completed (the source may have propagated
// a message to the target during LinkTo or soon thereafter).
if (!target.Task.IsCompleted)
{
registration =
#if NET
OutputAvailableAsyncTarget<TOutput>.CancelAndUnlink,
cancellationToken.UnsafeRegister(static (state, cancellationToken) => ((OutputAvailableAsyncTarget<TOutput>)state!).TrySetCanceled(cancellationToken), target);
#else
static state => OutputAvailableAsyncTarget<TOutput>.CancelAndUnlink(state, default),
cancellationToken.Register(static state => ((OutputAvailableAsyncTarget<TOutput>)state!).TrySetCanceled(), target);
#endif
target);
}

return target.Task;
}
catch (Exception exc)
{
// Source.LinkTo could throw, as could cancellationToken.Register if cancellation was already requested
// such that it synchronously invokes the source's unlinker IDisposable, which could throw.
target.TrySetException(exc);

// Undo the link from the source to the target
target.AttemptThreadSafeUnlink();
}

// Return the now faulted task
return target.Task;
return await target.Task.ConfigureAwait(false);
}
finally
{
registration.Dispose();
}
}
}
}

Expand All @@ -1504,61 +1489,19 @@ public OutputAvailableAsyncTarget() :
{
}

/// <summary>
/// Cached continuation delegate that unregisters from cancellation and
/// marshals the antecedent's result to the return value.
/// </summary>
internal static readonly Func<Task<bool>, object?, bool> s_handleCompletion = (antecedent, state) =>
{
var target = state as OutputAvailableAsyncTarget<T>;
Debug.Assert(target != null, "Expected non-null target");
target._ctr.Dispose();
return antecedent.GetAwaiter().GetResult();
};

/// <summary>Cancels the target and unlinks the target from the source.</summary>
/// <param name="state">An OutputAvailableAsyncTarget.</param>
/// <param name="cancellationToken">The token that triggered cancellation</param>
internal static void CancelAndUnlink(object? state, CancellationToken cancellationToken)
{
var target = state as OutputAvailableAsyncTarget<T>;
Debug.Assert(target != null, "Expected a non-null target");

target.TrySetCanceled(cancellationToken);
target.AttemptThreadSafeUnlink();
}

/// <summary>Disposes of _unlinker if the target has been linked.</summary>
internal void AttemptThreadSafeUnlink()
{
// A race is possible. Therefore use an interlocked operation.
IDisposable? cachedUnlinker = _unlinker;
if (cachedUnlinker != null && Interlocked.CompareExchange(ref _unlinker, null, cachedUnlinker) == cachedUnlinker)
{
cachedUnlinker.Dispose();
}
}

/// <summary>The IDisposable used to unlink this target from its source.</summary>
internal IDisposable? _unlinker;
/// <summary>The registration used to unregister this target from the cancellation token.</summary>
internal CancellationTokenRegistration _ctr;

/// <summary>Completes the task when offered a message (but doesn't consume the message).</summary>
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T>? source, bool consumeToAccept)
{
if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, nameof(messageHeader));
if (source == null) throw new ArgumentNullException(nameof(source));

TrySetResult(true);

return DataflowMessageStatus.DecliningPermanently;
}

/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
void IDataflowBlock.Complete()
{
TrySetResult(false);
}
void IDataflowBlock.Complete() => TrySetResult(false);

/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
void IDataflowBlock.Fault(Exception exception)
Expand All @@ -1572,13 +1515,13 @@ void IDataflowBlock.Fault(Exception exception)
}

/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
Task IDataflowBlock.Completion { get { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); } }
Task IDataflowBlock.Completion => throw new NotSupportedException(SR.NotSupported_MemberNotNeeded);

/// <summary>The data to display in the debugger display attribute.</summary>
private object DebuggerDisplayContent => $"{Common.GetNameForDebugger(this)} IsCompleted = {base.Task.IsCompleted}";

/// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
object IDebuggerDisplay.Content => DebuggerDisplayContent;
}
#endregion

Expand Down

0 comments on commit 84885d7

Please sign in to comment.