Skip to content

Commit

Permalink
[69-74] KeepGoingStageSpec (#6617)
Browse files Browse the repository at this point in the history
* [69-74] `KeepGoingStageSpec`

* Changes

---------

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
eaba and Aaronontheweb authored Mar 28, 2023
1 parent 40fdc1b commit 7472bd5
Showing 1 changed file with 39 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,143 +200,137 @@ public KeepGoingStageSpec(ITestOutputHelper helper = null) : base(helper)
}

[Fact]
public void A_stage_with_keep_going_must_still_be_alive_after_all_ports_have_been_closed_until_explicity_closed()
public async Task A_stage_with_keep_going_must_still_be_alive_after_all_ports_have_been_closed_until_explicity_closed()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var t = Source.Maybe<int>().ToMaterialized(new PingableSink(true), Keep.Both).Run(Materializer);
var maybePromise = t.Item1;
var pingerFuture = t.Item2;
pingerFuture.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
var pinger = pingerFuture.Result;
var pinger = await pingerFuture;

pinger.Register(TestActor);

//Before completion
pinger.Ping();
ExpectMsg<Pong>();
await ExpectMsgAsync<Pong>();

pinger.Ping();
ExpectMsg<Pong>();
await ExpectMsgAsync<Pong>();

maybePromise.TrySetResult(0);
ExpectMsg<UpstreamCompleted>();
await ExpectMsgAsync<UpstreamCompleted>();

ExpectNoMsg(200);
await ExpectNoMsgAsync(200);

pinger.Ping();
ExpectMsg<Pong>();
await ExpectMsgAsync<Pong>();

pinger.Stop();
// PostStop should not be concurrent with the event handler. This event here tests this.
ExpectMsg<EndOfEventHandler>();
ExpectMsg<PostStop>();
await ExpectMsgAsync<EndOfEventHandler>();
await ExpectMsgAsync<PostStop>();
}, Materializer);
}

[Fact]
public void A_stage_with_keep_going_must_still_be_alive_after_all_ports_have_been_closed_until_explicitly_failed()
public async Task A_stage_with_keep_going_must_still_be_alive_after_all_ports_have_been_closed_until_explicitly_failed()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var t = Source.Maybe<int>().ToMaterialized(new PingableSink(true), Keep.Both).Run(Materializer);
var maybePromise = t.Item1;
var pingerFuture = t.Item2;
pingerFuture.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
var pinger = pingerFuture.Result;
var pinger = await pingerFuture;

pinger.Register(TestActor);

//Before completion
pinger.Ping();
ExpectMsg<Pong>();
await ExpectMsgAsync<Pong>();

pinger.Ping();
ExpectMsg<Pong>();
await ExpectMsgAsync<Pong>();

maybePromise.TrySetResult(0);
ExpectMsg<UpstreamCompleted>();
await ExpectMsgAsync<UpstreamCompleted>();

ExpectNoMsg(200);
await ExpectNoMsgAsync(200);

pinger.Ping();
ExpectMsg<Pong>();
await ExpectMsgAsync<Pong>();

pinger.Ping();
ExpectMsg<Pong>();
await ExpectMsgAsync<Pong>();

pinger.Fail();
// PostStop should not be concurrent with the event handler. This event here tests this.
ExpectMsg<EndOfEventHandler>();
ExpectMsg<PostStop>();

await ExpectMsgAsync<EndOfEventHandler>();
await ExpectMsgAsync<PostStop>();
}, Materializer);
}

[Fact]
public void A_stage_with_keep_going_must_still_be_alive_after_all_ports_have_been_closed_until_implicity_failed_via_exception()
public async Task A_stage_with_keep_going_must_still_be_alive_after_all_ports_have_been_closed_until_implicity_failed_via_exception()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var t = Source.Maybe<int>().ToMaterialized(new PingableSink(true), Keep.Both).Run(Materializer);
var maybePromise = t.Item1;
var pingerFuture = t.Item2;
pingerFuture.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
var pinger = pingerFuture.Result;
var pinger = await pingerFuture;

pinger.Register(TestActor);

//Before completion
pinger.Ping();
ExpectMsg<Pong>();
await ExpectMsgAsync<Pong>();

pinger.Ping();
ExpectMsg<Pong>();
await ExpectMsgAsync<Pong>();

maybePromise.TrySetResult(0);
ExpectMsg<UpstreamCompleted>();
await ExpectMsgAsync<UpstreamCompleted>();

ExpectNoMsg(200);

pinger.Ping();
ExpectMsg<Pong>();
await ExpectMsgAsync<Pong>();

pinger.Ping();
ExpectMsg<Pong>();
await ExpectMsgAsync<Pong>();

// We need to catch the exception otherwise the test fails
// ReSharper disable once EmptyGeneralCatchClause
try { pinger.ThrowEx();} catch { }
try { pinger.ThrowEx(); } catch { }
// PostStop should not be concurrent with the event handler. This event here tests this.
ExpectMsg<EndOfEventHandler>();
ExpectMsg<PostStop>();

await ExpectMsgAsync<EndOfEventHandler>();
await ExpectMsgAsync<PostStop>();
}, Materializer);
}

[Fact]
public void A_stage_with_keep_going_must_close_down_earls_if_keepAlive_is_not_requested()
public async Task A_stage_with_keep_going_must_close_down_earls_if_keepAlive_is_not_requested()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(async() => {
var t = Source.Maybe<int>().ToMaterialized(new PingableSink(false), Keep.Both).Run(Materializer);
var maybePromise = t.Item1;
var pingerFuture = t.Item2;
pingerFuture.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
var pinger = pingerFuture.Result;
var pinger = await pingerFuture;

pinger.Register(TestActor);

//Before completion
pinger.Ping();
ExpectMsg<Pong>();
await ExpectMsgAsync<Pong>();

pinger.Ping();
ExpectMsg<Pong>();
await ExpectMsgAsync<Pong>();

maybePromise.TrySetResult(0);
ExpectMsg<UpstreamCompleted>();
ExpectMsg<PostStop>();
await ExpectMsgAsync<UpstreamCompleted>();
await ExpectMsgAsync<PostStop>();
}, Materializer);
}
}
Expand Down

0 comments on commit 7472bd5

Please sign in to comment.