From e3f8dfa20736de9bfd290a53263c7a2d74d00b83 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Sat, 25 Mar 2023 16:00:57 +0100 Subject: [PATCH 1/2] [44-74] `GraphMergeSpec` --- .../Akka.Streams.Tests/Dsl/GraphMergeSpec.cs | 51 +++++++++---------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/GraphMergeSpec.cs b/src/core/Akka.Streams.Tests/Dsl/GraphMergeSpec.cs index 8a70adb2c26..0ccdcc97019 100644 --- a/src/core/Akka.Streams.Tests/Dsl/GraphMergeSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/GraphMergeSpec.cs @@ -9,6 +9,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading; +using System.Threading.Tasks; using Akka.Streams.Dsl; using Akka.Streams.TestKit; using Akka.TestKit; @@ -46,10 +47,9 @@ public MergeFixture(GraphDsl.Builder builder) : base(builder) } [Fact] - public void A_Merge_must_work_in_the_happy_case() + public async Task A_Merge_must_work_in_the_happy_case() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { // Different input sizes(4 and 6) var source1 = Source.From(Enumerable.Range(0, 4)); var source2 = Source.From(Enumerable.Range(4, 6)); @@ -63,8 +63,8 @@ public void A_Merge_must_work_in_the_happy_case() var sink = Sink.FromSubscriber(probe); b.From(source1).To(m1.In(0)); - b.From(m1.Out).Via(Flow.Create().Select(x => x*2)).To(m2.In(0)); - b.From(m2.Out).Via(Flow.Create().Select(x => x / 2).Select(x=>x+1)).To(sink); + b.From(m1.Out).Via(Flow.Create().Select(x => x * 2)).To(m2.In(0)); + b.From(m2.Out).Via(Flow.Create().Select(x => x / 2).Select(x => x + 1)).To(sink); b.From(source2).To(m1.In(1)); b.From(source3).To(m2.In(1)); @@ -84,6 +84,7 @@ public void A_Merge_must_work_in_the_happy_case() collected.Should().BeEquivalentTo(Enumerable.Range(1, 10).ToArray()); probe.ExpectComplete(); + return Task.CompletedTask; }, Materializer); } @@ -149,62 +150,59 @@ public void A_Merge_must_work_with_n_way_merge() } [Fact] - public void A_Merge_must_work_with_one_immediately_completed_and_one_nonempty_publisher() + public async Task A_Merge_must_work_with_one_immediately_completed_and_one_nonempty_publisher() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var subscriber1 = Setup(CompletedPublisher(), NonEmptyPublisher(Enumerable.Range(1, 4))); var subscription1 = subscriber1.ExpectSubscription(); subscription1.Request(4); - subscriber1.ExpectNext( 1, 2, 3, 4).ExpectComplete(); + subscriber1.ExpectNext(1, 2, 3, 4).ExpectComplete(); var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), CompletedPublisher()); var subscription2 = subscriber2.ExpectSubscription(); subscription2.Request(4); - subscriber2.ExpectNext( 1, 2, 3, 4).ExpectComplete(); + subscriber2.ExpectNext(1, 2, 3, 4).ExpectComplete(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void A_Merge_must_work_with_one_delayed_completed_and_one_nonempty_publisher() + public async Task A_Merge_must_work_with_one_delayed_completed_and_one_nonempty_publisher() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var subscriber1 = Setup(SoonToCompletePublisher(), NonEmptyPublisher(Enumerable.Range(1, 4))); var subscription1 = subscriber1.ExpectSubscription(); subscription1.Request(4); - subscriber1.ExpectNext( 1, 2, 3, 4).ExpectComplete(); + subscriber1.ExpectNext(1, 2, 3, 4).ExpectComplete(); var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), SoonToCompletePublisher()); var subscription2 = subscriber2.ExpectSubscription(); subscription2.Request(4); - subscriber2.ExpectNext( 1, 2, 3, 4).ExpectComplete(); + subscriber2.ExpectNext(1, 2, 3, 4).ExpectComplete(); + return Task.CompletedTask; }, Materializer); } [Fact(Skip = "This is nondeterministic, multiple scenarios can happen")] - public void A_Merge_must_work_with_one_immediately_failed_and_one_nonempty_publisher() + public async Task A_Merge_must_work_with_one_immediately_failed_and_one_nonempty_publisher() { - this.AssertAllStagesStopped(() => - { - + await this.AssertAllStagesStoppedAsync(() => { + return Task.CompletedTask; }, Materializer); } [Fact(Skip = "This is nondeterministic, multiple scenarios can happen")] - public void A_Merge_must_work_with_one_delayed_failed_and_one_nonempty_publisher() + public async Task A_Merge_must_work_with_one_delayed_failed_and_one_nonempty_publisher() { - this.AssertAllStagesStopped(() => - { - + await this.AssertAllStagesStoppedAsync(() => { + return Task.CompletedTask; }, Materializer); } [Fact] - public void A_Merge_must_pass_along_early_cancellation() + public async Task A_Merge_must_pass_along_early_cancellation() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var up1 = this.CreateManualPublisherProbe(); var up2 = this.CreateManualPublisherProbe(); var down = this.CreateManualSubscriberProbe(); @@ -232,6 +230,7 @@ public void A_Merge_must_pass_along_early_cancellation() upSub1.ExpectCancellation(); var upSub2 = up2.ExpectSubscription(); upSub2.ExpectCancellation(); + return Task.CompletedTask; }, Materializer); } } From 37fd72a8dc24cfc5700314a79c64a96bb86f95f3 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Fri, 31 Mar 2023 17:31:03 +0100 Subject: [PATCH 2/2] Changes to `async` TestKit --- .../Akka.Streams.Tests/Dsl/GraphMergeSpec.cs | 52 +++++++++---------- 1 file changed, 24 insertions(+), 28 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/GraphMergeSpec.cs b/src/core/Akka.Streams.Tests/Dsl/GraphMergeSpec.cs index 0ccdcc97019..48a6fe81baf 100644 --- a/src/core/Akka.Streams.Tests/Dsl/GraphMergeSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/GraphMergeSpec.cs @@ -49,7 +49,7 @@ public MergeFixture(GraphDsl.Builder builder) : base(builder) [Fact] public async Task A_Merge_must_work_in_the_happy_case() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async() => { // Different input sizes(4 and 6) var source1 = Source.From(Enumerable.Range(0, 4)); var source2 = Source.From(Enumerable.Range(4, 6)); @@ -71,20 +71,19 @@ await this.AssertAllStagesStoppedAsync(() => { return ClosedShape.Instance; })).Run(Materializer); - var subscription = probe.ExpectSubscription(); + var subscription = await probe.ExpectSubscriptionAsync(); var collected = new List(); for (var i = 1; i <= 10; i++) { subscription.Request(1); - collected.Add(probe.ExpectNext()); + collected.Add(await probe.ExpectNextAsync()); } collected.Where(i => i <= 4).ShouldOnlyContainInOrder(1, 2, 3, 4); collected.Where(i => i >= 5).ShouldOnlyContainInOrder(5, 6, 7, 8, 9, 10); collected.Should().BeEquivalentTo(Enumerable.Range(1, 10).ToArray()); - probe.ExpectComplete(); - return Task.CompletedTask; + await probe.ExpectCompleteAsync(); }, Materializer); } @@ -110,7 +109,7 @@ public void A_Merge_must_work_with_one_way_merge() } [Fact] - public void A_Merge_must_work_with_n_way_merge() + public async Task A_Merge_must_work_with_n_way_merge() { var source1 = Source.Single(1); var source2 = Source.Single(2); @@ -136,50 +135,48 @@ public void A_Merge_must_work_with_n_way_merge() return ClosedShape.Instance; })).Run(Materializer); - var subscription = probe.ExpectSubscription(); + var subscription = await probe.ExpectSubscriptionAsync(); var collected = new List(); for (var i = 1; i <= 5; i++) { subscription.Request(1); - collected.Add(probe.ExpectNext()); + collected.Add(await probe.ExpectNextAsync()); } collected.Should().BeEquivalentTo(Enumerable.Range(1, 5)); - probe.ExpectComplete(); + await probe.ExpectCompleteAsync(); } [Fact] public async Task A_Merge_must_work_with_one_immediately_completed_and_one_nonempty_publisher() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async() => { var subscriber1 = Setup(CompletedPublisher(), NonEmptyPublisher(Enumerable.Range(1, 4))); - var subscription1 = subscriber1.ExpectSubscription(); + var subscription1 = await subscriber1.ExpectSubscriptionAsync(); subscription1.Request(4); - subscriber1.ExpectNext(1, 2, 3, 4).ExpectComplete(); + await subscriber1.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync(); var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), CompletedPublisher()); - var subscription2 = subscriber2.ExpectSubscription(); + var subscription2 = await subscriber2.ExpectSubscriptionAsync(); subscription2.Request(4); - subscriber2.ExpectNext(1, 2, 3, 4).ExpectComplete(); - return Task.CompletedTask; + await subscriber2.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync(); }, Materializer); } [Fact] public async Task A_Merge_must_work_with_one_delayed_completed_and_one_nonempty_publisher() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async() => { var subscriber1 = Setup(SoonToCompletePublisher(), NonEmptyPublisher(Enumerable.Range(1, 4))); - var subscription1 = subscriber1.ExpectSubscription(); + var subscription1 = await subscriber1.ExpectSubscriptionAsync(); subscription1.Request(4); - subscriber1.ExpectNext(1, 2, 3, 4).ExpectComplete(); + await subscriber1.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync(); var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), SoonToCompletePublisher()); - var subscription2 = subscriber2.ExpectSubscription(); + var subscription2 = await subscriber2.ExpectSubscriptionAsync(); subscription2.Request(4); - subscriber2.ExpectNext(1, 2, 3, 4).ExpectComplete(); - return Task.CompletedTask; + await subscriber2.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync(); }, Materializer); } @@ -202,7 +199,7 @@ await this.AssertAllStagesStoppedAsync(() => { [Fact] public async Task A_Merge_must_pass_along_early_cancellation() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async() => { var up1 = this.CreateManualPublisherProbe(); var up2 = this.CreateManualPublisherProbe(); var down = this.CreateManualSubscriberProbe(); @@ -222,15 +219,14 @@ await this.AssertAllStagesStoppedAsync(() => { return ClosedShape.Instance; })).Run(Materializer); - var downstream = down.ExpectSubscription(); + var downstream = await down.ExpectSubscriptionAsync(); downstream.Cancel(); up1.Subscribe(t.Item1); up2.Subscribe(t.Item2); - var upSub1 = up1.ExpectSubscription(); - upSub1.ExpectCancellation(); - var upSub2 = up2.ExpectSubscription(); - upSub2.ExpectCancellation(); - return Task.CompletedTask; + var upSub1 = await up1.ExpectSubscriptionAsync(); + await upSub1.ExpectCancellationAsync(); + var upSub2 = await up2.ExpectSubscriptionAsync(); + await upSub2.ExpectCancellationAsync(); }, Materializer); } }