Skip to content

Commit

Permalink
Cancel GroupBy when all substreams cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed committed Apr 23, 2022
1 parent 98be809 commit 4584f59
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 12 deletions.
44 changes: 44 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,50 @@ public void GroupBy_must_work_if_pull_is_exercised_from_multiple_substreams_whil
}, Materializer);
}

[Fact]
public void GroupBy_must_cancel_if_downstream_has_cancelled_and_all_substreams_cancel()
{
this.AssertAllStagesStopped(() =>
{
var upstream = this.CreatePublisherProbe<int>();
var downstreamMaster = this.CreateSubscriberProbe<Source<int, NotUsed>>();

Source.FromPublisher(upstream)
.Via(new GroupBy<int, int>(10, element => element))
.RunWith(Sink.FromSubscriber(downstreamMaster), Materializer);

var substream1 = this.CreateSubscriberProbe<int>();
downstreamMaster.Request(1);
upstream.SendNext(1);
downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream1), Materializer);

var substream2 = this.CreateSubscriberProbe<int>();
downstreamMaster.Request(1);
upstream.SendNext(2);
downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream2), Materializer);

// Cancel downstream
downstreamMaster.Cancel();

// Both substreams still work
substream1.Request(1);
substream1.ExpectNext(1);
substream2.Request(1);
substream2.ExpectNext(2);

// New keys are ignored
upstream.SendNext(3);
upstream.SendNext(4);

// Cancel all substreams
substream1.Cancel();
substream2.Cancel();

// Upstream gets cancelled
upstream.ExpectCancellation();
}, Materializer);
}

[Fact]
public void GroupBy_must_work_with_random_demand()
{
Expand Down
30 changes: 18 additions & 12 deletions src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
using Akka.Streams.Implementation.Stages;
using Akka.Streams.Stage;
using Akka.Streams.Supervision;
using Akka.Streams.Util;
using Akka.Util;
using Akka.Util.Internal;

Expand Down Expand Up @@ -464,9 +463,7 @@ public void OnUpstreamFinish()

public void OnDownstreamFinish()
{
if (_activeSubstreams.Count == 0)
CompleteStage();
else
if (!TryCancel())
SetKeepGoing(true);
}

Expand All @@ -493,6 +490,18 @@ private bool TryCompleteAll()
return false;
}

private bool TryCancel()
{
// if there's no active substreams or there's only one but it's not been pushed yet
if (_activeSubstreams.Count == 0 || (_activeSubstreams.Count == 1 && _substreamWaitingToBePushed.HasValue))
{
CompleteStage();
return true;
}

return false;
}

private void Fail(Exception ex)
{
foreach (var value in _activeSubstreams.Values)
Expand Down Expand Up @@ -602,15 +611,12 @@ public void OnPull()

public void OnDownstreamFinish()
{
if(_logic.HasNextElement && _logic._nextElementKey.Equals(Key))
_logic.ClearNextElement();
if (FirstPush)
_logic._firstPushCounter--;
if(_logic.HasNextElement && _logic._nextElementKey.Equals(Key)) _logic.ClearNextElement();
if (FirstPush) _logic._firstPushCounter--;
CompleteSubStream();
if (_logic.IsClosed(_logic._stage.In))
_logic.TryCompleteAll();
else if (_logic.NeedToPull)
_logic.Pull(_logic._stage.In);
if (_logic.IsClosed(_logic._stage.Out)) _logic.TryCancel();
if (_logic.IsClosed(_logic._stage.In)) _logic.TryCompleteAll();
else if (_logic.NeedToPull) _logic.Pull(_logic._stage.In);
}
}
}
Expand Down

0 comments on commit 4584f59

Please sign in to comment.