From 3bc0b4657a39ea412c7bd113da1c07934fa64198 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 24 Mar 2023 11:23:22 -0500 Subject: [PATCH 1/3] fix MergeHub_must_work_with_long_streams_when_buffer_size_is_1 99% sure that trying to ReSharper this into a method that doesn't use tail recursion actually created a bug here that can occur for really small partitions - making the `firstAttempt` value mutable across all possible instances results in behavior that actually isn't comparable to using isolated function invocations in all instances. Going to run this through CI and see if that adds up. --- src/core/Akka.Streams/Dsl/Hub.cs | 126 ++++++++++++++++++------------- 1 file changed, 75 insertions(+), 51 deletions(-) diff --git a/src/core/Akka.Streams/Dsl/Hub.cs b/src/core/Akka.Streams/Dsl/Hub.cs index 32505ee03c1..5bc55a95e0b 100644 --- a/src/core/Akka.Streams/Dsl/Hub.cs +++ b/src/core/Akka.Streams/Dsl/Hub.cs @@ -78,7 +78,6 @@ public sealed class ProducerFailed : Exception /// The exception that is the cause of the current exception. public ProducerFailed(string message, Exception cause) : base(message, cause) { - } } } @@ -233,35 +232,29 @@ private bool OnEvent(IEvent e) private void TryProcessNext(bool firstAttempt) { - while (true) + // That we dequeue elements from the queue when there is demand means that Register and Deregister messages + // might be delayed for arbitrary long. This is not a problem as Register is only interesting if it is followed + // by actual elements, which would be delayed anyway by the backpressure. + // Unregister is only used to keep the map growing too large, but otherwise it is not critical to process it + // timely. In fact, the only way the map could keep growing would mean that we dequeue Registers from the + // queue, but then we will eventually reach the Deregister message, too. + if (_queue.TryDequeue(out var nextElement)) { - // That we dequeue elements from the queue when there is demand means that Register and Deregister messages - // might be delayed for arbitrary long. This is not a problem as Register is only interesting if it is followed - // by actual elements, which would be delayed anyway by the backpressure. - // Unregister is only used to keep the map growing too large, but otherwise it is not critical to process it - // timely. In fact, the only way the map could keep growing would mean that we dequeue Registers from the - // queue, but then we will eventually reach the Deregister message, too. - if (_queue.TryDequeue(out var nextElement)) + _needWakeup = false; + if (OnEvent(nextElement)) { - _needWakeup = false; - if (OnEvent(nextElement)) - { - firstAttempt = true; - continue; - } + TryProcessNext(true); } - else + } + else + { + // additional poll to grab any elements that might missed the needWakeup + // and have been enqueued just after it + _needWakeup = true; + if (firstAttempt) { - // additional poll to grab any elements that might missed the needWakeup - // and have been enqueued just after it - _needWakeup = true; - if (firstAttempt) - { - firstAttempt = false; - continue; - } + TryProcessNext(false); } - break; } } @@ -459,7 +452,8 @@ public MergeHub(int perProducerBufferSize) /// /// TBD /// TBD - public override ILogicAndMaterializedValue> CreateLogicAndMaterializedValue(Attributes inheritedAttributes) + public override ILogicAndMaterializedValue> CreateLogicAndMaterializedValue( + Attributes inheritedAttributes) { var idCounter = new AtomicCounterLong(); var logic = new HubLogic(this, idCounter); @@ -530,7 +524,9 @@ internal class BroadcastHub : GraphStageWithMaterializedValue, S { #region internal classes - private interface IHubEvent { } + private interface IHubEvent + { + } private sealed class RegistrationPending : IHubEvent { @@ -538,7 +534,6 @@ private sealed class RegistrationPending : IHubEvent private RegistrationPending() { - } } @@ -607,12 +602,13 @@ private sealed class Completed private Completed() { - } } - private interface IHubState { } + private interface IHubState + { + } private sealed class Open : IHubState { @@ -638,7 +634,9 @@ public Closed(Exception failure = null) } - private interface IConsumerEvent { } + private interface IConsumerEvent + { + } private sealed class Wakeup : IConsumerEvent { @@ -646,7 +644,6 @@ private sealed class Wakeup : IConsumerEvent private Wakeup() { - } } @@ -672,7 +669,7 @@ public Initialize(int offset) #endregion - #region Logic + #region Logic private sealed class HubLogic : InGraphStageLogic { @@ -784,6 +781,7 @@ private void OnEvent(IHubEvent hubEvent) } else CheckUnblock(unregister.PreviousOffset); + return; } @@ -933,6 +931,7 @@ public override void PostStop() else continue; } + // Already closed, ignore break; } @@ -1024,7 +1023,8 @@ void OnHubReady(Result> result) if (state is Open open) { var newRegistrations = open.Registrations.Insert(0, new Consumer(_id, callback)); - if (_stage._hubLogic.State.CompareAndSet(state, new Open(open.CallbackTask, newRegistrations))) + if (_stage._hubLogic.State.CompareAndSet(state, + new Open(open.CallbackTask, newRegistrations))) { var readyCallback = GetAsyncCallback((Action>>)OnHubReady); open.CallbackTask.ContinueWith(t => readyCallback(Result.FromTask(t))); @@ -1159,7 +1159,8 @@ public BroadcastHub(int bufferSize) /// /// TBD /// TBD - public override ILogicAndMaterializedValue> CreateLogicAndMaterializedValue(Attributes inheritedAttributes) + public override ILogicAndMaterializedValue> CreateLogicAndMaterializedValue( + Attributes inheritedAttributes) { var idCounter = new AtomicCounterLong(); var logic = new HubLogic(this); @@ -1339,7 +1340,9 @@ public ConsumerQueue(ImmutableQueue queue, int size) private sealed class PartitionQueue : IPartitionQueue { private readonly AtomicCounter _totalSize = new AtomicCounter(); - private readonly ConcurrentDictionary _queues = new ConcurrentDictionary(); + + private readonly ConcurrentDictionary _queues = + new ConcurrentDictionary(); public void Init(long id) => _queues.TryAdd(id, ConsumerQueue.Empty); @@ -1402,20 +1405,26 @@ public void Remove(long id) #region internal classes - private interface IConsumerEvent { } + private interface IConsumerEvent + { + } private sealed class Wakeup : IConsumerEvent { public static Wakeup Instance { get; } = new Wakeup(); - private Wakeup() { } + private Wakeup() + { + } } private sealed class Initialize : IConsumerEvent { public static Initialize Instance { get; } = new Initialize(); - private Initialize() { } + private Initialize() + { + } } private sealed class HubCompleted : IConsumerEvent @@ -1429,13 +1438,17 @@ public HubCompleted(Exception failure) } - private interface IHubEvent { } + private interface IHubEvent + { + } private sealed class RegistrationPending : IHubEvent { public static RegistrationPending Instance { get; } = new RegistrationPending(); - private RegistrationPending() { } + private RegistrationPending() + { + } } private sealed class UnRegister : IHubEvent @@ -1456,7 +1469,6 @@ public NeedWakeup(Consumer consumer) { Consumer = consumer; } - } private sealed class Consumer : IHubEvent @@ -1475,18 +1487,24 @@ private sealed class TryPull : IHubEvent { public static TryPull Instance { get; } = new TryPull(); - private TryPull() { } + private TryPull() + { + } } private sealed class Completed { public static Completed Instance { get; } = new Completed(); - private Completed() { } + private Completed() + { + } } - private interface IHubState { } + private interface IHubState + { + } private sealed class Open : IHubState { @@ -1510,7 +1528,7 @@ public Closed(Exception failure) } } - #endregion + #endregion private sealed class PartitionSinkLogic : InGraphStageLogic { @@ -1540,7 +1558,10 @@ public ConsumerInfo(PartitionSinkLogic partitionSinkLogic, ImmutableList _hub; private readonly int _demandThreshold; private readonly Func _materializedPartitioner; - private readonly TaskCompletionSource> _callbackCompletion = new TaskCompletionSource>(); + + private readonly TaskCompletionSource> _callbackCompletion = + new TaskCompletionSource>(); + private readonly IHubState _noRegistrationsState; private bool _initialized; private readonly IPartitionQueue _queue = new PartitionQueue(); @@ -1651,7 +1672,8 @@ private void OnEvent(IHubEvent e) var o = (Open)State.GetAndSet(_noRegistrationsState); foreach (var consumer in o.Registrations) { - var newConsumers = _consumerInfo.Consumers.Add(consumer).Sort((c1, c2) => c1.Id.CompareTo(c2.Id)); + var newConsumers = _consumerInfo.Consumers.Add(consumer) + .Sort((c1, c2) => c1.Id.CompareTo(c2.Id)); _consumerInfo = new ConsumerInfo(this, newConsumers); _queue.Init(consumer.Id); if (newConsumers.Count >= _hub._startAfterNrOfConsumers) @@ -1850,7 +1872,8 @@ public PartitionSource(AtomicCounterLong counter, PartitionSinkLogic logic) private readonly int _startAfterNrOfConsumers; private readonly int _bufferSize; - public PartitionHub(Func> partitioner, int startAfterNrOfConsumers, int bufferSize) + public PartitionHub(Func> partitioner, int startAfterNrOfConsumers, + int bufferSize) { _partitioner = partitioner; _startAfterNrOfConsumers = startAfterNrOfConsumers; @@ -1862,7 +1885,8 @@ public PartitionHub(Func> partitioner, public override SinkShape Shape { get; } - public override ILogicAndMaterializedValue> CreateLogicAndMaterializedValue(Attributes inheritedAttributes) + public override ILogicAndMaterializedValue> CreateLogicAndMaterializedValue( + Attributes inheritedAttributes) { var idCounter = new AtomicCounterLong(); var logic = new PartitionSinkLogic(this); @@ -1871,4 +1895,4 @@ public override ILogicAndMaterializedValue> CreateLogicAndMat return new LogicAndMaterializedValue>(logic, Source.FromGraph(source)); } } -} +} \ No newline at end of file From 7f070f139cc21a843a420d76d696e5b3ade12cf7 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 27 Mar 2023 15:40:42 -0500 Subject: [PATCH 2/3] Revert "fix MergeHub_must_work_with_long_streams_when_buffer_size_is_1" This reverts commit 3bc0b4657a39ea412c7bd113da1c07934fa64198. --- src/core/Akka.Streams/Dsl/Hub.cs | 126 +++++++++++++------------------ 1 file changed, 51 insertions(+), 75 deletions(-) diff --git a/src/core/Akka.Streams/Dsl/Hub.cs b/src/core/Akka.Streams/Dsl/Hub.cs index 5bc55a95e0b..32505ee03c1 100644 --- a/src/core/Akka.Streams/Dsl/Hub.cs +++ b/src/core/Akka.Streams/Dsl/Hub.cs @@ -78,6 +78,7 @@ public sealed class ProducerFailed : Exception /// The exception that is the cause of the current exception. public ProducerFailed(string message, Exception cause) : base(message, cause) { + } } } @@ -232,29 +233,35 @@ private bool OnEvent(IEvent e) private void TryProcessNext(bool firstAttempt) { - // That we dequeue elements from the queue when there is demand means that Register and Deregister messages - // might be delayed for arbitrary long. This is not a problem as Register is only interesting if it is followed - // by actual elements, which would be delayed anyway by the backpressure. - // Unregister is only used to keep the map growing too large, but otherwise it is not critical to process it - // timely. In fact, the only way the map could keep growing would mean that we dequeue Registers from the - // queue, but then we will eventually reach the Deregister message, too. - if (_queue.TryDequeue(out var nextElement)) + while (true) { - _needWakeup = false; - if (OnEvent(nextElement)) + // That we dequeue elements from the queue when there is demand means that Register and Deregister messages + // might be delayed for arbitrary long. This is not a problem as Register is only interesting if it is followed + // by actual elements, which would be delayed anyway by the backpressure. + // Unregister is only used to keep the map growing too large, but otherwise it is not critical to process it + // timely. In fact, the only way the map could keep growing would mean that we dequeue Registers from the + // queue, but then we will eventually reach the Deregister message, too. + if (_queue.TryDequeue(out var nextElement)) { - TryProcessNext(true); + _needWakeup = false; + if (OnEvent(nextElement)) + { + firstAttempt = true; + continue; + } } - } - else - { - // additional poll to grab any elements that might missed the needWakeup - // and have been enqueued just after it - _needWakeup = true; - if (firstAttempt) + else { - TryProcessNext(false); + // additional poll to grab any elements that might missed the needWakeup + // and have been enqueued just after it + _needWakeup = true; + if (firstAttempt) + { + firstAttempt = false; + continue; + } } + break; } } @@ -452,8 +459,7 @@ public MergeHub(int perProducerBufferSize) /// /// TBD /// TBD - public override ILogicAndMaterializedValue> CreateLogicAndMaterializedValue( - Attributes inheritedAttributes) + public override ILogicAndMaterializedValue> CreateLogicAndMaterializedValue(Attributes inheritedAttributes) { var idCounter = new AtomicCounterLong(); var logic = new HubLogic(this, idCounter); @@ -524,9 +530,7 @@ internal class BroadcastHub : GraphStageWithMaterializedValue, S { #region internal classes - private interface IHubEvent - { - } + private interface IHubEvent { } private sealed class RegistrationPending : IHubEvent { @@ -534,6 +538,7 @@ private sealed class RegistrationPending : IHubEvent private RegistrationPending() { + } } @@ -602,13 +607,12 @@ private sealed class Completed private Completed() { + } } - private interface IHubState - { - } + private interface IHubState { } private sealed class Open : IHubState { @@ -634,9 +638,7 @@ public Closed(Exception failure = null) } - private interface IConsumerEvent - { - } + private interface IConsumerEvent { } private sealed class Wakeup : IConsumerEvent { @@ -644,6 +646,7 @@ private sealed class Wakeup : IConsumerEvent private Wakeup() { + } } @@ -669,7 +672,7 @@ public Initialize(int offset) #endregion - #region Logic + #region Logic private sealed class HubLogic : InGraphStageLogic { @@ -781,7 +784,6 @@ private void OnEvent(IHubEvent hubEvent) } else CheckUnblock(unregister.PreviousOffset); - return; } @@ -931,7 +933,6 @@ public override void PostStop() else continue; } - // Already closed, ignore break; } @@ -1023,8 +1024,7 @@ void OnHubReady(Result> result) if (state is Open open) { var newRegistrations = open.Registrations.Insert(0, new Consumer(_id, callback)); - if (_stage._hubLogic.State.CompareAndSet(state, - new Open(open.CallbackTask, newRegistrations))) + if (_stage._hubLogic.State.CompareAndSet(state, new Open(open.CallbackTask, newRegistrations))) { var readyCallback = GetAsyncCallback((Action>>)OnHubReady); open.CallbackTask.ContinueWith(t => readyCallback(Result.FromTask(t))); @@ -1159,8 +1159,7 @@ public BroadcastHub(int bufferSize) /// /// TBD /// TBD - public override ILogicAndMaterializedValue> CreateLogicAndMaterializedValue( - Attributes inheritedAttributes) + public override ILogicAndMaterializedValue> CreateLogicAndMaterializedValue(Attributes inheritedAttributes) { var idCounter = new AtomicCounterLong(); var logic = new HubLogic(this); @@ -1340,9 +1339,7 @@ public ConsumerQueue(ImmutableQueue queue, int size) private sealed class PartitionQueue : IPartitionQueue { private readonly AtomicCounter _totalSize = new AtomicCounter(); - - private readonly ConcurrentDictionary _queues = - new ConcurrentDictionary(); + private readonly ConcurrentDictionary _queues = new ConcurrentDictionary(); public void Init(long id) => _queues.TryAdd(id, ConsumerQueue.Empty); @@ -1405,26 +1402,20 @@ public void Remove(long id) #region internal classes - private interface IConsumerEvent - { - } + private interface IConsumerEvent { } private sealed class Wakeup : IConsumerEvent { public static Wakeup Instance { get; } = new Wakeup(); - private Wakeup() - { - } + private Wakeup() { } } private sealed class Initialize : IConsumerEvent { public static Initialize Instance { get; } = new Initialize(); - private Initialize() - { - } + private Initialize() { } } private sealed class HubCompleted : IConsumerEvent @@ -1438,17 +1429,13 @@ public HubCompleted(Exception failure) } - private interface IHubEvent - { - } + private interface IHubEvent { } private sealed class RegistrationPending : IHubEvent { public static RegistrationPending Instance { get; } = new RegistrationPending(); - private RegistrationPending() - { - } + private RegistrationPending() { } } private sealed class UnRegister : IHubEvent @@ -1469,6 +1456,7 @@ public NeedWakeup(Consumer consumer) { Consumer = consumer; } + } private sealed class Consumer : IHubEvent @@ -1487,24 +1475,18 @@ private sealed class TryPull : IHubEvent { public static TryPull Instance { get; } = new TryPull(); - private TryPull() - { - } + private TryPull() { } } private sealed class Completed { public static Completed Instance { get; } = new Completed(); - private Completed() - { - } + private Completed() { } } - private interface IHubState - { - } + private interface IHubState { } private sealed class Open : IHubState { @@ -1528,7 +1510,7 @@ public Closed(Exception failure) } } - #endregion + #endregion private sealed class PartitionSinkLogic : InGraphStageLogic { @@ -1558,10 +1540,7 @@ public ConsumerInfo(PartitionSinkLogic partitionSinkLogic, ImmutableList _hub; private readonly int _demandThreshold; private readonly Func _materializedPartitioner; - - private readonly TaskCompletionSource> _callbackCompletion = - new TaskCompletionSource>(); - + private readonly TaskCompletionSource> _callbackCompletion = new TaskCompletionSource>(); private readonly IHubState _noRegistrationsState; private bool _initialized; private readonly IPartitionQueue _queue = new PartitionQueue(); @@ -1672,8 +1651,7 @@ private void OnEvent(IHubEvent e) var o = (Open)State.GetAndSet(_noRegistrationsState); foreach (var consumer in o.Registrations) { - var newConsumers = _consumerInfo.Consumers.Add(consumer) - .Sort((c1, c2) => c1.Id.CompareTo(c2.Id)); + var newConsumers = _consumerInfo.Consumers.Add(consumer).Sort((c1, c2) => c1.Id.CompareTo(c2.Id)); _consumerInfo = new ConsumerInfo(this, newConsumers); _queue.Init(consumer.Id); if (newConsumers.Count >= _hub._startAfterNrOfConsumers) @@ -1872,8 +1850,7 @@ public PartitionSource(AtomicCounterLong counter, PartitionSinkLogic logic) private readonly int _startAfterNrOfConsumers; private readonly int _bufferSize; - public PartitionHub(Func> partitioner, int startAfterNrOfConsumers, - int bufferSize) + public PartitionHub(Func> partitioner, int startAfterNrOfConsumers, int bufferSize) { _partitioner = partitioner; _startAfterNrOfConsumers = startAfterNrOfConsumers; @@ -1885,8 +1862,7 @@ public PartitionHub(Func> partitioner, public override SinkShape Shape { get; } - public override ILogicAndMaterializedValue> CreateLogicAndMaterializedValue( - Attributes inheritedAttributes) + public override ILogicAndMaterializedValue> CreateLogicAndMaterializedValue(Attributes inheritedAttributes) { var idCounter = new AtomicCounterLong(); var logic = new PartitionSinkLogic(this); @@ -1895,4 +1871,4 @@ public override ILogicAndMaterializedValue> CreateLogicAndMat return new LogicAndMaterializedValue>(logic, Source.FromGraph(source)); } } -} \ No newline at end of file +} From ec3d50d621ff452937cdd831c06015f41da85b39 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Mon, 27 Mar 2023 16:03:39 -0500 Subject: [PATCH 3/3] tremendously simplified `MergeHub_must_work_with_long_streams_when_buffer_size_is_1` --- src/core/Akka.Streams.Tests/Dsl/HubSpec.cs | 24 ++++++++-------------- src/core/Akka.Streams/Dsl/Hub.cs | 10 ++++----- 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs b/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs index 76db9b76a6b..a0d17bc8861 100644 --- a/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs @@ -185,25 +185,17 @@ public async Task MergeHub_must_work_with_long_streams_when_buffer_size_is_1() { await this.AssertAllStagesStoppedAsync(async () => { - var (sink, probe) = MergeHub.Source(1) - .ToMaterialized(this.SinkProbe(), Keep.Both) + var (sink, result) = MergeHub.Source(1) + .Take(20000) + .ToMaterialized(Sink.Seq(), Keep.Both) .Run(Materializer); - + + Source.From(Enumerable.Range(1, 10000)).RunWith(sink, Materializer); Source.From(Enumerable.Range(10001, 10000)).RunWith(sink, Materializer); - - await probe.RequestAsync(int.MaxValue); - var result = new List(); - foreach (var i in Enumerable.Range(1, 20000)) - { - var evt = await probe.ExpectEventAsync(); - if (evt is TestSubscriber.OnNext next) - result.Add(next.Element); - else - throw new Exception($"For element [{i}]: Expected OnNext but received {evt.GetType()}"); - } - result.OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 20000)); - }, Materializer, 300.Seconds()); + + (await result).OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 20000)); + }, Materializer, 3.Seconds()); } [Fact] diff --git a/src/core/Akka.Streams/Dsl/Hub.cs b/src/core/Akka.Streams/Dsl/Hub.cs index 32505ee03c1..b4727239bf8 100644 --- a/src/core/Akka.Streams/Dsl/Hub.cs +++ b/src/core/Akka.Streams/Dsl/Hub.cs @@ -86,7 +86,7 @@ public ProducerFailed(string message, Exception cause) : base(message, cause) /// /// INTERNAL API /// - /// TBD + /// The type of element emitted by the MergeHub internal class MergeHub : GraphStageWithMaterializedValue, Sink> { #region Internal classes @@ -426,14 +426,14 @@ public HubSink(MergeHub hub, AtomicCounterLong idCounter, HubLogic logic) /// TBD /// /// TBD - /// - /// This exception is thrown when the specified is less than or equal to zero. + /// + /// This exception is thrown when the specified is less than or equal to zero. /// public MergeHub(int perProducerBufferSize) { if (perProducerBufferSize <= 0) - throw new ArgumentException("Buffer size must be positive", nameof(perProducerBufferSize)); - + throw new ArgumentOutOfRangeException(nameof(perProducerBufferSize), perProducerBufferSize, "Buffer size must be positive"); + _perProducerBufferSize = perProducerBufferSize; DemandThreshold = perProducerBufferSize / 2 + perProducerBufferSize % 2; Shape = new SourceShape(Out);