Skip to content

Commit

Permalink
Fix Persistence.Healthcheck probe stall because of failed warmup (#275)
Browse files Browse the repository at this point in the history
* Fix Persistence.Healthcheck probe stall because of failed warmup

* Make sure that the probe doesn't stuck in warm up state forever

* Simplify warmup detection

* Simplify warmup detection
  • Loading branch information
Arkatufus authored Mar 25, 2024
1 parent 145aec5 commit 121db73
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 66 deletions.
64 changes: 52 additions & 12 deletions src/Akka.HealthCheck.Persistence.Tests/ProbeFailureSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.HealthCheck.Liveness;
using Akka.Persistence.TestKit;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;

Expand All @@ -20,7 +22,7 @@ public class ProbeFailureSpec: PersistenceTestKit
private readonly string _id = Guid.NewGuid().ToString("N");
private int _count;

public ProbeFailureSpec(ITestOutputHelper output) : base(nameof(ProbeFailureSpec), output)
public ProbeFailureSpec(ITestOutputHelper output) : base("akka.loglevel = DEBUG", nameof(ProbeFailureSpec), output)
{
}

Expand All @@ -29,9 +31,9 @@ public void SuccessfulFirstProbeTest()
{
var status = PerformProbe();
status.IsLive.Should().BeFalse();
status.JournalRecovered.Should().BeFalse();
status.JournalRecovered.Should().BeNull();
status.JournalPersisted.Should().BeTrue();
status.SnapshotRecovered.Should().BeFalse();
status.SnapshotRecovered.Should().BeNull();
status.SnapshotSaved.Should().BeTrue();
status.StatusMessage.Should().StartWith("Warming up probe.");
status.Failures.Should().BeNull();
Expand All @@ -57,9 +59,9 @@ await WithJournalWrite(write => write.Fail(), () =>
{
var status = PerformProbe();
status.IsLive.Should().BeFalse();
status.JournalRecovered.Should().BeFalse();
status.JournalRecovered.Should().BeNull();
status.JournalPersisted.Should().BeFalse();
status.SnapshotRecovered.Should().BeFalse();
status.SnapshotRecovered.Should().BeNull();
status.SnapshotSaved.Should().BeTrue();
var e = status.Failures!.Flatten().InnerExceptions[0];
e.Should().BeOfType<TestJournalFailureException>();
Expand All @@ -74,9 +76,9 @@ await WithJournalWrite(write => write.Reject(), () =>
{
var status = PerformProbe();
status.IsLive.Should().BeFalse();
status.JournalRecovered.Should().BeFalse();
status.JournalRecovered.Should().BeNull();
status.JournalPersisted.Should().BeFalse();
status.SnapshotRecovered.Should().BeFalse();
status.SnapshotRecovered.Should().BeNull();
status.SnapshotSaved.Should().BeTrue();
var e = status.Failures!.Flatten().InnerExceptions[0];
e.Should().BeOfType<TestJournalRejectionException>();
Expand All @@ -91,9 +93,9 @@ await WithSnapshotSave(save => save.Fail(), () =>
{
var status = PerformProbe();
status.IsLive.Should().BeFalse();
status.JournalRecovered.Should().BeFalse();
status.JournalRecovered.Should().BeNull();
status.JournalPersisted.Should().BeTrue();
status.SnapshotRecovered.Should().BeFalse();
status.SnapshotRecovered.Should().BeNull();
status.SnapshotSaved.Should().BeFalse();
var e = status.Failures!.Flatten().InnerExceptions[0];
e.Should().BeOfType<TestSnapshotStoreFailureException>();
Expand Down Expand Up @@ -223,11 +225,49 @@ await WithJournalDelete(delete => delete.Fail(), () =>
}
*/

[Fact(DisplayName = "AkkaPersistenceLivenessProbe should retry probe if SaveSnapshot failed")]
public async Task SaveSnapshotFailTest()
{
await WithSnapshotSave(
save => save.SetInterceptorAsync(new SnapshotInterceptors.DelayOnce(1.Seconds(), new SnapshotInterceptors.Failure(1))),
async () =>
{
var probeActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 500.Seconds())));
probeActor.Tell(new SubscribeToLiveness(TestActor));

probeActor.Tell(GetCurrentLiveness.Instance);
var status = ExpectMsg<LivenessStatus>();
status.IsLive.Should().BeFalse();
status.StatusMessage.Should().StartWith("Warming up probe.");

var failStatus = await FishForMessageAsync<PersistenceLivenessStatus>(
msg => !msg.StatusMessage.StartsWith("Warming up probe."),
6.Seconds());

failStatus.IsLive.Should().BeFalse();
failStatus.JournalPersisted.Should().BeTrue();
failStatus.JournalRecovered.Should().BeNull();
failStatus.SnapshotSaved.Should().BeFalse();
failStatus.SnapshotRecovered.Should().BeNull();

failStatus = await ExpectMsgAsync<PersistenceLivenessStatus>(6.Seconds());

failStatus.IsLive.Should().BeFalse();
failStatus.JournalPersisted.Should().BeTrue();
failStatus.JournalRecovered.Should().BeTrue();
failStatus.SnapshotSaved.Should().BeTrue();
failStatus.SnapshotRecovered.Should().BeNull();

var successStatus = await ExpectMsgAsync<PersistenceLivenessStatus>(6.Seconds());
successStatus.IsLive.Should().BeTrue();
});
}

private PersistenceLivenessStatus PerformProbe()
{
var first = _count == 0;
_count++;
var liveProbe = ActorOf(() => new SuicideProbe(TestActor, first, _id));
var liveProbe = ActorOf(() => new SuicideProbe(TestActor, first, _id, true));
Watch(liveProbe);
liveProbe.Tell("hit");
var status = ExpectMsg<PersistenceLivenessStatus>();
Expand All @@ -243,9 +283,9 @@ private void AssertFirstProbe()
throw new Exception("Must be called as the first probe!");

var status = PerformProbe();
status.JournalRecovered.Should().BeFalse();
status.JournalRecovered.Should().BeNull();
status.JournalPersisted.Should().BeTrue();
status.SnapshotRecovered.Should().BeFalse();
status.SnapshotRecovered.Should().BeNull();
status.SnapshotSaved.Should().BeTrue();
status.StatusMessage.Should().StartWith("Warming up probe.");
status.Failures.Should().BeNull();
Expand Down
51 changes: 50 additions & 1 deletion src/Akka.HealthCheck.Persistence.Tests/SnapshotInterceptors.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,54 @@ public async Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria
}
await _next.InterceptAsync(persistenceId, criteria);
}
}
}

public class DelayOnce: ISnapshotStoreInterceptor
{
public DelayOnce(TimeSpan delay, ISnapshotStoreInterceptor next)
{
_delay = delay;
_next = next;
}

private readonly TimeSpan _delay;
private readonly ISnapshotStoreInterceptor _next;
private bool _delayed;

public async Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria criteria)
{
if (!_delayed)
{
_delayed = true;
await Task.Delay(_delay);
}
await _next.InterceptAsync(persistenceId, criteria);
}
}


public class Failure : ISnapshotStoreInterceptor
{
public Failure(int times, ISnapshotStoreInterceptor? next = null)
{
_times = times;
_next = next ?? Noop.Instance;
}

private readonly int _times;
private readonly ISnapshotStoreInterceptor _next;
private int _count;

public Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria criteria)
{
if (_count >= _times)
{
_next.InterceptAsync(persistenceId, criteria);
return Task.CompletedTask;
}

_count++;
throw new TestSnapshotStoreFailureException($"Failing snapshot {_count}/{_times}");
}
}
}
Loading

0 comments on commit 121db73

Please sign in to comment.