Skip to content

Commit

Permalink
[37-74] FlowWatchTerminationSpec (#6584)
Browse files Browse the repository at this point in the history
* [37-74] `FlowWatchTerminationSpec`

* Change to `async` TestKit

---------

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
eaba and Aaronontheweb authored Apr 10, 2023
1 parent 2042810 commit 1ebbb4d
Showing 1 changed file with 34 additions and 34 deletions.
68 changes: 34 additions & 34 deletions src/core/Akka.Streams.Tests/Dsl/FlowWatchTerminationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
Expand All @@ -28,83 +29,81 @@ public FlowWatchTerminationSpec(ITestOutputHelper helper) : base(helper)
}

[Fact]
public void A_WatchTermination_must_complete_the_future_when_stream_is_completed()
public async Task A_WatchTermination_must_complete_the_future_when_stream_is_completed()
{
this.AssertAllStagesStopped(() =>
{
var t =
Source.From(Enumerable.Range(1, 4))
.WatchTermination(Keep.Right)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
await this.AssertAllStagesStoppedAsync(async() => {
var t =
Source.From(Enumerable.Range(1, 4))
.WatchTermination(Keep.Right)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
var future = t.Item1;
var p = t.Item2;

p.Request(4).ExpectNext( 1, 2, 3, 4);
p.Request(4).ExpectNext(1, 2, 3, 4);
future.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
p.ExpectComplete();
await p.ExpectCompleteAsync();
}, Materializer);
}

[Fact]
public void A_WatchTermination_must_complete_the_future_when_stream_is_cancelled_from_downstream()
public async Task A_WatchTermination_must_complete_the_future_when_stream_is_cancelled_from_downstream()
{
this.AssertAllStagesStopped(() =>
{
var t =
Source.From(Enumerable.Range(1, 4))
.WatchTermination(Keep.Right)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
await this.AssertAllStagesStoppedAsync(() => {
var t =
Source.From(Enumerable.Range(1, 4))
.WatchTermination(Keep.Right)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
var future = t.Item1;
var p = t.Item2;

p.Request(3).ExpectNext( 1, 2, 3);
p.Request(3).ExpectNext(1, 2, 3);
p.Cancel();
future.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_WatchTermination_must_fail_the_future_when_stream_is_failed()
public async Task A_WatchTermination_must_fail_the_future_when_stream_is_failed()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var ex = new Exception("Stream failed.");
var t = this.SourceProbe<int>().WatchTermination(Keep.Both).To(Sink.Ignore<int>()).Run(Materializer);
var p = t.Item1;
var future = t.Item2;
p.SendNext(1);
p.SendError(ex);
future.Invoking(f => f.Wait()).Should().Throw<Exception>().WithMessage("Stream failed.");
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void A_WatchTermination_must_complete_the_future_for_an_empty_stream()
public async Task A_WatchTermination_must_complete_the_future_for_an_empty_stream()
{
this.AssertAllStagesStopped(() =>
{
var t =
Source.Empty<int>()
.WatchTermination(Keep.Right)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
await this.AssertAllStagesStoppedAsync(() => {
var t =
Source.Empty<int>()
.WatchTermination(Keep.Right)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
var future = t.Item1;
var p = t.Item2;
p.Request(1);
future.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
return Task.CompletedTask;
}, Materializer);
}

[Fact(Skip = "We need a way to combine multiple sources with different materializer types")]
public void A_WatchTermination_must_complete_the_future_for_graph()
public async Task A_WatchTermination_must_complete_the_future_for_graph()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
//var first = this.SourceProbe<int>().WatchTermination(Keep.Both);
//var second = Source.From(Enumerable.Range(2, 4)).MapMaterializedValue(new Func<NotUsed, (TestPublisher.Probe<int>, Task)>(_ => null));

//var t = Source.FromGraph(
// GraphDsl.Create<SourceShape<int>, (TestPublisher.Probe<int>, Task)>(b =>
// {
Expand All @@ -128,6 +127,7 @@ public void A_WatchTermination_must_complete_the_future_for_graph()

//sourceProbe.SendComplete();
//sinkProbe.ExpectNextN(new[] {2, 3, 4, 5}).ExpectComplete();
return Task.CompletedTask;
}, Materializer);
}

Expand Down

0 comments on commit 1ebbb4d

Please sign in to comment.