From 9942ad95ec20a3cb3a190385a5f8824c2a2f7c2b Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 17 Aug 2023 23:41:29 +0700 Subject: [PATCH] Improve Streams Throttle. Log errors, improve tests, and add supervisor strategy support (#6886) * Improve Streams Throttle. Log errors and improve tests * code cleanup * Apply supervision strategy only on calculateCost --------- Co-authored-by: Aaron Stannard --- .../Dsl/FlowThrottleSpec.cs | 299 +++++++++++------- .../Dsl/Internal/InternalFlowOperations.cs | 4 +- .../Akka.Streams/Implementation/Throttle.cs | 37 ++- 3 files changed, 216 insertions(+), 124 deletions(-) diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowThrottleSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowThrottleSpec.cs index 18edebd45ed..2dcec1432b8 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowThrottleSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowThrottleSpec.cs @@ -13,14 +13,17 @@ using Akka.IO; using Akka.Streams.Actors; using Akka.Streams.Dsl; +using Akka.Streams.Supervision; using Akka.Streams.TestKit; using Akka.TestKit; +using Akka.TestKit.Extensions; using Akka.TestKit.Xunit2.Attributes; using Akka.Util.Internal; using FluentAssertions; +using FluentAssertions.Extensions; using Xunit; using Xunit.Abstractions; -// ReSharper disable InvokeAsExtensionMethod +using static FluentAssertions.FluentActions; namespace Akka.Streams.Tests.Dsl { @@ -46,16 +49,42 @@ private static ByteString GenerateByteString(int length) return ByteString.FromBytes(bytes); } + [Fact(DisplayName = "Throttle with delegate calculateCost must resume when delegate throws")] + public async Task ThrottleCostExceptionTest() + { + await this.AssertAllStagesStoppedAsync(async () => + { + var probe = Source.From(new[] { 1, 2, 3, 4, 5 }) + .Throttle( + cost: 1, + per: 1.Seconds(), + maximumBurst: 10, + calculateCost: e => e == 3 ? throw new TestException("err1") : 0, + mode: ThrottleMode.Shaping) + .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider)) + .RunWith(this.SinkProbe(), Materializer); + await probe.AsyncBuilder() + .Request(5) + .ExpectNext(1, 2, 4, 5) + .ExpectComplete() + .ExecuteAsync() + .ShouldCompleteWithin(RemainingOrDefault); + }, Materializer); + } + [Fact] public async Task Throttle_for_single_cost_elements_must_work_for_the_happy_case() { await this.AssertAllStagesStoppedAsync(async() => { - await Source.From(Enumerable.Range(1, 5)) - .Throttle(1, TimeSpan.FromMilliseconds(100), 0, ThrottleMode.Shaping) - .RunWith(this.SinkProbe(), Materializer) - .Request(5) - .ExpectNext(1, 2, 3, 4, 5) - .ExpectCompleteAsync(); + var probe = Source.From(Enumerable.Range(1, 5)) + .Throttle(1, TimeSpan.FromMilliseconds(100), 0, ThrottleMode.Shaping) + .RunWith(this.SinkProbe(), Materializer); + await probe.AsyncBuilder() + .Request(5) + .ExpectNext(1, 2, 3, 4, 5) + .ExpectComplete() + .ExecuteAsync() + .ShouldCompleteWithin(RemainingOrDefault); }, Materializer); } @@ -63,12 +92,15 @@ await Source.From(Enumerable.Range(1, 5)) public async Task Throttle_for_single_cost_elements_must_accept_very_high_rates() { await this.AssertAllStagesStoppedAsync(async() => { - await Source.From(Enumerable.Range(1, 5)) - .Throttle(1, TimeSpan.FromTicks(1), 0, ThrottleMode.Shaping) - .RunWith(this.SinkProbe(), Materializer) - .Request(5) - .ExpectNext(1, 2, 3, 4, 5) - .ExpectCompleteAsync(); + var probe = Source.From(Enumerable.Range(1, 5)) + .Throttle(1, TimeSpan.FromTicks(1), 0, ThrottleMode.Shaping) + .RunWith(this.SinkProbe(), Materializer); + await probe.AsyncBuilder() + .Request(5) + .ExpectNext(1, 2, 3, 4, 5) + .ExpectComplete() + .ExecuteAsync() + .ShouldCompleteWithin(RemainingOrDefault); }, Materializer); } @@ -76,36 +108,42 @@ await Source.From(Enumerable.Range(1, 5)) public async Task Throttle_for_single_cost_elements_must_accept_very_low_rates() { await this.AssertAllStagesStoppedAsync(async() => { - var probe = Source.From(Enumerable.Range(1, 5)) - .Throttle(1, TimeSpan.FromDays(100), 1, ThrottleMode.Shaping) - .RunWith(this.SinkProbe(), Materializer); - await probe.Request(5) + var probe = Source.From(Enumerable.Range(1, 5)) + .Throttle(1, TimeSpan.FromDays(100), 1, ThrottleMode.Shaping) + .RunWith(this.SinkProbe(), Materializer); + await probe.AsyncBuilder() + .Request(5) .ExpectNext(1) - .ExpectNoMsgAsync(TimeSpan.FromMilliseconds(100)); + .ExpectNoMsg(TimeSpan.FromMilliseconds(100)) + .ExecuteAsync() + .ShouldCompleteWithin(RemainingOrDefault); probe.Cancel(); }, Materializer); } [Fact] - public void Throttle_for_single_cost_elements_must_() + public async Task Throttle_for_single_cost_elements_must_work() { - var sharedThrottle = Flow.Create().Throttle(1, TimeSpan.FromDays(1), 1, ThrottleMode.Enforcing); - - // If there is accidental shared state then we would not be able to pass through the single element - var t = Source.Single(1) - .Via(sharedThrottle) - .Via(sharedThrottle) - .RunWith(Sink.First(), Materializer); - t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); - t.Result.Should().Be(1); - - // It works with a new stream, too - t = Source.Single(2) - .Via(sharedThrottle) - .Via(sharedThrottle) - .RunWith(Sink.First(), Materializer); - t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); - t.Result.Should().Be(2); + await this.AssertAllStagesStoppedAsync(async () => + { + var sharedThrottle = Flow.Create().Throttle(1, TimeSpan.FromDays(1), 1, ThrottleMode.Enforcing); + + // If there is accidental shared state then we would not be able to pass through the single element + var t = await Source.Single(1) + .Via(sharedThrottle) + .Via(sharedThrottle) + .RunWith(Sink.First(), Materializer) + .ShouldCompleteWithin(RemainingOrDefault); + t.Should().Be(1); + + // It works with a new stream, too + t = await Source.Single(2) + .Via(sharedThrottle) + .Via(sharedThrottle) + .RunWith(Sink.First(), Materializer) + .ShouldCompleteWithin(RemainingOrDefault); + t.Should().Be(2); + }, Materializer); } [LocalFact(SkipLocal = "Racy on Azure DevOps")] @@ -160,13 +198,14 @@ await this.AssertAllStagesStoppedAsync(async() => { [Fact] public async Task Throttle_for_single_cost_elements_must_cancel_when_downstream_cancels() { - await this.AssertAllStagesStoppedAsync(() => { - var downstream = this.CreateSubscriberProbe(); - Source.From(Enumerable.Range(1, 10)) + await this.AssertAllStagesStoppedAsync(async () => { + var probe = Source.From(Enumerable.Range(1, 10)) .Throttle(1, TimeSpan.FromMilliseconds(300), 0, ThrottleMode.Shaping) - .RunWith(Sink.FromSubscriber(downstream), Materializer); + .RunWith(this.SinkProbe(), Materializer); + var downstream = await probe.ExpectSubscriptionAsync(); + downstream.Request(5); downstream.Cancel(); - return Task.CompletedTask; + await probe.ExpectNoMsgAsync(200.Milliseconds()); }, Materializer); } @@ -174,16 +213,19 @@ await this.AssertAllStagesStoppedAsync(() => { public async Task Throttle_for_single_cost_elements_must_send_elements_downstream_as_soon_as_time_comes() { await this.AssertAllStagesStoppedAsync(async() => { - var probe = - Source.From(Enumerable.Range(1, 10)) - .Throttle(2, TimeSpan.FromMilliseconds(750), 0, ThrottleMode.Shaping) - .RunWith(this.SinkProbe(), Materializer); + var probe = Source.From(Enumerable.Range(1, 10)) + .Throttle(2, TimeSpan.FromMilliseconds(750), 0, ThrottleMode.Shaping) + .RunWith(this.SinkProbe(), Materializer); + probe.Request(5); var result = probe.ReceiveWhile(TimeSpan.FromMilliseconds(900), filter: x => x); - await probe.ExpectNoMsg(TimeSpan.FromMilliseconds(150)) + await probe.AsyncBuilder() + .ExpectNoMsg(TimeSpan.FromMilliseconds(150)) .ExpectNext(3) .ExpectNoMsg(TimeSpan.FromMilliseconds(150)) - .ExpectNextAsync(4); + .ExpectNext(4) + .ExecuteAsync() + .ShouldCompleteWithin(RemainingOrDefault); probe.Cancel(); // assertion may take longer then the throttle and therefore the next assertion fails result.Should().BeEquivalentTo(new[] { new OnNext(1), new OnNext(2) }); @@ -277,20 +319,21 @@ await this.AssertAllStagesStoppedAsync(async() => { [Fact] public async Task Throttle_for_single_cost_elements_must_throw_exception_when_exceeding_throughtput_in_enforced_mode() { - await this.AssertAllStagesStoppedAsync(() => { - var t1 = - Source.From(Enumerable.Range(1, 5)) - .Throttle(1, TimeSpan.FromMilliseconds(200), 5, ThrottleMode.Enforcing) - .RunWith(Sink.Seq(), Materializer); // Burst is 5 so this will not fail - t1.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); - t1.Result.Should().BeEquivalentTo(Enumerable.Range(1, 5)); - - var t2 = - Source.From(Enumerable.Range(1, 6)) - .Throttle(1, TimeSpan.FromMilliseconds(200), 5, ThrottleMode.Enforcing) - .RunWith(Sink.Ignore(), Materializer); - t2.Invoking(task => task.Wait(TimeSpan.FromSeconds(2))).Should().Throw(); - return Task.CompletedTask; + await this.AssertAllStagesStoppedAsync(async () => + { + var t1 = await Source.From(Enumerable.Range(1, 5)) + .Throttle(1, TimeSpan.FromMilliseconds(200), 5, ThrottleMode.Enforcing) + .RunWith(Sink.Seq(), Materializer) // Burst is 5 so this will not fail + .ShouldCompleteWithin(RemainingOrDefault); + t1.Should().BeEquivalentTo(Enumerable.Range(1, 5)); + + await Awaiting(async () => + { + await Source.From(Enumerable.Range(1, 6)) + .Throttle(1, TimeSpan.FromMilliseconds(200), 5, ThrottleMode.Enforcing) + .RunWith(Sink.Ignore(), Materializer); + }).Should().ThrowAsync() + .ShouldCompleteWithin(RemainingOrDefault); }, Materializer); } @@ -298,28 +341,33 @@ await this.AssertAllStagesStoppedAsync(() => { public async Task Throttle_for_single_cost_elements_must_properly_combine_shape_and_throttle_modes() { await this.AssertAllStagesStoppedAsync(async() => { - await Source.From(Enumerable.Range(1, 5)) - .Throttle(1, TimeSpan.FromMilliseconds(100), 5, ThrottleMode.Shaping) - .Throttle(1, TimeSpan.FromMilliseconds(100), 5, ThrottleMode.Enforcing) - .RunWith(this.SinkProbe(), Materializer) - .Request(5) - .ExpectNext(1, 2, 3, 4, 5) - .ExpectCompleteAsync(); + var probe = Source.From(Enumerable.Range(1, 5)) + .Throttle(1, TimeSpan.FromMilliseconds(100), 5, ThrottleMode.Shaping) + .Throttle(1, TimeSpan.FromMilliseconds(100), 5, ThrottleMode.Enforcing) + .RunWith(this.SinkProbe(), Materializer); + + await probe.AsyncBuilder() + .Request(5) + .ExpectNext(1, 2, 3, 4, 5) + .ExpectComplete() + .ExecuteAsync() + .ShouldCompleteWithin(RemainingOrDefault); }, Materializer); } - - [Fact] public async Task Throttle_for_various_cost_elements_must_work_for_the_happy_case() { - await this.AssertAllStagesStoppedAsync(async() => { - await Source.From(Enumerable.Range(1, 5)) - .Throttle(1, TimeSpan.FromMilliseconds(100), 0, _ => 1, ThrottleMode.Shaping) - .RunWith(this.SinkProbe(), Materializer) - .Request(5) - .ExpectNext(1, 2, 3, 4, 5) - .ExpectCompleteAsync(); + await this.AssertAllStagesStoppedAsync(async () => { + var probe = Source.From(Enumerable.Range(1, 5)) + .Throttle(1, TimeSpan.FromMilliseconds(100), 0, _ => 1, ThrottleMode.Shaping) + .RunWith(this.SinkProbe(), Materializer); + await probe.AsyncBuilder() + .Request(5) + .ExpectNext(1, 2, 3, 4, 5) + .ExpectComplete() + .ExecuteAsync() + .ShouldCompleteWithin(RemainingOrDefault); }, Materializer); } @@ -329,9 +377,10 @@ public async Task Throttle_for_various_cost_elements_must_emit_elements_accordin await this.AssertAllStagesStoppedAsync(async() => { var list = Enumerable.Range(1, 4).Select(x => x * 2).Select(GenerateByteString).ToList(); - await Source.From(list) + var probe = Source.From(list) .Throttle(2, TimeSpan.FromMilliseconds(200), 0, x => x.Count, ThrottleMode.Shaping) - .RunWith(this.SinkProbe(), Materializer) + .RunWith(this.SinkProbe(), Materializer); + await probe.AsyncBuilder() .Request(4) .ExpectNext(list[0]) .ExpectNoMsg(TimeSpan.FromMilliseconds(300)) @@ -340,7 +389,9 @@ await Source.From(list) .ExpectNext(list[2]) .ExpectNoMsg(TimeSpan.FromMilliseconds(700)) .ExpectNext(list[3]) - .ExpectCompleteAsync(); + .ExpectComplete() + .ExecuteAsync() + .ShouldCompleteWithin(RemainingOrDefault); }, Materializer); } @@ -371,13 +422,14 @@ await this.AssertAllStagesStoppedAsync(async() => { [Fact] public async Task Throttle_for_various_cost_elements_must_cancel_when_downstream_cancels() { - await this.AssertAllStagesStoppedAsync(() => { - var downstream = this.CreateSubscriberProbe(); - Source.From(Enumerable.Range(1, 10)) + await this.AssertAllStagesStoppedAsync(async () => { + var probe = Source.From(Enumerable.Range(1, 10)) .Throttle(2, TimeSpan.FromMilliseconds(200), 0, x => x, ThrottleMode.Shaping) - .RunWith(Sink.FromSubscriber(downstream), Materializer); + .RunWith(this.SinkProbe(), Materializer); + var downstream = await probe.ExpectSubscriptionAsync(); + downstream.Request(5); downstream.Cancel(); - return Task.CompletedTask; + await probe.ExpectNoMsgAsync(200.Milliseconds()); }, Materializer); } @@ -385,19 +437,24 @@ await this.AssertAllStagesStoppedAsync(() => { public async Task Throttle_for_various_cost_elements_must_send_elements_downstream_as_soon_as_time_comes() { await this.AssertAllStagesStoppedAsync(async() => { - var probe = - Source.From(Enumerable.Range(1, 10)) - .Throttle(4, TimeSpan.FromMilliseconds(500), 0, _ => 2, ThrottleMode.Shaping) - .RunWith(this.SinkProbe(), Materializer); + var probe = Source.From(Enumerable.Range(1, 10)) + .Throttle(4, TimeSpan.FromMilliseconds(500), 0, _ => 2, ThrottleMode.Shaping) + .RunWith(this.SinkProbe(), Materializer); + probe.Request(5); - var result = probe.ReceiveWhile(TimeSpan.FromMilliseconds(600), filter: x => x); - await probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)) + var result = await probe.ReceiveWithinAsync(600.Milliseconds(), 2).ToListAsync(); + + await probe.AsyncBuilder() + .ExpectNoMsg(TimeSpan.FromMilliseconds(100)) .ExpectNext(3) .ExpectNoMsg(TimeSpan.FromMilliseconds(100)) - .ExpectNextAsync(4); + .ExpectNext(4) + .ExecuteAsync() + .ShouldCompleteWithin(RemainingOrDefault); probe.Cancel(); + // assertion may take longer then the throttle and therefore the next assertion fails - result.Should().BeEquivalentTo(new[] { new OnNext(1), new OnNext(2) }); + result.Should().BeEquivalentTo( 1, 2 ); }, Materializer); } @@ -479,20 +536,20 @@ await this.AssertAllStagesStoppedAsync(async() => { [Fact] public async Task Throttle_for_various_cost_elements_must_throw_exception_when_exceeding_throughtput_in_enforced_mode() { - await this.AssertAllStagesStoppedAsync(() => { - var t1 = - Source.From(Enumerable.Range(1, 4)) - .Throttle(2, TimeSpan.FromMilliseconds(200), 10, x => x, ThrottleMode.Enforcing) - .RunWith(Sink.Seq(), Materializer); - t1.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); - t1.Result.Should().BeEquivalentTo(Enumerable.Range(1, 4)); // Burst is 10 so this will not fail - - var t2 = - Source.From(Enumerable.Range(1, 6)) - .Throttle(2, TimeSpan.FromMilliseconds(200), 5, x => x, ThrottleMode.Enforcing) - .RunWith(Sink.Ignore(), Materializer); - t2.Invoking(task => task.Wait(TimeSpan.FromSeconds(2))).Should().Throw(); - return Task.CompletedTask; + await this.AssertAllStagesStoppedAsync(async () => { + var t1 = await Source.From(Enumerable.Range(1, 4)) + .Throttle(2, TimeSpan.FromMilliseconds(200), 10, x => x, ThrottleMode.Enforcing) + .RunWith(Sink.Seq(), Materializer) + .ShouldCompleteWithin(RemainingOrDefault); + t1.Should().BeEquivalentTo(Enumerable.Range(1, 4)); // Burst is 10 so this will not fail + + await Awaiting(async () => + { + await Source.From(Enumerable.Range(1, 6)) + .Throttle(2, TimeSpan.FromMilliseconds(200), 5, x => x, ThrottleMode.Enforcing) + .RunWith(Sink.Ignore(), Materializer); + }).Should().ThrowAsync() + .ShouldCompleteWithin(RemainingOrDefault); }, Materializer); } @@ -500,28 +557,32 @@ await this.AssertAllStagesStoppedAsync(() => { public async Task Throttle_for_various_cost_elements_must_properly_combine_shape_and_enforce_modes() { await this.AssertAllStagesStoppedAsync(async() => { - await Source.From(Enumerable.Range(1, 5)) - .Throttle(2, TimeSpan.FromMilliseconds(200), 0, x => x, ThrottleMode.Shaping) - .Throttle(1, TimeSpan.FromMilliseconds(100), 5, ThrottleMode.Enforcing) - .RunWith(this.SinkProbe(), Materializer) - .Request(5) - .ExpectNext(1, 2, 3, 4, 5) - .ExpectCompleteAsync(); + var probe = Source.From(Enumerable.Range(1, 5)) + .Throttle(2, TimeSpan.FromMilliseconds(200), 0, x => x, ThrottleMode.Shaping) + .Throttle(1, TimeSpan.FromMilliseconds(100), 5, ThrottleMode.Enforcing) + .RunWith(this.SinkProbe(), Materializer); + await probe.AsyncBuilder() + .Request(5) + .ExpectNext(1, 2, 3, 4, 5) + .ExpectComplete() + .ExecuteAsync() + .ShouldCompleteWithin(RemainingOrDefault); }, Materializer); } [Fact] public async Task Throttle_for_various_cost_elements_must_handle_rate_calculation_function_exception() { - await this.AssertAllStagesStoppedAsync(() => { + await this.AssertAllStagesStoppedAsync(async () => { var ex = new Exception(); - Source.From(Enumerable.Range(1, 5)) - .Throttle(2, TimeSpan.FromMilliseconds(200), 0, _ => { throw ex; }, ThrottleMode.Shaping) + var exception = await Source.From(Enumerable.Range(1, 5)) + .Throttle(2, TimeSpan.FromMilliseconds(200), 0, _ => throw ex, ThrottleMode.Shaping) .Throttle(1, TimeSpan.FromMilliseconds(100), 5, ThrottleMode.Enforcing) .RunWith(this.SinkProbe(), Materializer) + .AsyncBuilder() .Request(5) - .ExpectError().Should().Be(ex); - return Task.CompletedTask; + .ExpectErrorAsync(); + exception.Should().Be(ex); }, Materializer); } } diff --git a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs index 1ecee2a75a6..7259a033808 100644 --- a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs @@ -1893,7 +1893,7 @@ public static IFlow Throttle(this IFlow flow, int ele int maximumBurst, ThrottleMode mode) { if (elements <= 0) throw new ArgumentException("Throttle elements must be > 0", nameof(elements)); - if (per == TimeSpan.Zero) throw new ArgumentException("Throttle per timeout must not be zero", nameof(per)); + if (per <= TimeSpan.Zero) throw new ArgumentException("Throttle per timeout must not be less than or equal to zero", nameof(per)); if (mode == ThrottleMode.Enforcing && maximumBurst < 0) throw new ArgumentException("Throttle maximumBurst must be > 0 in Enforcing mode", nameof(maximumBurst)); if (per.Ticks < elements) @@ -1950,7 +1950,7 @@ public static IFlow Throttle(this IFlow flow, int cos int maximumBurst, Func calculateCost, ThrottleMode mode) { if (cost <= 0) throw new ArgumentException("cost must be > 0", nameof(cost)); - if (per == TimeSpan.Zero) throw new ArgumentException("Throttle per timeout must not be zero", nameof(per)); + if (per <= TimeSpan.Zero) throw new ArgumentException("Throttle per timeout must not be less than or equal to zero", nameof(per)); if (mode == ThrottleMode.Enforcing && maximumBurst < 0) throw new ArgumentException("Throttle maximumBurst must be > 0 in Enforcing mode", nameof(maximumBurst)); if (per.Ticks < cost) diff --git a/src/core/Akka.Streams/Implementation/Throttle.cs b/src/core/Akka.Streams/Implementation/Throttle.cs index 127780d40dc..b98911543be 100644 --- a/src/core/Akka.Streams/Implementation/Throttle.cs +++ b/src/core/Akka.Streams/Implementation/Throttle.cs @@ -7,8 +7,10 @@ using System; using Akka.Annotations; +using Akka.Event; using Akka.Streams.Implementation.Fusing; using Akka.Streams.Stage; +using Akka.Streams.Supervision; using Akka.Streams.Util; using Akka.Util; @@ -29,16 +31,20 @@ private sealed class Logic : TimerGraphStageLogic, IInHandler, IOutHandler private readonly Throttle _stage; private readonly TickTimeTokenBucket _tokenBucket; private readonly bool _enforcing; + private readonly Decider _decider; private bool _willStop; private Option _currentElement; - public Logic(Throttle stage) : base(stage.Shape) + public Logic(Throttle stage, Attributes inheritedAttributes) : base(stage.Shape) { _stage = stage; _tokenBucket = new TickTimeTokenBucket(stage._maximumBurst, stage._ticksBetweenTokens); _enforcing = stage._mode == ThrottleMode.Enforcing; + var attr = inheritedAttributes.GetAttribute(null); + _decider = attr != null ? attr.Decider : Deciders.StoppingDecider; + SetHandler(_stage.Inlet, this); SetHandler(_stage.Outlet, this); } @@ -46,7 +52,32 @@ public Logic(Throttle stage) : base(stage.Shape) public void OnPush() { var element = Grab(_stage.Inlet); - var cost = _stage._costCalculation(element); + int cost; + try + { + cost = _stage._costCalculation(element); + } + catch (Exception e) + { + var strategy = _decider(e); + Log.Error(e, "An exception occured inside Throttle while calculating cost for element [{0}]. Supervision strategy: {1}", element, strategy); + switch (strategy) + { + case Directive.Stop: + FailStage(e); + return; + + case Directive.Resume: + case Directive.Restart: + if (!HasBeenPulled(_stage.Inlet)) + TryPull(_stage.Inlet); + return; + + default: + throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", e); + } + } + var delayTicks = _tokenBucket.Offer(cost); if (delayTicks == 0) @@ -119,7 +150,7 @@ public Throttle(int cost, TimeSpan per, int maximumBurst, Func costCalcu /// /// TBD /// TBD - protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this); + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this, inheritedAttributes); /// /// TBD