Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release/8.0] Fix cancellation unregistration in DataflowBlock.OutputAvailableAsync #102376

Merged
merged 2 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1434,63 +1434,48 @@ public static Task<bool> OutputAvailableAsync<TOutput>(this ISourceBlock<TOutput
public static Task<bool> OutputAvailableAsync<TOutput>(
this ISourceBlock<TOutput> source, CancellationToken cancellationToken)
{
if (source is null)
{
throw new ArgumentNullException(nameof(source));
}

// Fast path for cancellation
if (cancellationToken.IsCancellationRequested)
return Common.CreateTaskFromCancellation<bool>(cancellationToken);

// In a method like this, normally we would want to check source.Completion.IsCompleted
// and avoid linking completely by simply returning a completed task. However,
// some blocks that are completed still have data available, like WriteOnceBlock,
// which completes as soon as it gets a value and stores that value forever.
// As such, OutputAvailableAsync must link from the source so that the source
// can push data to us if it has it, at which point we can immediately unlink.
return
source is null ? throw new ArgumentNullException(nameof(source)) :
cancellationToken.IsCancellationRequested ? Common.CreateTaskFromCancellation<bool>(cancellationToken) :
Impl(source, cancellationToken);

// Create a target task that will complete when it's offered a message (but it won't accept the message)
var target = new OutputAvailableAsyncTarget<TOutput>();
try
static async Task<bool> Impl(ISourceBlock<TOutput> source, CancellationToken cancellationToken)
{
// Link from the source. If the source propagates a message during or immediately after linking
// such that our target is already completed, just return its task.
target._unlinker = source.LinkTo(target, DataflowLinkOptions.UnlinkAfterOneAndPropagateCompletion);
// In a method like this, normally we would want to check source.Completion.IsCompleted
// and avoid linking completely by simply returning a completed task. However,
// some blocks that are completed still have data available, like WriteOnceBlock,
// which completes as soon as it gets a value and stores that value forever.
// As such, OutputAvailableAsync must link from the source so that the source
// can push data to us if it has it, at which point we can immediately unlink.

// If the task is already completed (an exception may have occurred, or the source may have propagated
// a message to the target during LinkTo or soon thereafter), just return the task directly.
if (target.Task.IsCompleted)
{
return target.Task;
}
// Create a target task that will complete when it's offered a message (but it won't accept the message)
var target = new OutputAvailableAsyncTarget<TOutput>();

// If cancellation could be requested, hook everything up to be notified of cancellation requests.
if (cancellationToken.CanBeCanceled)
// Link from the source.
using (source.LinkTo(target, DataflowLinkOptions.UnlinkAfterOneAndPropagateCompletion))
{
// When cancellation is requested, unlink the target from the source and cancel the target.
target._ctr = cancellationToken.Register(
CancellationTokenRegistration registration = default;
try
{
// Register for cancellation if the target isn't already completed (the source may have propagated
// a message to the target during LinkTo or soon thereafter).
if (!target.Task.IsCompleted)
{
registration =
#if NET6_0_OR_GREATER
OutputAvailableAsyncTarget<TOutput>.CancelAndUnlink,
cancellationToken.UnsafeRegister(static (state, cancellationToken) => ((OutputAvailableAsyncTarget<TOutput>)state!).TrySetCanceled(cancellationToken), target);
#else
static state => OutputAvailableAsyncTarget<TOutput>.CancelAndUnlink(state, default),
cancellationToken.Register(static state => ((OutputAvailableAsyncTarget<TOutput>)state!).TrySetCanceled(), target);
#endif
target);
}

return target.Task;
}
catch (Exception exc)
{
// Source.LinkTo could throw, as could cancellationToken.Register if cancellation was already requested
// such that it synchronously invokes the source's unlinker IDisposable, which could throw.
target.TrySetException(exc);

// Undo the link from the source to the target
target.AttemptThreadSafeUnlink();
}

// Return the now faulted task
return target.Task;
return await target.Task.ConfigureAwait(false);
}
finally
{
registration.Dispose();
}
}
}
}

Expand All @@ -1504,61 +1489,19 @@ public OutputAvailableAsyncTarget() :
{
}

/// <summary>
/// Cached continuation delegate that unregisters from cancellation and
/// marshals the antecedent's result to the return value.
/// </summary>
internal static readonly Func<Task<bool>, object?, bool> s_handleCompletion = (antecedent, state) =>
{
var target = state as OutputAvailableAsyncTarget<T>;
Debug.Assert(target != null, "Expected non-null target");
target._ctr.Dispose();
return antecedent.GetAwaiter().GetResult();
};

/// <summary>Cancels the target and unlinks the target from the source.</summary>
/// <param name="state">An OutputAvailableAsyncTarget.</param>
/// <param name="cancellationToken">The token that triggered cancellation</param>
internal static void CancelAndUnlink(object? state, CancellationToken cancellationToken)
{
var target = state as OutputAvailableAsyncTarget<T>;
Debug.Assert(target != null, "Expected a non-null target");

target.TrySetCanceled(cancellationToken);
target.AttemptThreadSafeUnlink();
}

/// <summary>Disposes of _unlinker if the target has been linked.</summary>
internal void AttemptThreadSafeUnlink()
{
// A race is possible. Therefore use an interlocked operation.
IDisposable? cachedUnlinker = _unlinker;
if (cachedUnlinker != null && Interlocked.CompareExchange(ref _unlinker, null, cachedUnlinker) == cachedUnlinker)
{
cachedUnlinker.Dispose();
}
}

/// <summary>The IDisposable used to unlink this target from its source.</summary>
internal IDisposable? _unlinker;
/// <summary>The registration used to unregister this target from the cancellation token.</summary>
internal CancellationTokenRegistration _ctr;

/// <summary>Completes the task when offered a message (but doesn't consume the message).</summary>
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T>? source, bool consumeToAccept)
{
if (!messageHeader.IsValid) throw new ArgumentException(SR.Argument_InvalidMessageHeader, nameof(messageHeader));
if (source == null) throw new ArgumentNullException(nameof(source));

TrySetResult(true);

return DataflowMessageStatus.DecliningPermanently;
}

/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Complete"]/*' />
void IDataflowBlock.Complete()
{
TrySetResult(false);
}
void IDataflowBlock.Complete() => TrySetResult(false);

/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Fault"]/*' />
void IDataflowBlock.Fault(Exception exception)
Expand All @@ -1572,13 +1515,13 @@ void IDataflowBlock.Fault(Exception exception)
}

/// <include file='XmlDocs/CommonXmlDocComments.xml' path='CommonXmlDocComments/Blocks/Member[@name="Completion"]/*' />
Task IDataflowBlock.Completion { get { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); } }
Task IDataflowBlock.Completion => throw new NotSupportedException(SR.NotSupported_MemberNotNeeded);

/// <summary>The data to display in the debugger display attribute.</summary>
private object DebuggerDisplayContent => $"{Common.GetNameForDebugger(this)} IsCompleted = {base.Task.IsCompleted}";

/// <summary>Gets the data to display in the debugger display attribute for this instance.</summary>
object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } }
object IDebuggerDisplay.Content => DebuggerDisplayContent;
}
#endregion

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
<PropertyGroup>
<TargetFrameworks>$(NetCoreAppCurrent);$(NetCoreAppPrevious);$(NetCoreAppMinimum);netstandard2.1;netstandard2.0;$(NetFrameworkMinimum)</TargetFrameworks>
<IsPackable>true</IsPackable>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<ServicingVersion>1</ServicingVersion>
<PackageDescription>TPL Dataflow promotes actor/agent-oriented designs through primitives for in-process message passing, dataflow, and pipelining. TDF builds upon the APIs and scheduling infrastructure provided by the Task Parallel Library (TPL), and integrates with the language support for asynchrony provided by C#, Visual Basic, and F#.

Commonly Used Types:
Expand Down