From 87f02c48549d84c32808d4a54b1f2f257d5d5865 Mon Sep 17 00:00:00 2001 From: Ebere Abanonu Date: Tue, 4 Apr 2023 14:44:19 +0100 Subject: [PATCH] Changes to `async` TestKit --- .../Dsl/FlowGroupedWithinSpec.cs | 266 +++++++++--------- 1 file changed, 135 insertions(+), 131 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowGroupedWithinSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowGroupedWithinSpec.cs index d2c3c4a1c33..26428a591df 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowGroupedWithinSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowGroupedWithinSpec.cs @@ -8,6 +8,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading.Tasks; using Akka.Streams.Dsl; using Akka.Streams.TestKit; using Akka.Util.Internal; @@ -35,7 +36,7 @@ public FlowGroupedWithinSpec(ITestOutputHelper helper) : base(helper) [Fact] public void A_GroupedWithin_must_group_elements_within_the_duration() { - this.AssertAllStagesStopped(() => + this.AssertAllStagesStopped(async() => { var input = new Iterator(Enumerable.Range(1, 10000)); var p = this.CreateManualPublisherProbe(); @@ -46,43 +47,43 @@ public void A_GroupedWithin_must_group_elements_within_the_duration() .To(Sink.FromSubscriber(c)) .Run(Materializer); - var pSub = p.ExpectSubscription(); - var cSub = c.ExpectSubscription(); + var pSub = await p.ExpectSubscriptionAsync(); + var cSub = await c.ExpectSubscriptionAsync(); cSub.Request(100); - var demand1 = (int)pSub.ExpectRequest(); + var demand1 = (int)await pSub.ExpectRequestAsync(); for (var i = 1; i <= demand1; i++) pSub.SendNext(input.Next()); - var demand2 = (int)pSub.ExpectRequest(); + var demand2 = (int)await pSub.ExpectRequestAsync(); for (var i = 1; i <= demand2; i++) pSub.SendNext(input.Next()); - var demand3 = (int)pSub.ExpectRequest(); + var demand3 = (int)await pSub.ExpectRequestAsync(); c.ExpectNext().Should().BeEquivalentTo(Enumerable.Range(1, demand1 + demand2)); for (var i = 1; i <= demand3; i++) pSub.SendNext(input.Next()); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); - c.ExpectNext() + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(300)); + (await c.ExpectNextAsync()) .Should().BeEquivalentTo(Enumerable.Range(demand1 + demand2 + 1, demand3)); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); - pSub.ExpectRequest(); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(300)); + await pSub.ExpectRequestAsync(); var last = input.Next(); pSub.SendNext(last); pSub.SendComplete(); c.ExpectNext().Should().HaveCount(1).And.HaveElementAt(0, last); - c.ExpectComplete(); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); + await c.ExpectCompleteAsync(); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); }, Materializer); } [Fact] - public void A_GroupedWithin_must_deliver_buffered_elements_OnComplete_before_the_timeout() + public async Task A_GroupedWithin_must_deliver_buffered_elements_OnComplete_before_the_timeout() { var c = this.CreateManualSubscriberProbe>(); @@ -91,16 +92,16 @@ public void A_GroupedWithin_must_deliver_buffered_elements_OnComplete_before_the .To(Sink.FromSubscriber(c)) .Run(Materializer); - var cSub = c.ExpectSubscription(); + var cSub = await c.ExpectSubscriptionAsync(); cSub.Request(100); - c.ExpectNext().Should().BeEquivalentTo(new[] { 1, 2, 3 }); - c.ExpectComplete(); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); + (await c.ExpectNextAsync()).Should().BeEquivalentTo(new[] { 1, 2, 3 }); + await c.ExpectCompleteAsync(); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200)); } [Fact] - public void A_GroupedWithin_must_buffer_groups_until_requested_from_downstream() + public async Task A_GroupedWithin_must_buffer_groups_until_requested_from_downstream() { var input = new Iterator(Enumerable.Range(1, 10000)); var p = this.CreateManualPublisherProbe(); @@ -111,30 +112,30 @@ public void A_GroupedWithin_must_buffer_groups_until_requested_from_downstream() .To(Sink.FromSubscriber(c)) .Run(Materializer); - var pSub = p.ExpectSubscription(); - var cSub = c.ExpectSubscription(); + var pSub = await p.ExpectSubscriptionAsync(); + var cSub = await c.ExpectSubscriptionAsync(); cSub.Request(1); - var demand1 = (int)pSub.ExpectRequest(); + var demand1 = (int)await pSub.ExpectRequestAsync(); for (var i = 1; i <= demand1; i++) pSub.SendNext(input.Next()); - c.ExpectNext().Should().BeEquivalentTo(Enumerable.Range(1, demand1)); + (await c.ExpectNextAsync()).Should().BeEquivalentTo(Enumerable.Range(1, demand1)); - var demand2 = (int)pSub.ExpectRequest(); + var demand2 = (int)await pSub.ExpectRequestAsync(); for (var i = 1; i <= demand2; i++) pSub.SendNext(input.Next()); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(300)); cSub.Request(1); - c.ExpectNext().Should().BeEquivalentTo(Enumerable.Range(demand1 + 1, demand2)); + (await c.ExpectNextAsync()).Should().BeEquivalentTo(Enumerable.Range(demand1 + 1, demand2)); pSub.SendComplete(); - c.ExpectComplete(); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await c.ExpectCompleteAsync(); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); } [Fact] - public void A_GroupedWithin_must_drop_empty_groups() + public async Task A_GroupedWithin_must_drop_empty_groups() { var p = this.CreateManualPublisherProbe(); var c = this.CreateManualSubscriberProbe>(); @@ -144,27 +145,27 @@ public void A_GroupedWithin_must_drop_empty_groups() .To(Sink.FromSubscriber(c)) .Run(Materializer); - var pSub = p.ExpectSubscription(); - var cSub = c.ExpectSubscription(); + var pSub = await p.ExpectSubscriptionAsync(); + var cSub = await c.ExpectSubscriptionAsync(); cSub.Request(2); - pSub.ExpectRequest(); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(600)); + await pSub.ExpectRequestAsync(); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(600)); pSub.SendNext(1); pSub.SendNext(2); - c.ExpectNext().Should().BeEquivalentTo(new[] { 1, 2 }); + (await c.ExpectNextAsync()).Should().BeEquivalentTo(new[] { 1, 2 }); // nothing more requested - c.ExpectNoMsg(TimeSpan.FromMilliseconds(1100)); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(1100)); cSub.Request(3); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(600)); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(600)); pSub.SendComplete(); - c.ExpectComplete(); - c.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await c.ExpectCompleteAsync(); + await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); } [Fact] - public void A_GroupedWithin_must_not_emit_empty_group_when_finished_while_not_being_pushed() + public async Task A_GroupedWithin_must_not_emit_empty_group_when_finished_while_not_being_pushed() { var p = this.CreateManualPublisherProbe(); var c = this.CreateManualSubscriberProbe>(); @@ -174,17 +175,18 @@ public void A_GroupedWithin_must_not_emit_empty_group_when_finished_while_not_be .To(Sink.FromSubscriber(c)) .Run(Materializer); - var pSub = p.ExpectSubscription(); - var cSub = c.ExpectSubscription(); + var pSub = await p.ExpectSubscriptionAsync(); + var cSub = await c.ExpectSubscriptionAsync(); cSub.Request(1); - pSub.ExpectRequest(); + await pSub.ExpectRequestAsync(); pSub.SendComplete(); - c.ExpectComplete(); + await c.ExpectCompleteAsync(); } - [Fact(Skip = "Skipped for async_testkit conversion build")] - public void A_GroupedWithin_must_reset_time_window_when_max_elements_reached() + // [Fact(Skip = "Skipped for async_testkit conversion build")] + [Fact] + public async Task A_GroupedWithin_must_reset_time_window_when_max_elements_reached() { var input = new Iterator(Enumerable.Range(1, 10000)); var upstream = this.CreatePublisherProbe(); @@ -195,27 +197,29 @@ public void A_GroupedWithin_must_reset_time_window_when_max_elements_reached() .To(Sink.FromSubscriber(downstream)) .Run(Materializer); - downstream.Request(2); - downstream.ExpectNoMsg(TimeSpan.FromMilliseconds(1000)); + await downstream.RequestAsync(2); + await downstream.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(1000)); - Enumerable.Range(1, 4).ForEach(_ => upstream.SendNext(input.Next())); - downstream.Within(TimeSpan.FromMilliseconds(1000), () => + foreach (var _ in Enumerable.Range(1, 4)) + upstream.SendNext(input.Next()); + //Enumerable.Range(1, 4).ForEach(_ => upstream.SendNext(input.Next())); + await downstream.WithinAsync(TimeSpan.FromMilliseconds(1000), async() => { - downstream.ExpectNext().Should().BeEquivalentTo(new[] { 1, 2, 3 }); + (await downstream.ExpectNextAsync()).Should().BeEquivalentTo(new[] { 1, 2, 3 }); return NotUsed.Instance; }); - downstream.ExpectNoMsg(TimeSpan.FromMilliseconds(1500)); + await downstream.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(1500)); - downstream.Within(TimeSpan.FromMilliseconds(1000), () => + await downstream.WithinAsync(TimeSpan.FromMilliseconds(1000), async() => { - downstream.ExpectNext().Should().BeEquivalentTo(new[] { 4 }); + (await downstream.ExpectNextAsync()).Should().BeEquivalentTo(new[] { 4 }); return NotUsed.Instance; }); upstream.SendComplete(); - downstream.ExpectComplete(); - downstream.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + await downstream.ExpectCompleteAsync(); + await downstream.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); } [Fact] @@ -282,7 +286,7 @@ public FlowGroupedWeightedWithinSpec(ITestOutputHelper helper) : base(helper) } [Fact] - public void A_GroupedWeightedWithin_must_handle_handle_elements_larger_than_the_limit() + public async Task A_GroupedWeightedWithin_must_handle_handle_elements_larger_than_the_limit() { var downstream = this.CreateSubscriberProbe>(); @@ -291,17 +295,17 @@ public void A_GroupedWeightedWithin_must_handle_handle_elements_larger_than_the_ .To(Sink.FromSubscriber(downstream)) .Run(Materializer); - downstream.Request(1); - downstream.ExpectNext().Should().BeEquivalentTo(new List { 1, 2, 3 }); - downstream.Request(1); - downstream.ExpectNext().Should().BeEquivalentTo(new List { 101 }); - downstream.Request(1); - downstream.ExpectNext().Should().BeEquivalentTo(new List { 4, 5, 6 }); - downstream.ExpectComplete(); + await downstream.RequestAsync(1); + (await downstream.ExpectNextAsync()).Should().BeEquivalentTo(new List { 1, 2, 3 }); + await downstream.RequestAsync(1); + (await downstream.ExpectNextAsync()).Should().BeEquivalentTo(new List { 101 }); + await downstream.RequestAsync(1); + (await downstream.ExpectNextAsync()).Should().BeEquivalentTo(new List { 4, 5, 6 }); + await downstream.ExpectCompleteAsync(); } [Fact] - public void A_GroupedWeightedWithin_must_not_drop_a_pending_last_element_on_upstream_finish() + public async Task A_GroupedWeightedWithin_must_not_drop_a_pending_last_element_on_upstream_finish() { var upstream = this.CreatePublisherProbe(); var downstream = this.CreateSubscriberProbe>(); @@ -311,22 +315,22 @@ public void A_GroupedWeightedWithin_must_not_drop_a_pending_last_element_on_upst .To(Sink.FromSubscriber(downstream)) .Run(Materializer); - downstream.EnsureSubscription(); - downstream.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); - upstream.SendNext(1); - upstream.SendNext(2); - upstream.SendNext(3); - upstream.SendComplete(); - downstream.Request(1); - downstream.ExpectNext().Should().BeEquivalentTo(new List { 1, 2 }); - downstream.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); - downstream.Request(1); - downstream.ExpectNext().Should().BeEquivalentTo(new List { 3 }); - downstream.ExpectComplete(); + await downstream.EnsureSubscriptionAsync(); + await downstream.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); + await upstream.SendNextAsync(1); + await upstream.SendNextAsync(2); + await upstream.SendNextAsync(3); + await upstream.SendCompleteAsync(); + await downstream.RequestAsync(1); + (await downstream.ExpectNextAsync()).Should().BeEquivalentTo(new List { 1, 2 }); + await downstream.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); + await downstream.RequestAsync(1); + (await downstream.ExpectNextAsync()).Should().BeEquivalentTo(new List { 3 }); + await downstream.ExpectCompleteAsync(); } [Fact] - public void A_GroupedWeightedWithin_must_append_zero_weighted_elements_to_a_full_group_before_timeout_received_if_downstream_hasnt_pulled_yet() + public async Task A_GroupedWeightedWithin_must_append_zero_weighted_elements_to_a_full_group_before_timeout_received_if_downstream_hasnt_pulled_yet() { var upstream = this.CreatePublisherProbe(); var downstream = this.CreateSubscriberProbe>(); @@ -336,24 +340,24 @@ public void A_GroupedWeightedWithin_must_append_zero_weighted_elements_to_a_full .To(Sink.FromSubscriber(downstream)) .Run(Materializer); - downstream.EnsureSubscription(); - upstream.SendNext("333"); - upstream.SendNext("22"); - upstream.SendNext(""); - upstream.SendNext(""); - upstream.SendNext(""); - downstream.Request(1); - downstream.ExpectNext().Should().BeEquivalentTo(new List { "333", "22", "", "", "" }); - upstream.SendNext(""); - upstream.SendNext(""); - upstream.SendComplete(); - downstream.Request(1); - downstream.ExpectNext().Should().BeEquivalentTo(new List { "", "" }); - downstream.ExpectComplete(); + await downstream.EnsureSubscriptionAsync(); + await upstream.SendNextAsync("333"); + await upstream.SendNextAsync("22"); + await upstream.SendNextAsync(""); + await upstream.SendNextAsync(""); + await upstream.SendNextAsync(""); + await downstream.RequestAsync(1); + (await downstream.ExpectNextAsync()).Should().BeEquivalentTo(new List { "333", "22", "", "", "" }); + await upstream.SendNextAsync(""); + await upstream.SendNextAsync(""); + await upstream.SendCompleteAsync(); + await downstream.RequestAsync(1); + (await downstream.ExpectNextAsync()).Should().BeEquivalentTo(new List { "", "" }); + await downstream.ExpectCompleteAsync(); } [Fact] - public void A_GroupedWeightedWithin_must_not_emit_an_empty_group_if_first_element_is_heavier_than_maxWeight() + public async Task A_GroupedWeightedWithin_must_not_emit_an_empty_group_if_first_element_is_heavier_than_maxWeight() { var upstream = this.CreatePublisherProbe(); var downstream = this.CreateSubscriberProbe>(); @@ -363,16 +367,16 @@ public void A_GroupedWeightedWithin_must_not_emit_an_empty_group_if_first_elemen .To(Sink.FromSubscriber(downstream)) .Run(Materializer); - downstream.EnsureSubscription(); - downstream.Request(1); - upstream.SendNext(11); - downstream.ExpectNext().Should().BeEquivalentTo(new List { 11 }); - upstream.SendComplete(); - downstream.ExpectComplete(); + await downstream.EnsureSubscriptionAsync(); + await downstream.RequestAsync(1); + await upstream.SendNextAsync(11); + (await downstream.ExpectNextAsync()).Should().BeEquivalentTo(new List { 11 }); + await upstream.SendCompleteAsync(); + await downstream.ExpectCompleteAsync(); } [Fact] - public void A_GroupedWeightedWithin_must_handle_zero_cost_function_to_get_only_timed_based_grouping_without_limit() + public async Task A_GroupedWeightedWithin_must_handle_zero_cost_function_to_get_only_timed_based_grouping_without_limit() { var upstream = this.CreatePublisherProbe(); var downstream = this.CreateSubscriberProbe>(); @@ -382,20 +386,20 @@ public void A_GroupedWeightedWithin_must_handle_zero_cost_function_to_get_only_t .To(Sink.FromSubscriber(downstream)) .Run(Materializer); - downstream.EnsureSubscription(); - downstream.Request(1); - upstream.SendNext("333"); - upstream.SendNext("22"); - upstream.SendNext("333"); - upstream.SendNext("22"); - downstream.ExpectNoMsg(TimeSpan.FromMilliseconds(50)); - downstream.ExpectNext().Should().BeEquivalentTo(new List { "333", "22", "333", "22" }); - upstream.SendComplete(); - downstream.ExpectComplete(); + await downstream.EnsureSubscriptionAsync(); + await downstream.RequestAsync(1); + await upstream.SendNextAsync("333"); + await upstream.SendNextAsync("22"); + await upstream.SendNextAsync("333"); + await upstream.SendNextAsync("22"); + await downstream.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(50)); + (await downstream.ExpectNextAsync()).Should().BeEquivalentTo(new List { "333", "22", "333", "22" }); + await upstream.SendCompleteAsync(); + await downstream.ExpectCompleteAsync(); } [Fact] - public void A_GroupedWeightedWithin_must_group_by_max_weight_and_max_number_of_elements_reached() + public async Task A_GroupedWeightedWithin_must_group_by_max_weight_and_max_number_of_elements_reached() { var upstream = this.CreatePublisherProbe(); var downstream = this.CreateSubscriberProbe>(); @@ -405,33 +409,33 @@ public void A_GroupedWeightedWithin_must_group_by_max_weight_and_max_number_of_e .To(Sink.FromSubscriber(downstream)) .Run(Materializer); - downstream.EnsureSubscription(); - upstream.SendNext(1); - upstream.SendNext(2); - upstream.SendNext(3); - upstream.SendNext(4); - upstream.SendNext(5); - upstream.SendNext(6); - upstream.SendNext(11); - upstream.SendNext(7); - upstream.SendNext(2); - upstream.SendComplete(); - downstream.Request(1); + await downstream.EnsureSubscriptionAsync(); + await upstream.SendNextAsync(1); + await upstream.SendNextAsync(2); + await upstream.SendNextAsync(3); + await upstream.SendNextAsync(4); + await upstream.SendNextAsync(5); + await upstream.SendNextAsync(6); + await upstream.SendNextAsync(11); + await upstream.SendNextAsync(7); + await upstream.SendNextAsync(2); + await upstream.SendCompleteAsync(); + await downstream.RequestAsync(1); // split because of maxNumber: 3 element - downstream.ExpectNext().Should().BeEquivalentTo(new List { 1, 2, 3 }); - downstream.Request(1); + (await downstream.ExpectNextAsync()).Should().BeEquivalentTo(new List { 1, 2, 3 }); + await downstream.RequestAsync(1); // split because of maxWeight: 9=4+5, one more element did not fit - downstream.ExpectNext().Should().BeEquivalentTo(new List { 4, 5 }); - downstream.Request(1); + (await downstream.ExpectNextAsync()).Should().BeEquivalentTo(new List { 4, 5 }); + await downstream.RequestAsync(1); // split because of maxWeight: 6, one more element did not fit - downstream.ExpectNext().Should().BeEquivalentTo(new List { 6 }); - downstream.Request(1); + (await downstream.ExpectNextAsync()).Should().BeEquivalentTo(new List { 6 }); + await downstream.RequestAsync(1); // split because of maxWeight: 11 - downstream.ExpectNext().Should().BeEquivalentTo(new List { 11 }); - downstream.Request(1); + (await downstream.ExpectNextAsync()).Should().BeEquivalentTo(new List { 11 }); + await downstream.RequestAsync(1); // no split - downstream.ExpectNext().Should().BeEquivalentTo(new List { 7, 2 }); - downstream.ExpectComplete(); + (await downstream.ExpectNextAsync()).Should().BeEquivalentTo(new List { 7, 2 }); + await downstream.ExpectCompleteAsync(); } } }