Skip to content

Commit

Permalink
[38-74] FlowWhereSpec (#6585)
Browse files Browse the repository at this point in the history
* [38-74] `FlowWhereSpec`

* Changes to `async` TestKit
  • Loading branch information
eaba authored Apr 6, 2023
1 parent 607de41 commit 146929f
Showing 1 changed file with 9 additions and 9 deletions.
18 changes: 9 additions & 9 deletions src/core/Akka.Streams.Tests/Dsl/FlowWhereSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.Supervision;
using Akka.Streams.TestKit;
Expand Down Expand Up @@ -46,7 +47,7 @@ public void A_Where_must_filter()
}

[Fact]
public void A_Where_must_not_blow_up_with_high_request_counts()
public async Task A_Where_must_not_blow_up_with_high_request_counts()
{
var settings = ActorMaterializerSettings.Create(Sys).WithInputBuffer(1, 1);
var materializer = ActorMaterializer.Create(Sys, settings);
Expand All @@ -60,18 +61,17 @@ public void A_Where_must_not_blow_up_with_high_request_counts()
for (var i = 1; i <= 1000; i++)
subscription.Request(int.MaxValue);

probe.ExpectNext(1);
probe.ExpectComplete();
await probe.ExpectNextAsync(1);
await probe.ExpectCompleteAsync();
}

[Fact]
public void A_Where_must_continue_if_error()
public async Task A_Where_must_continue_if_error()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var ex = new TestException("Test");

Source.From(Enumerable.Range(1, 3))
await Source.From(Enumerable.Range(1, 3))
.Where(x =>
{
if (x == 2)
Expand All @@ -81,8 +81,8 @@ public void A_Where_must_continue_if_error()
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.RunWith(this.SinkProbe<int>(), Materializer)
.Request(3)
.ExpectNext( 1, 3)
.ExpectComplete();
.ExpectNext(1, 3)
.ExpectCompleteAsync();
}, Materializer);
}

Expand Down

0 comments on commit 146929f

Please sign in to comment.