Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[27-74] FlowSelectAsyncUnorderedSpec #6574

Merged
merged 6 commits into from
Apr 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 79 additions & 80 deletions src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncUnorderedSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@ public FlowSelectAsyncUnorderedSpec(ITestOutputHelper helper) : base(helper)
}

[WindowsFact(Skip ="Racy in Linux")]
public void A_Flow_with_SelectAsyncUnordered_must_produce_task_elements_in_the_order_they_are_ready()
public async Task A_Flow_with_SelectAsyncUnordered_must_produce_task_elements_in_the_order_they_are_ready()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var c = this.CreateManualSubscriberProbe<int>();
var latch = Enumerable.Range(0, 4).Select(_ => new TestLatch(1)).ToArray();

Expand All @@ -53,28 +52,28 @@ public void A_Flow_with_SelectAsyncUnordered_must_produce_task_elements_in_the_o
latch[n].Ready(TimeSpan.FromSeconds(5));
return n;
})).To(Sink.FromSubscriber(c)).Run(Materializer);
var sub = c.ExpectSubscription();
var sub = await c.ExpectSubscriptionAsync();
sub.Request(5);

latch[1].CountDown();
c.ExpectNext(1);
await c.ExpectNextAsync(1);

latch[3].CountDown();
c.ExpectNext(3);
await c.ExpectNextAsync(3);

latch[2].CountDown();
c.ExpectNext(2);
await c.ExpectNextAsync(2);

latch[0].CountDown();
c.ExpectNext(0);
await c.ExpectNextAsync(0);

c.ExpectComplete();
await c.ExpectCompleteAsync();
}, Materializer);

}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
public void A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_requested_elements()
public async Task A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_requested_elements()
{
var probe = CreateTestProbe();
var c = this.CreateManualSubscriberProbe<int>();
Expand All @@ -94,29 +93,32 @@ public void A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_requ
});
})
.To(Sink.FromSubscriber(c)).Run(Materializer);
var sub = c.ExpectSubscription();
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
probe.ExpectNoMsg(TimeSpan.Zero);
var sub = await c.ExpectSubscriptionAsync();
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
await probe.ExpectNoMsgAsync(TimeSpan.Zero);
sub.Request(1);
var got = new List<int> {c.ExpectNext()};
probe.ExpectMsgAllOf(new []{ 1, 2, 3, 4, 5 });
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500));
sub.Request(25);
probe.ExpectMsgAllOf(Enumerable.Range(6, 15).ToArray());
c.Within(TimeSpan.FromSeconds(3), () =>
await c.WithinAsync(TimeSpan.FromSeconds(3), async () =>
{
Enumerable.Range(2, 19).ForEach(_ => got.Add(c.ExpectNext()));
foreach(var i in Enumerable.Range(2, 19))
{
got.Add(await c.ExpectNextAsync());
}
//Enumerable.Range(2, 19).ForEach(_ => got.Add(c.ExpectNext()));
return NotUsed.Instance;
});
got.Should().BeEquivalentTo(Enumerable.Range(1, 20));
c.ExpectComplete();
await c.ExpectCompleteAsync();
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
public void A_Flow_with_SelectAsyncUnordered_must_signal_task_failure()
public async Task A_Flow_with_SelectAsyncUnordered_must_signal_task_failure()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var latch = new TestLatch(1);
var c = this.CreateManualSubscriberProbe<int>();
Source.From(Enumerable.Range(1, 5))
Expand All @@ -129,7 +131,7 @@ public void A_Flow_with_SelectAsyncUnordered_must_signal_task_failure()
return n;
}))
.To(Sink.FromSubscriber(c)).Run(Materializer);
var sub = c.ExpectSubscription();
var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
c.ExpectError().InnerException.Message.Should().Be("err1");
latch.CountDown();
Expand All @@ -138,10 +140,9 @@ public void A_Flow_with_SelectAsyncUnordered_must_signal_task_failure()


[Fact]
public void A_Flow_with_SelectAsyncUnordered_must_signal_task_failure_asap()
public async Task A_Flow_with_SelectAsyncUnordered_must_signal_task_failure_asap()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var latch = CreateTestLatch();
var done = Source.From(Enumerable.Range(1, 5))
.Select(n =>
Expand All @@ -166,14 +167,14 @@ public void A_Flow_with_SelectAsyncUnordered_must_signal_task_failure_asap()

done.Invoking(d => d.Wait(RemainingOrDefault)).Should().Throw<Exception>().WithMessage("err1");
latch.CountDown();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_Flow_with_SelectAsyncUnordered_must_signal_error_from_SelectAsyncUnordered()
public async Task A_Flow_with_SelectAsyncUnordered_must_signal_error_from_SelectAsyncUnordered()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var latch = new TestLatch(1);
var c = this.CreateManualSubscriberProbe<int>();
Source.From(Enumerable.Range(1, 5))
Expand All @@ -189,40 +190,39 @@ public void A_Flow_with_SelectAsyncUnordered_must_signal_error_from_SelectAsyncU
});
})
.RunWith(Sink.FromSubscriber(c), Materializer);
var sub = c.ExpectSubscription();
var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
c.ExpectError().Message.Should().Be("err2");
latch.CountDown();
}, Materializer);
}

[Fact]
public void A_Flow_with_SelectAsyncUnordered_must_resume_after_task_failure()
public async Task A_Flow_with_SelectAsyncUnordered_must_resume_after_task_failure()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
this.AssertAllStagesStopped(() =>
{
Source.From(Enumerable.Range(1, 5))
.SelectAsyncUnordered(4, n => Task.Run(() =>
{
if (n == 3)
throw new TestException("err3");
return n;
}))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(10)
.ExpectNextUnordered(1, 2, 4, 5)
.ExpectComplete();
await this.AssertAllStagesStoppedAsync(async() => {
await Source.From(Enumerable.Range(1, 5))
.SelectAsyncUnordered(4, n => Task.Run(() =>
{
if (n == 3)
throw new TestException("err3");
return n;
}))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(10)
.ExpectNextUnordered(1, 2, 4, 5)
.ExpectCompleteAsync();
}, Materializer);
}, Materializer);
}

[Fact]
public void A_Flow_with_SelectAsyncUnordered_must_resume_after_multiple_failures()
public async Task A_Flow_with_SelectAsyncUnordered_must_resume_after_multiple_failures()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var futures = new[]
{
Expand All @@ -245,30 +245,29 @@ public void A_Flow_with_SelectAsyncUnordered_must_resume_after_multiple_failures
}

[Fact]
public void A_Flow_with_SelectAsyncUnordered_must_finish_after_task_failure()
public async Task A_Flow_with_SelectAsyncUnordered_must_finish_after_task_failure()
{
this.AssertAllStagesStopped(() =>
{
var t = Source.From(Enumerable.Range(1, 3))
.SelectAsyncUnordered(1, n => Task.Run(() =>
{
if (n == 3)
throw new TestException("err3b");
return n;
}))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.Grouped(10)
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);

await this.AssertAllStagesStoppedAsync(() => {
var t = Source.From(Enumerable.Range(1, 3))
.SelectAsyncUnordered(1, n => Task.Run(() =>
{
if (n == 3)
throw new TestException("err3b");
return n;
}))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.Grouped(10)
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);
t.Wait(TimeSpan.FromSeconds(1)).Should().BeTrue();
t.Result.Should().BeEquivalentTo(new[] {1, 2});
t.Result.Should().BeEquivalentTo(new[] { 1, 2 });
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_Flow_with_SelectAsyncUnordered_must_resume_when_SelectAsyncUnordered_throws()
public async Task A_Flow_with_SelectAsyncUnordered_must_resume_when_SelectAsyncUnordered_throws()
{
Source.From(Enumerable.Range(1, 5))
await Source.From(Enumerable.Range(1, 5))
.SelectAsyncUnordered(4, n =>
{
if (n == 3)
Expand All @@ -279,63 +278,61 @@ public void A_Flow_with_SelectAsyncUnordered_must_resume_when_SelectAsyncUnorder
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(10)
.ExpectNextUnordered(1, 2, 4, 5)
.ExpectComplete();
.ExpectCompleteAsync();
}

[Fact]
public void A_Flow_with_SelectAsyncUnordered_must_signal_NPE_when_task_is_completed_with_null()
public async Task A_Flow_with_SelectAsyncUnordered_must_signal_NPE_when_task_is_completed_with_null()
{
var c = this.CreateManualSubscriberProbe<string>();

Source.From(new[] {"a", "b"})
.SelectAsyncUnordered(4, _ => Task.FromResult(null as string))
.To(Sink.FromSubscriber(c)).Run(Materializer);

var sub = c.ExpectSubscription();
var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
c.ExpectError().Message.Should().StartWith(ReactiveStreamsCompliance.ElementMustNotBeNullMsg);
}

[Fact]
public void A_Flow_with_SelectAsyncUnordered_must_resume_when_task_is_completed_with_null()
public async Task A_Flow_with_SelectAsyncUnordered_must_resume_when_task_is_completed_with_null()
{
var c = this.CreateManualSubscriberProbe<string>();
Source.From(new[] { "a", "b", "c" })
.SelectAsyncUnordered(4, s => s.Equals("b") ? Task.FromResult(null as string) : Task.FromResult(s))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.To(Sink.FromSubscriber(c)).Run(Materializer);
var sub = c.ExpectSubscription();
var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
c.ExpectNextUnordered("a", "c");
c.ExpectComplete();
await c.ExpectCompleteAsync();
}

[Fact]
public void A_Flow_with_SelectAsyncUnordered_must_handle_cancel_properly()
public async Task A_Flow_with_SelectAsyncUnordered_must_handle_cancel_properly()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var pub = this.CreateManualPublisherProbe<int>();
var sub = this.CreateManualSubscriberProbe<int>();

Source.FromPublisher(pub)
.SelectAsyncUnordered(4, _ => Task.FromResult(0))
.RunWith(Sink.FromSubscriber(sub), Materializer);

var upstream = pub.ExpectSubscription();
upstream.ExpectRequest();
var upstream = await pub.ExpectSubscriptionAsync();
await upstream.ExpectRequestAsync();

sub.ExpectSubscription().Cancel();
(await sub.ExpectSubscriptionAsync()).Cancel();

upstream.ExpectCancellation();
await upstream.ExpectCancellationAsync();
}, Materializer);
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
public void A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_configured()
public async Task A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_configured()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
const int parallelism = 8;
var counter = new AtomicCounter();
var queue = new BlockingQueue<(TaskCompletionSource<int>, long)>();
Expand Down Expand Up @@ -365,7 +362,7 @@ public void A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_conf
}
}
}, cancellation.Token);

Func<Task<int>> deferred = () =>
{
var promise = new TaskCompletionSource<int>();
Expand All @@ -390,6 +387,8 @@ public void A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_conf
{
cancellation.Cancel(false);
}

return Task.CompletedTask;
}, Materializer);
}
}
Expand Down