From c99d3f7cd9bbe24be06e0369c95dbcdbb89bfa3d Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 23 Feb 2024 04:34:17 +0700 Subject: [PATCH] Fix persistence liveness check deadlock --- ...robeNotAvailableDueToSnapshotStoreSpecs.cs | 3 +- ...enessProbeNotAvailableduetoJournalSpecs.cs | 3 +- ...ersistenceLivenessProbeSubscriptionTest.cs | 3 +- .../JournalInterceptors.cs | 58 ++++++++++++ .../LivenessProbeTimeoutSpec.cs | 92 +++++++++++++++++++ .../RegressionProbeFailureSpec.cs | 2 +- .../SnapshotInterceptors.cs | 58 ++++++++++++ .../AkkaPersistenceLivenessProbe.cs | 45 +++++++-- .../AkkaPersistenceLivenessProbeProvider.cs | 9 +- .../Configuration/akka.healthcheck.conf | 9 +- 10 files changed, 263 insertions(+), 19 deletions(-) create mode 100644 src/Akka.HealthCheck.Persistence.Tests/JournalInterceptors.cs create mode 100644 src/Akka.HealthCheck.Persistence.Tests/LivenessProbeTimeoutSpec.cs create mode 100644 src/Akka.HealthCheck.Persistence.Tests/SnapshotInterceptors.cs diff --git a/src/Akka.HealthCheck.Persistence.Tests/AkkaPersistenceLivenessProbeNotAvailableDueToSnapshotStoreSpecs.cs b/src/Akka.HealthCheck.Persistence.Tests/AkkaPersistenceLivenessProbeNotAvailableDueToSnapshotStoreSpecs.cs index 0166af2..e10483d 100644 --- a/src/Akka.HealthCheck.Persistence.Tests/AkkaPersistenceLivenessProbeNotAvailableDueToSnapshotStoreSpecs.cs +++ b/src/Akka.HealthCheck.Persistence.Tests/AkkaPersistenceLivenessProbeNotAvailableDueToSnapshotStoreSpecs.cs @@ -10,6 +10,7 @@ using FluentAssertions; using System; using System.Threading.Tasks; +using FluentAssertions.Extensions; using Xunit; using Xunit.Abstractions; using static Akka.HealthCheck.Persistence.AkkaPersistenceLivenessProbe; @@ -27,7 +28,7 @@ public AkkaPersistenceLivenessProbeNotAvailableDueToSnapshotStoreSpecs(ITestOutp public void AkkaPersistenceLivenessProbeProvidert_Should_Report_Akka_Persistance_Is_Unavailable_With_Bad_Snapshot_Store_Setup() { - var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, TimeSpan.FromMilliseconds(250)))); + var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 3.Seconds()))); ProbActor.Tell(new SubscribeToLiveness(TestActor)); ExpectMsg().IsLive.Should().BeFalse("System should not be live"); ExpectMsg(TimeSpan.FromMinutes(1)).IsLive.Should().BeFalse("System should not be live"); diff --git a/src/Akka.HealthCheck.Persistence.Tests/AkkaPersistenceLivenessProbeNotAvailableduetoJournalSpecs.cs b/src/Akka.HealthCheck.Persistence.Tests/AkkaPersistenceLivenessProbeNotAvailableduetoJournalSpecs.cs index b96616b..759eae4 100644 --- a/src/Akka.HealthCheck.Persistence.Tests/AkkaPersistenceLivenessProbeNotAvailableduetoJournalSpecs.cs +++ b/src/Akka.HealthCheck.Persistence.Tests/AkkaPersistenceLivenessProbeNotAvailableduetoJournalSpecs.cs @@ -8,6 +8,7 @@ using Akka.Util.Internal; using FluentAssertions; using System; +using FluentAssertions.Extensions; using Xunit; using Xunit.Abstractions; using static Akka.HealthCheck.Persistence.AkkaPersistenceLivenessProbe; @@ -26,7 +27,7 @@ public AkkaPersistenceLivenessProbeNotAvailableDueToJournalSpecs(ITestOutputHelp public void AkkaPersistenceLivenessProbeProvidert_Should_Report_Akka_Persistance_Is_Unavailable_With_Bad_Journal_Setup() { - var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, TimeSpan.FromMilliseconds(250)))); + var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 3.Seconds()))); ProbActor.Tell(new SubscribeToLiveness(TestActor)); ExpectMsg().IsLive.Should().BeFalse("System should not be live"); ExpectMsg(TimeSpan.FromMinutes(1)).IsLive.Should().BeFalse("System should not be live"); diff --git a/src/Akka.HealthCheck.Persistence.Tests/AkkaPersistenceLivenessProbeSubscriptionTest.cs b/src/Akka.HealthCheck.Persistence.Tests/AkkaPersistenceLivenessProbeSubscriptionTest.cs index dd37510..2968a4a 100644 --- a/src/Akka.HealthCheck.Persistence.Tests/AkkaPersistenceLivenessProbeSubscriptionTest.cs +++ b/src/Akka.HealthCheck.Persistence.Tests/AkkaPersistenceLivenessProbeSubscriptionTest.cs @@ -9,6 +9,7 @@ using Akka.Util.Internal; using FluentAssertions; using System; +using FluentAssertions.Extensions; using Xunit; using Xunit.Abstractions; @@ -27,7 +28,7 @@ public AkkaPersistenceLivenessProbeSubscriptionTest(ITestOutputHelper helper) [Fact(DisplayName = "AkkaPersistenceLivenessProbe should correctly handle subscription requests")] public void AkkaPersistenceLivenessProbe_Should_Handle_Subscriptions_In_Any_State() { - var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, TimeSpan.FromMilliseconds(250)))); + var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 3.Seconds()))); ProbActor.Tell(new SubscribeToLiveness(TestActor)); ExpectMsg().IsLive.Should().BeFalse(); AwaitAssert(() => ExpectMsg().IsLive.Should().BeTrue(),TimeSpan.FromSeconds(10)); diff --git a/src/Akka.HealthCheck.Persistence.Tests/JournalInterceptors.cs b/src/Akka.HealthCheck.Persistence.Tests/JournalInterceptors.cs new file mode 100644 index 0000000..ad87dc3 --- /dev/null +++ b/src/Akka.HealthCheck.Persistence.Tests/JournalInterceptors.cs @@ -0,0 +1,58 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015 - 2024 Petabridge, LLC +// +// ----------------------------------------------------------------------- + +using System; +using System.Threading; +using System.Threading.Tasks; +using Akka.Persistence; +using Akka.Persistence.TestKit; + +namespace Akka.HealthCheck.Persistence.Tests; + +public static class JournalInterceptors +{ + internal class Noop : IJournalInterceptor + { + public static readonly Noop Instance = new (); + + private Noop() + {} + + public Task InterceptAsync(IPersistentRepresentation message) => Task.FromResult(true); + } + + public class CancelableDelay: IJournalInterceptor + { + public CancelableDelay(TimeSpan delay, IJournalInterceptor next, CancellationToken cancellationToken) + { + _delay = delay; + _next = next; + _cancellationToken = cancellationToken; + } + + private readonly TimeSpan _delay; + private readonly IJournalInterceptor _next; + private readonly CancellationToken _cancellationToken; + + public async Task InterceptAsync(IPersistentRepresentation message) + { + try + { + await Task.Delay(_delay, _cancellationToken); + } + catch (OperationCanceledException) + { + // no-op + } + catch (TimeoutException) + { + // no-op + } + await _next.InterceptAsync(message); + } + } + +} \ No newline at end of file diff --git a/src/Akka.HealthCheck.Persistence.Tests/LivenessProbeTimeoutSpec.cs b/src/Akka.HealthCheck.Persistence.Tests/LivenessProbeTimeoutSpec.cs new file mode 100644 index 0000000..fbe8f7b --- /dev/null +++ b/src/Akka.HealthCheck.Persistence.Tests/LivenessProbeTimeoutSpec.cs @@ -0,0 +1,92 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015 - 2024 Petabridge, LLC +// +// ----------------------------------------------------------------------- + +using System; +using System.Threading; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.HealthCheck.Liveness; +using Akka.Persistence.TestKit; +using FluentAssertions; +using FluentAssertions.Extensions; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.HealthCheck.Persistence.Tests; + +public class LivenessProbeTimeoutSpec: PersistenceTestKit +{ + public LivenessProbeTimeoutSpec(ITestOutputHelper output) : base(nameof(LivenessProbeTimeoutSpec), output) + { + } + + [Fact(DisplayName = "AkkaPersistenceLivenessProbe should time out if SaveSnapshot does not respond")] + public async Task SaveSnapshotTimeoutTest() + { + using var cts = new CancellationTokenSource(); + var delay = new SnapshotInterceptors.CancelableDelay(30.Minutes(), SnapshotInterceptors.Noop.Instance, cts.Token); + + await WithSnapshotSave( + save => save.SetInterceptorAsync(delay), + () => TestTimeout(cts)); + } + + [Fact(DisplayName = "AkkaPersistenceLivenessProbe should time out if snapshot recovery does not respond")] + public async Task SnapshotLoadTimeoutTest() + { + using var cts = new CancellationTokenSource(); + var delay = new SnapshotInterceptors.CancelableDelay(30.Minutes(), SnapshotInterceptors.Noop.Instance, cts.Token); + + await WithSnapshotLoad( + save => save.SetInterceptorAsync(delay), + () => TestTimeout(cts)); + } + + [Fact(DisplayName = "AkkaPersistenceLivenessProbe should time out if journal Persist does not respond")] + public async Task JournalPersistTimeoutTest() + { + using var cts = new CancellationTokenSource(); + var delay = new JournalInterceptors.CancelableDelay(30.Minutes(), JournalInterceptors.Noop.Instance, cts.Token); + + await WithJournalWrite( + save => save.SetInterceptorAsync(delay), + () => TestTimeout(cts)); + } + + [Fact(DisplayName = "AkkaPersistenceLivenessProbe should time out if journal recovery does not respond")] + public async Task JournalRecoveryTimeoutTest() + { + using var cts = new CancellationTokenSource(); + var delay = new JournalInterceptors.CancelableDelay(30.Minutes(), JournalInterceptors.Noop.Instance, cts.Token); + + await WithJournalRecovery( + save => save.SetInterceptorAsync(delay), + () => TestTimeout(cts)); + } + + private async Task TestTimeout(CancellationTokenSource cts) + { + var probeActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 500.Milliseconds()))); + probeActor.Tell(new SubscribeToLiveness(TestActor)); + var status = ExpectMsg(); + status.IsLive.Should().BeFalse(); + status.StatusMessage.Should().StartWith("Warming up probe."); + + var timeoutStatusObj = await FishForMessageAsync( + msg => msg is LivenessStatus stat && !stat.StatusMessage.StartsWith("Warming up probe."), + 6.Seconds()); + + var timeoutStatus = (LivenessStatus)timeoutStatusObj; + timeoutStatus.IsLive.Should().BeFalse(); + timeoutStatus.StatusMessage.Should().StartWith("Timeout while checking persistence liveness."); + + cts.Cancel(); + + await AwaitAssertAsync( + () => ExpectMsg().IsLive.Should().BeTrue(), + TimeSpan.FromSeconds(10)); + } +} \ No newline at end of file diff --git a/src/Akka.HealthCheck.Persistence.Tests/RegressionProbeFailureSpec.cs b/src/Akka.HealthCheck.Persistence.Tests/RegressionProbeFailureSpec.cs index 3df873c..ae97b2c 100644 --- a/src/Akka.HealthCheck.Persistence.Tests/RegressionProbeFailureSpec.cs +++ b/src/Akka.HealthCheck.Persistence.Tests/RegressionProbeFailureSpec.cs @@ -31,7 +31,7 @@ await WithSnapshotLoad(load => load.Fail(), async () => { Sys.EventStream.Subscribe(TestActor, typeof(LogEvent)); var probe = Sys.ActorOf(Props.Create(() => - new AkkaPersistenceLivenessProbe(true, TimeSpan.FromMilliseconds(400)))); + new AkkaPersistenceLivenessProbe(true, 400.Milliseconds(), 3.Seconds()))); await FishForMessageAsync(e => e.Message.ToString() is "Recreating persistence probe."); var stopwatch = Stopwatch.StartNew(); diff --git a/src/Akka.HealthCheck.Persistence.Tests/SnapshotInterceptors.cs b/src/Akka.HealthCheck.Persistence.Tests/SnapshotInterceptors.cs new file mode 100644 index 0000000..439ccdc --- /dev/null +++ b/src/Akka.HealthCheck.Persistence.Tests/SnapshotInterceptors.cs @@ -0,0 +1,58 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015 - 2024 Petabridge, LLC +// +// ----------------------------------------------------------------------- + +using System; +using System.Threading; +using System.Threading.Tasks; +using Akka.Persistence; +using Akka.Persistence.TestKit; + +namespace Akka.HealthCheck.Persistence.Tests; + +public static class SnapshotInterceptors +{ + public class Noop : ISnapshotStoreInterceptor + { + public static readonly Noop Instance = new (); + + private Noop() + { + } + + public Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria criteria) => Task.FromResult(true); + } + + public class CancelableDelay: ISnapshotStoreInterceptor + { + public CancelableDelay(TimeSpan delay, ISnapshotStoreInterceptor next, CancellationToken cancellationToken) + { + _delay = delay; + _next = next; + _cancellationToken = cancellationToken; + } + + private readonly TimeSpan _delay; + private readonly ISnapshotStoreInterceptor _next; + private readonly CancellationToken _cancellationToken; + + public async Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria criteria) + { + try + { + await Task.Delay(_delay, _cancellationToken); + } + catch (OperationCanceledException) + { + // no-op + } + catch (TimeoutException) + { + // no-op + } + await _next.InterceptAsync(persistenceId, criteria); + } + } +} diff --git a/src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbe.cs b/src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbe.cs index d945fe4..25f8ff7 100644 --- a/src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbe.cs +++ b/src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbe.cs @@ -75,41 +75,53 @@ private CreateProbe() { } } + + internal sealed class CheckTimeout + { + public static readonly CheckTimeout Instance = new(); + + private CheckTimeout() + { + } + } - public class AkkaPersistenceLivenessProbe : ActorBase + public class AkkaPersistenceLivenessProbe : ActorBase, IWithTimers { + private const string TimeoutTimerKey = nameof(TimeoutTimerKey); + private const string CreateProbeTimerKey = nameof(CreateProbeTimerKey); + private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly HashSet _subscribers = new HashSet(); private PersistenceLivenessStatus _currentLivenessStatus = new(message: "Warming up probe. Recovery status is still undefined"); private IActorRef? _probe; private int _probeCounter; private readonly TimeSpan _delay; + private readonly TimeSpan _timeout; private readonly string _id; - private readonly Cancelable _shutdownCancellable; private readonly bool _logInfo; - public AkkaPersistenceLivenessProbe(bool logInfo, TimeSpan delay) + public AkkaPersistenceLivenessProbe(bool logInfo, TimeSpan delay, TimeSpan timeout) { _delay = delay; + _timeout = timeout; _id = Guid.NewGuid().ToString("N"); - _shutdownCancellable = new Cancelable(Context.System.Scheduler); _logInfo = logInfo; Become(obj => HandleMessages(obj) || HandleSubscriptions(obj)); } - public static Props PersistentHealthCheckProps(bool logInfo, TimeSpan delay) + public ITimerScheduler Timers { get; set; } = null!; + + public static Props PersistentHealthCheckProps(bool logInfo, TimeSpan delay, TimeSpan timeout) { // need to use the stopping strategy in case things blow up right away - return Props.Create(() => new AkkaPersistenceLivenessProbe(logInfo, delay)) + return Props.Create(() => new AkkaPersistenceLivenessProbe(logInfo, delay, timeout)) .WithSupervisorStrategy(Actor.SupervisorStrategy.StoppingStrategy); } protected override void PostStop() { _probe?.Tell(PoisonPill.Instance); - _shutdownCancellable.Cancel(); - _shutdownCancellable.Dispose(); base.PostStop(); } @@ -156,6 +168,8 @@ private bool HandleMessages(object message) _probe = null; if(_logInfo) _log.Debug($"Persistence probe terminated. Recreating in {_delay.TotalSeconds} seconds."); + + Timers.CancelAll(); ScheduleProbeRestart(); return true; @@ -163,13 +177,26 @@ private bool HandleMessages(object message) if(_logInfo) _log.Debug("Recreating persistence probe."); + Timers.StartSingleTimer(TimeoutTimerKey, CheckTimeout.Instance, _timeout); _probe = Context.ActorOf(Props.Create(() => new SuicideProbe(Self, _probeCounter == 0, _id))); Context.Watch(_probe); _probe.Tell("hit" + _probeCounter); _probeCounter++; return true; + case CheckTimeout: + const string errMsg = "Timeout while checking persistence liveness. Recovery status is undefined."; + _log.Warning(errMsg); + _currentLivenessStatus = new PersistenceLivenessStatus(errMsg); + PublishStatusUpdates(); + + if(_probe is not null) + Context.Stop(_probe); + + return true; + case PersistenceLivenessStatus status: + Timers.CancelAll(); HandleRecoveryStatus(status); return true; } @@ -189,7 +216,7 @@ protected override void PreStart() private void ScheduleProbeRestart() { - Context.System.Scheduler.ScheduleTellOnce(_delay, Self, CreateProbe.Instance, Self, _shutdownCancellable); + Timers.StartSingleTimer(CreateProbeTimerKey, CreateProbe.Instance, _delay); } private void PublishStatusUpdates() diff --git a/src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbeProvider.cs b/src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbeProvider.cs index bf199bd..b571b88 100644 --- a/src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbeProvider.cs +++ b/src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbeProvider.cs @@ -15,15 +15,16 @@ namespace Akka.HealthCheck.Persistence public sealed class AkkaPersistenceLivenessProbeProvider : ProbeProviderBase { private readonly TimeSpan _interval; + private readonly TimeSpan _timeout; public AkkaPersistenceLivenessProbeProvider(ActorSystem system) : base(system) { - _interval = system.Settings.Config.GetTimeSpan( - path: "akka.healthcheck.liveness.persistence.probe-interval", - @default: TimeSpan.FromSeconds(10)); + var config = system.Settings.Config.GetConfig("akka.healthcheck.liveness.persistence"); + _interval = config.GetTimeSpan("probe-interval", TimeSpan.FromSeconds(10)); + _timeout = config.GetTimeSpan("timeout", TimeSpan.FromSeconds(3)); } public override Props ProbeProps => - AkkaPersistenceLivenessProbe.PersistentHealthCheckProps(Settings.LogInfoEvents, _interval); + AkkaPersistenceLivenessProbe.PersistentHealthCheckProps(Settings.LogInfoEvents, _interval, _timeout); } } \ No newline at end of file diff --git a/src/Akka.HealthCheck/Configuration/akka.healthcheck.conf b/src/Akka.HealthCheck/Configuration/akka.healthcheck.conf index a3c5e20..598f51c 100644 --- a/src/Akka.HealthCheck/Configuration/akka.healthcheck.conf +++ b/src/Akka.HealthCheck/Configuration/akka.healthcheck.conf @@ -30,8 +30,13 @@ akka.healthcheck{ #persistence = "Akka.HealthCheck.Cluster.AkkaPersistenceLivenessProbeProvider, Akka.HealthCheck.Persistence" } - # Defines the interval for each persistence liveness health check probe refresh - persistence.probe-interval = 10s + persistence { + # Defines the interval for each persistence liveness health check probe refresh + probe-interval = 10s + + # Defines the timeout for each liveness check operation + timeout = 3s + } # Defines the signaling mechanism used to communicate with K8s, AWS, Azure, # or whatever the hosting environment is for the Akka.NET application. The