Skip to content

Commit

Permalink
Fix persistence liveness check deadlock (#269)
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus authored Feb 23, 2024
1 parent fffaa11 commit b6e3aa1
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using FluentAssertions;
using System;
using System.Threading.Tasks;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
using static Akka.HealthCheck.Persistence.AkkaPersistenceLivenessProbe;
Expand All @@ -27,7 +28,7 @@ public AkkaPersistenceLivenessProbeNotAvailableDueToSnapshotStoreSpecs(ITestOutp
public void AkkaPersistenceLivenessProbeProvidert_Should_Report_Akka_Persistance_Is_Unavailable_With_Bad_Snapshot_Store_Setup()
{

var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, TimeSpan.FromMilliseconds(250))));
var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 3.Seconds())));
ProbActor.Tell(new SubscribeToLiveness(TestActor));
ExpectMsg<LivenessStatus>().IsLive.Should().BeFalse("System should not be live");
ExpectMsg<LivenessStatus>(TimeSpan.FromMinutes(1)).IsLive.Should().BeFalse("System should not be live");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Akka.Util.Internal;
using FluentAssertions;
using System;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
using static Akka.HealthCheck.Persistence.AkkaPersistenceLivenessProbe;
Expand All @@ -26,7 +27,7 @@ public AkkaPersistenceLivenessProbeNotAvailableDueToJournalSpecs(ITestOutputHelp
public void AkkaPersistenceLivenessProbeProvidert_Should_Report_Akka_Persistance_Is_Unavailable_With_Bad_Journal_Setup()
{

var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, TimeSpan.FromMilliseconds(250))));
var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 3.Seconds())));
ProbActor.Tell(new SubscribeToLiveness(TestActor));
ExpectMsg<LivenessStatus>().IsLive.Should().BeFalse("System should not be live");
ExpectMsg<LivenessStatus>(TimeSpan.FromMinutes(1)).IsLive.Should().BeFalse("System should not be live");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Akka.Util.Internal;
using FluentAssertions;
using System;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;

Expand All @@ -27,7 +28,7 @@ public AkkaPersistenceLivenessProbeSubscriptionTest(ITestOutputHelper helper)
[Fact(DisplayName = "AkkaPersistenceLivenessProbe should correctly handle subscription requests")]
public void AkkaPersistenceLivenessProbe_Should_Handle_Subscriptions_In_Any_State()
{
var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, TimeSpan.FromMilliseconds(250))));
var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 3.Seconds())));
ProbActor.Tell(new SubscribeToLiveness(TestActor));
ExpectMsg<LivenessStatus>().IsLive.Should().BeFalse();
AwaitAssert(() => ExpectMsg<LivenessStatus>().IsLive.Should().BeTrue(),TimeSpan.FromSeconds(10));
Expand Down
58 changes: 58 additions & 0 deletions src/Akka.HealthCheck.Persistence.Tests/JournalInterceptors.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// -----------------------------------------------------------------------
// <copyright file="JournalInterceptors.cs" company="Petabridge, LLC">
// Copyright (C) 2015 - 2024 Petabridge, LLC <https://petabridge.com>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Persistence;
using Akka.Persistence.TestKit;

namespace Akka.HealthCheck.Persistence.Tests;

public static class JournalInterceptors
{
internal class Noop : IJournalInterceptor
{
public static readonly Noop Instance = new ();

private Noop()
{}

public Task InterceptAsync(IPersistentRepresentation message) => Task.FromResult(true);
}

public class CancelableDelay: IJournalInterceptor
{
public CancelableDelay(TimeSpan delay, IJournalInterceptor next, CancellationToken cancellationToken)
{
_delay = delay;
_next = next;
_cancellationToken = cancellationToken;
}

private readonly TimeSpan _delay;
private readonly IJournalInterceptor _next;
private readonly CancellationToken _cancellationToken;

public async Task InterceptAsync(IPersistentRepresentation message)
{
try
{
await Task.Delay(_delay, _cancellationToken);
}
catch (OperationCanceledException)
{
// no-op
}
catch (TimeoutException)
{
// no-op
}
await _next.InterceptAsync(message);
}
}

}
92 changes: 92 additions & 0 deletions src/Akka.HealthCheck.Persistence.Tests/LivenessProbeTimeoutSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// -----------------------------------------------------------------------
// <copyright file="LivenessProbeTimeoutSpec.cs" company="Petabridge, LLC">
// Copyright (C) 2015 - 2024 Petabridge, LLC <https://petabridge.com>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.HealthCheck.Liveness;
using Akka.Persistence.TestKit;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.HealthCheck.Persistence.Tests;

public class LivenessProbeTimeoutSpec: PersistenceTestKit
{
public LivenessProbeTimeoutSpec(ITestOutputHelper output) : base(nameof(LivenessProbeTimeoutSpec), output)
{
}

[Fact(DisplayName = "AkkaPersistenceLivenessProbe should time out if SaveSnapshot does not respond")]
public async Task SaveSnapshotTimeoutTest()
{
using var cts = new CancellationTokenSource();
var delay = new SnapshotInterceptors.CancelableDelay(30.Minutes(), SnapshotInterceptors.Noop.Instance, cts.Token);

await WithSnapshotSave(
save => save.SetInterceptorAsync(delay),
() => TestTimeout(cts));
}

[Fact(DisplayName = "AkkaPersistenceLivenessProbe should time out if snapshot recovery does not respond")]
public async Task SnapshotLoadTimeoutTest()
{
using var cts = new CancellationTokenSource();
var delay = new SnapshotInterceptors.CancelableDelay(30.Minutes(), SnapshotInterceptors.Noop.Instance, cts.Token);

await WithSnapshotLoad(
save => save.SetInterceptorAsync(delay),
() => TestTimeout(cts));
}

[Fact(DisplayName = "AkkaPersistenceLivenessProbe should time out if journal Persist does not respond")]
public async Task JournalPersistTimeoutTest()
{
using var cts = new CancellationTokenSource();
var delay = new JournalInterceptors.CancelableDelay(30.Minutes(), JournalInterceptors.Noop.Instance, cts.Token);

await WithJournalWrite(
save => save.SetInterceptorAsync(delay),
() => TestTimeout(cts));
}

[Fact(DisplayName = "AkkaPersistenceLivenessProbe should time out if journal recovery does not respond")]
public async Task JournalRecoveryTimeoutTest()
{
using var cts = new CancellationTokenSource();
var delay = new JournalInterceptors.CancelableDelay(30.Minutes(), JournalInterceptors.Noop.Instance, cts.Token);

await WithJournalRecovery(
save => save.SetInterceptorAsync(delay),
() => TestTimeout(cts));
}

private async Task TestTimeout(CancellationTokenSource cts)
{
var probeActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 500.Milliseconds())));
probeActor.Tell(new SubscribeToLiveness(TestActor));
var status = ExpectMsg<LivenessStatus>();
status.IsLive.Should().BeFalse();
status.StatusMessage.Should().StartWith("Warming up probe.");

var timeoutStatusObj = await FishForMessageAsync(
msg => msg is LivenessStatus stat && !stat.StatusMessage.StartsWith("Warming up probe."),
6.Seconds());

var timeoutStatus = (LivenessStatus)timeoutStatusObj;
timeoutStatus.IsLive.Should().BeFalse();
timeoutStatus.StatusMessage.Should().StartWith("Timeout while checking persistence liveness.");

cts.Cancel();

await AwaitAssertAsync(
() => ExpectMsg<LivenessStatus>().IsLive.Should().BeTrue(),
TimeSpan.FromSeconds(10));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ await WithSnapshotLoad(load => load.Fail(), async () =>
{
Sys.EventStream.Subscribe(TestActor, typeof(LogEvent));
var probe = Sys.ActorOf(Props.Create(() =>
new AkkaPersistenceLivenessProbe(true, TimeSpan.FromMilliseconds(400))));
new AkkaPersistenceLivenessProbe(true, 400.Milliseconds(), 3.Seconds())));
await FishForMessageAsync<LogEvent>(e => e.Message.ToString() is "Recreating persistence probe.");

var stopwatch = Stopwatch.StartNew();
Expand Down
58 changes: 58 additions & 0 deletions src/Akka.HealthCheck.Persistence.Tests/SnapshotInterceptors.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// -----------------------------------------------------------------------
// <copyright file="FailingTestSnapshotStore.cs" company="Petabridge, LLC">
// Copyright (C) 2015 - 2024 Petabridge, LLC <https://petabridge.com>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Persistence;
using Akka.Persistence.TestKit;

namespace Akka.HealthCheck.Persistence.Tests;

public static class SnapshotInterceptors
{
public class Noop : ISnapshotStoreInterceptor
{
public static readonly Noop Instance = new ();

private Noop()
{
}

public Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria criteria) => Task.FromResult(true);
}

public class CancelableDelay: ISnapshotStoreInterceptor
{
public CancelableDelay(TimeSpan delay, ISnapshotStoreInterceptor next, CancellationToken cancellationToken)
{
_delay = delay;
_next = next;
_cancellationToken = cancellationToken;
}

private readonly TimeSpan _delay;
private readonly ISnapshotStoreInterceptor _next;
private readonly CancellationToken _cancellationToken;

public async Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria criteria)
{
try
{
await Task.Delay(_delay, _cancellationToken);
}
catch (OperationCanceledException)
{
// no-op
}
catch (TimeoutException)
{
// no-op
}
await _next.InterceptAsync(persistenceId, criteria);
}
}
}
45 changes: 36 additions & 9 deletions src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,41 +75,53 @@ private CreateProbe()
{
}
}

internal sealed class CheckTimeout
{
public static readonly CheckTimeout Instance = new();

private CheckTimeout()
{
}
}

public class AkkaPersistenceLivenessProbe : ActorBase
public class AkkaPersistenceLivenessProbe : ActorBase, IWithTimers
{
private const string TimeoutTimerKey = nameof(TimeoutTimerKey);
private const string CreateProbeTimerKey = nameof(CreateProbeTimerKey);

private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly HashSet<IActorRef> _subscribers = new HashSet<IActorRef>();
private PersistenceLivenessStatus _currentLivenessStatus = new(message: "Warming up probe. Recovery status is still undefined");
private IActorRef? _probe;
private int _probeCounter;
private readonly TimeSpan _delay;
private readonly TimeSpan _timeout;
private readonly string _id;
private readonly Cancelable _shutdownCancellable;
private readonly bool _logInfo;

public AkkaPersistenceLivenessProbe(bool logInfo, TimeSpan delay)
public AkkaPersistenceLivenessProbe(bool logInfo, TimeSpan delay, TimeSpan timeout)
{
_delay = delay;
_timeout = timeout;
_id = Guid.NewGuid().ToString("N");
_shutdownCancellable = new Cancelable(Context.System.Scheduler);
_logInfo = logInfo;

Become(obj => HandleMessages(obj) || HandleSubscriptions(obj));
}

public static Props PersistentHealthCheckProps(bool logInfo, TimeSpan delay)
public ITimerScheduler Timers { get; set; } = null!;

public static Props PersistentHealthCheckProps(bool logInfo, TimeSpan delay, TimeSpan timeout)
{
// need to use the stopping strategy in case things blow up right away
return Props.Create(() => new AkkaPersistenceLivenessProbe(logInfo, delay))
return Props.Create(() => new AkkaPersistenceLivenessProbe(logInfo, delay, timeout))
.WithSupervisorStrategy(Actor.SupervisorStrategy.StoppingStrategy);
}

protected override void PostStop()
{
_probe?.Tell(PoisonPill.Instance);
_shutdownCancellable.Cancel();
_shutdownCancellable.Dispose();
base.PostStop();
}

Expand Down Expand Up @@ -156,20 +168,35 @@ private bool HandleMessages(object message)
_probe = null;
if(_logInfo)
_log.Debug($"Persistence probe terminated. Recreating in {_delay.TotalSeconds} seconds.");

Timers.CancelAll();
ScheduleProbeRestart();
return true;

case CreateProbe:
if(_logInfo)
_log.Debug("Recreating persistence probe.");

Timers.StartSingleTimer(TimeoutTimerKey, CheckTimeout.Instance, _timeout);
_probe = Context.ActorOf(Props.Create(() => new SuicideProbe(Self, _probeCounter == 0, _id)));
Context.Watch(_probe);
_probe.Tell("hit" + _probeCounter);
_probeCounter++;
return true;

case CheckTimeout:
const string errMsg = "Timeout while checking persistence liveness. Recovery status is undefined.";
_log.Warning(errMsg);
_currentLivenessStatus = new PersistenceLivenessStatus(errMsg);
PublishStatusUpdates();

if(_probe is not null)
Context.Stop(_probe);

return true;

case PersistenceLivenessStatus status:
Timers.CancelAll();
HandleRecoveryStatus(status);
return true;
}
Expand All @@ -189,7 +216,7 @@ protected override void PreStart()

private void ScheduleProbeRestart()
{
Context.System.Scheduler.ScheduleTellOnce(_delay, Self, CreateProbe.Instance, Self, _shutdownCancellable);
Timers.StartSingleTimer(CreateProbeTimerKey, CreateProbe.Instance, _delay);
}

private void PublishStatusUpdates()
Expand Down
Loading

0 comments on commit b6e3aa1

Please sign in to comment.