Skip to content

Commit

Permalink
[14-74]FlowIdleInjectSpec (#6558)
Browse files Browse the repository at this point in the history
* [14-74]`FlowIdleInjectSpec`

* Changes to `async` TestKit

* Changes to `async` TestKit

* Revert "Changes to `async` TestKit"

This reverts commit 87f02c4.

---------

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
eaba and Aaronontheweb authored Apr 25, 2023
1 parent 272ea2d commit f3dddac
Showing 1 changed file with 65 additions and 67 deletions.
132 changes: 65 additions & 67 deletions src/core/Akka.Streams.Tests/Dsl/FlowIdleInjectSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
Expand All @@ -31,43 +32,40 @@ public FlowIdleInjectSpec(ITestOutputHelper helper) : base(helper)
}

[Fact]
public void KeepAlive_must_not_emit_additional_elements_if_upstream_is_fastEnough()
public async Task KeepAlive_must_not_emit_additional_elements_if_upstream_is_fastEnough()
{
this.AssertAllStagesStopped(() =>
{
var result = Source.From(Enumerable.Range(1, 10))
.KeepAlive(TimeSpan.FromSeconds(1), () => 0)
.Grouped(1000)
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);

await this.AssertAllStagesStoppedAsync(() => {
var result = Source.From(Enumerable.Range(1, 10))
.KeepAlive(TimeSpan.FromSeconds(1), () => 0)
.Grouped(1000)
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);
result.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
result.Result.Should().BeEquivalentTo(Enumerable.Range(1, 10));
return Task.CompletedTask;
}, Materializer);
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
public void KeepAlive_must_emit_elements_periodically_after_silent_periods()
public async Task KeepAlive_must_emit_elements_periodically_after_silent_periods()
{
this.AssertAllStagesStopped(() =>
{
var sourceWithIdleGap = Source.Combine(Source.From(Enumerable.Range(1, 5)),
Source.From(Enumerable.Range(6, 5)).InitialDelay(TimeSpan.FromSeconds(2)),
await this.AssertAllStagesStoppedAsync(() => {
var sourceWithIdleGap = Source.Combine(Source.From(Enumerable.Range(1, 5)),
Source.From(Enumerable.Range(6, 5)).InitialDelay(TimeSpan.FromSeconds(2)),
i => new Merge<int, int>(i));


var result = sourceWithIdleGap
.KeepAlive(TimeSpan.FromSeconds(0.6), () => 0)
.Grouped(1000)
.RunWith(Sink.First<IEnumerable<int>>(), Materializer);

result.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
result.Result.Should().BeEquivalentTo(
Enumerable.Range(1, 5).Concat(new[] {0, 0, 0}).Concat(Enumerable.Range(6, 5)));
Enumerable.Range(1, 5).Concat(new[] { 0, 0, 0 }).Concat(Enumerable.Range(6, 5)));
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public void KeepAlive_must_immediately_pull_upstream()
public async Task KeepAlive_must_immediately_pull_upstream()
{
var upstream = this.CreatePublisherProbe<int>();
var downstream = this.CreateSubscriberProbe<int>();
Expand All @@ -76,17 +74,17 @@ public void KeepAlive_must_immediately_pull_upstream()
.KeepAlive(TimeSpan.FromSeconds(1), () => 0)
.RunWith(Sink.FromSubscriber(downstream), Materializer);

downstream.Request(1);
await downstream.RequestAsync(1);

upstream.SendNext(1);
downstream.ExpectNext(1);
await upstream.SendNextAsync(1);
await downstream.ExpectNextAsync(1);

upstream.SendComplete();
downstream.ExpectComplete();
await upstream.SendCompleteAsync();
await downstream.ExpectCompleteAsync();
}

[Fact]
public void KeepAlive_must_immediately_pull_upstream_after_busy_period()
public async Task KeepAlive_must_immediately_pull_upstream_after_busy_period()
{
var upstream = this.CreatePublisherProbe<int>();
var downstream = this.CreateSubscriberProbe<int>();
Expand All @@ -96,20 +94,20 @@ public void KeepAlive_must_immediately_pull_upstream_after_busy_period()
.KeepAlive(TimeSpan.FromSeconds(1), () => 0)
.RunWith(Sink.FromSubscriber(downstream), Materializer);

downstream.Request(10);
await downstream.RequestAsync(10);
downstream.ExpectNextN(10).Should().BeEquivalentTo(Enumerable.Range(1, 10));

downstream.Request(1);
await downstream.RequestAsync(1);

upstream.SendNext(1);
downstream.ExpectNext(1);
await upstream.SendNextAsync(1);
await downstream.ExpectNextAsync(1);

upstream.SendComplete();
downstream.ExpectComplete();
await upstream.SendCompleteAsync();
await downstream.ExpectCompleteAsync();
}

[Fact]
public void KeepAlive_must_work_if_timer_fires_before_initial_request()
public async Task KeepAlive_must_work_if_timer_fires_before_initial_request()
{
var upstream = this.CreatePublisherProbe<int>();
var downstream = this.CreateSubscriberProbe<int>();
Expand All @@ -118,17 +116,17 @@ public void KeepAlive_must_work_if_timer_fires_before_initial_request()
.KeepAlive(TimeSpan.FromSeconds(1), () => 0)
.RunWith(Sink.FromSubscriber(downstream), Materializer);

downstream.EnsureSubscription();
downstream.ExpectNoMsg(TimeSpan.FromSeconds(1.5));
downstream.Request(1);
downstream.ExpectNext(0);
await downstream.EnsureSubscriptionAsync();
await downstream.ExpectNoMsgAsync(TimeSpan.FromSeconds(1.5));
await downstream.RequestAsync(1);
await downstream.ExpectNextAsync(0);

upstream.SendComplete();
downstream.ExpectComplete();
await upstream.SendCompleteAsync();
await downstream.ExpectCompleteAsync();
}

[Fact]
public void KeepAlive_must_work_if_timer_fires_before_initial_request_after_busy_period()
public async Task KeepAlive_must_work_if_timer_fires_before_initial_request_after_busy_period()
{
var upstream = this.CreatePublisherProbe<int>();
var downstream = this.CreateSubscriberProbe<int>();
Expand All @@ -138,19 +136,19 @@ public void KeepAlive_must_work_if_timer_fires_before_initial_request_after_busy
.KeepAlive(TimeSpan.FromSeconds(1), () => 0)
.RunWith(Sink.FromSubscriber(downstream), Materializer);

downstream.Request(10);
await downstream.RequestAsync(10);
downstream.ExpectNextN(Enumerable.Range(1, 10));

downstream.ExpectNoMsg(TimeSpan.FromSeconds(1.5));
downstream.Request(1);
downstream.ExpectNext(0);
await downstream.ExpectNoMsgAsync(TimeSpan.FromSeconds(1.5));
await downstream.RequestAsync(1);
await downstream.ExpectNextAsync(0);

upstream.SendComplete();
downstream.ExpectComplete();
await upstream.SendCompleteAsync();
await downstream.ExpectCompleteAsync();
}

[Fact]
public void KeepAlive_must_prefer_upstream_element_over_injected()
public async Task KeepAlive_must_prefer_upstream_element_over_injected()
{
var upstream = this.CreatePublisherProbe<int>();
var downstream = this.CreateSubscriberProbe<int>();
Expand All @@ -159,20 +157,20 @@ public void KeepAlive_must_prefer_upstream_element_over_injected()
.KeepAlive(TimeSpan.FromSeconds(1), () => 0)
.RunWith(Sink.FromSubscriber(downstream), Materializer);

downstream.EnsureSubscription();
downstream.ExpectNoMsg(TimeSpan.FromSeconds(1.5));
upstream.SendNext(1);
downstream.ExpectNoMsg(TimeSpan.FromSeconds(0.5));
await downstream.EnsureSubscriptionAsync();
await downstream.ExpectNoMsgAsync(TimeSpan.FromSeconds(1.5));
await upstream.SendNextAsync(1);
await downstream.ExpectNoMsgAsync(TimeSpan.FromSeconds(0.5));

downstream.Request(1);
downstream.ExpectNext(1);
await downstream.RequestAsync(1);
await downstream.ExpectNextAsync(1);

upstream.SendComplete();
downstream.ExpectComplete();
await upstream.SendCompleteAsync();
await downstream.ExpectCompleteAsync();
}

[Fact]
public void KeepAlive_must_prefer_upstream_element_over_injected_after_busy_period()
public async Task KeepAlive_must_prefer_upstream_element_over_injected_after_busy_period()
{
var upstream = this.CreatePublisherProbe<int>();
var downstream = this.CreateSubscriberProbe<int>();
Expand All @@ -182,22 +180,22 @@ public void KeepAlive_must_prefer_upstream_element_over_injected_after_busy_peri
.KeepAlive(TimeSpan.FromSeconds(1), () => 0)
.RunWith(Sink.FromSubscriber(downstream), Materializer);

downstream.Request(10);
await downstream.RequestAsync(10);
downstream.ExpectNextN(Enumerable.Range(1, 10));

downstream.ExpectNoMsg(TimeSpan.FromSeconds(1.5));
upstream.SendNext(1);
downstream.ExpectNoMsg(TimeSpan.FromSeconds(0.5));
await downstream.ExpectNoMsgAsync(TimeSpan.FromSeconds(1.5));
await upstream.SendNextAsync(1);
await downstream.ExpectNoMsgAsync(TimeSpan.FromSeconds(0.5));

downstream.Request(1);
downstream.ExpectNext(1);
await downstream.RequestAsync(1);
await downstream.ExpectNextAsync(1);

upstream.SendComplete();
downstream.ExpectComplete();
await upstream.SendCompleteAsync();
await downstream.ExpectCompleteAsync();
}

[Fact]
public void KeepAlive_must_reset_deadline_properly_after_injected_element()
public async Task KeepAlive_must_reset_deadline_properly_after_injected_element()
{
var upstream = this.CreatePublisherProbe<int>();
var downstream = this.CreateSubscriberProbe<int>();
Expand All @@ -206,12 +204,12 @@ public void KeepAlive_must_reset_deadline_properly_after_injected_element()
.KeepAlive(TimeSpan.FromSeconds(1), () => 0)
.RunWith(Sink.FromSubscriber(downstream), Materializer);

downstream.Request(2);
downstream.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
downstream.ExpectNext(0);
await downstream.RequestAsync(2);
await downstream.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500));
await downstream.ExpectNextAsync(0);

downstream.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
downstream.ExpectNext(0);
await downstream.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500));
await downstream.ExpectNextAsync(0);
}
}
}

0 comments on commit f3dddac

Please sign in to comment.