diff --git a/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs b/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs index 76db9b76a6b..a0d17bc8861 100644 --- a/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/HubSpec.cs @@ -185,25 +185,17 @@ public async Task MergeHub_must_work_with_long_streams_when_buffer_size_is_1() { await this.AssertAllStagesStoppedAsync(async () => { - var (sink, probe) = MergeHub.Source<int>(1) - .ToMaterialized(this.SinkProbe<int>(), Keep.Both) + var (sink, result) = MergeHub.Source<int>(1) + .Take(20000) + .ToMaterialized(Sink.Seq<int>(), Keep.Both) .Run(Materializer); - + + Source.From(Enumerable.Range(1, 10000)).RunWith(sink, Materializer); Source.From(Enumerable.Range(10001, 10000)).RunWith(sink, Materializer); - - await probe.RequestAsync(int.MaxValue); - var result = new List<int>(); - foreach (var i in Enumerable.Range(1, 20000)) - { - var evt = await probe.ExpectEventAsync(); - if (evt is TestSubscriber.OnNext<int> next) - result.Add(next.Element); - else - throw new Exception($"For element [{i}]: Expected OnNext<int> but received {evt.GetType()}"); - } - result.OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 20000)); - }, Materializer, 300.Seconds()); + + (await result).OrderBy(x => x).Should().BeEquivalentTo(Enumerable.Range(1, 20000)); + }, Materializer, 3.Seconds()); } [Fact] diff --git a/src/core/Akka.Streams/Dsl/Hub.cs b/src/core/Akka.Streams/Dsl/Hub.cs index 32505ee03c1..b4727239bf8 100644 --- a/src/core/Akka.Streams/Dsl/Hub.cs +++ b/src/core/Akka.Streams/Dsl/Hub.cs @@ -86,7 +86,7 @@ public ProducerFailed(string message, Exception cause) : base(message, cause) /// <summary> /// INTERNAL API /// </summary> - /// <typeparam name="T">TBD</typeparam> + /// <typeparam name="T">The type of element emitted by the MergeHub</typeparam> internal class MergeHub<T> : GraphStageWithMaterializedValue<SourceShape<T>, Sink<T, NotUsed>> { #region Internal classes @@ -426,14 +426,14 @@ public HubSink(MergeHub<T> hub, AtomicCounterLong idCounter, HubLogic logic) /// TBD /// </summary> /// <param name="perProducerBufferSize">TBD</param> - /// <exception cref="ArgumentException"> - /// This exception is thrown when the specified <paramref name="perProducerBufferSize"/> is less than or equal to zero. + /// <exception cref="ArgumentOutOfRangeException"> + /// This exception is thrown when the specified <paramref name="perProducerBufferSize"/>is less than or equal to zero. /// </exception> public MergeHub(int perProducerBufferSize) { if (perProducerBufferSize <= 0) - throw new ArgumentException("Buffer size must be positive", nameof(perProducerBufferSize)); - + throw new ArgumentOutOfRangeException(nameof(perProducerBufferSize), perProducerBufferSize, "Buffer size must be positive"); + _perProducerBufferSize = perProducerBufferSize; DemandThreshold = perProducerBufferSize / 2 + perProducerBufferSize % 2; Shape = new SourceShape<T>(Out);