diff --git a/src/core/Akka.Streams.Tests/Dsl/GraphMergeSpec.cs b/src/core/Akka.Streams.Tests/Dsl/GraphMergeSpec.cs index 8a70adb2c26..48a6fe81baf 100644 --- a/src/core/Akka.Streams.Tests/Dsl/GraphMergeSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/GraphMergeSpec.cs @@ -9,6 +9,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading; +using System.Threading.Tasks; using Akka.Streams.Dsl; using Akka.Streams.TestKit; using Akka.TestKit; @@ -46,10 +47,9 @@ public MergeFixture(GraphDsl.Builder builder) : base(builder) } [Fact] - public void A_Merge_must_work_in_the_happy_case() + public async Task A_Merge_must_work_in_the_happy_case() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { // Different input sizes(4 and 6) var source1 = Source.From(Enumerable.Range(0, 4)); var source2 = Source.From(Enumerable.Range(4, 6)); @@ -63,27 +63,27 @@ public void A_Merge_must_work_in_the_happy_case() var sink = Sink.FromSubscriber(probe); b.From(source1).To(m1.In(0)); - b.From(m1.Out).Via(Flow.Create().Select(x => x*2)).To(m2.In(0)); - b.From(m2.Out).Via(Flow.Create().Select(x => x / 2).Select(x=>x+1)).To(sink); + b.From(m1.Out).Via(Flow.Create().Select(x => x * 2)).To(m2.In(0)); + b.From(m2.Out).Via(Flow.Create().Select(x => x / 2).Select(x => x + 1)).To(sink); b.From(source2).To(m1.In(1)); b.From(source3).To(m2.In(1)); return ClosedShape.Instance; })).Run(Materializer); - var subscription = probe.ExpectSubscription(); + var subscription = await probe.ExpectSubscriptionAsync(); var collected = new List(); for (var i = 1; i <= 10; i++) { subscription.Request(1); - collected.Add(probe.ExpectNext()); + collected.Add(await probe.ExpectNextAsync()); } collected.Where(i => i <= 4).ShouldOnlyContainInOrder(1, 2, 3, 4); collected.Where(i => i >= 5).ShouldOnlyContainInOrder(5, 6, 7, 8, 9, 10); collected.Should().BeEquivalentTo(Enumerable.Range(1, 10).ToArray()); - probe.ExpectComplete(); + await probe.ExpectCompleteAsync(); }, Materializer); } @@ -109,7 +109,7 @@ public void A_Merge_must_work_with_one_way_merge() } [Fact] - public void A_Merge_must_work_with_n_way_merge() + public async Task A_Merge_must_work_with_n_way_merge() { var source1 = Source.Single(1); var source2 = Source.Single(2); @@ -135,76 +135,71 @@ public void A_Merge_must_work_with_n_way_merge() return ClosedShape.Instance; })).Run(Materializer); - var subscription = probe.ExpectSubscription(); + var subscription = await probe.ExpectSubscriptionAsync(); var collected = new List(); for (var i = 1; i <= 5; i++) { subscription.Request(1); - collected.Add(probe.ExpectNext()); + collected.Add(await probe.ExpectNextAsync()); } collected.Should().BeEquivalentTo(Enumerable.Range(1, 5)); - probe.ExpectComplete(); + await probe.ExpectCompleteAsync(); } [Fact] - public void A_Merge_must_work_with_one_immediately_completed_and_one_nonempty_publisher() + public async Task A_Merge_must_work_with_one_immediately_completed_and_one_nonempty_publisher() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var subscriber1 = Setup(CompletedPublisher(), NonEmptyPublisher(Enumerable.Range(1, 4))); - var subscription1 = subscriber1.ExpectSubscription(); + var subscription1 = await subscriber1.ExpectSubscriptionAsync(); subscription1.Request(4); - subscriber1.ExpectNext( 1, 2, 3, 4).ExpectComplete(); + await subscriber1.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync(); var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), CompletedPublisher()); - var subscription2 = subscriber2.ExpectSubscription(); + var subscription2 = await subscriber2.ExpectSubscriptionAsync(); subscription2.Request(4); - subscriber2.ExpectNext( 1, 2, 3, 4).ExpectComplete(); + await subscriber2.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_Merge_must_work_with_one_delayed_completed_and_one_nonempty_publisher() + public async Task A_Merge_must_work_with_one_delayed_completed_and_one_nonempty_publisher() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var subscriber1 = Setup(SoonToCompletePublisher(), NonEmptyPublisher(Enumerable.Range(1, 4))); - var subscription1 = subscriber1.ExpectSubscription(); + var subscription1 = await subscriber1.ExpectSubscriptionAsync(); subscription1.Request(4); - subscriber1.ExpectNext( 1, 2, 3, 4).ExpectComplete(); + await subscriber1.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync(); var subscriber2 = Setup(NonEmptyPublisher(Enumerable.Range(1, 4)), SoonToCompletePublisher()); - var subscription2 = subscriber2.ExpectSubscription(); + var subscription2 = await subscriber2.ExpectSubscriptionAsync(); subscription2.Request(4); - subscriber2.ExpectNext( 1, 2, 3, 4).ExpectComplete(); + await subscriber2.ExpectNext(1, 2, 3, 4).ExpectCompleteAsync(); }, Materializer); } [Fact(Skip = "This is nondeterministic, multiple scenarios can happen")] - public void A_Merge_must_work_with_one_immediately_failed_and_one_nonempty_publisher() + public async Task A_Merge_must_work_with_one_immediately_failed_and_one_nonempty_publisher() { - this.AssertAllStagesStopped(() => - { - + await this.AssertAllStagesStoppedAsync(() => { + return Task.CompletedTask; }, Materializer); } [Fact(Skip = "This is nondeterministic, multiple scenarios can happen")] - public void A_Merge_must_work_with_one_delayed_failed_and_one_nonempty_publisher() + public async Task A_Merge_must_work_with_one_delayed_failed_and_one_nonempty_publisher() { - this.AssertAllStagesStopped(() => - { - + await this.AssertAllStagesStoppedAsync(() => { + return Task.CompletedTask; }, Materializer); } [Fact] - public void A_Merge_must_pass_along_early_cancellation() + public async Task A_Merge_must_pass_along_early_cancellation() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(async() => { var up1 = this.CreateManualPublisherProbe(); var up2 = this.CreateManualPublisherProbe(); var down = this.CreateManualSubscriberProbe(); @@ -224,14 +219,14 @@ public void A_Merge_must_pass_along_early_cancellation() return ClosedShape.Instance; })).Run(Materializer); - var downstream = down.ExpectSubscription(); + var downstream = await down.ExpectSubscriptionAsync(); downstream.Cancel(); up1.Subscribe(t.Item1); up2.Subscribe(t.Item2); - var upSub1 = up1.ExpectSubscription(); - upSub1.ExpectCancellation(); - var upSub2 = up2.ExpectSubscription(); - upSub2.ExpectCancellation(); + var upSub1 = await up1.ExpectSubscriptionAsync(); + await upSub1.ExpectCancellationAsync(); + var upSub2 = await up2.ExpectSubscriptionAsync(); + await upSub2.ExpectCancellationAsync(); }, Materializer); } }