Skip to content

Commit

Permalink
[21-74]FlowOnCompleteSpec (#6569)
Browse files Browse the repository at this point in the history
* [21-74]`FlowOnCompleteSpec`

* Changes to `async` TestKit

---------

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
eaba and Aaronontheweb authored Apr 13, 2023
1 parent c1cbb7f commit 220cffb
Showing 1 changed file with 31 additions and 33 deletions.
64 changes: 31 additions & 33 deletions src/core/Akka.Streams.Tests/Dsl/FlowOnCompleteSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
Expand All @@ -25,66 +26,62 @@ public FlowOnCompleteSpec(ITestOutputHelper helper) : base(helper)
}

[Fact]
public void A_Flow_with_OnComplete_must_invoke_callback_on_normal_completion()
public async Task A_Flow_with_OnComplete_must_invoke_callback_on_normal_completion()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var onCompleteProbe = CreateTestProbe();
var p = this.CreateManualPublisherProbe<int>();
Source.FromPublisher(p)
.To(Sink.OnComplete<int>(() => onCompleteProbe.Ref.Tell("done"), _ => { }))
.Run(Materializer);
var proc = p.ExpectSubscription();
proc.ExpectRequest();
var proc = await p.ExpectSubscriptionAsync();
await proc.ExpectRequestAsync();
proc.SendNext(42);
onCompleteProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
await onCompleteProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
proc.SendComplete();
onCompleteProbe.ExpectMsg("done");
await onCompleteProbe.ExpectMsgAsync("done");
}, Materializer);
}

[Fact]
public void A_Flow_with_OnComplete_must_yield_the_first_error()
public async Task A_Flow_with_OnComplete_must_yield_the_first_error()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var onCompleteProbe = CreateTestProbe();
var p = this.CreateManualPublisherProbe<int>();
Source.FromPublisher(p)
.To(Sink.OnComplete<int>(() => { }, ex => onCompleteProbe.Ref.Tell(ex)))
.Run(Materializer);
var proc = p.ExpectSubscription();
proc.ExpectRequest();
var proc = await p.ExpectSubscriptionAsync();
await proc.ExpectRequestAsync();
var cause = new TestException("test");
proc.SendError(cause);
onCompleteProbe.ExpectMsg(cause);
onCompleteProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
await onCompleteProbe.ExpectMsgAsync(cause);
await onCompleteProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
}, Materializer);
}

[Fact]
public void A_Flow_with_OnComplete_must_invoke_callback_for_an_empty_stream()
public async Task A_Flow_with_OnComplete_must_invoke_callback_for_an_empty_stream()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var onCompleteProbe = CreateTestProbe();
var p = this.CreateManualPublisherProbe<int>();
Source.FromPublisher(p)
.To(Sink.OnComplete<int>(() => onCompleteProbe.Ref.Tell("done"), _ => { }))
.Run(Materializer);
var proc = p.ExpectSubscription();
proc.ExpectRequest();
var proc = await p.ExpectSubscriptionAsync();
await proc.ExpectRequestAsync();
proc.SendComplete();
onCompleteProbe.ExpectMsg("done");
onCompleteProbe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
await onCompleteProbe.ExpectMsgAsync("done");
await onCompleteProbe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100));
}, Materializer);
}

[Fact]
public void A_Flow_with_OnComplete_must_invoke_callback_after_transform_and_foreach_steps()
public async Task A_Flow_with_OnComplete_must_invoke_callback_after_transform_and_foreach_steps()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var onCompleteProbe = CreateTestProbe();
var p = this.CreateManualPublisherProbe<int>();
var foreachSink = Sink.ForEach<int>(x => onCompleteProbe.Ref.Tell("foreach-" + x));
Expand All @@ -95,18 +92,19 @@ public void A_Flow_with_OnComplete_must_invoke_callback_after_transform_and_fore
}).RunWith(foreachSink, Materializer);
future.ContinueWith(t => onCompleteProbe.Tell(t.IsCompleted ? "done" : "failure"));

var proc = p.ExpectSubscription();
proc.ExpectRequest();
var proc = await p.ExpectSubscriptionAsync();
await proc.ExpectRequestAsync();
proc.SendNext(42);
proc.SendComplete();
onCompleteProbe.ExpectMsg("map-42");
onCompleteProbe.ExpectMsg("foreach-42");
onCompleteProbe.ExpectMsg("done");
await onCompleteProbe.ExpectMsgAsync("map-42");
await onCompleteProbe.ExpectMsgAsync("foreach-42");
await onCompleteProbe.ExpectMsgAsync("done");

}, Materializer);
}

[Fact]
public void A_Flow_with_OnComplete_must_yield_error_on_abrupt_termination()
public async Task A_Flow_with_OnComplete_must_yield_error_on_abrupt_termination()
{
var materializer = ActorMaterializer.Create(Sys);
var onCompleteProbe = CreateTestProbe();
Expand All @@ -115,11 +113,11 @@ public void A_Flow_with_OnComplete_must_yield_error_on_abrupt_termination()
Source.FromPublisher(publisher).To(Sink.OnComplete<int>(() => onCompleteProbe.Ref.Tell("done"),
ex => onCompleteProbe.Ref.Tell(ex)))
.Run(materializer);
var proc = publisher.ExpectSubscription();
proc.ExpectRequest();
var proc = await publisher.ExpectSubscriptionAsync();
await proc.ExpectRequestAsync();
materializer.Shutdown();

onCompleteProbe.ExpectMsg<AbruptTerminationException>();
await onCompleteProbe.ExpectMsgAsync<AbruptTerminationException>();
}
}
}
Expand Down

0 comments on commit 220cffb

Please sign in to comment.