Skip to content

Commit

Permalink
[27-74] FlowSelectAsyncUnorderedSpec (#6574)
Browse files Browse the repository at this point in the history
* [27-74] `FlowSelectAsyncUnorderedSpec`

* Changes to `async` TestKit
  • Loading branch information
eaba authored Apr 11, 2023
1 parent 2dd7d26 commit 3c02d9e
Showing 1 changed file with 79 additions and 80 deletions.
159 changes: 79 additions & 80 deletions src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncUnorderedSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@ public FlowSelectAsyncUnorderedSpec(ITestOutputHelper helper) : base(helper)
}

[WindowsFact(Skip ="Racy in Linux")]
public void A_Flow_with_SelectAsyncUnordered_must_produce_task_elements_in_the_order_they_are_ready()
public async Task A_Flow_with_SelectAsyncUnordered_must_produce_task_elements_in_the_order_they_are_ready()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var c = this.CreateManualSubscriberProbe<int>();
var latch = Enumerable.Range(0, 4).Select(_ => new TestLatch(1)).ToArray();

Expand All @@ -53,28 +52,28 @@ public void A_Flow_with_SelectAsyncUnordered_must_produce_task_elements_in_the_o
latch[n].Ready(TimeSpan.FromSeconds(5));
return n;
})).To(Sink.FromSubscriber(c)).Run(Materializer);
var sub = c.ExpectSubscription();
var sub = await c.ExpectSubscriptionAsync();
sub.Request(5);

latch[1].CountDown();
c.ExpectNext(1);
await c.ExpectNextAsync(1);

latch[3].CountDown();
c.ExpectNext(3);
await c.ExpectNextAsync(3);

latch[2].CountDown();
c.ExpectNext(2);
await c.ExpectNextAsync(2);

latch[0].CountDown();
c.ExpectNext(0);
await c.ExpectNextAsync(0);

c.ExpectComplete();
await c.ExpectCompleteAsync();
}, Materializer);

}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
public void A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_requested_elements()
public async Task A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_requested_elements()
{
var probe = CreateTestProbe();
var c = this.CreateManualSubscriberProbe<int>();
Expand All @@ -94,29 +93,32 @@ public void A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_requ
});
})
.To(Sink.FromSubscriber(c)).Run(Materializer);
var sub = c.ExpectSubscription();
c.ExpectNoMsg(TimeSpan.FromMilliseconds(200));
probe.ExpectNoMsg(TimeSpan.Zero);
var sub = await c.ExpectSubscriptionAsync();
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
await probe.ExpectNoMsgAsync(TimeSpan.Zero);
sub.Request(1);
var got = new List<int> {c.ExpectNext()};
probe.ExpectMsgAllOf(new []{ 1, 2, 3, 4, 5 });
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500));
sub.Request(25);
probe.ExpectMsgAllOf(Enumerable.Range(6, 15).ToArray());
c.Within(TimeSpan.FromSeconds(3), () =>
await c.WithinAsync(TimeSpan.FromSeconds(3), async () =>
{
Enumerable.Range(2, 19).ForEach(_ => got.Add(c.ExpectNext()));
foreach(var i in Enumerable.Range(2, 19))
{
got.Add(await c.ExpectNextAsync());
}
//Enumerable.Range(2, 19).ForEach(_ => got.Add(c.ExpectNext()));
return NotUsed.Instance;
});
got.Should().BeEquivalentTo(Enumerable.Range(1, 20));
c.ExpectComplete();
await c.ExpectCompleteAsync();
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
public void A_Flow_with_SelectAsyncUnordered_must_signal_task_failure()
public async Task A_Flow_with_SelectAsyncUnordered_must_signal_task_failure()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var latch = new TestLatch(1);
var c = this.CreateManualSubscriberProbe<int>();
Source.From(Enumerable.Range(1, 5))
Expand All @@ -129,7 +131,7 @@ public void A_Flow_with_SelectAsyncUnordered_must_signal_task_failure()
return n;
}))
.To(Sink.FromSubscriber(c)).Run(Materializer);
var sub = c.ExpectSubscription();
var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
c.ExpectError().InnerException.Message.Should().Be("err1");
latch.CountDown();
Expand All @@ -138,10 +140,9 @@ public void A_Flow_with_SelectAsyncUnordered_must_signal_task_failure()


[Fact]
public void A_Flow_with_SelectAsyncUnordered_must_signal_task_failure_asap()
public async Task A_Flow_with_SelectAsyncUnordered_must_signal_task_failure_asap()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var latch = CreateTestLatch();
var done = Source.From(Enumerable.Range(1, 5))
.Select(n =>
Expand All @@ -166,14 +167,14 @@ public void A_Flow_with_SelectAsyncUnordered_must_signal_task_failure_asap()

done.Invoking(d => d.Wait(RemainingOrDefault)).Should().Throw<Exception>().WithMessage("err1");
latch.CountDown();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_Flow_with_SelectAsyncUnordered_must_signal_error_from_SelectAsyncUnordered()
public async Task A_Flow_with_SelectAsyncUnordered_must_signal_error_from_SelectAsyncUnordered()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var latch = new TestLatch(1);
var c = this.CreateManualSubscriberProbe<int>();
Source.From(Enumerable.Range(1, 5))
Expand All @@ -189,40 +190,39 @@ public void A_Flow_with_SelectAsyncUnordered_must_signal_error_from_SelectAsyncU
});
})
.RunWith(Sink.FromSubscriber(c), Materializer);
var sub = c.ExpectSubscription();
var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
c.ExpectError().Message.Should().Be("err2");
latch.CountDown();
}, Materializer);
}

[Fact]
public void A_Flow_with_SelectAsyncUnordered_must_resume_after_task_failure()
public async Task A_Flow_with_SelectAsyncUnordered_must_resume_after_task_failure()
{
this.AssertAllStagesStopped(() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
this.AssertAllStagesStopped(() =>
{
Source.From(Enumerable.Range(1, 5))
.SelectAsyncUnordered(4, n => Task.Run(() =>
{
if (n == 3)
throw new TestException("err3");
return n;
}))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(10)
.ExpectNextUnordered(1, 2, 4, 5)
.ExpectComplete();
await this.AssertAllStagesStoppedAsync(async() => {
await Source.From(Enumerable.Range(1, 5))
.SelectAsyncUnordered(4, n => Task.Run(() =>
{
if (n == 3)
throw new TestException("err3");
return n;
}))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(10)
.ExpectNextUnordered(1, 2, 4, 5)
.ExpectCompleteAsync();
}, Materializer);
}, Materializer);
}

[Fact]
public void A_Flow_with_SelectAsyncUnordered_must_resume_after_multiple_failures()
public async Task A_Flow_with_SelectAsyncUnordered_must_resume_after_multiple_failures()
{
this.AssertAllStagesStopped(async() =>
await this.AssertAllStagesStoppedAsync(async() =>
{
var futures = new[]
{
Expand All @@ -245,30 +245,29 @@ public void A_Flow_with_SelectAsyncUnordered_must_resume_after_multiple_failures
}

[Fact]
public void A_Flow_with_SelectAsyncUnordered_must_finish_after_task_failure()
public async Task A_Flow_with_SelectAsyncUnordered_must_finish_after_task_failure()
{
this.AssertAllStagesStopped(() =>
{
var t = Source.From(Enumerable.Range(1, 3))
.SelectAsyncUnordered(1, n => Task.Run(() =>
{
if (n == 3)
throw new TestException("err3b");
return n;
}))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.Grouped(10)
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);

await this.AssertAllStagesStoppedAsync(() => {
var t = Source.From(Enumerable.Range(1, 3))
.SelectAsyncUnordered(1, n => Task.Run(() =>
{
if (n == 3)
throw new TestException("err3b");
return n;
}))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.Grouped(10)
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);
t.Wait(TimeSpan.FromSeconds(1)).Should().BeTrue();
t.Result.Should().BeEquivalentTo(new[] {1, 2});
t.Result.Should().BeEquivalentTo(new[] { 1, 2 });
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_Flow_with_SelectAsyncUnordered_must_resume_when_SelectAsyncUnordered_throws()
public async Task A_Flow_with_SelectAsyncUnordered_must_resume_when_SelectAsyncUnordered_throws()
{
Source.From(Enumerable.Range(1, 5))
await Source.From(Enumerable.Range(1, 5))
.SelectAsyncUnordered(4, n =>
{
if (n == 3)
Expand All @@ -279,63 +278,61 @@ public void A_Flow_with_SelectAsyncUnordered_must_resume_when_SelectAsyncUnorder
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(10)
.ExpectNextUnordered(1, 2, 4, 5)
.ExpectComplete();
.ExpectCompleteAsync();
}

[Fact]
public void A_Flow_with_SelectAsyncUnordered_must_signal_NPE_when_task_is_completed_with_null()
public async Task A_Flow_with_SelectAsyncUnordered_must_signal_NPE_when_task_is_completed_with_null()
{
var c = this.CreateManualSubscriberProbe<string>();

Source.From(new[] {"a", "b"})
.SelectAsyncUnordered(4, _ => Task.FromResult(null as string))
.To(Sink.FromSubscriber(c)).Run(Materializer);

var sub = c.ExpectSubscription();
var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
c.ExpectError().Message.Should().StartWith(ReactiveStreamsCompliance.ElementMustNotBeNullMsg);
}

[Fact]
public void A_Flow_with_SelectAsyncUnordered_must_resume_when_task_is_completed_with_null()
public async Task A_Flow_with_SelectAsyncUnordered_must_resume_when_task_is_completed_with_null()
{
var c = this.CreateManualSubscriberProbe<string>();
Source.From(new[] { "a", "b", "c" })
.SelectAsyncUnordered(4, s => s.Equals("b") ? Task.FromResult(null as string) : Task.FromResult(s))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.To(Sink.FromSubscriber(c)).Run(Materializer);
var sub = c.ExpectSubscription();
var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
c.ExpectNextUnordered("a", "c");
c.ExpectComplete();
await c.ExpectCompleteAsync();
}

[Fact]
public void A_Flow_with_SelectAsyncUnordered_must_handle_cancel_properly()
public async Task A_Flow_with_SelectAsyncUnordered_must_handle_cancel_properly()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var pub = this.CreateManualPublisherProbe<int>();
var sub = this.CreateManualSubscriberProbe<int>();

Source.FromPublisher(pub)
.SelectAsyncUnordered(4, _ => Task.FromResult(0))
.RunWith(Sink.FromSubscriber(sub), Materializer);

var upstream = pub.ExpectSubscription();
upstream.ExpectRequest();
var upstream = await pub.ExpectSubscriptionAsync();
await upstream.ExpectRequestAsync();

sub.ExpectSubscription().Cancel();
(await sub.ExpectSubscriptionAsync()).Cancel();

upstream.ExpectCancellation();
await upstream.ExpectCancellationAsync();
}, Materializer);
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
public void A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_configured()
public async Task A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_configured()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
const int parallelism = 8;
var counter = new AtomicCounter();
var queue = new BlockingQueue<(TaskCompletionSource<int>, long)>();
Expand Down Expand Up @@ -365,7 +362,7 @@ public void A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_conf
}
}
}, cancellation.Token);

Func<Task<int>> deferred = () =>
{
var promise = new TaskCompletionSource<int>();
Expand All @@ -390,6 +387,8 @@ public void A_Flow_with_SelectAsyncUnordered_must_not_run_more_futures_than_conf
{
cancellation.Cancel(false);
}

return Task.CompletedTask;
}, Materializer);
}
}
Expand Down

0 comments on commit 3c02d9e

Please sign in to comment.