Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleaned up IAsyncEnumerable Source to use local functions #6045

Conversation

Aaronontheweb
Copy link
Member

@Aaronontheweb Aaronontheweb commented Jul 18, 2022

Changes

Better to use await around ValueTask rather than converting to Task and doing ContinueWith

Cleanup from #6044

Checklist

For significant changes, please ensure that the following have been completed (delete if not relevant):

Better to use `await` around `ValueTask` rather than converting to `Task` and doing `ContinueWith`
Copy link
Contributor

@Arkatufus Arkatufus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the code can be cleaned up further.

src/core/Akka.Streams/Implementation/Fusing/Ops.cs Outdated Show resolved Hide resolved
Copy link
Contributor

@Arkatufus Arkatufus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aaronontheweb Need you to review the changes I've made

Copy link
Member Author

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Aaronontheweb Aaronontheweb enabled auto-merge (squash) July 18, 2022 21:25
{
_enumerator = enumerator;
_completionCts = new CancellationTokenSource();
_enumerator = enumerable.GetAsyncEnumerator(_completionCts.Token);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting question, should this be in prestart vs the constructor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing that would throw, I guess, would be if there was something wrong with the IAsyncEnumerable (i.e. null)?

In that case I guess it'd be better to have this run in PreStart

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, I'll make the appropriate changes

@Aaronontheweb Aaronontheweb disabled auto-merge July 18, 2022 21:26
vtask.GetAwaiter().OnCompleted(_onComplete);
}
_completionCts.Cancel();
_completionCts.Dispose();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have guarantees that cancelling will dispose the enumerator if needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you're totally correct, I forgot that we need to dispose the enumerator... I guess the previous code is still needed here.

Copy link
Contributor

@Arkatufus Arkatufus Jul 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a big question, should we defer the base.OnDownstreamFinish() call and do this asynchronously or just block and wait until _enumerator.DisposeAsync is complete?

Copy link
Contributor

@Arkatufus Arkatufus Jul 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

            public override void OnDownstreamFinish(Exception cause)
            {
                _completionCts.Cancel();
                _completionCts.Dispose();
                _enumerator.DisposeAsync().AsTask().ContinueWith(t => _baseOnDownstreamFinishCallback(cause));
            }

            private void BaseOnDownstreamFinishCallback(Exception cause)
            {
                CompleteStage();
                base.OnDownstreamFinish(cause);
            }

or

            public override void OnDownstreamFinish(Exception cause)
            {
                _completionCts.Cancel();
                _completionCts.Dispose();
                CompleteStage();

                _enumerator.DisposeAsync().GetAwaiter().GetResult();
                base.OnDownstreamFinish(cause);
            }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add the try...catch later, just wanted to get the point across first

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only problem that I can see here is a custom IAsyncEnumerator that deadlocked/froze when disposed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should go with the latter - I doubt there will be IAsyncEnumerator.DisposeAsync implementations that can block for prolonged periods of time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member Author

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Aaronontheweb Aaronontheweb merged commit 0c92aac into akkadotnet:dev Jul 19, 2022
@Aaronontheweb Aaronontheweb deleted the IAsyncEnumerable-ContinueWith-refactor branch July 19, 2022 11:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants