Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleaned up IAsyncEnumerable Source to use local functions #6045

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 45 additions & 13 deletions src/core/Akka.Streams/Implementation/Fusing/Ops.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3775,7 +3775,6 @@ private sealed class Logic : OutGraphStageLogic
private readonly Action<T> _onSuccess;
private readonly Action<Exception> _onFailure;
private readonly Action _onComplete;
private readonly Action<Task<bool>> _handleContinuation;

public Logic(SourceShape<T> shape, IAsyncEnumerator<T> enumerator) : base(shape)
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
{
Expand All @@ -3784,15 +3783,6 @@ public Logic(SourceShape<T> shape, IAsyncEnumerator<T> enumerator) : base(shape)
_onSuccess = GetAsyncCallback<T>(OnSuccess);
_onFailure = GetAsyncCallback<Exception>(OnFailure);
_onComplete = GetAsyncCallback(OnComplete);
_handleContinuation = task =>
{
// Since this Action is used as task continuation, we cannot safely call corresponding
// OnSuccess/OnFailure/OnComplete methods directly. We need to do that via async callbacks.
if (task.IsFaulted) _onFailure(task.Exception);
else if (task.IsCanceled) _onFailure(new TaskCanceledException(task));
else if (task.Result) _onSuccess(enumerator.Current);
else _onComplete();
};

SetHandler(_outlet, this);
}
Expand All @@ -3810,7 +3800,7 @@ public override void OnPull()
var vtask = _enumerator.MoveNextAsync();
if (vtask.IsCompletedSuccessfully)
{
// When MoveNextAsync returned immediatelly, we don't need to await.
// When MoveNextAsync returned immediately, we don't need to await.
// We can use fast path instead.
if (vtask.Result)
{
Expand All @@ -3825,9 +3815,31 @@ public override void OnPull()
}
else
{
vtask.AsTask().ContinueWith(_handleContinuation);
async Task ProcessTask()
{

// Since this Action is used as task continuation, we cannot safely call corresponding
// OnSuccess/OnFailure/OnComplete methods directly. We need to do that via async callbacks.
try
{
var completed = await vtask;
if (completed)
_onSuccess(_enumerator.Current);
else
_onComplete();
}
catch (Exception ex)
{
_onFailure(ex);
}
}

#pragma warning disable CS4014
ProcessTask();
#pragma warning restore CS4014
}
}

public override void OnDownstreamFinish(Exception cause)
{
var vtask = _enumerator.DisposeAsync();
Expand All @@ -3838,7 +3850,27 @@ public override void OnDownstreamFinish(Exception cause)
else
{
// for async disposals use async callback
vtask.GetAwaiter().OnCompleted(_onComplete);
async Task ProcessDisposal()
{
try
{
await vtask;
}
catch (Exception ex)
{
// might as well log the failure while shutting down
Log.Error(ex, "Encountered error while disposing {0}", _enumerator.GetType());
}
finally
{
_onComplete();
}
}

#pragma warning disable CS4014
ProcessDisposal();
#pragma warning restore CS4014

}
base.OnDownstreamFinish(cause);
}
Expand Down