From 389c000ec7bf3e39c36fdd0b47362a29db1a8b6e Mon Sep 17 00:00:00 2001 From: Ismael Hamed <1279846+ismaelhamed@users.noreply.github.com> Date: Sat, 23 Apr 2022 10:05:35 +0200 Subject: [PATCH 1/6] GroupBy pulls upstream when a substream materialization is waiting (cherry picked from commit 98be809b66114f49b2b77e14ba05ac8262bce545) --- .../Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs | 61 ++++++++++++++++++- .../Implementation/Fusing/StreamOfStreams.cs | 35 +++++------ 2 files changed, 77 insertions(+), 19 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs index 2609e557762..dd3d0dc420f 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs @@ -483,7 +483,7 @@ public void GroupBy_must_work_under_fuzzing_stress_test() } [Fact] - public void GroupBy_must_Work_if_pull_is_exercised_from_both_substream_and_main() + public void GroupBy_must_work_if_pull_is_exercised_from_both_substream_and_main() { this.AssertAllStagesStopped(() => { @@ -517,6 +517,63 @@ public void GroupBy_must_Work_if_pull_is_exercised_from_both_substream_and_main( }, Materializer); } + [Fact] + public void GroupBy_must_work_if_pull_is_exercised_from_multiple_substreams_while_downstream_is_backpressuring() + { + this.AssertAllStagesStopped(() => + { + var upstream = this.CreatePublisherProbe(); + var downstreamMaster = this.CreateSubscriberProbe>(); + + Source.FromPublisher(upstream) + .Via(new GroupBy(10, element => element)) + .RunWith(Sink.FromSubscriber(downstreamMaster), Materializer); + + var substream1 = this.CreateSubscriberProbe(); + downstreamMaster.Request(1); + upstream.SendNext(1); + downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream1), Materializer); + + var substream2 = this.CreateSubscriberProbe(); + downstreamMaster.Request(1); + upstream.SendNext(2); + downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream2), Materializer); + + substream1.Request(1); + substream1.ExpectNext(1); + substream2.Request(1); + substream2.ExpectNext(2); + + // Both substreams pull + substream1.Request(1); + substream2.Request(1); + + // Upstream sends new groups + upstream.SendNext(3); + upstream.SendNext(4); + + var substream3 = this.CreateSubscriberProbe(); + var substream4 = this.CreateSubscriberProbe(); + downstreamMaster.Request(1); + downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream3), Materializer); + downstreamMaster.Request(1); + downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream4), Materializer); + + substream3.Request(1); + substream3.ExpectNext(3); + substream4.Request(1); + substream4.ExpectNext(4); + + // Cleanup, not part of the actual test + substream1.Cancel(); + substream2.Cancel(); + substream3.Cancel(); + substream4.Cancel(); + downstreamMaster.Cancel(); + upstream.SendComplete(); + }, Materializer); + } + [Fact] public void GroupBy_must_work_with_random_demand() { @@ -681,7 +738,7 @@ private void RandomDemand(Dictionary map, RandomDemandPropert map[key] = new SubFlowState(state.Probe, false, null); } else if (props.BlockingNextElement == null) - break; + break; } } diff --git a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs index 87d08789836..8cc294577fa 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs @@ -181,7 +181,7 @@ public FlattenMerge(int breadth) internal sealed class PrefixAndTail : GraphStage, Source)>> { #region internal classes - + private sealed class Logic : TimerGraphStageLogic, IInHandler, IOutHandler { private const string SubscriptionTimer = "SubstreamSubscriptionTimer"; @@ -205,11 +205,11 @@ public Logic(PrefixAndTail stage) : base(stage.Shape) Pull(_stage._in); _tailSource.SetHandler(new LambdaOutHandler(onPull: () => Pull(_stage._in))); }); - + SetHandler(_stage._in, this); SetHandler(_stage._out, this); } - + protected internal override void OnTimer(object timerKey) { var materializer = ActorMaterializerHelper.Downcast(Interpreter.Materializer); @@ -360,7 +360,7 @@ public PrefixAndTail(int count) /// TBD internal sealed class GroupBy : GraphStage>> { - #region Loigc + #region Logic private sealed class Logic : TimerGraphStageLogic, IInHandler, IOutHandler { @@ -370,7 +370,7 @@ private sealed class Logic : TimerGraphStageLogic, IInHandler, IOutHandler private readonly HashSet _substreamsJustStarted = new HashSet(); private readonly Lazy _decider; private TimeSpan _timeout; - private SubstreamSource _substreamWaitingToBePushed; + private Option _substreamWaitingToBePushed = Option.None; private Option _nextElementKey = Option.None; private Option _nextElementValue = Option.None; private long _nextId; @@ -379,12 +379,12 @@ private sealed class Logic : TimerGraphStageLogic, IInHandler, IOutHandler public Logic(GroupBy stage, Attributes inheritedAttributes) : base(stage.Shape) { _stage = stage; - + _decider = new Lazy(() => { var attribute = inheritedAttributes.GetAttribute(null); return attribute != null ? attribute.Decider : Deciders.StoppingDecider; - }); + }); SetHandler(_stage.In, this); SetHandler(_stage.Out, this); @@ -431,11 +431,12 @@ public void OnPush() public void OnPull() { - if (_substreamWaitingToBePushed != null) + if (_substreamWaitingToBePushed.HasValue) { - Push(_stage.Out, Source.FromGraph(_substreamWaitingToBePushed.Source)); - ScheduleOnce(_substreamWaitingToBePushed.Key.Value, _timeout); - _substreamWaitingToBePushed = null; + var substreamSource = _substreamWaitingToBePushed.Value; + Push(_stage.Out, Source.FromGraph(substreamSource.Source)); + ScheduleOnce(substreamSource.Key.Value, _timeout); + _substreamWaitingToBePushed = Option.None; } else { @@ -500,7 +501,7 @@ private void Fail(Exception ex) FailStage(ex); } - private bool NeedToPull => !(HasBeenPulled(_stage.In) || IsClosed(_stage.In) || HasNextElement); + private bool NeedToPull => !(HasBeenPulled(_stage.In) || IsClosed(_stage.In) || HasNextElement || _substreamWaitingToBePushed.HasValue); public override void PreStart() { @@ -530,7 +531,7 @@ private void RunSubstream(TKey key, T value) { Push(_stage.Out, Source.FromGraph(substreamSource.Source)); ScheduleOnce(key, _timeout); - _substreamWaitingToBePushed = null; + _substreamWaitingToBePushed = Option.None; } else { @@ -628,7 +629,7 @@ public GroupBy(int maxSubstreams, Func keyFor) { _maxSubstreams = maxSubstreams; _keyFor = keyFor; - + Shape = new FlowShape>(In, Out); } @@ -778,7 +779,7 @@ public override void OnDownstreamFinish() else // Start draining if (!_logic.HasBeenPulled(_inlet)) - _logic.Pull(_inlet); + _logic.Pull(_inlet); } public override void OnPush() @@ -1010,7 +1011,7 @@ protected CommandScheduledBeforeMaterialization(ICommand command) internal class RequestOneScheduledBeforeMaterialization : CommandScheduledBeforeMaterialization { public static readonly RequestOneScheduledBeforeMaterialization Instance = new RequestOneScheduledBeforeMaterialization(RequestOne.Instance); - + private RequestOneScheduledBeforeMaterialization(ICommand command) : base(command) { } @@ -1046,7 +1047,7 @@ private RequestOne() { } } - + internal class Cancel : ICommand { public static readonly Cancel Instance = new Cancel(); From 14562378ba9b5330530d0907ee55c02bc3314a91 Mon Sep 17 00:00:00 2001 From: Ismael Hamed <1279846+ismaelhamed@users.noreply.github.com> Date: Sat, 23 Apr 2022 10:25:26 +0200 Subject: [PATCH 2/6] Cancel GroupBy when all substreams cancel (cherry picked from commit 4584f596e599b55e002b9f58f1b02029933a8c3a) --- .../Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs | 44 +++++++++++++++++++ .../Implementation/Fusing/StreamOfStreams.cs | 30 ++++++++----- 2 files changed, 62 insertions(+), 12 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs index dd3d0dc420f..f098ad318f0 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs @@ -574,6 +574,50 @@ public void GroupBy_must_work_if_pull_is_exercised_from_multiple_substreams_whil }, Materializer); } + [Fact] + public void GroupBy_must_cancel_if_downstream_has_cancelled_and_all_substreams_cancel() + { + this.AssertAllStagesStopped(() => + { + var upstream = this.CreatePublisherProbe(); + var downstreamMaster = this.CreateSubscriberProbe>(); + + Source.FromPublisher(upstream) + .Via(new GroupBy(10, element => element)) + .RunWith(Sink.FromSubscriber(downstreamMaster), Materializer); + + var substream1 = this.CreateSubscriberProbe(); + downstreamMaster.Request(1); + upstream.SendNext(1); + downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream1), Materializer); + + var substream2 = this.CreateSubscriberProbe(); + downstreamMaster.Request(1); + upstream.SendNext(2); + downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream2), Materializer); + + // Cancel downstream + downstreamMaster.Cancel(); + + // Both substreams still work + substream1.Request(1); + substream1.ExpectNext(1); + substream2.Request(1); + substream2.ExpectNext(2); + + // New keys are ignored + upstream.SendNext(3); + upstream.SendNext(4); + + // Cancel all substreams + substream1.Cancel(); + substream2.Cancel(); + + // Upstream gets cancelled + upstream.ExpectCancellation(); + }, Materializer); + } + [Fact] public void GroupBy_must_work_with_random_demand() { diff --git a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs index 8cc294577fa..7b91f644c04 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs @@ -15,7 +15,6 @@ using Akka.Streams.Implementation.Stages; using Akka.Streams.Stage; using Akka.Streams.Supervision; -using Akka.Streams.Util; using Akka.Util; using Akka.Util.Internal; @@ -464,9 +463,7 @@ public void OnUpstreamFinish() public void OnDownstreamFinish() { - if (_activeSubstreams.Count == 0) - CompleteStage(); - else + if (!TryCancel()) SetKeepGoing(true); } @@ -493,6 +490,18 @@ private bool TryCompleteAll() return false; } + private bool TryCancel() + { + // if there's no active substreams or there's only one but it's not been pushed yet + if (_activeSubstreams.Count == 0 || (_activeSubstreams.Count == 1 && _substreamWaitingToBePushed.HasValue)) + { + CompleteStage(); + return true; + } + + return false; + } + private void Fail(Exception ex) { foreach (var value in _activeSubstreams.Values) @@ -602,15 +611,12 @@ public void OnPull() public void OnDownstreamFinish() { - if(_logic.HasNextElement && _logic._nextElementKey.Equals(Key)) - _logic.ClearNextElement(); - if (FirstPush) - _logic._firstPushCounter--; + if(_logic.HasNextElement && _logic._nextElementKey.Equals(Key)) _logic.ClearNextElement(); + if (FirstPush) _logic._firstPushCounter--; CompleteSubStream(); - if (_logic.IsClosed(_logic._stage.In)) - _logic.TryCompleteAll(); - else if (_logic.NeedToPull) - _logic.Pull(_logic._stage.In); + if (_logic.IsClosed(_logic._stage.Out)) _logic.TryCancel(); + if (_logic.IsClosed(_logic._stage.In)) _logic.TryCompleteAll(); + else if (_logic.NeedToPull) _logic.Pull(_logic._stage.In); } } } From a36765badee545d9eef526c2f8480e15e7305e2a Mon Sep 17 00:00:00 2001 From: Ismael Hamed <1279846+ismaelhamed@users.noreply.github.com> Date: Sat, 23 Apr 2022 12:19:09 +0200 Subject: [PATCH 3/6] Allow GroupBy to recreate already closed substreams (cherry picked from commit b77b68e4ccb7c4b1b11155c074dd724e6fa3e1dc) --- docs/articles/streams/builtinstages.md | 10 +- .../CoreAPISpec.ApproveStreams.verified.txt | 1 + .../Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs | 35 +++++++ src/core/Akka.Streams/Dsl/FlowOperations.cs | 83 ++++++++++------ .../Dsl/Internal/InternalFlowOperations.cs | 99 ++++++++++++++----- .../Implementation/Fusing/StreamOfStreams.cs | 20 ++-- 6 files changed, 184 insertions(+), 64 deletions(-) diff --git a/docs/articles/streams/builtinstages.md b/docs/articles/streams/builtinstages.md index 8c827766fa9..900dbf1e4fc 100644 --- a/docs/articles/streams/builtinstages.md +++ b/docs/articles/streams/builtinstages.md @@ -1000,7 +1000,15 @@ and returns a pair containing a strict sequence of the taken element and a strea ### GroupBy -De-multiplex the incoming stream into separate output streams. +This operation demultiplexes the incoming stream into separate output streams, one for each element key. The +key is computed for each element using the given function. When a new key is encountered for the first time +a new substream is opened and subsequently fed with all elements belonging to that key. + +> [!NOTE] +> If `allowClosedSubstreamRecreation` is set to `true` substream completion and incoming elements are subject to race-conditions. If elements arrive for a stream that is in the process of closing these elements might get lost. + +> [!WARNING] +> If `allowClosedSubstreamRecreation` is set to `false` (default behavior) the stage keeps track of all keys of streams that have already been closed. If you expect an infinite number of keys this can cause memory issues. Elements belonging to those keys are drained directly and not send to the substream. **emits** an element for which the grouping function returns a group that has not yet been created. Emits the new group there is an element pending for a group whose substream backpressures diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt index aca25f5b718..8eeba130ac5 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt @@ -1342,6 +1342,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Flow DivertTo(this Akka.Streams.Dsl.Flow flow, Akka.Streams.IGraph, TMat> that, System.Func when) { } public static Akka.Streams.Dsl.Flow DivertToMaterialized(this Akka.Streams.Dsl.Flow flow, Akka.Streams.IGraph, TMat2> that, System.Func when, System.Func materializerFunction) { } public static Akka.Streams.Dsl.Flow Expand(this Akka.Streams.Dsl.Flow flow, System.Func> extrapolate) { } + public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Flow flow, int maxSubstreams, System.Func groupingFunc, bool allowClosedSubstreamRecreation) { } public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Flow flow, int maxSubstreams, System.Func groupingFunc) { } public static Akka.Streams.Dsl.Flow, TMat> Grouped(this Akka.Streams.Dsl.Flow flow, int n) { } public static Akka.Streams.Dsl.Flow, TMat> GroupedWithin(this Akka.Streams.Dsl.Flow flow, int n, System.TimeSpan timeout) { } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs index f098ad318f0..f7193503b1e 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs @@ -574,6 +574,41 @@ public void GroupBy_must_work_if_pull_is_exercised_from_multiple_substreams_whil }, Materializer); } + [Fact] + public void GroupBy_must_allow_to_recreate_an_already_closed_substream() + { + this.AssertAllStagesStopped(() => + { + var f = Flow.Create() + .GroupBy(2, x => x, allowClosedSubstreamRecreation: true) + .Take(1) // close the substream after 1 element + .MergeSubstreams(); + + var (up, down) = ((Flow)f) + .RunWith(this.SourceProbe(), this.SinkProbe(), Materializer); + + down.Request(4); + + // Creates and closes substream "1" + up.SendNext(1); + down.ExpectNext(1); + + // Creates and closes substream "2" + up.SendNext(2); + down.ExpectNext(2); + + // Recreates and closes substream "1" twice + up.SendNext(1); + down.ExpectNext(1); + up.SendNext(1); + down.ExpectNext(1); + + // Cleanup, not part of the actual test + up.SendComplete(); + down.ExpectComplete(); + }, Materializer); + } + [Fact] public void GroupBy_must_cancel_if_downstream_has_cancelled_and_all_substreams_cancel() { diff --git a/src/core/Akka.Streams/Dsl/FlowOperations.cs b/src/core/Akka.Streams/Dsl/FlowOperations.cs index b8359b7beea..32fe8a69631 100644 --- a/src/core/Akka.Streams/Dsl/FlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/FlowOperations.cs @@ -1315,48 +1315,75 @@ public static Flow Transform(this Flo /// This operation demultiplexes the incoming stream into separate output /// streams, one for each element key. The key is computed for each element /// using the given function. When a new key is encountered for the first time - /// it is emitted to the downstream subscriber together with a fresh - /// flow that will eventually produce all the elements of the substream - /// for that key. Not consuming the elements from the created streams will - /// stop this processor from processing more elements, therefore you must take - /// care to unblock (or cancel) all of the produced streams even if you want - /// to consume only one of them. - /// + /// a new substream is opened and subsequently fed with all elements belonging to + /// that key. + /// + /// WARNING: If is set to false (default behavior) the operator + /// keeps track of all keys of streams that have already been closed. If you expect an infinite number of keys this + /// can cause memory issues. Elements belonging to those keys are drained directly and not send to the substream. + /// + /// + /// Note: If is set to true substream completion and incoming + /// elements are subject to race-conditions. If elements arrive for a stream that is in the process of closing + /// these elements might get lost. + /// + /// + /// The object returned from this method is not a normal , it is a + /// . This means that after this operator + /// all transformations are applied to all encountered substreams in the same fashion. + /// Substream mode is exited either by closing the substream (i.e. connecting it to a ) + /// or by merging the substreams back together; see the To and MergeBack methods + /// on for more information. + /// + /// + /// It is important to note that the substreams also propagate back-pressure as any other stream, which means + /// that blocking one substream will block the GroupBy operator itself —and thereby all substreams— once all + /// internal or explicit buffers are filled. + /// + /// /// If the group by function throws an exception and the supervision decision - /// is the stream and substreams will be completed - /// with failure. - /// + /// is the stream and substreams will be completed with failure. + /// + /// /// If the group by throws an exception and the supervision decision /// is or /// the element is dropped and the stream and substreams continue. - /// - /// Function MUST NOT return null. This will throw exception and trigger supervision decision mechanism. - /// - /// Adheres to the attribute. - /// - /// - /// Emits when an element for which the grouping function returns a group that has not yet been created. - /// Emits the new group /// - /// Backpressures when there is an element pending for a group whose substream backpressures /// - /// Completes when upstream completes + /// Function MUST NOT return null. This will throw exception and trigger supervision decision mechanism. /// - /// Cancels when downstream cancels and all substreams cancel + /// **Emits when** an element for which the grouping function returns a group that has not yet been created. Emits the new group. + /// **Backpressures when** there is an element pending for a group whose substream backpressures + /// **Completes when** upstream completes + /// **Cancels when** downstream cancels and all substreams cancel /// /// TBD /// TBD /// TBD /// TBD /// TBD - /// TBD - /// TBD + /// Configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails + /// Computes the key for each element + /// Enables recreation of already closed substreams if elements with their corresponding keys arrive after completion /// TBD - public static SubFlow> GroupBy(this Flow flow, int maxSubstreams, Func groupingFunc) - { - return flow.GroupBy(maxSubstreams, groupingFunc, - (f, s) => ((Flow, TMat>) f).To(s)); - } + public static SubFlow> GroupBy(this Flow flow, int maxSubstreams, Func groupingFunc, bool allowClosedSubstreamRecreation) => + flow.GroupBy(maxSubstreams, groupingFunc, (f, s) => ((Flow, TMat>)f).To(s), allowClosedSubstreamRecreation); + + /// + /// This operation demultiplexes the incoming stream into separate output + /// streams, one for each element key. The key is computed for each element + /// using the given function. When a new key is encountered for the first time + /// a new substream is opened and subsequently fed with all elements belonging to + /// that key. + /// + /// WARNING: The stage keeps track of all keys of streams that have already been closed. + /// If you expect an infinite number of keys this can cause memory issues. Elements belonging + /// to those keys are drained directly and not send to the substream. + /// + /// See + /// + public static SubFlow> GroupBy(this Flow flow, int maxSubstreams, Func groupingFunc) => + flow.GroupBy(maxSubstreams, groupingFunc, (f, s) => ((Flow, TMat>)f).To(s), false); /// /// This operation applies the given predicate to all incoming elements and diff --git a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs index 70900d5d5c2..e0224cb6e41 100644 --- a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs @@ -1269,59 +1269,103 @@ public static IFlow Transform(this IFlow /// This operation demultiplexes the incoming stream into separate output /// streams, one for each element key. The key is computed for each element /// using the given function. When a new key is encountered for the first time - /// it is emitted to the downstream subscriber together with a fresh - /// flow that will eventually produce all the elements of the substream - /// for that key. Not consuming the elements from the created streams will - /// stop this processor from processing more elements, therefore you must take - /// care to unblock (or cancel) all of the produced streams even if you want - /// to consume only one of them. - /// + /// a new substream is opened and subsequently fed with all elements belonging to + /// that key. + /// + /// WARNING: If is set to false (default behavior) the operator + /// keeps track of all keys of streams that have already been closed. If you expect an infinite number of keys this + /// can cause memory issues. Elements belonging to those keys are drained directly and not send to the substream. + /// + /// + /// Note: If is set to true substream completion and incoming + /// elements are subject to race-conditions. If elements arrive for a stream that is in the process of closing + /// these elements might get lost. + /// + /// + /// The object returned from this method is not a normal or , it is a + /// . This means that after this operator + /// all transformations are applied to all encountered substreams in the same fashion. + /// Substream mode is exited either by closing the substream (i.e. connecting it to a ) + /// or by merging the substreams back together; see the To and MergeBack methods + /// on for more information. + /// + /// + /// It is important to note that the substreams also propagate back-pressure as any other stream, which means + /// that blocking one substream will block the GroupBy operator itself —and thereby all substreams— once all + /// internal or explicit buffers are filled. + /// + /// /// If the group by function throws an exception and the supervision decision - /// is the stream and substreams will be completed - /// with failure. - /// + /// is the stream and substreams will be completed with failure. + /// + /// /// If the group by throws an exception and the supervision decision /// is or /// the element is dropped and the stream and substreams continue. - /// - /// Function MUST NOT return null. This will throw exception and trigger supervision decision mechanism. - /// - /// Emits when an element for which the grouping function returns a group that has not yet been created. - /// Emits the new group /// - /// Backpressures when there is an element pending for a group whose substream backpressures /// - /// Completes when upstream completes + /// Function MUST NOT return null. This will throw exception and trigger supervision decision mechanism. /// - /// Cancels when downstream cancels and all substreams cancel + /// **Emits when** an element for which the grouping function returns a group that has not yet been created. Emits the new group. + /// **Backpressures when** there is an element pending for a group whose substream backpressures + /// **Completes when** upstream completes + /// **Cancels when** downstream cancels and all substreams cancel /// /// TBD /// TBD /// TBD /// TBD /// TBD - /// TBD - /// TBD + /// Configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails + /// Computes the key for each element /// TBD + /// Enables recreation of already closed substreams if elements with their corresponding keys arrive after completion /// TBD public static SubFlow GroupBy( this IFlow flow, int maxSubstreams, Func groupingFunc, - Func, TMat>, Sink, Task>, TClosed> toFunc) + Func, TMat>, Sink, Task>, TClosed> toFunc, + bool allowClosedSubstreamRecreation) { - var merge = new GroupByMergeBack(flow, maxSubstreams, groupingFunc); + var merge = new GroupByMergeBack(flow, maxSubstreams, groupingFunc, allowClosedSubstreamRecreation); Func, TClosed> finish = s => { return toFunc( - flow.Via(new Fusing.GroupBy(maxSubstreams, groupingFunc)), + flow.Via(new Fusing.GroupBy(maxSubstreams, groupingFunc, allowClosedSubstreamRecreation)), Sink.ForEach>(e => e.RunWith(s, Fusing.GraphInterpreter.Current.Materializer))); }; return new SubFlowImpl(Flow.Create(), merge, finish); } + /// + /// This operation demultiplexes the incoming stream into separate output streams, one for each element key. + /// The key is computed for each element using the given function. When a new key is encountered for the first + /// time a new substream is opened and subsequently fed with all elements belonging to that key. + /// + /// WARNING: The operator keeps track of all keys of streams that have already been closed. If you expect an + /// infinite number of keys this can cause memory issues. Elements belonging to those keys are drained directly + /// and not send to the substream. + /// + /// See also + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + public static SubFlow GroupBy( + this IFlow flow, + int maxSubstreams, + Func groupingFunc, + Func, TMat>, Sink, Task>, TClosed> toFunc) => GroupBy(flow, maxSubstreams, groupingFunc, toFunc, allowClosedSubstreamRecreation: false); + /// /// TBD /// @@ -1333,6 +1377,7 @@ internal class GroupByMergeBack : IMergeBack private readonly IFlow _self; private readonly int _maxSubstreams; private readonly Func _groupingFunc; + private readonly bool _allowClosedSubstreamRecreation; /// /// TBD @@ -1340,13 +1385,13 @@ internal class GroupByMergeBack : IMergeBack /// TBD /// TBD /// TBD - public GroupByMergeBack(IFlow self, - int maxSubstreams, - Func groupingFunc) + /// TBD + public GroupByMergeBack(IFlow self, int maxSubstreams, Func groupingFunc, bool allowClosedSubstreamRecreation = false) { _self = self; _maxSubstreams = maxSubstreams; _groupingFunc = groupingFunc; + _allowClosedSubstreamRecreation = allowClosedSubstreamRecreation; } /// @@ -1358,7 +1403,7 @@ public GroupByMergeBack(IFlow self, /// TBD public IFlow Apply(Flow flow, int breadth) { - return _self.Via(new Fusing.GroupBy(_maxSubstreams, _groupingFunc)) + return _self.Via(new Fusing.GroupBy(_maxSubstreams, _groupingFunc, _allowClosedSubstreamRecreation)) .Select(f => f.Via(flow)) .Via(new Fusing.FlattenMerge, T, NotUsed>(breadth)); } diff --git a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs index 7b91f644c04..5b8833c2d25 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs @@ -520,14 +520,15 @@ public override void PreStart() protected internal override void OnTimer(object timerKey) { - var key = (TKey) timerKey; - if (_activeSubstreams.TryGetValue(key, out var substreamSource)) + var key = (TKey)timerKey; + if (_activeSubstreams.ContainsKey(key)) { - substreamSource.Timeout(_timeout); - _closedSubstreams.Add(key); + if (!_stage._allowClosedSubstreamRecreation) + { + _closedSubstreams.Add(key); + } _activeSubstreams.Remove(key); - if (IsClosed(_stage.In)) - TryCompleteAll(); + if (IsClosed(_stage.In)) TryCompleteAll(); } } @@ -574,7 +575,7 @@ private void CompleteSubStream() { Complete(); _logic._activeSubstreams.Remove(Key.Value); - _logic._closedSubstreams.Add(Key.Value); + if (!_logic._stage._allowClosedSubstreamRecreation) _logic._closedSubstreams.Add(Key.Value); } private void TryCompleteHandler() @@ -625,16 +626,19 @@ public void OnDownstreamFinish() private readonly int _maxSubstreams; private readonly Func _keyFor; + private readonly bool _allowClosedSubstreamRecreation; /// /// TBD /// /// TBD /// TBD - public GroupBy(int maxSubstreams, Func keyFor) + /// TBD + public GroupBy(int maxSubstreams, Func keyFor, bool allowClosedSubstreamRecreation = false) { _maxSubstreams = maxSubstreams; _keyFor = keyFor; + _allowClosedSubstreamRecreation = allowClosedSubstreamRecreation; Shape = new FlowShape>(In, Out); } From a4f72e97c26c29d0e7503beeb428ed8c2f2dd8d2 Mon Sep 17 00:00:00 2001 From: Ismael Hamed <1279846+ismaelhamed@users.noreply.github.com> Date: Sat, 23 Apr 2022 12:44:25 +0200 Subject: [PATCH 4/6] Fixes GroupBy does not invoke decider (cherry picked from commit 810882bd08f8de1953398d6826f89dd70949b1f3) --- .../CoreAPISpec.ApproveStreams.verified.txt | 4 ++++ .../Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs | 14 ++++++++++++ .../Implementation/Fusing/StreamOfStreams.cs | 2 +- .../TooManySubstreamsOpenException.cs | 22 +++++++++++++++++++ 4 files changed, 41 insertions(+), 1 deletion(-) create mode 100644 src/core/Akka.Streams/TooManySubstreamsOpenException.cs diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt index 8eeba130ac5..f5ea9a0ead2 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt @@ -1012,6 +1012,10 @@ namespace Akka.Streams Shaping = 0, Enforcing = 1, } + public class TooManySubstreamsOpenException : System.InvalidOperationException + { + public TooManySubstreamsOpenException() { } + } public abstract class TransformerLikeBase : Akka.Streams.ITransformerLike { protected TransformerLikeBase() { } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs index f7193503b1e..186fe7399ea 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs @@ -427,6 +427,20 @@ public void GroupBy_must_fail_when_exceeding_maxSubstreams() }, Materializer); } + [Fact] + public void GroupBy_must_resume_when_exceeding_maxSubstreams() + { + var f = Flow.Create().GroupBy(0, x => x).MergeSubstreams(); + var (up, down) = ((Flow)f) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + .RunWith(this.SourceProbe(), this.SinkProbe(), Materializer); + + down.Request(1); + + up.SendNext(1); + down.ExpectNoMsg(TimeSpan.FromSeconds(1)); + } + [Fact] public void GroupBy_must_emit_subscribe_before_completed() { diff --git a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs index 5b8833c2d25..2f0161f9550 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs @@ -411,7 +411,7 @@ public void OnPush() else { if (_activeSubstreams.Count == _stage._maxSubstreams) - Fail(new IllegalStateException($"Cannot open substream for key {key}: too many substreams open")); + throw new TooManySubstreamsOpenException(); else if (_closedSubstreams.Contains(key) && !HasBeenPulled(_stage.In)) Pull(_stage.In); else diff --git a/src/core/Akka.Streams/TooManySubstreamsOpenException.cs b/src/core/Akka.Streams/TooManySubstreamsOpenException.cs new file mode 100644 index 00000000000..cbb9ab20f65 --- /dev/null +++ b/src/core/Akka.Streams/TooManySubstreamsOpenException.cs @@ -0,0 +1,22 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; + +namespace Akka.Streams +{ + /// + /// This exception signals that the maximum number of substreams declared has been exceeded. + /// A finite limit is imposed so that memory usage is controlled. + /// + public class TooManySubstreamsOpenException : InvalidOperationException + { + public TooManySubstreamsOpenException() : + base("Cannot open a new substream as there are too many substreams open") + { } + } +} From 327be46700f3b33ad1217b9f96988e2344959ac8 Mon Sep 17 00:00:00 2001 From: Ismael Hamed <1279846+ismaelhamed@users.noreply.github.com> Date: Sat, 23 Apr 2022 14:42:13 +0200 Subject: [PATCH 5/6] Avoids memory being retained for GroupBy (cherry picked from commit fd3fb951eba334cfed0aa58df3a2171bbafc7335) --- .../Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs | 118 +++++++++--------- .../Implementation/Fusing/StreamOfStreams.cs | 2 +- 2 files changed, 60 insertions(+), 60 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs index 186fe7399ea..5891e573814 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs @@ -38,63 +38,6 @@ public FlowGroupBySpec(ITestOutputHelper helper) : base(helper) Materializer = ActorMaterializer.Create(Sys, settings); } - private sealed class StreamPuppet - { - private readonly TestSubscriber.ManualProbe _probe; - private readonly ISubscription _subscription; - - public StreamPuppet(IPublisher p, TestKitBase kit) - { - _probe = kit.CreateManualSubscriberProbe(); - p.Subscribe(_probe); - _subscription = _probe.ExpectSubscription(); - } - - public void Request(int demand) => _subscription.Request(demand); - - public void ExpectNext(int element) => _probe.ExpectNext(element); - - public void ExpectNoMsg(TimeSpan max) => _probe.ExpectNoMsg(max); - - public void ExpectComplete() => _probe.ExpectComplete(); - - public void ExpectError(Exception ex) => _probe.ExpectError().Should().Be(ex); - - public void Cancel() => _subscription.Cancel(); - } - - private void WithSubstreamsSupport(int groupCount = 2, int elementCount = 6, int maxSubstream = -1, - Action)>, ISubscription, Func>> run = null) - { - - var source = Source.From(Enumerable.Range(1, elementCount)).RunWith(Sink.AsPublisher(false), Materializer); - var max = maxSubstream > 0 ? maxSubstream : groupCount; - var groupStream = - Source.FromPublisher(source) - .GroupBy(max, x => x % groupCount) - .Lift(x => x % groupCount) - .RunWith(Sink.AsPublisher<(int, Source)>(false), Materializer); - var masterSubscriber = this.CreateManualSubscriberProbe<(int, Source)>(); - - groupStream.Subscribe(masterSubscriber); - var masterSubscription = masterSubscriber.ExpectSubscription(); - - run?.Invoke(masterSubscriber, masterSubscription, expectedKey => - { - masterSubscription.Request(1); - var tuple = masterSubscriber.ExpectNext(); - tuple.Item1.Should().Be(expectedKey); - return tuple.Item2; - }); - } - - private ByteString RandomByteString(int size) - { - var a = new byte[size]; - ThreadLocalRandom.Current.NextBytes(a); - return ByteString.FromBytes(a); - } - [Fact] public void GroupBy_must_work_in_the_happy_case() { @@ -167,11 +110,11 @@ public void GroupBy_must_fail_when_key_function_returns_null() } [Fact] - public void GroupBy_must_support_cancelling_substreams() + public void GroupBy_must_accept_cancelling_substreams() { this.AssertAllStagesStopped(() => { - WithSubstreamsSupport(2, run: (masterSubscriber, masterSubscription, getSubFlow) => + WithSubstreamsSupport(2, maxSubstream: 3, run: (masterSubscriber, masterSubscription, getSubFlow) => { new StreamPuppet(getSubFlow(1).RunWith(Sink.AsPublisher(false), Materializer), this).Cancel(); var substream = new StreamPuppet(getSubFlow(0).RunWith(Sink.AsPublisher(false), Materializer), this); @@ -740,6 +683,63 @@ public void GroupBy_must_work_with_random_demand() }, Materializer); } + private sealed class StreamPuppet + { + private readonly TestSubscriber.ManualProbe _probe; + private readonly ISubscription _subscription; + + public StreamPuppet(IPublisher p, TestKitBase kit) + { + _probe = kit.CreateManualSubscriberProbe(); + p.Subscribe(_probe); + _subscription = _probe.ExpectSubscription(); + } + + public void Request(int demand) => _subscription.Request(demand); + + public void ExpectNext(int element) => _probe.ExpectNext(element); + + public void ExpectNoMsg(TimeSpan max) => _probe.ExpectNoMsg(max); + + public void ExpectComplete() => _probe.ExpectComplete(); + + public void ExpectError(Exception ex) => _probe.ExpectError().Should().Be(ex); + + public void Cancel() => _subscription.Cancel(); + } + + private void WithSubstreamsSupport(int groupCount = 2, int elementCount = 6, int maxSubstream = -1, + Action)>, ISubscription, Func>> run = null) + { + + var source = Source.From(Enumerable.Range(1, elementCount)).RunWith(Sink.AsPublisher(false), Materializer); + var max = maxSubstream > 0 ? maxSubstream : groupCount; + var groupStream = + Source.FromPublisher(source) + .GroupBy(max, x => x % groupCount) + .Lift(x => x % groupCount) + .RunWith(Sink.AsPublisher<(int, Source)>(false), Materializer); + var masterSubscriber = this.CreateManualSubscriberProbe<(int, Source)>(); + + groupStream.Subscribe(masterSubscriber); + var masterSubscription = masterSubscriber.ExpectSubscription(); + + run?.Invoke(masterSubscriber, masterSubscription, expectedKey => + { + masterSubscription.Request(1); + var tuple = masterSubscriber.ExpectNext(); + tuple.Item1.Should().Be(expectedKey); + return tuple.Item2; + }); + } + + private ByteString RandomByteString(int size) + { + var a = new byte[size]; + ThreadLocalRandom.Current.NextBytes(a); + return ByteString.FromBytes(a); + } + private sealed class SubFlowState { public SubFlowState(TestSubscriber.Probe probe, bool hasDemand, ByteString firstElement) diff --git a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs index 2f0161f9550..eb620ce9d1f 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs @@ -410,7 +410,7 @@ public void OnPush() } else { - if (_activeSubstreams.Count == _stage._maxSubstreams) + if (_activeSubstreams.Count + _closedSubstreams.Count == _stage._maxSubstreams) throw new TooManySubstreamsOpenException(); else if (_closedSubstreams.Contains(key) && !HasBeenPulled(_stage.In)) Pull(_stage.In); From d538a8490d1c010c82e6a7d8fc48422ac94b83b2 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Mon, 25 Apr 2022 23:04:43 +0700 Subject: [PATCH 6/6] Fix markdownlint error (cherry picked from commit 1663e384f6a3d1594f74ba93e840676e1a466813) --- docs/articles/streams/builtinstages.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/articles/streams/builtinstages.md b/docs/articles/streams/builtinstages.md index 900dbf1e4fc..5ac675b47b1 100644 --- a/docs/articles/streams/builtinstages.md +++ b/docs/articles/streams/builtinstages.md @@ -1004,11 +1004,13 @@ This operation demultiplexes the incoming stream into separate output streams, o key is computed for each element using the given function. When a new key is encountered for the first time a new substream is opened and subsequently fed with all elements belonging to that key. + > [!NOTE] > If `allowClosedSubstreamRecreation` is set to `true` substream completion and incoming elements are subject to race-conditions. If elements arrive for a stream that is in the process of closing these elements might get lost. > [!WARNING] > If `allowClosedSubstreamRecreation` is set to `false` (default behavior) the stage keeps track of all keys of streams that have already been closed. If you expect an infinite number of keys this can cause memory issues. Elements belonging to those keys are drained directly and not send to the substream. + **emits** an element for which the grouping function returns a group that has not yet been created. Emits the new group there is an element pending for a group whose substream backpressures