Skip to content

Commit

Permalink
[10-74]FlowDetacherSpec (#6554)
Browse files Browse the repository at this point in the history
* [10-74]`FlowDetacherSpec`

* Change to `async` TestKit
  • Loading branch information
eaba authored Apr 24, 2023
1 parent 8b6c0ff commit 4642f56
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions src/core/Akka.Streams.Tests/Dsl/FlowDetacherSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
Expand All @@ -26,22 +27,21 @@ public FlowDetacherSpec(ITestOutputHelper helper) : base(helper)
}

[Fact]
public void A_Detacher_must_pass_through_all_elements()
public async Task A_Detacher_must_pass_through_all_elements()
{
this.AssertAllStagesStopped(() =>
{
Source.From(Enumerable.Range(1, 100))
.Detach()
.RunWith(Sink.Seq<int>(), Materializer)
.Result.Should().BeEquivalentTo(Enumerable.Range(1, 100));
await this.AssertAllStagesStoppedAsync(() => {
Source.From(Enumerable.Range(1, 100))
.Detach()
.RunWith(Sink.Seq<int>(), Materializer)
.Result.Should().BeEquivalentTo(Enumerable.Range(1, 100));
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_Detacher_must_pass_through_failure()
public async Task A_Detacher_must_pass_through_failure()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var ex = new TestException("buh");
var result = Source.From(Enumerable.Range(1, 100)).Select(x =>
{
Expand All @@ -51,17 +51,17 @@ public void A_Detacher_must_pass_through_failure()
}).Detach().RunWith(Sink.Seq<int>(), Materializer);

result.Invoking(r => r.Wait(TimeSpan.FromSeconds(2))).Should().Throw<TestException>().And.Should().Be(ex);
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_Detacher_must_emit_the_last_element_when_completed_Without_demand()
public async Task A_Detacher_must_emit_the_last_element_when_completed_Without_demand()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var probe = Source.Single(42).Detach().RunWith(this.SinkProbe<int>(), Materializer).EnsureSubscription();
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
probe.RequestNext(42);
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500));
await probe.RequestNextAsync(42);
}, Materializer);
}
}
Expand Down

0 comments on commit 4642f56

Please sign in to comment.