diff --git a/src/core/Akka.Tests/Routing/ResizerSpec.cs b/src/core/Akka.Tests/Routing/ResizerSpec.cs index c62a1a66c98..c85087ce8d5 100644 --- a/src/core/Akka.Tests/Routing/ResizerSpec.cs +++ b/src/core/Akka.Tests/Routing/ResizerSpec.cs @@ -8,6 +8,7 @@ using System; using System.Linq; using System.Threading; +using System.Threading.Tasks; using Akka.Actor; using Akka.Configuration; using Akka.Routing; @@ -65,6 +66,24 @@ public PressureActor() } } + private class PressureAsyncActor : ReceiveActor + { + public PressureAsyncActor() + { + ReceiveAsync(async d => + { + await Task.Delay(d); + Sender.Tell("done"); + }); + + ReceiveAsync(s => s == "echo", s => + { + Sender.Tell("reply"); + return Task.CompletedTask; + }); + } + } + private class BackoffActor : ReceiveActor { private readonly Func _dilated; @@ -266,6 +285,55 @@ public void DefaultResizer_must_grow_as_needed_under_pressure() RouteeSize(router).Should().Be(resizer.UpperBound); } + [Fact] + public async Task DefaultResizer_with_ReceiveAsync_must_grow_as_needed_under_pressure() + { + var resizer = new DefaultResizer( + lower: 3, + upper: 5, + rampupRate: 0.1, + backoffRate: 0.0, + pressureThreshold: 1, + messagesPerResize: 1, + backoffThreshold: 0.0); + + var router = Sys.ActorOf(new RoundRobinPool(0, resizer).Props(Props.Create())); + + // first message should create the minimum number of routees + router.Tell("echo"); + ExpectMsg("reply"); + + RouteeSize(router).Should().Be(resizer.LowerBound); + + Func loop = async (loops, d) => + { + for (var i = 0; i < loops; i++) + { + router.Tell(d); + + //sending too quickly will result in skipped resize due to many ResizeInProgress conflicts + await Task.Delay(Dilated(20.Milliseconds())); + } + + var max = d.TotalMilliseconds * loops / resizer.LowerBound + Dilated(2.Seconds()).TotalMilliseconds; + Within(TimeSpan.FromMilliseconds(max), () => + { + for (var i = 0; i < loops; i++) + { + ExpectMsg("done"); + } + }); + }; + + // 2 more should go through without triggering more + await loop(2, 200.Milliseconds()); + RouteeSize(router).Should().Be(resizer.LowerBound); + + // a whole bunch should max it out + await loop(20, 500.Milliseconds()); + RouteeSize(router).Should().Be(resizer.UpperBound); + } + [Fact(Skip = "Racy due to Resizer / Mailbox impl")] public void DefaultResizer_must_backoff() { diff --git a/src/core/Akka/Routing/Resizer.cs b/src/core/Akka/Routing/Resizer.cs index ee389a73710..e294fe9d349 100644 --- a/src/core/Akka/Routing/Resizer.cs +++ b/src/core/Akka/Routing/Resizer.cs @@ -192,7 +192,7 @@ public int Capacity(IEnumerable currentRoutees) var pressure = Pressure(routees); var delta = Filter(pressure, currentSize); var proposed = currentSize + delta; - + if (proposed < LowerBound) return delta + (LowerBound - proposed); if (proposed > UpperBound) @@ -227,9 +227,9 @@ public int Pressure(IEnumerable currentRoutees) if (underlying is ActorCell cell) { if (PressureThreshold == 1) - return cell.Mailbox.IsScheduled() && cell.Mailbox.HasMessages; + return (cell.Mailbox.IsScheduled() || cell.Mailbox.IsSuspended()) && cell.Mailbox.HasMessages; if (PressureThreshold < 1) - return cell.Mailbox.IsScheduled() && cell.CurrentMessage != null; + return (cell.Mailbox.IsScheduled() || cell.Mailbox.IsSuspended()) && cell.CurrentMessage != null; return cell.Mailbox.NumberOfMessages >= PressureThreshold; }