diff --git a/src/Akka.HealthCheck.Persistence.Tests/ProbeFailureSpec.cs b/src/Akka.HealthCheck.Persistence.Tests/ProbeFailureSpec.cs index 3c1177c..a4555ab 100644 --- a/src/Akka.HealthCheck.Persistence.Tests/ProbeFailureSpec.cs +++ b/src/Akka.HealthCheck.Persistence.Tests/ProbeFailureSpec.cs @@ -7,8 +7,10 @@ using System; using System.Threading.Tasks; using Akka.Actor; +using Akka.HealthCheck.Liveness; using Akka.Persistence.TestKit; using FluentAssertions; +using FluentAssertions.Extensions; using Xunit; using Xunit.Abstractions; @@ -20,7 +22,7 @@ public class ProbeFailureSpec: PersistenceTestKit private readonly string _id = Guid.NewGuid().ToString("N"); private int _count; - public ProbeFailureSpec(ITestOutputHelper output) : base(nameof(ProbeFailureSpec), output) + public ProbeFailureSpec(ITestOutputHelper output) : base("akka.loglevel = DEBUG", nameof(ProbeFailureSpec), output) { } @@ -29,9 +31,9 @@ public void SuccessfulFirstProbeTest() { var status = PerformProbe(); status.IsLive.Should().BeFalse(); - status.JournalRecovered.Should().BeFalse(); + status.JournalRecovered.Should().BeNull(); status.JournalPersisted.Should().BeTrue(); - status.SnapshotRecovered.Should().BeFalse(); + status.SnapshotRecovered.Should().BeNull(); status.SnapshotSaved.Should().BeTrue(); status.StatusMessage.Should().StartWith("Warming up probe."); status.Failures.Should().BeNull(); @@ -57,9 +59,9 @@ await WithJournalWrite(write => write.Fail(), () => { var status = PerformProbe(); status.IsLive.Should().BeFalse(); - status.JournalRecovered.Should().BeFalse(); + status.JournalRecovered.Should().BeNull(); status.JournalPersisted.Should().BeFalse(); - status.SnapshotRecovered.Should().BeFalse(); + status.SnapshotRecovered.Should().BeNull(); status.SnapshotSaved.Should().BeTrue(); var e = status.Failures!.Flatten().InnerExceptions[0]; e.Should().BeOfType(); @@ -74,9 +76,9 @@ await WithJournalWrite(write => write.Reject(), () => { var status = PerformProbe(); status.IsLive.Should().BeFalse(); - status.JournalRecovered.Should().BeFalse(); + status.JournalRecovered.Should().BeNull(); status.JournalPersisted.Should().BeFalse(); - status.SnapshotRecovered.Should().BeFalse(); + status.SnapshotRecovered.Should().BeNull(); status.SnapshotSaved.Should().BeTrue(); var e = status.Failures!.Flatten().InnerExceptions[0]; e.Should().BeOfType(); @@ -91,9 +93,9 @@ await WithSnapshotSave(save => save.Fail(), () => { var status = PerformProbe(); status.IsLive.Should().BeFalse(); - status.JournalRecovered.Should().BeFalse(); + status.JournalRecovered.Should().BeNull(); status.JournalPersisted.Should().BeTrue(); - status.SnapshotRecovered.Should().BeFalse(); + status.SnapshotRecovered.Should().BeNull(); status.SnapshotSaved.Should().BeFalse(); var e = status.Failures!.Flatten().InnerExceptions[0]; e.Should().BeOfType(); @@ -223,11 +225,49 @@ await WithJournalDelete(delete => delete.Fail(), () => } */ + [Fact(DisplayName = "AkkaPersistenceLivenessProbe should retry probe if SaveSnapshot failed")] + public async Task SaveSnapshotFailTest() + { + await WithSnapshotSave( + save => save.SetInterceptorAsync(new SnapshotInterceptors.DelayOnce(1.Seconds(), new SnapshotInterceptors.Failure(1))), + async () => + { + var probeActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 500.Seconds()))); + probeActor.Tell(new SubscribeToLiveness(TestActor)); + + probeActor.Tell(GetCurrentLiveness.Instance); + var status = ExpectMsg(); + status.IsLive.Should().BeFalse(); + status.StatusMessage.Should().StartWith("Warming up probe."); + + var failStatus = await FishForMessageAsync( + msg => !msg.StatusMessage.StartsWith("Warming up probe."), + 6.Seconds()); + + failStatus.IsLive.Should().BeFalse(); + failStatus.JournalPersisted.Should().BeTrue(); + failStatus.JournalRecovered.Should().BeNull(); + failStatus.SnapshotSaved.Should().BeFalse(); + failStatus.SnapshotRecovered.Should().BeNull(); + + failStatus = await ExpectMsgAsync(6.Seconds()); + + failStatus.IsLive.Should().BeFalse(); + failStatus.JournalPersisted.Should().BeTrue(); + failStatus.JournalRecovered.Should().BeTrue(); + failStatus.SnapshotSaved.Should().BeTrue(); + failStatus.SnapshotRecovered.Should().BeNull(); + + var successStatus = await ExpectMsgAsync(6.Seconds()); + successStatus.IsLive.Should().BeTrue(); + }); + } + private PersistenceLivenessStatus PerformProbe() { var first = _count == 0; _count++; - var liveProbe = ActorOf(() => new SuicideProbe(TestActor, first, _id)); + var liveProbe = ActorOf(() => new SuicideProbe(TestActor, first, _id, true)); Watch(liveProbe); liveProbe.Tell("hit"); var status = ExpectMsg(); @@ -243,9 +283,9 @@ private void AssertFirstProbe() throw new Exception("Must be called as the first probe!"); var status = PerformProbe(); - status.JournalRecovered.Should().BeFalse(); + status.JournalRecovered.Should().BeNull(); status.JournalPersisted.Should().BeTrue(); - status.SnapshotRecovered.Should().BeFalse(); + status.SnapshotRecovered.Should().BeNull(); status.SnapshotSaved.Should().BeTrue(); status.StatusMessage.Should().StartWith("Warming up probe."); status.Failures.Should().BeNull(); diff --git a/src/Akka.HealthCheck.Persistence.Tests/SnapshotInterceptors.cs b/src/Akka.HealthCheck.Persistence.Tests/SnapshotInterceptors.cs index 439ccdc..ccd6b5c 100644 --- a/src/Akka.HealthCheck.Persistence.Tests/SnapshotInterceptors.cs +++ b/src/Akka.HealthCheck.Persistence.Tests/SnapshotInterceptors.cs @@ -54,5 +54,54 @@ public async Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria } await _next.InterceptAsync(persistenceId, criteria); } - } + } + + public class DelayOnce: ISnapshotStoreInterceptor + { + public DelayOnce(TimeSpan delay, ISnapshotStoreInterceptor next) + { + _delay = delay; + _next = next; + } + + private readonly TimeSpan _delay; + private readonly ISnapshotStoreInterceptor _next; + private bool _delayed; + + public async Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria criteria) + { + if (!_delayed) + { + _delayed = true; + await Task.Delay(_delay); + } + await _next.InterceptAsync(persistenceId, criteria); + } + } + + + public class Failure : ISnapshotStoreInterceptor + { + public Failure(int times, ISnapshotStoreInterceptor? next = null) + { + _times = times; + _next = next ?? Noop.Instance; + } + + private readonly int _times; + private readonly ISnapshotStoreInterceptor _next; + private int _count; + + public Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria criteria) + { + if (_count >= _times) + { + _next.InterceptAsync(persistenceId, criteria); + return Task.CompletedTask; + } + + _count++; + throw new TestSnapshotStoreFailureException($"Failing snapshot {_count}/{_times}"); + } + } } diff --git a/src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbe.cs b/src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbe.cs index 25f8ff7..821cadc 100644 --- a/src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbe.cs +++ b/src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbe.cs @@ -18,16 +18,16 @@ public sealed class PersistenceLivenessStatus: LivenessStatus, INoSerializationV { private readonly string? _message; - public PersistenceLivenessStatus(string message): this(false, false, false, false, Array.Empty(), message) + public PersistenceLivenessStatus(string message): this(null, null, false, false, Array.Empty(), message) { } - + public PersistenceLivenessStatus( - bool journalRecovered, - bool snapshotRecovered, + bool? journalRecovered, + bool? snapshotRecovered, bool journalPersisted, bool snapshotSaved, - IReadOnlyCollection failures, + IReadOnlyCollection failures, string? message = null): base(false) { JournalRecovered = journalRecovered; @@ -38,17 +38,17 @@ public PersistenceLivenessStatus( _message = message; } - public override bool IsLive => JournalRecovered - && SnapshotRecovered + public override bool IsLive => JournalRecovered is true + && SnapshotRecovered is true && JournalPersisted && SnapshotSaved && Failures is null; public override string StatusMessage => _message ?? ToString(); - public bool JournalRecovered { get; } + public bool? JournalRecovered { get; } - public bool SnapshotRecovered { get; } + public bool? SnapshotRecovered { get; } public bool JournalPersisted { get; } @@ -59,8 +59,8 @@ public PersistenceLivenessStatus( public override string ToString() { return $"{nameof(PersistenceLivenessStatus)}(" + - $"{nameof(JournalRecovered)}={JournalRecovered}, " + - $"{nameof(SnapshotRecovered)}={SnapshotRecovered}, " + + $"{nameof(JournalRecovered)}={JournalRecovered?.ToString() ?? "undefined"}, " + + $"{nameof(SnapshotRecovered)}={SnapshotRecovered?.ToString() ?? "undefined"}, " + $"{nameof(JournalPersisted)}={JournalPersisted}, " + $"{nameof(SnapshotSaved)}={SnapshotSaved}, " + $"{nameof(Failures)}={Failures?.ToString() ?? "null"})"; @@ -95,6 +95,7 @@ public class AkkaPersistenceLivenessProbe : ActorBase, IWithTimers private PersistenceLivenessStatus _currentLivenessStatus = new(message: "Warming up probe. Recovery status is still undefined"); private IActorRef? _probe; private int _probeCounter; + private bool _warmup = true; private readonly TimeSpan _delay; private readonly TimeSpan _timeout; private readonly string _id; @@ -154,8 +155,12 @@ private bool HandleSubscriptions(object msg) private void HandleRecoveryStatus(PersistenceLivenessStatus livenessStatus) { if(_logInfo) - _log.Debug("Received recovery status {0} from probe.", livenessStatus); + _log.Debug("Received recovery status {0} from probe", livenessStatus); + _currentLivenessStatus = livenessStatus; + if (_warmup && livenessStatus.SnapshotRecovered is not null && livenessStatus.JournalRecovered is not null) + _warmup = false; + PublishStatusUpdates(); } @@ -178,14 +183,14 @@ private bool HandleMessages(object message) _log.Debug("Recreating persistence probe."); Timers.StartSingleTimer(TimeoutTimerKey, CheckTimeout.Instance, _timeout); - _probe = Context.ActorOf(Props.Create(() => new SuicideProbe(Self, _probeCounter == 0, _id))); + _probe = Context.ActorOf(Props.Create(() => new SuicideProbe(Self, _warmup, _id, _logInfo))); Context.Watch(_probe); _probe.Tell("hit" + _probeCounter); _probeCounter++; return true; case CheckTimeout: - const string errMsg = "Timeout while checking persistence liveness. Recovery status is undefined."; + const string errMsg = "Timeout while checking persistence liveness. Persistence liveness status is undefined."; _log.Warning(errMsg); _currentLivenessStatus = new PersistenceLivenessStatus(errMsg); PublishStatusUpdates(); @@ -228,11 +233,12 @@ private void PublishStatusUpdates() /// /// Validate that the snapshot store and the journal and both working /// - internal class SuicideProbe : ReceivePersistentActor + internal class SuicideProbe : ReceivePersistentActor, IWithStash { private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly IActorRef _probe; private readonly bool _firstAttempt; + private readonly bool _debugLog; private string? _message; private bool? _recoveredJournal; @@ -243,39 +249,116 @@ internal class SuicideProbe : ReceivePersistentActor private bool? _deletedSnapshotStore; private readonly List _failures = new (); - public SuicideProbe(IActorRef probe, bool firstAttempt, string id) + public SuicideProbe(IActorRef probe, bool firstAttempt, string id, bool debugLog) { _probe = probe; _firstAttempt = firstAttempt; + _debugLog = debugLog; PersistenceId = $"Akka.HealthCheck-{id}"; + + Become(AwaitingRecovery); + } + private void AwaitingRecovery() + { Recover(_ => { _recoveredJournal = true; + if(_debugLog) + _log.Debug($"{PersistenceId}: Journal recovered"); }); Recover(_ => { _recoveredSnapshotStore = true; + if(_debugLog) + _log.Debug($"{PersistenceId}: Snapshot recovered"); }); Recover(_ => { + if(_debugLog) + _log.Debug($"{PersistenceId}: Recovery complete"); DeleteMessages(long.MaxValue); DeleteSnapshots(new SnapshotSelectionCriteria(long.MaxValue)); }); + + Command(_ => + { + _deletedJournal = true; + if(_debugLog) + _log.Debug($"{PersistenceId}: Journal events deleted"); + + if(_deletedSnapshotStore is not null) + { + Become(Active); + Stash.UnstashAll(); + } + }); + + Command(fail => + { + _failures.Add(fail.Cause); + _deletedJournal = false; + if(_debugLog) + _log.Debug($"{PersistenceId}: Failed to delete journal events"); + + if(_deletedSnapshotStore is not null) + { + Become(Active); + Stash.UnstashAll(); + } + }); + + Command(_ => + { + _deletedSnapshotStore = true; + if(_debugLog) + _log.Debug($"{PersistenceId}: Snapshot deleted"); + + if(_deletedJournal is not null) + { + Become(Active); + Stash.UnstashAll(); + } + }); + + Command(fail => + { + _failures.Add(fail.Cause); + _deletedSnapshotStore = false; + if(_debugLog) + _log.Debug($"{PersistenceId}: Failed to delete snapshot"); + + if(_deletedJournal is not null) + { + Become(Active); + Stash.UnstashAll(); + } + }); + + CommandAny(_ => Stash.Stash()); + } + private void Active() + { Command(str => { _message = str; + if(_debugLog) + _log.Debug($"{PersistenceId}: Probe started, saving snapshot"); SaveSnapshot(str); }); Command(_ => { _persistedSnapshotStore = true; + if(_debugLog) + _log.Debug($"{PersistenceId}: Snapshot saved"); Persist(_message, _ => { _persistedJournal = true; + if(_debugLog) + _log.Debug($"{PersistenceId}: Journal persisted"); SendRecoveryStatusWhenFinished(); }); }); @@ -290,35 +373,12 @@ public SuicideProbe(IActorRef probe, bool firstAttempt, string id) _ => { _persistedJournal = true; + if(_debugLog) + _log.Debug($"{PersistenceId}: Journal persisted"); SendRecoveryStatusWhenFinished(); }); }); - - Command(_ => - { - _deletedJournal = true; - SendRecoveryStatusWhenFinished(); - }); - - Command(fail => - { - _failures.Add(fail.Cause); - _deletedJournal = false; - SendRecoveryStatusWhenFinished(); - }); - - Command(_ => - { - _deletedSnapshotStore = true; - SendRecoveryStatusWhenFinished(); - }); - - Command(fail => - { - _failures.Add(fail.Cause); - _deletedSnapshotStore = false; - SendRecoveryStatusWhenFinished(); - }); + } public override string PersistenceId { get; } @@ -330,6 +390,10 @@ private void SendRecoveryStatusWhenFinished() { _probe.Tell(CreateStatus()); Context.Stop(Self); + if(_debugLog) + _log.Debug($"{PersistenceId}: First case: " + + $"_persistedJournal:{_persistedJournal} " + + $"_persistedSnapshotStore:{_persistedSnapshotStore}, "); return; } @@ -338,23 +402,36 @@ private void SendRecoveryStatusWhenFinished() && _persistedJournal is { } && _persistedSnapshotStore is { }) { - var msg = _persistedJournal == true && _persistedSnapshotStore == true + var msg = _persistedJournal == true && + _persistedSnapshotStore == true && + (_recoveredJournal is null || _recoveredSnapshotStore is null) ? "Warming up probe. Recovery status is still undefined" : null; _probe.Tell(CreateStatus(msg)); Context.Stop(Self); + if (_debugLog) + _log.Debug($"{PersistenceId}: Second case: " + + $"_persistedJournal:{_persistedJournal} " + + $"_persistedSnapshotStore:{_persistedSnapshotStore}"); + return; } // Third case, all fields should be populated - if (_recoveredJournal is { } - && _recoveredSnapshotStore is { } - && _persistedJournal is { } + if (_persistedJournal is { } && _persistedSnapshotStore is { } && _deletedJournal is { } && _deletedSnapshotStore is { }) { _probe.Tell(CreateStatus()); Context.Stop(Self); + if(_debugLog) + _log.Debug($"{PersistenceId}: Third case: " + + $"_persistedJournal:{_persistedJournal}, " + + $"_persistedSnapshotStore:{_persistedSnapshotStore}, " + + $"_recoveredJournal:{_recoveredJournal?.ToString() ?? "null"} " + + $"_recoveredSnapshotStore:{_recoveredSnapshotStore?.ToString() ?? "null"} " + + $"_deletedJournal:{_deletedJournal} " + + $"_deletedSnapshotStore:{_deletedSnapshotStore}"); } } @@ -387,12 +464,20 @@ protected override void OnRecoveryFailure(Exception reason, object? message = nu } private PersistenceLivenessStatus CreateStatus(string? message = null) - => new PersistenceLivenessStatus( - journalRecovered: _recoveredJournal ?? false, - snapshotRecovered: _recoveredSnapshotStore ?? false, - journalPersisted: _persistedJournal ?? false, - snapshotSaved: _persistedSnapshotStore ?? false, - failures: _failures, - message: message); + => _firstAttempt + ? new PersistenceLivenessStatus( + journalRecovered: _recoveredJournal, + snapshotRecovered: _recoveredSnapshotStore, + journalPersisted: _persistedJournal ?? false, + snapshotSaved: _persistedSnapshotStore ?? false, + failures: _failures, + message: message) + : new PersistenceLivenessStatus( + journalRecovered: _recoveredJournal ?? false, + snapshotRecovered: _recoveredSnapshotStore ?? false, + journalPersisted: _persistedJournal ?? false, + snapshotSaved: _persistedSnapshotStore ?? false, + failures: _failures, + message: message); } } \ No newline at end of file