Skip to content

Commit

Permalink
Change AssertAllStagesStopped to AssertAllStagesStoppedAsync - `A…
Browse files Browse the repository at this point in the history
…ctorRefBackpressureSinkSpec`! (#6539)
  • Loading branch information
eaba authored Mar 23, 2023
1 parent 6351bd9 commit 29da910
Showing 1 changed file with 19 additions and 18 deletions.
37 changes: 19 additions & 18 deletions src/core/Akka.Streams.Tests/Dsl/ActorRefBackpressureSinkSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Streams.Dsl;
Expand Down Expand Up @@ -77,10 +78,9 @@ public ActorRefBackpressureSinkSpec(ITestOutputHelper output) : base(output, Str
private IActorRef CreateActor<T>() => Sys.ActorOf(Props.Create(typeof(T), TestActor).WithDispatcher("akka.test.stream-dispatcher"));

[Fact]
public void ActorBackpressureSink_should_send_the_elements_to_the_ActorRef()
public async Task ActorBackpressureSink_should_send_the_elements_to_the_ActorRef()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var fw = CreateActor<Fw>();
Source.From(Enumerable.Range(1, 3))
.RunWith(Sink.ActorRefWithAck<int>(fw, InitMessage, AckMessage, CompleteMessage), Materializer);
Expand All @@ -89,14 +89,14 @@ public void ActorBackpressureSink_should_send_the_elements_to_the_ActorRef()
ExpectMsg(2);
ExpectMsg(3);
ExpectMsg(CompleteMessage);
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void ActorBackpressureSink_should_send_the_elements_to_the_ActorRef2()
public async Task ActorBackpressureSink_should_send_the_elements_to_the_ActorRef2()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var fw = CreateActor<Fw>();
var probe =
this.SourceProbe<int>()
Expand All @@ -111,14 +111,14 @@ public void ActorBackpressureSink_should_send_the_elements_to_the_ActorRef2()
ExpectMsg(3);
probe.SendComplete();
ExpectMsg(CompleteMessage);
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void ActorBackpressureSink_should_cancel_stream_when_actor_terminates()
public async Task ActorBackpressureSink_should_cancel_stream_when_actor_terminates()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var fw = CreateActor<Fw>();
var publisher =
this.SourceProbe<int>()
Expand All @@ -129,14 +129,14 @@ public void ActorBackpressureSink_should_cancel_stream_when_actor_terminates()
ExpectMsg(1);
Sys.Stop(fw);
publisher.ExpectCancellation();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void ActorBackpressureSink_should_send_message_only_when_backpressure_received()
public async Task ActorBackpressureSink_should_send_message_only_when_backpressure_received()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var fw = CreateActor<Fw2>();
var publisher = this.SourceProbe<int>()
.To(Sink.ActorRefWithAck<int>(fw, InitMessage, AckMessage, CompleteMessage))
Expand All @@ -156,14 +156,14 @@ public void ActorBackpressureSink_should_send_message_only_when_backpressure_rec
ExpectMsg(3);

ExpectMsg(CompleteMessage);
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void ActorBackpressureSink_should_keep_on_sending_even_after_the_buffer_has_been_full()
public async Task ActorBackpressureSink_should_keep_on_sending_even_after_the_buffer_has_been_full()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var bufferSize = 16;
var streamElementCount = bufferSize + 4;
var fw = CreateActor<Fw2>();
Expand All @@ -187,14 +187,14 @@ public void ActorBackpressureSink_should_keep_on_sending_even_after_the_buffer_h
fw.Tell(TriggerAckMessage.Instance);
}
ExpectMsg(CompleteMessage);
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void ActorBackpressureSink_should_work_with_one_element_buffer()
public async Task ActorBackpressureSink_should_work_with_one_element_buffer()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var fw = CreateActor<Fw2>();
var publisher =
this.SourceProbe<int>()
Expand All @@ -216,6 +216,7 @@ public void ActorBackpressureSink_should_work_with_one_element_buffer()

publisher.SendComplete();
ExpectMsg(CompleteMessage);
return Task.CompletedTask;
}, Materializer);
}

Expand Down

0 comments on commit 29da910

Please sign in to comment.