From 39028ef92d19b6931b03d632e4c0b3963508aa0d Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sat, 9 Sep 2023 00:35:48 +0700 Subject: [PATCH] Fix suicide probe bug --- .../RegressionProbeFailureSpec.cs | 49 +++++++++++ .../AkkaPersistenceLivenessProbe.cs | 82 +++++++++---------- 2 files changed, 87 insertions(+), 44 deletions(-) create mode 100644 src/Akka.HealthCheck.Persistence.Tests/RegressionProbeFailureSpec.cs diff --git a/src/Akka.HealthCheck.Persistence.Tests/RegressionProbeFailureSpec.cs b/src/Akka.HealthCheck.Persistence.Tests/RegressionProbeFailureSpec.cs new file mode 100644 index 0000000..3df873c --- /dev/null +++ b/src/Akka.HealthCheck.Persistence.Tests/RegressionProbeFailureSpec.cs @@ -0,0 +1,49 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015 - 2023 Petabridge, LLC +// +// ----------------------------------------------------------------------- + +using System; +using System.Diagnostics; +using System.Linq; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Persistence.TestKit; +using FluentAssertions; +using FluentAssertions.Extensions; +using Xunit; +using Xunit.Abstractions; +using LogEvent = Akka.Event.LogEvent; + +namespace Akka.HealthCheck.Persistence.Tests; + +public class RegressionProbeFailureSpec: PersistenceTestKit +{ + public RegressionProbeFailureSpec(ITestOutputHelper output) : base("akka.loglevel = DEBUG", nameof(RegressionProbeFailureSpec), output) + { + } + + [Fact(DisplayName = "Probe should be performed in proper interval with snapshot recovery failure")] + public async Task IntervalTest() + { + await WithSnapshotLoad(load => load.Fail(), async () => + { + Sys.EventStream.Subscribe(TestActor, typeof(LogEvent)); + var probe = Sys.ActorOf(Props.Create(() => + new AkkaPersistenceLivenessProbe(true, TimeSpan.FromMilliseconds(400)))); + await FishForMessageAsync(e => e.Message.ToString() is "Recreating persistence probe."); + + var stopwatch = Stopwatch.StartNew(); + // Default circuit breaker max-failures is 10 + foreach (var _ in Enumerable.Range(0, 15)) + { + stopwatch.Restart(); + await FishForMessageAsync(e => e.Message.ToString() is "Recreating persistence probe."); + stopwatch.Stop(); + // In the original issue, suicide probe is being recreated immediately after failure without waiting + stopwatch.Elapsed.Should().BeGreaterThan(300.Milliseconds()); + } + }); + } +} \ No newline at end of file diff --git a/src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbe.cs b/src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbe.cs index 2cc56d0..ebcdc69 100644 --- a/src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbe.cs +++ b/src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbe.cs @@ -66,6 +66,15 @@ public override string ToString() $"{nameof(Failures)}={Failures?.ToString() ?? "null"})"; } } + + internal sealed class CreateProbe + { + public static readonly CreateProbe Instance = new(); + + private CreateProbe() + { + } + } public class AkkaPersistenceLivenessProbe : ActorBase { @@ -85,6 +94,8 @@ public AkkaPersistenceLivenessProbe(bool logInfo, TimeSpan delay) _id = Guid.NewGuid().ToString("N"); _shutdownCancellable = new Cancelable(Context.System.Scheduler); _logInfo = logInfo; + + Become(obj => HandleMessages(obj) || HandleSubscriptions(obj)); } public AkkaPersistenceLivenessProbe(bool logInfo) : this(logInfo, TimeSpan.FromSeconds(10)) { @@ -99,6 +110,7 @@ public static Props PersistentHealthCheckProps(bool logInfo) protected override void PostStop() { + _probe?.Tell(PoisonPill.Instance); _shutdownCancellable.Cancel(); _shutdownCancellable.Dispose(); base.PostStop(); @@ -130,25 +142,6 @@ private bool HandleSubscriptions(object msg) return true; } - private bool Started(object message) - { - switch (message) - { - case Terminated t when t.ActorRef.Equals(_probe): - Context.Unwatch(_probe); - if(_logInfo) - _log.Debug("Persistence probe terminated. Recreating..."); - CreateProbe(); - Become(obj => Recreating(obj) || HandleSubscriptions(obj)); - return true; - case PersistenceLivenessStatus status: - HandleRecoveryStatus(status); - return true; - } - - return false; - } - private void HandleRecoveryStatus(PersistenceLivenessStatus livenessStatus) { if(_logInfo) @@ -157,16 +150,28 @@ private void HandleRecoveryStatus(PersistenceLivenessStatus livenessStatus) PublishStatusUpdates(); } - private bool Recreating(object message) + private bool HandleMessages(object message) { switch (message) { case Terminated t when t.ActorRef.Equals(_probe): Context.Unwatch(_probe); + _probe = null; + if(_logInfo) + _log.Debug($"Persistence probe terminated. Recreating in {_delay.TotalSeconds} seconds."); + ScheduleProbeRestart(); + return true; + + case CreateProbe: if(_logInfo) - _log.Debug("Persistence probe terminated. Recreating..."); - CreateProbe(); + _log.Debug("Recreating persistence probe."); + + _probe = Context.ActorOf(Props.Create(() => new SuicideProbe(Self, _probeCounter == 0, _id))); + Context.Watch(_probe); + _probe.Tell("hit" + _probeCounter); + _probeCounter++; return true; + case PersistenceLivenessStatus status: HandleRecoveryStatus(status); return true; @@ -177,30 +182,19 @@ private bool Recreating(object message) protected override bool Receive(object message) { - return Started(message) || HandleSubscriptions(message); + throw new NotImplementedException("Should never hit this line"); } protected override void PreStart() { - CreateProbe(); + Self.Tell(CreateProbe.Instance); } - private void CreateProbe() + private void ScheduleProbeRestart() { - _probe = Context.ActorOf(Props.Create(() => new SuicideProbe(Self, _probeCounter == 0, _id))); - Context.Watch(_probe); - - if(_probeCounter == 0) - { - _probe.Tell("hit" + _probeCounter); - } - else - { - Context.System.Scheduler.ScheduleTellOnce(_delay, _probe, "hit" + _probeCounter, Self, _shutdownCancellable); - } - _probeCounter++; + Context.System.Scheduler.ScheduleTellOnce(_delay, Self, CreateProbe.Instance, Self, _shutdownCancellable); } - + private void PublishStatusUpdates() { foreach (var sub in _subscribers) sub.Tell(_currentLivenessStatus); @@ -223,7 +217,7 @@ internal class SuicideProbe : ReceivePersistentActor private bool? _persistedSnapshotStore; private bool? _deletedJournal; private bool? _deletedSnapshotStore; - private readonly List _failures = new List(); + private readonly List _failures = new (); public SuicideProbe(IActorRef probe, bool firstAttempt, string id) { @@ -231,11 +225,11 @@ public SuicideProbe(IActorRef probe, bool firstAttempt, string id) _firstAttempt = firstAttempt; PersistenceId = $"Akka.HealthCheck-{id}"; - Recover(str => + Recover(_ => { _recoveredJournal = true; }); - Recover(offer => + Recover(_ => { _recoveredSnapshotStore = true; }); @@ -251,11 +245,11 @@ public SuicideProbe(IActorRef probe, bool firstAttempt, string id) SaveSnapshot(str); }); - Command(save => + Command(_ => { _persistedSnapshotStore = true; Persist(_message, - s => + _ => { _persistedJournal = true; SendRecoveryStatusWhenFinished(); @@ -269,7 +263,7 @@ public SuicideProbe(IActorRef probe, bool firstAttempt, string id) _failures.Add(fail.Cause); _persistedSnapshotStore = false; Persist(_message, - s => + _ => { _persistedJournal = true; SendRecoveryStatusWhenFinished();