Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GroupBy fixes #5874

Merged
merged 10 commits into from
Apr 25, 2022
12 changes: 11 additions & 1 deletion docs/articles/streams/builtinstages.md
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,17 @@ and returns a pair containing a strict sequence of the taken element and a strea

### GroupBy

De-multiplex the incoming stream into separate output streams.
This operation demultiplexes the incoming stream into separate output streams, one for each element key. The
key is computed for each element using the given function. When a new key is encountered for the first time
a new substream is opened and subsequently fed with all elements belonging to that key.

<!-- markdownlint-disable MD028 -->
> [!NOTE]
> If `allowClosedSubstreamRecreation` is set to `true` substream completion and incoming elements are subject to race-conditions. If elements arrive for a stream that is in the process of closing these elements might get lost.

> [!WARNING]
> If `allowClosedSubstreamRecreation` is set to `false` (default behavior) the stage keeps track of all keys of streams that have already been closed. If you expect an infinite number of keys this can cause memory issues. Elements belonging to those keys are drained directly and not send to the substream.
<!-- markdownlint-enable MD028 -->

**emits** an element for which the grouping function returns a group that has not yet been created. Emits the new group
there is an element pending for a group whose substream backpressures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,10 @@ namespace Akka.Streams
Shaping = 0,
Enforcing = 1,
}
public class TooManySubstreamsOpenException : System.InvalidOperationException
{
public TooManySubstreamsOpenException() { }
}
public abstract class TransformerLikeBase<TIn, TOut> : Akka.Streams.ITransformerLike<TIn, TOut>
{
protected TransformerLikeBase() { }
Expand Down Expand Up @@ -1342,6 +1346,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> DivertTo<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat> that, System.Func<TOut, bool> when) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat3> DivertToMaterialized<TIn, TOut, TMat, TMat2, TMat3>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> that, System.Func<TOut, bool> when, System.Func<TMat, TMat2, TMat3> materializerFunction) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut2, TMat> Expand<TIn, TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut1, TMat> flow, System.Func<TOut1, System.Collections.Generic.IEnumerator<TOut2>> extrapolate) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc, bool allowClosedSubstreamRecreation) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc) { }
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> Grouped<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int n) { }
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWithin<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int n, System.TimeSpan timeout) { }
Expand Down
272 changes: 211 additions & 61 deletions src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,63 +38,6 @@ public FlowGroupBySpec(ITestOutputHelper helper) : base(helper)
Materializer = ActorMaterializer.Create(Sys, settings);
}

private sealed class StreamPuppet
{
private readonly TestSubscriber.ManualProbe<int> _probe;
private readonly ISubscription _subscription;

public StreamPuppet(IPublisher<int> p, TestKitBase kit)
{
_probe = kit.CreateManualSubscriberProbe<int>();
p.Subscribe(_probe);
_subscription = _probe.ExpectSubscription();
}

public void Request(int demand) => _subscription.Request(demand);

public void ExpectNext(int element) => _probe.ExpectNext(element);

public void ExpectNoMsg(TimeSpan max) => _probe.ExpectNoMsg(max);

public void ExpectComplete() => _probe.ExpectComplete();

public void ExpectError(Exception ex) => _probe.ExpectError().Should().Be(ex);

public void Cancel() => _subscription.Cancel();
}

private void WithSubstreamsSupport(int groupCount = 2, int elementCount = 6, int maxSubstream = -1,
Action<TestSubscriber.ManualProbe<(int, Source<int, NotUsed>)>, ISubscription, Func<int, Source<int, NotUsed>>> run = null)
{

var source = Source.From(Enumerable.Range(1, elementCount)).RunWith(Sink.AsPublisher<int>(false), Materializer);
var max = maxSubstream > 0 ? maxSubstream : groupCount;
var groupStream =
Source.FromPublisher(source)
.GroupBy(max, x => x % groupCount)
.Lift(x => x % groupCount)
.RunWith(Sink.AsPublisher<(int, Source<int, NotUsed>)>(false), Materializer);
var masterSubscriber = this.CreateManualSubscriberProbe<(int, Source<int, NotUsed>)>();

groupStream.Subscribe(masterSubscriber);
var masterSubscription = masterSubscriber.ExpectSubscription();

run?.Invoke(masterSubscriber, masterSubscription, expectedKey =>
{
masterSubscription.Request(1);
var tuple = masterSubscriber.ExpectNext();
tuple.Item1.Should().Be(expectedKey);
return tuple.Item2;
});
}

private ByteString RandomByteString(int size)
{
var a = new byte[size];
ThreadLocalRandom.Current.NextBytes(a);
return ByteString.FromBytes(a);
}

[Fact]
public void GroupBy_must_work_in_the_happy_case()
{
Expand Down Expand Up @@ -167,11 +110,11 @@ public void GroupBy_must_fail_when_key_function_returns_null()
}

[Fact]
public void GroupBy_must_support_cancelling_substreams()
public void GroupBy_must_accept_cancelling_substreams()
{
this.AssertAllStagesStopped(() =>
{
WithSubstreamsSupport(2, run: (masterSubscriber, masterSubscription, getSubFlow) =>
WithSubstreamsSupport(2, maxSubstream: 3, run: (masterSubscriber, masterSubscription, getSubFlow) =>
{
new StreamPuppet(getSubFlow(1).RunWith(Sink.AsPublisher<int>(false), Materializer), this).Cancel();
var substream = new StreamPuppet(getSubFlow(0).RunWith(Sink.AsPublisher<int>(false), Materializer), this);
Expand Down Expand Up @@ -427,6 +370,20 @@ public void GroupBy_must_fail_when_exceeding_maxSubstreams()
}, Materializer);
}

[Fact]
public void GroupBy_must_resume_when_exceeding_maxSubstreams()
{
var f = Flow.Create<int>().GroupBy(0, x => x).MergeSubstreams();
var (up, down) = ((Flow<int, int, NotUsed>)f)
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.RunWith(this.SourceProbe<int>(), this.SinkProbe<int>(), Materializer);

down.Request(1);

up.SendNext(1);
down.ExpectNoMsg(TimeSpan.FromSeconds(1));
}

[Fact]
public void GroupBy_must_emit_subscribe_before_completed()
{
Expand Down Expand Up @@ -483,7 +440,7 @@ public void GroupBy_must_work_under_fuzzing_stress_test()
}

[Fact]
public void GroupBy_must_Work_if_pull_is_exercised_from_both_substream_and_main()
public void GroupBy_must_work_if_pull_is_exercised_from_both_substream_and_main()
{
this.AssertAllStagesStopped(() =>
{
Expand Down Expand Up @@ -517,6 +474,142 @@ public void GroupBy_must_Work_if_pull_is_exercised_from_both_substream_and_main(
}, Materializer);
}

[Fact]
public void GroupBy_must_work_if_pull_is_exercised_from_multiple_substreams_while_downstream_is_backpressuring()
{
this.AssertAllStagesStopped(() =>
{
var upstream = this.CreatePublisherProbe<int>();
var downstreamMaster = this.CreateSubscriberProbe<Source<int, NotUsed>>();

Source.FromPublisher(upstream)
.Via(new GroupBy<int, int>(10, element => element))
.RunWith(Sink.FromSubscriber(downstreamMaster), Materializer);

var substream1 = this.CreateSubscriberProbe<int>();
downstreamMaster.Request(1);
upstream.SendNext(1);
downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream1), Materializer);

var substream2 = this.CreateSubscriberProbe<int>();
downstreamMaster.Request(1);
upstream.SendNext(2);
downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream2), Materializer);

substream1.Request(1);
substream1.ExpectNext(1);
substream2.Request(1);
substream2.ExpectNext(2);

// Both substreams pull
substream1.Request(1);
substream2.Request(1);

// Upstream sends new groups
upstream.SendNext(3);
upstream.SendNext(4);

var substream3 = this.CreateSubscriberProbe<int>();
var substream4 = this.CreateSubscriberProbe<int>();
downstreamMaster.Request(1);
downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream3), Materializer);
downstreamMaster.Request(1);
downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream4), Materializer);

substream3.Request(1);
substream3.ExpectNext(3);
substream4.Request(1);
substream4.ExpectNext(4);

// Cleanup, not part of the actual test
substream1.Cancel();
substream2.Cancel();
substream3.Cancel();
substream4.Cancel();
downstreamMaster.Cancel();
upstream.SendComplete();
}, Materializer);
}

[Fact]
public void GroupBy_must_allow_to_recreate_an_already_closed_substream()
{
this.AssertAllStagesStopped(() =>
{
var f = Flow.Create<int>()
.GroupBy(2, x => x, allowClosedSubstreamRecreation: true)
.Take(1) // close the substream after 1 element
.MergeSubstreams();

var (up, down) = ((Flow<int, int, NotUsed>)f)
.RunWith(this.SourceProbe<int>(), this.SinkProbe<int>(), Materializer);

down.Request(4);

// Creates and closes substream "1"
up.SendNext(1);
down.ExpectNext(1);

// Creates and closes substream "2"
up.SendNext(2);
down.ExpectNext(2);

// Recreates and closes substream "1" twice
up.SendNext(1);
down.ExpectNext(1);
up.SendNext(1);
down.ExpectNext(1);

// Cleanup, not part of the actual test
up.SendComplete();
down.ExpectComplete();
}, Materializer);
}

[Fact]
public void GroupBy_must_cancel_if_downstream_has_cancelled_and_all_substreams_cancel()
{
this.AssertAllStagesStopped(() =>
{
var upstream = this.CreatePublisherProbe<int>();
var downstreamMaster = this.CreateSubscriberProbe<Source<int, NotUsed>>();

Source.FromPublisher(upstream)
.Via(new GroupBy<int, int>(10, element => element))
.RunWith(Sink.FromSubscriber(downstreamMaster), Materializer);

var substream1 = this.CreateSubscriberProbe<int>();
downstreamMaster.Request(1);
upstream.SendNext(1);
downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream1), Materializer);

var substream2 = this.CreateSubscriberProbe<int>();
downstreamMaster.Request(1);
upstream.SendNext(2);
downstreamMaster.ExpectNext().RunWith(Sink.FromSubscriber(substream2), Materializer);

// Cancel downstream
downstreamMaster.Cancel();

// Both substreams still work
substream1.Request(1);
substream1.ExpectNext(1);
substream2.Request(1);
substream2.ExpectNext(2);

// New keys are ignored
upstream.SendNext(3);
upstream.SendNext(4);

// Cancel all substreams
substream1.Cancel();
substream2.Cancel();

// Upstream gets cancelled
upstream.ExpectCancellation();
}, Materializer);
}

[Fact]
public void GroupBy_must_work_with_random_demand()
{
Expand Down Expand Up @@ -590,6 +683,63 @@ public void GroupBy_must_work_with_random_demand()
}, Materializer);
}

private sealed class StreamPuppet
{
private readonly TestSubscriber.ManualProbe<int> _probe;
private readonly ISubscription _subscription;

public StreamPuppet(IPublisher<int> p, TestKitBase kit)
{
_probe = kit.CreateManualSubscriberProbe<int>();
p.Subscribe(_probe);
_subscription = _probe.ExpectSubscription();
}

public void Request(int demand) => _subscription.Request(demand);

public void ExpectNext(int element) => _probe.ExpectNext(element);

public void ExpectNoMsg(TimeSpan max) => _probe.ExpectNoMsg(max);

public void ExpectComplete() => _probe.ExpectComplete();

public void ExpectError(Exception ex) => _probe.ExpectError().Should().Be(ex);

public void Cancel() => _subscription.Cancel();
}

private void WithSubstreamsSupport(int groupCount = 2, int elementCount = 6, int maxSubstream = -1,
Action<TestSubscriber.ManualProbe<(int, Source<int, NotUsed>)>, ISubscription, Func<int, Source<int, NotUsed>>> run = null)
{

var source = Source.From(Enumerable.Range(1, elementCount)).RunWith(Sink.AsPublisher<int>(false), Materializer);
var max = maxSubstream > 0 ? maxSubstream : groupCount;
var groupStream =
Source.FromPublisher(source)
.GroupBy(max, x => x % groupCount)
.Lift(x => x % groupCount)
.RunWith(Sink.AsPublisher<(int, Source<int, NotUsed>)>(false), Materializer);
var masterSubscriber = this.CreateManualSubscriberProbe<(int, Source<int, NotUsed>)>();

groupStream.Subscribe(masterSubscriber);
var masterSubscription = masterSubscriber.ExpectSubscription();

run?.Invoke(masterSubscriber, masterSubscription, expectedKey =>
{
masterSubscription.Request(1);
var tuple = masterSubscriber.ExpectNext();
tuple.Item1.Should().Be(expectedKey);
return tuple.Item2;
});
}

private ByteString RandomByteString(int size)
{
var a = new byte[size];
ThreadLocalRandom.Current.NextBytes(a);
return ByteString.FromBytes(a);
}

private sealed class SubFlowState
{
public SubFlowState(TestSubscriber.Probe<ByteString> probe, bool hasDemand, ByteString firstElement)
Expand Down Expand Up @@ -681,7 +831,7 @@ private void RandomDemand(Dictionary<int, SubFlowState> map, RandomDemandPropert
map[key] = new SubFlowState(state.Probe, false, null);
}
else if (props.BlockingNextElement == null)
break;
break;
}
}

Expand Down
Loading