Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make persistence probe return degraded during warmup #282

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/Akka.HealthCheck.Cluster/ClusterLivenessProbe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace Akka.HealthCheck.Cluster
public sealed class ClusterLivenessProbe : ReceiveActor
{
public static readonly LivenessStatus DefaultClusterLivenessStatus =
new LivenessStatus(true, "not yet joined cluster");
LivenessStatus.Degraded("not yet joined cluster");

private readonly Akka.Cluster.Cluster _cluster = Akka.Cluster.Cluster.Get(Context.System);
private readonly ILoggingAdapter _log = Context.GetLogger();
Expand Down Expand Up @@ -64,9 +64,9 @@ protected override void PreStart()
{
var self = Self;

_cluster.RegisterOnMemberUp(() => { self.Tell(new LivenessStatus(true)); });
_cluster.RegisterOnMemberUp(() => { self.Tell(LivenessStatus.Healthy()); });

_cluster.RegisterOnMemberRemoved(() => { self.Tell(new LivenessStatus(false)); });
_cluster.RegisterOnMemberRemoved(() => { self.Tell(LivenessStatus.Unhealthy()); });
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class AkkaPersistenceLivenessProbe : IAkkaHealthcheck
{
private const string Healthy = "Akka.NET persistence is alive";
private const string UnHealthy = "Akka.NET persistence is not alive";
private const string Degraded = "Akka.NET persistence liveness is degraded";
private const string Exception = "Exception occured when processing cluster liveness";

private readonly IActorRef _probe;
Expand All @@ -47,23 +48,35 @@ public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context
var status = await _probe.Ask<PersistenceLivenessStatus>(
message: GetCurrentLiveness.Instance,
cancellationToken: cancellationToken);
return status.IsLive
? HealthCheckResult.Healthy(Healthy, new Dictionary<string, object>
{
["journal-recovered"] = status.JournalRecovered,
["snapshot-recovered"] = status.SnapshotRecovered,
["journal-persisted"] = status.JournalPersisted,
["snapshot-persisted"] = status.SnapshotSaved,
["message"] = status.StatusMessage
})
: HealthCheckResult.Unhealthy(UnHealthy, status.Failures, new Dictionary<string, object>
{
["journal-recovered"] = status.JournalRecovered,
["snapshot-recovered"] = status.SnapshotRecovered,
["journal-persisted"] = status.JournalPersisted,
["snapshot-persisted"] = status.SnapshotSaved,
["message"] = status.StatusMessage
});
return status.Status switch
{
AkkaHealthStatus.Healthy => HealthCheckResult.Healthy(Healthy, new Dictionary<string, object>
{
["journal-recovered"] = status.JournalRecovered,
["snapshot-recovered"] = status.SnapshotRecovered,
["journal-persisted"] = status.JournalPersisted,
["snapshot-persisted"] = status.SnapshotSaved,
["message"] = status.StatusMessage
}),
AkkaHealthStatus.Unhealthy => HealthCheckResult.Unhealthy(UnHealthy, status.Failures,
new Dictionary<string, object>
{
["journal-recovered"] = status.JournalRecovered,
["snapshot-recovered"] = status.SnapshotRecovered,
["journal-persisted"] = status.JournalPersisted,
["snapshot-persisted"] = status.SnapshotSaved,
["message"] = status.StatusMessage
}),
_ => HealthCheckResult.Degraded(Degraded, status.Failures,
new Dictionary<string, object>
{
["journal-recovered"] = status.JournalRecovered,
["snapshot-recovered"] = status.SnapshotRecovered,
["journal-persisted"] = status.JournalPersisted,
["snapshot-persisted"] = status.SnapshotSaved,
["message"] = status.StatusMessage
})
};
}
catch (Exception e)
{
Expand Down
6 changes: 3 additions & 3 deletions src/Akka.HealthCheck.Persistence.Tests/ProbeFailureSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ await WithJournalRecovery(recover => recover.Fail(), () =>
{
var status = PerformProbe();
status.IsLive.Should().BeFalse();
status.JournalRecovered.Should().BeFalse();
status.JournalRecovered.Should().BeNull();
status.JournalPersisted.Should().BeFalse();
status.SnapshotRecovered.Should().BeTrue();
status.SnapshotSaved.Should().BeFalse();
Expand All @@ -128,9 +128,9 @@ await WithSnapshotLoad(load => load.Fail(), () =>
{
var status = PerformProbe();
status.IsLive.Should().BeFalse();
status.JournalRecovered.Should().BeFalse();
status.JournalRecovered.Should().BeNull();
status.JournalPersisted.Should().BeFalse();
status.SnapshotRecovered.Should().BeFalse();
status.SnapshotRecovered.Should().BeNull();
status.SnapshotSaved.Should().BeFalse();
var e = status.Failures!.Flatten().InnerExceptions[0];
e.Should().BeOfType<TestSnapshotStoreFailureException>();
Expand Down
137 changes: 77 additions & 60 deletions src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@ public sealed class PersistenceLivenessStatus: LivenessStatus, INoSerializationV
{
private readonly string? _message;

public PersistenceLivenessStatus(string message): this(null, null, false, false, Array.Empty<Exception>(), message)
public PersistenceLivenessStatus(AkkaHealthStatus status, string message)
: this(status, null, null, false, false, Array.Empty<Exception>(), message)
{
}

public PersistenceLivenessStatus(
AkkaHealthStatus status,
bool? journalRecovered,
bool? snapshotRecovered,
bool journalPersisted,
bool snapshotSaved,
IReadOnlyCollection<Exception> failures,
string? message = null): base(false)
string? message = null): base(status)
{
JournalRecovered = journalRecovered;
SnapshotRecovered = snapshotRecovered;
Expand All @@ -38,7 +40,8 @@ public PersistenceLivenessStatus(
_message = message;
}

public override bool IsLive => JournalRecovered is true
public override bool IsLive => base.IsLive &&
JournalRecovered is true
&& SnapshotRecovered is true
&& JournalPersisted
&& SnapshotSaved
Expand Down Expand Up @@ -92,7 +95,7 @@ public class AkkaPersistenceLivenessProbe : ActorBase, IWithTimers

private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly HashSet<IActorRef> _subscribers = new HashSet<IActorRef>();
private PersistenceLivenessStatus _currentLivenessStatus = new(message: "Warming up probe. Recovery status is still undefined");
private PersistenceLivenessStatus _currentLivenessStatus = new(AkkaHealthStatus.Degraded, "Warming up probe. Recovery status is still undefined");
private IActorRef? _probe;
private int _probeCounter;
private bool _warmup = true;
Expand Down Expand Up @@ -183,7 +186,8 @@ private bool HandleMessages(object message)
_log.Debug("Recreating persistence probe.");

Timers.StartSingleTimer(TimeoutTimerKey, CheckTimeout.Instance, _timeout);
_probe = Context.ActorOf(Props.Create(() => new SuicideProbe(Self, _warmup, _id, _logInfo)));
_probe = Context.ActorOf(Props.Create(() => new SuicideProbe(Self, _warmup, _id, _logInfo))
.WithSupervisorStrategy(Actor.SupervisorStrategy.StoppingStrategy));
Context.Watch(_probe);
_probe.Tell("hit" + _probeCounter);
_probeCounter++;
Expand All @@ -192,7 +196,7 @@ private bool HandleMessages(object message)
case CheckTimeout:
const string errMsg = "Timeout while checking persistence liveness. Persistence liveness status is undefined.";
_log.Warning(errMsg);
_currentLivenessStatus = new PersistenceLivenessStatus(errMsg);
_currentLivenessStatus = new PersistenceLivenessStatus(AkkaHealthStatus.Unhealthy, errMsg);
PublishStatusUpdates();

if(_probe is not null)
Expand Down Expand Up @@ -277,21 +281,23 @@ private void AwaitingRecovery()
{
if(_debugLog)
_log.Debug($"{PersistenceId}: Recovery complete");
DeleteMessages(long.MaxValue);
DeleteSnapshots(new SnapshotSelectionCriteria(long.MaxValue));
Become(CleanupMessages);
});

CommandAny(_ => Stash.Stash());
}

private void CleanupMessages()
{
DeleteMessages(long.MaxValue);

Command<DeleteMessagesSuccess>(_ =>
{
_deletedJournal = true;
if(_debugLog)
_log.Debug($"{PersistenceId}: Journal events deleted");

if(_deletedSnapshotStore is not null)
{
Become(Active);
Stash.UnstashAll();
}
Become(CleanupSnapshot);
});

Command<DeleteMessagesFailure>(fail =>
Expand All @@ -301,24 +307,24 @@ private void AwaitingRecovery()
if(_debugLog)
_log.Debug($"{PersistenceId}: Failed to delete journal events");

if(_deletedSnapshotStore is not null)
{
Become(Active);
Stash.UnstashAll();
}
Become(CleanupSnapshot);
});

CommandAny(_ => Stash.Stash());
}

private void CleanupSnapshot()
{
DeleteSnapshots(new SnapshotSelectionCriteria(long.MaxValue));

Command<DeleteSnapshotsSuccess>(_ =>
{
_deletedSnapshotStore = true;
if(_debugLog)
_log.Debug($"{PersistenceId}: Snapshot deleted");

if(_deletedJournal is not null)
{
Become(Active);
Stash.UnstashAll();
}
Become(Active);
Stash.UnstashAll();
});

Command<DeleteSnapshotsFailure>(fail =>
Expand All @@ -328,11 +334,8 @@ private void AwaitingRecovery()
if(_debugLog)
_log.Debug($"{PersistenceId}: Failed to delete snapshot");

if(_deletedJournal is not null)
{
Become(Active);
Stash.UnstashAll();
}
Become(Active);
Stash.UnstashAll();
});

CommandAny(_ => Stash.Stash());
Expand All @@ -345,48 +348,55 @@ private void Active()
_message = str;
if(_debugLog)
_log.Debug($"{PersistenceId}: Probe started, saving snapshot");
SaveSnapshot(str);
Become(SavingSnapshot(str));
});

Command<SaveSnapshotSuccess>(_ =>
}

private Action SavingSnapshot(string msg)
{
SaveSnapshot(msg);
return () =>
{
_persistedSnapshotStore = true;
if(_debugLog)
_log.Debug($"{PersistenceId}: Snapshot saved");
Persist(_message,
_ =>
{
_persistedJournal = true;
if(_debugLog)
_log.Debug($"{PersistenceId}: Journal persisted");
SendRecoveryStatusWhenFinished();
});
});
Command<SaveSnapshotSuccess>(_ =>
{
_persistedSnapshotStore = true;
if(_debugLog)
_log.Debug($"{PersistenceId}: Snapshot saved");
Become(PersistMessage(msg));
});

Command<SaveSnapshotFailure>(fail =>
{
_log.Error(fail.Cause,"Failed to save snapshot store");
Command<SaveSnapshotFailure>(fail =>
{
_log.Error(fail.Cause,"Failed to save snapshot store");

_failures.Add(fail.Cause);
_persistedSnapshotStore = false;
Persist(_message,
_ =>
{
_persistedJournal = true;
if(_debugLog)
_log.Debug($"{PersistenceId}: Journal persisted");
SendRecoveryStatusWhenFinished();
});
});
_failures.Add(fail.Cause);
_persistedSnapshotStore = false;
Become(PersistMessage(msg));
});
};
}

private Action PersistMessage(string msg)
{
Persist(msg,
_ =>
{
_persistedJournal = true;
if(_debugLog)
_log.Debug($"{PersistenceId}: Journal persisted");
SendRecoveryStatusWhenFinished();
});

return () => { };
}

public override string PersistenceId { get; }

private void SendRecoveryStatusWhenFinished()
{
// First case, snapshot failed to save or journal write was rejected, there will be no deletion.
if( (_persistedSnapshotStore is false && _persistedJournal is { }) || (_persistedJournal is false && _persistedSnapshotStore is { }))
if( (_persistedSnapshotStore is false && _persistedJournal is not null) ||
(_persistedJournal is false && _persistedSnapshotStore is not null))
{
_probe.Tell(CreateStatus());
Context.Stop(Self);
Expand Down Expand Up @@ -462,19 +472,26 @@ protected override void OnRecoveryFailure(Exception reason, object? message = nu
_probe.Tell(CreateStatus(msg));
Context.Stop(Self);
}

private bool IsHealthy => _recoveredJournal is true &&
_recoveredSnapshotStore is true &&
_persistedJournal is true &&
_persistedSnapshotStore is true;

private PersistenceLivenessStatus CreateStatus(string? message = null)
=> _firstAttempt
? new PersistenceLivenessStatus(
status: AkkaHealthStatus.Degraded,
journalRecovered: _recoveredJournal,
snapshotRecovered: _recoveredSnapshotStore,
journalPersisted: _persistedJournal ?? false,
snapshotSaved: _persistedSnapshotStore ?? false,
failures: _failures,
message: message)
: new PersistenceLivenessStatus(
journalRecovered: _recoveredJournal ?? false,
snapshotRecovered: _recoveredSnapshotStore ?? false,
status: IsHealthy ? AkkaHealthStatus.Healthy : AkkaHealthStatus.Unhealthy,
journalRecovered: _recoveredJournal,
snapshotRecovered: _recoveredSnapshotStore,
journalPersisted: _persistedJournal ?? false,
snapshotSaved: _persistedSnapshotStore ?? false,
failures: _failures,
Expand Down
2 changes: 1 addition & 1 deletion src/Akka.HealthCheck.Tests/AkkaHealthCheckSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ private class CustomProbe : ReceiveActor
private readonly LivenessStatus _livenessStatus;
private readonly ReadinessStatus _readinessStatus;

public CustomProbe() : this(new LivenessStatus(true), new ReadinessStatus(true))
public CustomProbe() : this(LivenessStatus.Healthy(), new ReadinessStatus(true))
{
}

Expand Down
Loading