From c2b578c9c6d77b26e3e2ae84600029be595e6ba5 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Sat, 25 Mar 2023 12:24:40 +0100 Subject: [PATCH 1/2] [29-74] `FlowSkipWhileSpec` --- .../Dsl/FlowSkipWhileSpec.cs | 93 ++++++++++--------- 1 file changed, 47 insertions(+), 46 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowSkipWhileSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowSkipWhileSpec.cs index b3a19dd9d30..98c6806d406 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowSkipWhileSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowSkipWhileSpec.cs @@ -7,6 +7,7 @@ using System.Linq; using System.Threading; +using System.Threading.Tasks; using Akka.Streams.Dsl; using Akka.Streams.Supervision; using Akka.Streams.TestKit; @@ -28,68 +29,68 @@ public FlowSkipWhileSpec(ITestOutputHelper helper) : base(helper) } [Fact] - public void A_SkipWhile_must_skip_while_predicate_is_true() + public async Task A_SkipWhile_must_skip_while_predicate_is_true() { - this.AssertAllStagesStopped(() => - { - Source.From(Enumerable.Range(1, 4)) - .SkipWhile(x => x < 3) - .RunWith(this.SinkProbe(), Materializer) - .Request(2) - .ExpectNext( 3, 4) - .ExpectComplete(); + await this.AssertAllStagesStoppedAsync(() => { + Source.From(Enumerable.Range(1, 4)) + .SkipWhile(x => x < 3) + .RunWith(this.SinkProbe(), Materializer) + .Request(2) + .ExpectNext(3, 4) + .ExpectComplete(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void A_SkipWhile_must_complete_the_future_for_an_empty_stream() + public async Task A_SkipWhile_must_complete_the_future_for_an_empty_stream() { - this.AssertAllStagesStopped(() => - { - Source.Empty() - .SkipWhile(x => x < 2) - .RunWith(this.SinkProbe(), Materializer) - .Request(1) - .ExpectComplete(); + await this.AssertAllStagesStoppedAsync(() => { + Source.Empty() + .SkipWhile(x => x < 2) + .RunWith(this.SinkProbe(), Materializer) + .Request(1) + .ExpectComplete(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void A_SkipWhile_must_continue_if_error() + public async Task A_SkipWhile_must_continue_if_error() { - this.AssertAllStagesStopped(() => - { - Source.From(Enumerable.Range(1, 4)).SkipWhile(x => - { - if (x < 3) - return true; - throw new TestException(""); - }) - .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) - .RunWith(this.SinkProbe(), Materializer) - .Request(1) - .ExpectComplete(); + await this.AssertAllStagesStoppedAsync(() => { + Source.From(Enumerable.Range(1, 4)).SkipWhile(x => + { + if (x < 3) + return true; + throw new TestException(""); + }) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + .RunWith(this.SinkProbe(), Materializer) + .Request(1) + .ExpectComplete(); + return Task.CompletedTask; }, Materializer); } [Fact] - public void A_SkipWhile_must_restart_with_strategy() + public async Task A_SkipWhile_must_restart_with_strategy() { - this.AssertAllStagesStopped(() => - { - Source.From(Enumerable.Range(1, 4)).SkipWhile(x => - { - if (x == 1 || x == 3) - return true; - if (x == 4) - return false; - throw new TestException(""); - }) - .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)) - .RunWith(this.SinkProbe(), Materializer) - .Request(1) - .ExpectNext(4) - .ExpectComplete(); + await this.AssertAllStagesStoppedAsync(() => { + Source.From(Enumerable.Range(1, 4)).SkipWhile(x => + { + if (x == 1 || x == 3) + return true; + if (x == 4) + return false; + throw new TestException(""); + }) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)) + .RunWith(this.SinkProbe(), Materializer) + .Request(1) + .ExpectNext(4) + .ExpectComplete(); + return Task.CompletedTask; }, Materializer); } } From 68f761ac58c49c70791a5dbccfb13f364877ca5e Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Mon, 3 Apr 2023 13:20:24 +0100 Subject: [PATCH 2/2] Changes to `async` TestKit --- .../Dsl/FlowSkipWhileSpec.cs | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowSkipWhileSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowSkipWhileSpec.cs index 98c6806d406..c3a9a872c25 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowSkipWhileSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowSkipWhileSpec.cs @@ -31,35 +31,33 @@ public FlowSkipWhileSpec(ITestOutputHelper helper) : base(helper) [Fact] public async Task A_SkipWhile_must_skip_while_predicate_is_true() { - await this.AssertAllStagesStoppedAsync(() => { - Source.From(Enumerable.Range(1, 4)) + await this.AssertAllStagesStoppedAsync(async() => { + await Source.From(Enumerable.Range(1, 4)) .SkipWhile(x => x < 3) .RunWith(this.SinkProbe(), Materializer) .Request(2) .ExpectNext(3, 4) - .ExpectComplete(); - return Task.CompletedTask; + .ExpectCompleteAsync(); }, Materializer); } [Fact] public async Task A_SkipWhile_must_complete_the_future_for_an_empty_stream() { - await this.AssertAllStagesStoppedAsync(() => { - Source.Empty() + await this.AssertAllStagesStoppedAsync(async() => { + await Source.Empty() .SkipWhile(x => x < 2) .RunWith(this.SinkProbe(), Materializer) .Request(1) - .ExpectComplete(); - return Task.CompletedTask; + .ExpectCompleteAsync(); }, Materializer); } [Fact] public async Task A_SkipWhile_must_continue_if_error() { - await this.AssertAllStagesStoppedAsync(() => { - Source.From(Enumerable.Range(1, 4)).SkipWhile(x => + await this.AssertAllStagesStoppedAsync(async() => { + await Source.From(Enumerable.Range(1, 4)).SkipWhile(x => { if (x < 3) return true; @@ -68,16 +66,15 @@ await this.AssertAllStagesStoppedAsync(() => { .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) .RunWith(this.SinkProbe(), Materializer) .Request(1) - .ExpectComplete(); - return Task.CompletedTask; + .ExpectCompleteAsync(); }, Materializer); } [Fact] public async Task A_SkipWhile_must_restart_with_strategy() { - await this.AssertAllStagesStoppedAsync(() => { - Source.From(Enumerable.Range(1, 4)).SkipWhile(x => + await this.AssertAllStagesStoppedAsync(async() => { + await Source.From(Enumerable.Range(1, 4)).SkipWhile(x => { if (x == 1 || x == 3) return true; @@ -89,8 +86,7 @@ await this.AssertAllStagesStoppedAsync(() => { .RunWith(this.SinkProbe(), Materializer) .Request(1) .ExpectNext(4) - .ExpectComplete(); - return Task.CompletedTask; + .ExpectCompleteAsync(); }, Materializer); } }