diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs index 186fe7399ea..5891e573814 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs @@ -38,63 +38,6 @@ public FlowGroupBySpec(ITestOutputHelper helper) : base(helper) Materializer = ActorMaterializer.Create(Sys, settings); } - private sealed class StreamPuppet - { - private readonly TestSubscriber.ManualProbe _probe; - private readonly ISubscription _subscription; - - public StreamPuppet(IPublisher p, TestKitBase kit) - { - _probe = kit.CreateManualSubscriberProbe(); - p.Subscribe(_probe); - _subscription = _probe.ExpectSubscription(); - } - - public void Request(int demand) => _subscription.Request(demand); - - public void ExpectNext(int element) => _probe.ExpectNext(element); - - public void ExpectNoMsg(TimeSpan max) => _probe.ExpectNoMsg(max); - - public void ExpectComplete() => _probe.ExpectComplete(); - - public void ExpectError(Exception ex) => _probe.ExpectError().Should().Be(ex); - - public void Cancel() => _subscription.Cancel(); - } - - private void WithSubstreamsSupport(int groupCount = 2, int elementCount = 6, int maxSubstream = -1, - Action)>, ISubscription, Func>> run = null) - { - - var source = Source.From(Enumerable.Range(1, elementCount)).RunWith(Sink.AsPublisher(false), Materializer); - var max = maxSubstream > 0 ? maxSubstream : groupCount; - var groupStream = - Source.FromPublisher(source) - .GroupBy(max, x => x % groupCount) - .Lift(x => x % groupCount) - .RunWith(Sink.AsPublisher<(int, Source)>(false), Materializer); - var masterSubscriber = this.CreateManualSubscriberProbe<(int, Source)>(); - - groupStream.Subscribe(masterSubscriber); - var masterSubscription = masterSubscriber.ExpectSubscription(); - - run?.Invoke(masterSubscriber, masterSubscription, expectedKey => - { - masterSubscription.Request(1); - var tuple = masterSubscriber.ExpectNext(); - tuple.Item1.Should().Be(expectedKey); - return tuple.Item2; - }); - } - - private ByteString RandomByteString(int size) - { - var a = new byte[size]; - ThreadLocalRandom.Current.NextBytes(a); - return ByteString.FromBytes(a); - } - [Fact] public void GroupBy_must_work_in_the_happy_case() { @@ -167,11 +110,11 @@ public void GroupBy_must_fail_when_key_function_returns_null() } [Fact] - public void GroupBy_must_support_cancelling_substreams() + public void GroupBy_must_accept_cancelling_substreams() { this.AssertAllStagesStopped(() => { - WithSubstreamsSupport(2, run: (masterSubscriber, masterSubscription, getSubFlow) => + WithSubstreamsSupport(2, maxSubstream: 3, run: (masterSubscriber, masterSubscription, getSubFlow) => { new StreamPuppet(getSubFlow(1).RunWith(Sink.AsPublisher(false), Materializer), this).Cancel(); var substream = new StreamPuppet(getSubFlow(0).RunWith(Sink.AsPublisher(false), Materializer), this); @@ -740,6 +683,63 @@ public void GroupBy_must_work_with_random_demand() }, Materializer); } + private sealed class StreamPuppet + { + private readonly TestSubscriber.ManualProbe _probe; + private readonly ISubscription _subscription; + + public StreamPuppet(IPublisher p, TestKitBase kit) + { + _probe = kit.CreateManualSubscriberProbe(); + p.Subscribe(_probe); + _subscription = _probe.ExpectSubscription(); + } + + public void Request(int demand) => _subscription.Request(demand); + + public void ExpectNext(int element) => _probe.ExpectNext(element); + + public void ExpectNoMsg(TimeSpan max) => _probe.ExpectNoMsg(max); + + public void ExpectComplete() => _probe.ExpectComplete(); + + public void ExpectError(Exception ex) => _probe.ExpectError().Should().Be(ex); + + public void Cancel() => _subscription.Cancel(); + } + + private void WithSubstreamsSupport(int groupCount = 2, int elementCount = 6, int maxSubstream = -1, + Action)>, ISubscription, Func>> run = null) + { + + var source = Source.From(Enumerable.Range(1, elementCount)).RunWith(Sink.AsPublisher(false), Materializer); + var max = maxSubstream > 0 ? maxSubstream : groupCount; + var groupStream = + Source.FromPublisher(source) + .GroupBy(max, x => x % groupCount) + .Lift(x => x % groupCount) + .RunWith(Sink.AsPublisher<(int, Source)>(false), Materializer); + var masterSubscriber = this.CreateManualSubscriberProbe<(int, Source)>(); + + groupStream.Subscribe(masterSubscriber); + var masterSubscription = masterSubscriber.ExpectSubscription(); + + run?.Invoke(masterSubscriber, masterSubscription, expectedKey => + { + masterSubscription.Request(1); + var tuple = masterSubscriber.ExpectNext(); + tuple.Item1.Should().Be(expectedKey); + return tuple.Item2; + }); + } + + private ByteString RandomByteString(int size) + { + var a = new byte[size]; + ThreadLocalRandom.Current.NextBytes(a); + return ByteString.FromBytes(a); + } + private sealed class SubFlowState { public SubFlowState(TestSubscriber.Probe probe, bool hasDemand, ByteString firstElement) diff --git a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs index 2f0161f9550..eb620ce9d1f 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs @@ -410,7 +410,7 @@ public void OnPush() } else { - if (_activeSubstreams.Count == _stage._maxSubstreams) + if (_activeSubstreams.Count + _closedSubstreams.Count == _stage._maxSubstreams) throw new TooManySubstreamsOpenException(); else if (_closedSubstreams.Contains(key) && !HasBeenPulled(_stage.In)) Pull(_stage.In);