diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowSkipWhileSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowSkipWhileSpec.cs index b3a19dd9d30..c3a9a872c25 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowSkipWhileSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowSkipWhileSpec.cs @@ -7,6 +7,7 @@ using System.Linq; using System.Threading; +using System.Threading.Tasks; using Akka.Streams.Dsl; using Akka.Streams.Supervision; using Akka.Streams.TestKit; @@ -28,68 +29,64 @@ public FlowSkipWhileSpec(ITestOutputHelper helper) : base(helper) } [Fact] - public void A_SkipWhile_must_skip_while_predicate_is_true() + public async Task A_SkipWhile_must_skip_while_predicate_is_true() { - this.AssertAllStagesStopped(() => - { - Source.From(Enumerable.Range(1, 4)) - .SkipWhile(x => x < 3) - .RunWith(this.SinkProbe(), Materializer) - .Request(2) - .ExpectNext( 3, 4) - .ExpectComplete(); + await this.AssertAllStagesStoppedAsync(async() => { + await Source.From(Enumerable.Range(1, 4)) + .SkipWhile(x => x < 3) + .RunWith(this.SinkProbe(), Materializer) + .Request(2) + .ExpectNext(3, 4) + .ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_SkipWhile_must_complete_the_future_for_an_empty_stream() + public async Task A_SkipWhile_must_complete_the_future_for_an_empty_stream() { - this.AssertAllStagesStopped(() => - { - Source.Empty() - .SkipWhile(x => x < 2) - .RunWith(this.SinkProbe(), Materializer) - .Request(1) - .ExpectComplete(); + await this.AssertAllStagesStoppedAsync(async() => { + await Source.Empty() + .SkipWhile(x => x < 2) + .RunWith(this.SinkProbe(), Materializer) + .Request(1) + .ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_SkipWhile_must_continue_if_error() + public async Task A_SkipWhile_must_continue_if_error() { - this.AssertAllStagesStopped(() => - { - Source.From(Enumerable.Range(1, 4)).SkipWhile(x => - { - if (x < 3) - return true; - throw new TestException(""); - }) - .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) - .RunWith(this.SinkProbe(), Materializer) - .Request(1) - .ExpectComplete(); + await this.AssertAllStagesStoppedAsync(async() => { + await Source.From(Enumerable.Range(1, 4)).SkipWhile(x => + { + if (x < 3) + return true; + throw new TestException(""); + }) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + .RunWith(this.SinkProbe(), Materializer) + .Request(1) + .ExpectCompleteAsync(); }, Materializer); } [Fact] - public void A_SkipWhile_must_restart_with_strategy() + public async Task A_SkipWhile_must_restart_with_strategy() { - this.AssertAllStagesStopped(() => - { - Source.From(Enumerable.Range(1, 4)).SkipWhile(x => - { - if (x == 1 || x == 3) - return true; - if (x == 4) - return false; - throw new TestException(""); - }) - .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)) - .RunWith(this.SinkProbe(), Materializer) - .Request(1) - .ExpectNext(4) - .ExpectComplete(); + await this.AssertAllStagesStoppedAsync(async() => { + await Source.From(Enumerable.Range(1, 4)).SkipWhile(x => + { + if (x == 1 || x == 3) + return true; + if (x == 4) + return false; + throw new TestException(""); + }) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.RestartingDecider)) + .RunWith(this.SinkProbe(), Materializer) + .Request(1) + .ExpectNext(4) + .ExpectCompleteAsync(); }, Materializer); } }