Skip to content

Commit

Permalink
Avoids memory being retained for GroupBy
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed committed Apr 23, 2022
1 parent 810882b commit fd3fb95
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 60 deletions.
118 changes: 59 additions & 59 deletions src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,63 +38,6 @@ public FlowGroupBySpec(ITestOutputHelper helper) : base(helper)
Materializer = ActorMaterializer.Create(Sys, settings);
}

private sealed class StreamPuppet
{
private readonly TestSubscriber.ManualProbe<int> _probe;
private readonly ISubscription _subscription;

public StreamPuppet(IPublisher<int> p, TestKitBase kit)
{
_probe = kit.CreateManualSubscriberProbe<int>();
p.Subscribe(_probe);
_subscription = _probe.ExpectSubscription();
}

public void Request(int demand) => _subscription.Request(demand);

public void ExpectNext(int element) => _probe.ExpectNext(element);

public void ExpectNoMsg(TimeSpan max) => _probe.ExpectNoMsg(max);

public void ExpectComplete() => _probe.ExpectComplete();

public void ExpectError(Exception ex) => _probe.ExpectError().Should().Be(ex);

public void Cancel() => _subscription.Cancel();
}

private void WithSubstreamsSupport(int groupCount = 2, int elementCount = 6, int maxSubstream = -1,
Action<TestSubscriber.ManualProbe<(int, Source<int, NotUsed>)>, ISubscription, Func<int, Source<int, NotUsed>>> run = null)
{

var source = Source.From(Enumerable.Range(1, elementCount)).RunWith(Sink.AsPublisher<int>(false), Materializer);
var max = maxSubstream > 0 ? maxSubstream : groupCount;
var groupStream =
Source.FromPublisher(source)
.GroupBy(max, x => x % groupCount)
.Lift(x => x % groupCount)
.RunWith(Sink.AsPublisher<(int, Source<int, NotUsed>)>(false), Materializer);
var masterSubscriber = this.CreateManualSubscriberProbe<(int, Source<int, NotUsed>)>();

groupStream.Subscribe(masterSubscriber);
var masterSubscription = masterSubscriber.ExpectSubscription();

run?.Invoke(masterSubscriber, masterSubscription, expectedKey =>
{
masterSubscription.Request(1);
var tuple = masterSubscriber.ExpectNext();
tuple.Item1.Should().Be(expectedKey);
return tuple.Item2;
});
}

private ByteString RandomByteString(int size)
{
var a = new byte[size];
ThreadLocalRandom.Current.NextBytes(a);
return ByteString.FromBytes(a);
}

[Fact]
public void GroupBy_must_work_in_the_happy_case()
{
Expand Down Expand Up @@ -167,11 +110,11 @@ public void GroupBy_must_fail_when_key_function_returns_null()
}

[Fact]
public void GroupBy_must_support_cancelling_substreams()
public void GroupBy_must_accept_cancelling_substreams()
{
this.AssertAllStagesStopped(() =>
{
WithSubstreamsSupport(2, run: (masterSubscriber, masterSubscription, getSubFlow) =>
WithSubstreamsSupport(2, maxSubstream: 3, run: (masterSubscriber, masterSubscription, getSubFlow) =>
{
new StreamPuppet(getSubFlow(1).RunWith(Sink.AsPublisher<int>(false), Materializer), this).Cancel();
var substream = new StreamPuppet(getSubFlow(0).RunWith(Sink.AsPublisher<int>(false), Materializer), this);
Expand Down Expand Up @@ -740,6 +683,63 @@ public void GroupBy_must_work_with_random_demand()
}, Materializer);
}

private sealed class StreamPuppet
{
private readonly TestSubscriber.ManualProbe<int> _probe;
private readonly ISubscription _subscription;

public StreamPuppet(IPublisher<int> p, TestKitBase kit)
{
_probe = kit.CreateManualSubscriberProbe<int>();
p.Subscribe(_probe);
_subscription = _probe.ExpectSubscription();
}

public void Request(int demand) => _subscription.Request(demand);

public void ExpectNext(int element) => _probe.ExpectNext(element);

public void ExpectNoMsg(TimeSpan max) => _probe.ExpectNoMsg(max);

public void ExpectComplete() => _probe.ExpectComplete();

public void ExpectError(Exception ex) => _probe.ExpectError().Should().Be(ex);

public void Cancel() => _subscription.Cancel();
}

private void WithSubstreamsSupport(int groupCount = 2, int elementCount = 6, int maxSubstream = -1,
Action<TestSubscriber.ManualProbe<(int, Source<int, NotUsed>)>, ISubscription, Func<int, Source<int, NotUsed>>> run = null)
{

var source = Source.From(Enumerable.Range(1, elementCount)).RunWith(Sink.AsPublisher<int>(false), Materializer);
var max = maxSubstream > 0 ? maxSubstream : groupCount;
var groupStream =
Source.FromPublisher(source)
.GroupBy(max, x => x % groupCount)
.Lift(x => x % groupCount)
.RunWith(Sink.AsPublisher<(int, Source<int, NotUsed>)>(false), Materializer);
var masterSubscriber = this.CreateManualSubscriberProbe<(int, Source<int, NotUsed>)>();

groupStream.Subscribe(masterSubscriber);
var masterSubscription = masterSubscriber.ExpectSubscription();

run?.Invoke(masterSubscriber, masterSubscription, expectedKey =>
{
masterSubscription.Request(1);
var tuple = masterSubscriber.ExpectNext();
tuple.Item1.Should().Be(expectedKey);
return tuple.Item2;
});
}

private ByteString RandomByteString(int size)
{
var a = new byte[size];
ThreadLocalRandom.Current.NextBytes(a);
return ByteString.FromBytes(a);
}

private sealed class SubFlowState
{
public SubFlowState(TestSubscriber.Probe<ByteString> probe, bool hasDemand, ByteString firstElement)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ public void OnPush()
}
else
{
if (_activeSubstreams.Count == _stage._maxSubstreams)
if (_activeSubstreams.Count + _closedSubstreams.Count == _stage._maxSubstreams)
throw new TooManySubstreamsOpenException();
else if (_closedSubstreams.Contains(key) && !HasBeenPulled(_stage.In))
Pull(_stage.In);
Expand Down

0 comments on commit fd3fb95

Please sign in to comment.