Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Streams SelectAsync. Log errors and improve test #6884

Merged
merged 4 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 126 additions & 64 deletions src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
using Xunit;
using Xunit.Abstractions;
using FluentAssertions.Extensions;
using static FluentAssertions.FluentActions;
using Directive = Akka.Streams.Supervision.Directive;

// ReSharper disable InvokeAsExtensionMethod
#pragma warning disable 162
Expand Down Expand Up @@ -87,63 +89,96 @@ public async void A_Flow_with_SelectAsync_must_produce_task_elements_in_order()
await c.ExpectCompleteAsync();
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
// Turning this on in CI/CD for now
[Fact]
public async Task A_Flow_with_SelectAsync_must_not_run_more_futures_than_requested_parallelism()
{
var probe = CreateTestProbe();
var c = this.CreateManualSubscriberProbe<int>();
Source.From(Enumerable.Range(1, 20))
.SelectAsync(8, n => Task.Run(() =>
{
probe.Ref.Tell(n);
return n;
}))
.RunWith(Sink.FromSubscriber(c), Materializer);
var sub = await c.ExpectSubscriptionAsync();
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500));
sub.Request(1);
probe.ReceiveN(9).Should().BeEquivalentTo(Enumerable.Range(1, 9));
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500));
sub.Request(2);
probe.ReceiveN(2).Should().BeEquivalentTo(Enumerable.Range(10, 2));
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500));
sub.Request(10);
probe.ReceiveN(9).Should().BeEquivalentTo(Enumerable.Range(12, 9));
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));

foreach (var n in Enumerable.Range(1, 13))
await c.ExpectNextAsync(n);
//Enumerable.Range(1, 13).ForEach(n => c.ExpectNext(n));
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
await this.AssertAllStagesStoppedAsync(async () =>
{
var probe = CreateTestProbe();
var c = this.CreateManualSubscriberProbe<int>();

Source.From(Enumerable.Range(1, 20))
.SelectAsync(8, async n =>
{
await Task.Yield();
probe.Ref.Tell(n);
return n;
})
.RunWith(Sink.FromSubscriber(c), Materializer);
var sub = await c.ExpectSubscriptionAsync();
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
sub.Request(1);
(await probe.ReceiveNAsync(9).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(1, 9));
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
sub.Request(2);
(await probe.ReceiveNAsync(2).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(10, 2));
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
sub.Request(10);
(await probe.ReceiveNAsync(9).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(12, 9));
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));

foreach (var n in Enumerable.Range(1, 13))
await c.ExpectNextAsync(n);

await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
}, Materializer).ShouldCompleteWithin(RemainingOrDefault);
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
public async Task A_Flow_with_SelectAsync_must_signal_task_failure()
// Turning this on in CI/CD for now
[Fact]
public async Task A_Flow_with_parallel_execution_SelectAsync_must_signal_task_failure()
{
await this.AssertAllStagesStoppedAsync(async() => {
var latch = new TestLatch(1);
var c = this.CreateManualSubscriberProbe<int>();

Source.From(Enumerable.Range(1, 5))
.SelectAsync(4, n => Task.Run(() =>
.SelectAsync(4, async n =>
{
if (n == 3)
if (n == 4)
throw new TestException("err1");
await Task.Delay(10.Seconds());

latch.Ready(TimeSpan.FromSeconds(10));
return n;
}))
})
.To(Sink.FromSubscriber(c)).Run(Materializer);

var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
c.ExpectError().InnerException.Message.Should().Be("err1");
latch.CountDown();
}, Materializer);

var exception = await c.ExpectErrorAsync();
exception.InnerException!.Message.Should().Be("err1");
}, Materializer).ShouldCompleteWithin(RemainingOrDefault);
}

[Fact]
public async Task A_Flow_with_SelectAsync_must_signal_task_failure()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - this is exactly the behavior we expect.

{
await this.AssertAllStagesStoppedAsync(async() => {
var probe = Source.From(Enumerable.Range(1, 5))
.SelectAsync(1, async n =>
{
await Task.Delay(10);
if (n == 3)
throw new TestException("err1");

return n;
})
.RunWith(this.SinkProbe<int>(), Materializer);

var exception = await probe.AsyncBuilder()
.Request(10)
.ExpectNextN(new[]{1, 2})
.ExpectErrorAsync()
.ShouldCompleteWithin(RemainingOrDefault);
exception.InnerException!.Message.Should().Be("err1");
}, Materializer);
}

[Fact]
public async Task A_Flow_with_SelectAsync_must_signal_task_failure_asap()
{
await this.AssertAllStagesStoppedAsync(() => {
await this.AssertAllStagesStoppedAsync(async () => {
var latch = CreateTestLatch();
var done = Source.From(Enumerable.Range(1, 5))
.Select(n =>
Expand All @@ -165,60 +200,87 @@ await this.AssertAllStagesStoppedAsync(() => {
return Task.FromResult(n);
}).RunWith(Sink.Ignore<int>(), Materializer);

done.Invoking(d => d.Wait(RemainingOrDefault)).Should().Throw<Exception>().WithMessage("err1");
await Awaiting(async () => await done).Should()
.ThrowAsync<Exception>()
.WithMessage("err1")
.ShouldCompleteWithin(RemainingOrDefault);

latch.CountDown();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public async Task A_Flow_with_SelectAsync_must_signal_error_from_SelectAsync()
{
await this.AssertAllStagesStoppedAsync(async() => {
var latch = new TestLatch(1);
await this.AssertAllStagesStoppedAsync(async () => {
var c = this.CreateManualSubscriberProbe<int>();
Source.From(Enumerable.Range(1, 5))
.SelectAsync(4, n =>
{
if (n == 3)
throw new TestException("err2");

return Task.Run(() =>
return Task.Run(async () =>
{
latch.Ready(TimeSpan.FromSeconds(10));
await Task.Delay(10.Seconds());
return n;
});
})
.RunWith(Sink.FromSubscriber(c), Materializer);
var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
c.ExpectError().Message.Should().Be("err2");
latch.CountDown();
}, Materializer);
}

[Fact]
public async Task A_Flow_with_SelectAsync_must_resume_after_task_failure()
public async Task A_Flow_with_SelectAsync_must_invoke_supervision_strategy_on_task_failure()
{
await this.AssertAllStagesStoppedAsync(async() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
await this.AssertAllStagesStoppedAsync(async () => {
var c = this.CreateManualSubscriberProbe<int>();
Source.From(Enumerable.Range(1, 5))
.SelectAsync(4, n => Task.Run(() =>
{
if (n == 3)
throw new TestException("err3");
return n;
}))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.RunWith(Sink.FromSubscriber(c), Materializer);
var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
foreach (var i in new[] { 1, 2, 4, 5 })
await c.ExpectNextAsync(i);
await c.ExpectCompleteAsync();
}, Materializer);
var invoked = false;
var probe = Source.From(Enumerable.Range(1, 5))
.SelectAsync(1, n => Task.Run(() =>
{
if (n == 3)
throw new TestException("err3");
return n;
}))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(_ =>
{
invoked = true;
return Directive.Stop;
}))
.RunWith(this.SinkProbe<int>(), Materializer);

await probe.AsyncBuilder()
.Request(10)
.ExpectNextN(new[] { 1, 2 })
.ExpectErrorAsync();

invoked.Should().BeTrue();
}, Materializer);
}

[Fact]
public async Task A_Flow_with_SelectAsync_must_resume_after_task_failure()
{
await this.AssertAllStagesStoppedAsync(async () => {
var c = this.CreateManualSubscriberProbe<int>();
Source.From(Enumerable.Range(1, 5))
.SelectAsync(4, n => Task.Run(() =>
{
if (n == 3)
throw new TestException("err3");
return n;
}))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.RunWith(Sink.FromSubscriber(c), Materializer);
var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
foreach (var i in new[] { 1, 2, 4, 5 })
await c.ExpectNextAsync(i);
await c.ExpectCompleteAsync();
}, Materializer);
}

Expand All @@ -227,7 +289,7 @@ public async Task A_Flow_with_SelectAsync_must_resume_after_multiple_failures()
{
await this.AssertAllStagesStoppedAsync(() => {
var futures = new[]
{
{
Task.Run(() => { throw new TestException("failure1"); return "";}),
Task.Run(() => { throw new TestException("failure2"); return "";}),
Task.Run(() => { throw new TestException("failure3"); return "";}),
Expand Down
49 changes: 42 additions & 7 deletions src/core/Akka.Streams/Implementation/Fusing/Ops.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2571,9 +2571,10 @@ public Logic(Attributes inheritedAttributes, SelectAsync<TIn, TOut> stage) : bas

public override void OnPush()
{
var message = Grab(_stage.In);
try
{
var task = _stage._mapFunc(Grab(_stage.In));
var task = _stage._mapFunc(message);
var holder = new Holder<TOut>(NotYetThere, _taskCallback);
_buffer.Enqueue(holder);

Expand All @@ -2590,8 +2591,21 @@ public override void OnPush()
}
catch (Exception e)
{
if (_decider(e) == Directive.Stop)
FailStage(e);
var strategy = _decider(e);
Log.Error(e, "An exception occured inside SelectAsync while processing message [{0}]. Supervision strategy: {1}", message, strategy);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - should resolve #6878

switch (strategy)
{
case Directive.Stop:
FailStage(e);
break;

case Directive.Resume:
case Directive.Restart:
break;

default:
throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", e);
}
}
if (Todo < _stage._parallelism && !HasBeenPulled(_stage.In))
TryPull(_stage.In);
Expand Down Expand Up @@ -2644,10 +2658,31 @@ private void PushOne()
private void HolderCompleted(Holder<TOut> holder)
{
var element = holder.Element;
if (!element.IsSuccess && _decider(element.Exception) == Directive.Stop)
FailStage(element.Exception);
else if (IsAvailable(_stage.Out))
PushOne();
if (element.IsSuccess)
{
if (IsAvailable(_stage.Out))
PushOne();
return;
}

var exception = element.Exception;
var strategy = _decider(exception);
Log.Error(exception, "An exception occured inside SelectAsync while executing Task. Supervision strategy: {0}", strategy);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

switch (strategy)
{
case Directive.Stop:
FailStage(exception);
break;

case Directive.Resume:
case Directive.Restart:
if (IsAvailable(_stage.Out))
PushOne();
break;

default:
throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", exception);
}
}

public override string ToString() => $"SelectAsync.Logic(buffer={_buffer})";
Expand Down