Skip to content

Commit

Permalink
GroupBy pulls upstream when a substream materialization is waiting
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed committed Apr 23, 2022
1 parent c80c11c commit 98be809
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 19 deletions.
61 changes: 59 additions & 2 deletions src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ public void GroupBy_must_work_under_fuzzing_stress_test()
}

[Fact]
public void GroupBy_must_Work_if_pull_is_exercised_from_both_substream_and_main()
public void GroupBy_must_work_if_pull_is_exercised_from_both_substream_and_main()
{
this.AssertAllStagesStopped(() =>
{
Expand Down Expand Up @@ -517,6 +517,63 @@ public void GroupBy_must_Work_if_pull_is_exercised_from_both_substream_and_main(
}, Materializer);
}

[Fact]
public void GroupBy_must_work_if_pull_is_exercised_from_multiple_substreams_while_downstream_is_backpressuring()
{
this.AssertAllStagesStopped(() =>
{
var upstream = this.CreatePublisherProbe<int>();
var downstreamMaster = this.CreateSubscriberProbe<Source<int, NotUsed>>();

Source.FromPublisher(upstream)
.Via(new GroupBy<int, int>(10, element => element))
.RunWith(Sink.FromSubscriber(downstreamMaster), Materializer);

var substream1 = this.CreateSubscriberProbe<int>();
downstreamMaster.Request(1);
upstream.SendNext(1);
downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream1), Materializer);

var substream2 = this.CreateSubscriberProbe<int>();
downstreamMaster.Request(1);
upstream.SendNext(2);
downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream2), Materializer);

substream1.Request(1);
substream1.ExpectNext(1);
substream2.Request(1);
substream2.ExpectNext(2);

// Both substreams pull
substream1.Request(1);
substream2.Request(1);

// Upstream sends new groups
upstream.SendNext(3);
upstream.SendNext(4);

var substream3 = this.CreateSubscriberProbe<int>();
var substream4 = this.CreateSubscriberProbe<int>();
downstreamMaster.Request(1);
downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream3), Materializer);
downstreamMaster.Request(1);
downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream4), Materializer);

substream3.Request(1);
substream3.ExpectNext(3);
substream4.Request(1);
substream4.ExpectNext(4);

// Cleanup, not part of the actual test
substream1.Cancel();
substream2.Cancel();
substream3.Cancel();
substream4.Cancel();
downstreamMaster.Cancel();
upstream.SendComplete();
}, Materializer);
}

[Fact]
public void GroupBy_must_work_with_random_demand()
{
Expand Down Expand Up @@ -681,7 +738,7 @@ private void RandomDemand(Dictionary<int, SubFlowState> map, RandomDemandPropert
map[key] = new SubFlowState(state.Probe, false, null);
}
else if (props.BlockingNextElement == null)
break;
break;
}
}

Expand Down
35 changes: 18 additions & 17 deletions src/core/Akka.Streams/Implementation/Fusing/StreamOfStreams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public FlattenMerge(int breadth)
internal sealed class PrefixAndTail<T> : GraphStage<FlowShape<T, (IImmutableList<T>, Source<T, NotUsed>)>>
{
#region internal classes

private sealed class Logic : TimerGraphStageLogic, IInHandler, IOutHandler
{
private const string SubscriptionTimer = "SubstreamSubscriptionTimer";
Expand All @@ -205,11 +205,11 @@ public Logic(PrefixAndTail<T> stage) : base(stage.Shape)
Pull(_stage._in);
_tailSource.SetHandler(new LambdaOutHandler(onPull: () => Pull(_stage._in)));
});

SetHandler(_stage._in, this);
SetHandler(_stage._out, this);
}

protected internal override void OnTimer(object timerKey)
{
var materializer = ActorMaterializerHelper.Downcast(Interpreter.Materializer);
Expand Down Expand Up @@ -360,7 +360,7 @@ public PrefixAndTail(int count)
/// <typeparam name="TKey">TBD</typeparam>
internal sealed class GroupBy<T, TKey> : GraphStage<FlowShape<T, Source<T, NotUsed>>>
{
#region Loigc
#region Logic

private sealed class Logic : TimerGraphStageLogic, IInHandler, IOutHandler
{
Expand All @@ -370,7 +370,7 @@ private sealed class Logic : TimerGraphStageLogic, IInHandler, IOutHandler
private readonly HashSet<SubstreamSource> _substreamsJustStarted = new HashSet<SubstreamSource>();
private readonly Lazy<Decider> _decider;
private TimeSpan _timeout;
private SubstreamSource _substreamWaitingToBePushed;
private Option<SubstreamSource> _substreamWaitingToBePushed = Option<SubstreamSource>.None;
private Option<TKey> _nextElementKey = Option<TKey>.None;
private Option<T> _nextElementValue = Option<T>.None;
private long _nextId;
Expand All @@ -379,12 +379,12 @@ private sealed class Logic : TimerGraphStageLogic, IInHandler, IOutHandler
public Logic(GroupBy<T, TKey> stage, Attributes inheritedAttributes) : base(stage.Shape)
{
_stage = stage;

_decider = new Lazy<Decider>(() =>
{
var attribute = inheritedAttributes.GetAttribute<ActorAttributes.SupervisionStrategy>(null);
return attribute != null ? attribute.Decider : Deciders.StoppingDecider;
});
});

SetHandler(_stage.In, this);
SetHandler(_stage.Out, this);
Expand Down Expand Up @@ -431,11 +431,12 @@ public void OnPush()

public void OnPull()
{
if (_substreamWaitingToBePushed != null)
if (_substreamWaitingToBePushed.HasValue)
{
Push(_stage.Out, Source.FromGraph(_substreamWaitingToBePushed.Source));
ScheduleOnce(_substreamWaitingToBePushed.Key.Value, _timeout);
_substreamWaitingToBePushed = null;
var substreamSource = _substreamWaitingToBePushed.Value;
Push(_stage.Out, Source.FromGraph(substreamSource.Source));
ScheduleOnce(substreamSource.Key.Value, _timeout);
_substreamWaitingToBePushed = Option<SubstreamSource>.None;
}
else
{
Expand Down Expand Up @@ -500,7 +501,7 @@ private void Fail(Exception ex)
FailStage(ex);
}

private bool NeedToPull => !(HasBeenPulled(_stage.In) || IsClosed(_stage.In) || HasNextElement);
private bool NeedToPull => !(HasBeenPulled(_stage.In) || IsClosed(_stage.In) || HasNextElement || _substreamWaitingToBePushed.HasValue);

public override void PreStart()
{
Expand Down Expand Up @@ -530,7 +531,7 @@ private void RunSubstream(TKey key, T value)
{
Push(_stage.Out, Source.FromGraph(substreamSource.Source));
ScheduleOnce(key, _timeout);
_substreamWaitingToBePushed = null;
_substreamWaitingToBePushed = Option<SubstreamSource>.None;
}
else
{
Expand Down Expand Up @@ -628,7 +629,7 @@ public GroupBy(int maxSubstreams, Func<T, TKey> keyFor)
{
_maxSubstreams = maxSubstreams;
_keyFor = keyFor;

Shape = new FlowShape<T, Source<T, NotUsed>>(In, Out);
}

Expand Down Expand Up @@ -778,7 +779,7 @@ public override void OnDownstreamFinish()
else
// Start draining
if (!_logic.HasBeenPulled(_inlet))
_logic.Pull(_inlet);
_logic.Pull(_inlet);
}

public override void OnPush()
Expand Down Expand Up @@ -1010,7 +1011,7 @@ protected CommandScheduledBeforeMaterialization(ICommand command)
internal class RequestOneScheduledBeforeMaterialization : CommandScheduledBeforeMaterialization
{
public static readonly RequestOneScheduledBeforeMaterialization Instance = new RequestOneScheduledBeforeMaterialization(RequestOne.Instance);

private RequestOneScheduledBeforeMaterialization(ICommand command) : base(command)
{
}
Expand Down Expand Up @@ -1046,7 +1047,7 @@ private RequestOne()
{
}
}

internal class Cancel : ICommand
{
public static readonly Cancel Instance = new Cancel();
Expand Down

0 comments on commit 98be809

Please sign in to comment.