Skip to content

Commit

Permalink
[16-74]FlowInterleaveSpec (#6560)
Browse files Browse the repository at this point in the history
  • Loading branch information
eaba authored Mar 24, 2023
1 parent 23245ae commit 8729c00
Showing 1 changed file with 28 additions and 27 deletions.
55 changes: 28 additions & 27 deletions src/core/Akka.Streams.Tests/Dsl/FlowInterleaveSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.Util.Internal;
Expand All @@ -32,10 +33,9 @@ protected override TestSubscriber.Probe<int> Setup(IPublisher<int> p1, IPublishe
}

[Fact]
public void An_Interleave_for_Flow_must_work_in_the_happy_case()
public async Task An_Interleave_for_Flow_must_work_in_the_happy_case()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var probe = this.CreateManualSubscriberProbe<int>();

Source.From(Enumerable.Range(0, 4))
Expand All @@ -52,61 +52,61 @@ public void An_Interleave_for_Flow_must_work_in_the_happy_case()
collected.Add(probe.ExpectNext());
}

collected.Should().BeEquivalentTo(new[] {0, 1, 4, 7, 8, 9, 5, 2, 3, 10, 11, 6});
collected.Should().BeEquivalentTo(new[] { 0, 1, 4, 7, 8, 9, 5, 2, 3, 10, 11, 6 });
probe.ExpectComplete();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void An_Interleave_for_Flow_must_work_when_segmentSize_is_not_equal_elements_in_stream()
public async Task An_Interleave_for_Flow_must_work_when_segmentSize_is_not_equal_elements_in_stream()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var probe = this.CreateManualSubscriberProbe<int>();

Source.From(Enumerable.Range(0, 3))
.Interleave(Source.From(Enumerable.Range(3, 3)), 2)
.RunWith(Sink.FromSubscriber(probe), Materializer);

probe.ExpectSubscription().Request(10);
probe.ExpectNext( 0, 1, 3, 4, 2, 5);
probe.ExpectNext(0, 1, 3, 4, 2, 5);
probe.ExpectComplete();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void An_Interleave_for_Flow_must_work_with_segmentSize_1()
public async Task An_Interleave_for_Flow_must_work_with_segmentSize_1()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var probe = this.CreateManualSubscriberProbe<int>();

Source.From(Enumerable.Range(0, 3))
.Interleave(Source.From(Enumerable.Range(3, 3)), 1)
.RunWith(Sink.FromSubscriber(probe), Materializer);

probe.ExpectSubscription().Request(10);
probe.ExpectNext( 0, 3, 1, 4, 2, 5);
probe.ExpectNext(0, 3, 1, 4, 2, 5);
probe.ExpectComplete();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void An_Interleave_for_Flow_must_not_work_with_segmentSize_0()
public async Task An_Interleave_for_Flow_must_not_work_with_segmentSize_0()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var source = Source.From(Enumerable.Range(0, 3));
source.Invoking(s => s.Interleave(Source.From(Enumerable.Range(3, 3)), 0))
.Should().Throw<ArgumentException>();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void An_Interleave_for_Flow_must_work_when_segmentSize_is_greater_than_stream_elements()
public async Task An_Interleave_for_Flow_must_work_when_segmentSize_is_greater_than_stream_elements()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var probe = this.CreateManualSubscriberProbe<int>();
Source.From(Enumerable.Range(0, 3))
.Interleave(Source.From(Enumerable.Range(3, 13)), 10)
Expand All @@ -126,14 +126,14 @@ public void An_Interleave_for_Flow_must_work_when_segmentSize_is_greater_than_st
Enumerable.Range(21, 5).ForEach(i => probe2.ExpectNext(i));
Enumerable.Range(11, 10).ForEach(i => probe2.ExpectNext(i));
probe2.ExpectComplete();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void An_Interleave_for_Flow_must_work_with_one_immediately_completed_and_one_nonempty_publisher()
public async Task An_Interleave_for_Flow_must_work_with_one_immediately_completed_and_one_nonempty_publisher()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var subscriber1 = Setup(CompletedPublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
var subscription1 = subscriber1.ExpectSubscription();
subscription1.Request(4);
Expand All @@ -146,14 +146,14 @@ public void An_Interleave_for_Flow_must_work_with_one_immediately_completed_and_
subscription2.Request(4);
Enumerable.Range(1, 4).ForEach(i => subscriber2.ExpectNext(i));
subscriber2.ExpectComplete();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void An_Interleave_for_Flow_must_work_with_one_delayed_completed_and_one_nonempty_publisher()
public async Task An_Interleave_for_Flow_must_work_with_one_delayed_completed_and_one_nonempty_publisher()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var subscriber1 = Setup(SoonToCompletePublisher<int>(), NonEmptyPublisher(Enumerable.Range(1, 4)));
var subscription1 = subscriber1.ExpectSubscription();
subscription1.Request(4);
Expand All @@ -166,6 +166,7 @@ public void An_Interleave_for_Flow_must_work_with_one_delayed_completed_and_one_
subscription2.Request(4);
Enumerable.Range(1, 4).ForEach(i => subscriber2.ExpectNext(i));
subscriber2.ExpectComplete();
return Task.CompletedTask;
}, Materializer);
}

Expand Down Expand Up @@ -209,10 +210,9 @@ public void An_Interleave_for_Flow_must_work_with_one_delayed_failed_and_one_non
}

[Fact]
public void An_Interleave_for_Flow_must_pass_along_early_cancellation()
public async Task An_Interleave_for_Flow_must_pass_along_early_cancellation()
{
this.AssertAllStagesStopped(() =>
{
await this.AssertAllStagesStoppedAsync(() => {
var up1 = this.CreateManualPublisherProbe<int>();
var up2 = this.CreateManualPublisherProbe<int>();
var down = this.CreateManualSubscriberProbe<int>();
Expand All @@ -230,6 +230,7 @@ public void An_Interleave_for_Flow_must_pass_along_early_cancellation()
up2.Subscribe(graphSubscriber2);
up1.ExpectSubscription().ExpectCancellation();
up2.ExpectSubscription().ExpectCancellation();
return Task.CompletedTask;
}, Materializer);
}
}
Expand Down

0 comments on commit 8729c00

Please sign in to comment.