Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams ensure cts cancel fix statecheck #6935

Merged

Conversation

to11mtm
Copy link
Member

@to11mtm to11mtm commented Sep 29, 2023

Fixes #6903

Changes

Changed cleanup pattern to fix AsyncEnumerable stage, also fixed race condition in ValueTask state check.

Checklist

For significant changes, please ensure that the following have been completed (delete if not relevant):

_enumerable = enumerable;
_outlet = shape.Outlet;
_onSuccess = GetAsyncCallback<T>(OnSuccess);
_onFailure = GetAsyncCallback<Exception>(OnFailure);
_onComplete = GetAsyncCallback(OnComplete);

_completionCts = new CancellationTokenSource();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Controversial possibly, but I felt it safer to move this to ctor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call IMHO

@@ -3859,26 +3880,12 @@ public override void OnPull()
// if result is false, it means enumerator was closed. Complete stage in that case.
CompleteStage();
}
}
else if (vtask.IsCompleted) // IsCompleted covers Faulted, Cancelled, and RanToCompletion async state
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is racy, so removed. Exception case is not as big a deal anyway.

#pragma warning disable CS4014
ProcessTask();
#pragma warning restore CS4014
_ = ProcessTask();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discard = no pragma disable needed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handy!

}
}

public override void OnDownstreamFinish(Exception cause)
{
_completionCts.Cancel();
_completionCts.Dispose();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to PostStop(). AFAIK we will call it after CompleteStage() in interpreter, plz correct if I'm wrong.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that's the case.

Comment on lines +3849 to +3850
_completionCts.Cancel();
_completionCts.Dispose();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should ALWAYS be disposing the CTS.

What is more interesting to ask, is whether it's worth us having some sort of flag to know whether we should bother canceling or not. I went this route because in theory, under happy-completion path nobody cares about this CTS anyway.

That said there -may- be value in some cases of not calling cancel, but can't think of any.

}
try
{
_enumerator.DisposeAsync().ConfigureAwait(false);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just feels like the right thing to do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is the actual fix, the enumerator needs to be disposed for the resources to be released.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, me too

Comment on lines +272 to +286
[Fact]
public async Task AsyncEnumerableSource_Disposes_OnCancel()
{
var resource = new Resource();
var tcs = new System.Threading.Tasks.TaskCompletionSource<NotUsed>(TaskCreationOptions
.RunContinuationsAsynchronously);
var src = Source.From(() =>
CancelTestGenerator(tcs, resource, default));
src.To(Sink.Ignore<int>()).Run(Materializer);
await tcs.Task;
Materializer.Shutdown();
await Task.Delay(500);
Assert.False(resource.IsActive);
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adapted from #6903 report.
Calling Materializer.Shutdown() is, as far as I know, going to happen when the actorsystem shuts down.

OTOH this test was added in a red->green refactoring, so I know without the changes in AsyncEnumerable it fails.

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, but had a suggestion around handling of DisposeAsync

_enumerable = enumerable;
_outlet = shape.Outlet;
_onSuccess = GetAsyncCallback<T>(OnSuccess);
_onFailure = GetAsyncCallback<Exception>(OnFailure);
_onComplete = GetAsyncCallback(OnComplete);

_completionCts = new CancellationTokenSource();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call IMHO

src/core/Akka.Streams/Implementation/Fusing/Ops.cs Outdated Show resolved Hide resolved
#pragma warning disable CS4014
ProcessTask();
#pragma warning restore CS4014
_ = ProcessTask();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handy!

}
}

public override void OnDownstreamFinish(Exception cause)
{
_completionCts.Cancel();
_completionCts.Dispose();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that's the case.

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Aaronontheweb Aaronontheweb merged commit 9bf14af into akkadotnet:dev Oct 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Source from IAsyncEnumerable is not disposed on shutdown, while interrupted because of back-pressure
3 participants