diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowInterleaveSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowInterleaveSpec.cs index c692182603e..3bd31bf16cc 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowInterleaveSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowInterleaveSpec.cs @@ -9,6 +9,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading; +using System.Threading.Tasks; using Akka.Streams.Dsl; using Akka.Streams.TestKit; using Akka.Util.Internal; @@ -32,10 +33,9 @@ protected override TestSubscriber.Probe Setup(IPublisher p1, IPublishe } [Fact] - public void An_Interleave_for_Flow_must_work_in_the_happy_case() + public async Task An_Interleave_for_Flow_must_work_in_the_happy_case() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var probe = this.CreateManualSubscriberProbe(); Source.From(Enumerable.Range(0, 4)) @@ -52,16 +52,16 @@ public void An_Interleave_for_Flow_must_work_in_the_happy_case() collected.Add(probe.ExpectNext()); } - collected.Should().BeEquivalentTo(new[] {0, 1, 4, 7, 8, 9, 5, 2, 3, 10, 11, 6}); + collected.Should().BeEquivalentTo(new[] { 0, 1, 4, 7, 8, 9, 5, 2, 3, 10, 11, 6 }); probe.ExpectComplete(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void An_Interleave_for_Flow_must_work_when_segmentSize_is_not_equal_elements_in_stream() + public async Task An_Interleave_for_Flow_must_work_when_segmentSize_is_not_equal_elements_in_stream() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var probe = this.CreateManualSubscriberProbe(); Source.From(Enumerable.Range(0, 3)) @@ -69,16 +69,16 @@ public void An_Interleave_for_Flow_must_work_when_segmentSize_is_not_equal_eleme .RunWith(Sink.FromSubscriber(probe), Materializer); probe.ExpectSubscription().Request(10); - probe.ExpectNext( 0, 1, 3, 4, 2, 5); + probe.ExpectNext(0, 1, 3, 4, 2, 5); probe.ExpectComplete(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void An_Interleave_for_Flow_must_work_with_segmentSize_1() + public async Task An_Interleave_for_Flow_must_work_with_segmentSize_1() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var probe = this.CreateManualSubscriberProbe(); Source.From(Enumerable.Range(0, 3)) @@ -86,27 +86,27 @@ public void An_Interleave_for_Flow_must_work_with_segmentSize_1() .RunWith(Sink.FromSubscriber(probe), Materializer); probe.ExpectSubscription().Request(10); - probe.ExpectNext( 0, 3, 1, 4, 2, 5); + probe.ExpectNext(0, 3, 1, 4, 2, 5); probe.ExpectComplete(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void An_Interleave_for_Flow_must_not_work_with_segmentSize_0() + public async Task An_Interleave_for_Flow_must_not_work_with_segmentSize_0() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var source = Source.From(Enumerable.Range(0, 3)); source.Invoking(s => s.Interleave(Source.From(Enumerable.Range(3, 3)), 0)) .Should().Throw(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void An_Interleave_for_Flow_must_work_when_segmentSize_is_greater_than_stream_elements() + public async Task An_Interleave_for_Flow_must_work_when_segmentSize_is_greater_than_stream_elements() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var probe = this.CreateManualSubscriberProbe(); Source.From(Enumerable.Range(0, 3)) .Interleave(Source.From(Enumerable.Range(3, 13)), 10) @@ -126,14 +126,14 @@ public void An_Interleave_for_Flow_must_work_when_segmentSize_is_greater_than_st Enumerable.Range(21, 5).ForEach(i => probe2.ExpectNext(i)); Enumerable.Range(11, 10).ForEach(i => probe2.ExpectNext(i)); probe2.ExpectComplete(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void An_Interleave_for_Flow_must_work_with_one_immediately_completed_and_one_nonempty_publisher() + public async Task An_Interleave_for_Flow_must_work_with_one_immediately_completed_and_one_nonempty_publisher() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var subscriber1 = Setup(CompletedPublisher(), NonEmptyPublisher(Enumerable.Range(1, 4))); var subscription1 = subscriber1.ExpectSubscription(); subscription1.Request(4); @@ -146,14 +146,14 @@ public void An_Interleave_for_Flow_must_work_with_one_immediately_completed_and_ subscription2.Request(4); Enumerable.Range(1, 4).ForEach(i => subscriber2.ExpectNext(i)); subscriber2.ExpectComplete(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void An_Interleave_for_Flow_must_work_with_one_delayed_completed_and_one_nonempty_publisher() + public async Task An_Interleave_for_Flow_must_work_with_one_delayed_completed_and_one_nonempty_publisher() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var subscriber1 = Setup(SoonToCompletePublisher(), NonEmptyPublisher(Enumerable.Range(1, 4))); var subscription1 = subscriber1.ExpectSubscription(); subscription1.Request(4); @@ -166,6 +166,7 @@ public void An_Interleave_for_Flow_must_work_with_one_delayed_completed_and_one_ subscription2.Request(4); Enumerable.Range(1, 4).ForEach(i => subscriber2.ExpectNext(i)); subscriber2.ExpectComplete(); + return Task.CompletedTask; }, Materializer); } @@ -209,10 +210,9 @@ public void An_Interleave_for_Flow_must_work_with_one_delayed_failed_and_one_non } [Fact] - public void An_Interleave_for_Flow_must_pass_along_early_cancellation() + public async Task An_Interleave_for_Flow_must_pass_along_early_cancellation() { - this.AssertAllStagesStopped(() => - { + await this.AssertAllStagesStoppedAsync(() => { var up1 = this.CreateManualPublisherProbe(); var up2 = this.CreateManualPublisherProbe(); var down = this.CreateManualSubscriberProbe(); @@ -230,6 +230,7 @@ public void An_Interleave_for_Flow_must_pass_along_early_cancellation() up2.Subscribe(graphSubscriber2); up1.ExpectSubscription().ExpectCancellation(); up2.ExpectSubscription().ExpectCancellation(); + return Task.CompletedTask; }, Materializer); } }