Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix persistence liveness check deadlock #269

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using FluentAssertions;
using System;
using System.Threading.Tasks;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
using static Akka.HealthCheck.Persistence.AkkaPersistenceLivenessProbe;
Expand All @@ -27,7 +28,7 @@ public AkkaPersistenceLivenessProbeNotAvailableDueToSnapshotStoreSpecs(ITestOutp
public void AkkaPersistenceLivenessProbeProvidert_Should_Report_Akka_Persistance_Is_Unavailable_With_Bad_Snapshot_Store_Setup()
{

var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, TimeSpan.FromMilliseconds(250))));
var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 3.Seconds())));
ProbActor.Tell(new SubscribeToLiveness(TestActor));
ExpectMsg<LivenessStatus>().IsLive.Should().BeFalse("System should not be live");
ExpectMsg<LivenessStatus>(TimeSpan.FromMinutes(1)).IsLive.Should().BeFalse("System should not be live");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Akka.Util.Internal;
using FluentAssertions;
using System;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
using static Akka.HealthCheck.Persistence.AkkaPersistenceLivenessProbe;
Expand All @@ -26,7 +27,7 @@ public AkkaPersistenceLivenessProbeNotAvailableDueToJournalSpecs(ITestOutputHelp
public void AkkaPersistenceLivenessProbeProvidert_Should_Report_Akka_Persistance_Is_Unavailable_With_Bad_Journal_Setup()
{

var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, TimeSpan.FromMilliseconds(250))));
var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 3.Seconds())));
ProbActor.Tell(new SubscribeToLiveness(TestActor));
ExpectMsg<LivenessStatus>().IsLive.Should().BeFalse("System should not be live");
ExpectMsg<LivenessStatus>(TimeSpan.FromMinutes(1)).IsLive.Should().BeFalse("System should not be live");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Akka.Util.Internal;
using FluentAssertions;
using System;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;

Expand All @@ -27,7 +28,7 @@ public AkkaPersistenceLivenessProbeSubscriptionTest(ITestOutputHelper helper)
[Fact(DisplayName = "AkkaPersistenceLivenessProbe should correctly handle subscription requests")]
public void AkkaPersistenceLivenessProbe_Should_Handle_Subscriptions_In_Any_State()
{
var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, TimeSpan.FromMilliseconds(250))));
var ProbActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 3.Seconds())));
ProbActor.Tell(new SubscribeToLiveness(TestActor));
ExpectMsg<LivenessStatus>().IsLive.Should().BeFalse();
AwaitAssert(() => ExpectMsg<LivenessStatus>().IsLive.Should().BeTrue(),TimeSpan.FromSeconds(10));
Expand Down
58 changes: 58 additions & 0 deletions src/Akka.HealthCheck.Persistence.Tests/JournalInterceptors.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// -----------------------------------------------------------------------
// <copyright file="JournalInterceptors.cs" company="Petabridge, LLC">
// Copyright (C) 2015 - 2024 Petabridge, LLC <https://petabridge.com>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Persistence;
using Akka.Persistence.TestKit;

namespace Akka.HealthCheck.Persistence.Tests;

public static class JournalInterceptors
{
internal class Noop : IJournalInterceptor
{
public static readonly Noop Instance = new ();

private Noop()
{}

public Task InterceptAsync(IPersistentRepresentation message) => Task.FromResult(true);
}

public class CancelableDelay: IJournalInterceptor
{
public CancelableDelay(TimeSpan delay, IJournalInterceptor next, CancellationToken cancellationToken)
{
_delay = delay;
_next = next;
_cancellationToken = cancellationToken;
}

private readonly TimeSpan _delay;
private readonly IJournalInterceptor _next;
private readonly CancellationToken _cancellationToken;

public async Task InterceptAsync(IPersistentRepresentation message)
{
try
{
await Task.Delay(_delay, _cancellationToken);
}
catch (OperationCanceledException)
{
// no-op
}
catch (TimeoutException)
{
// no-op
}
await _next.InterceptAsync(message);
}
}

}
92 changes: 92 additions & 0 deletions src/Akka.HealthCheck.Persistence.Tests/LivenessProbeTimeoutSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// -----------------------------------------------------------------------
// <copyright file="LivenessProbeTimeoutSpec.cs" company="Petabridge, LLC">
// Copyright (C) 2015 - 2024 Petabridge, LLC <https://petabridge.com>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.HealthCheck.Liveness;
using Akka.Persistence.TestKit;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.HealthCheck.Persistence.Tests;

public class LivenessProbeTimeoutSpec: PersistenceTestKit
{
public LivenessProbeTimeoutSpec(ITestOutputHelper output) : base(nameof(LivenessProbeTimeoutSpec), output)
{
}

[Fact(DisplayName = "AkkaPersistenceLivenessProbe should time out if SaveSnapshot does not respond")]
public async Task SaveSnapshotTimeoutTest()
{
using var cts = new CancellationTokenSource();
var delay = new SnapshotInterceptors.CancelableDelay(30.Minutes(), SnapshotInterceptors.Noop.Instance, cts.Token);

await WithSnapshotSave(
save => save.SetInterceptorAsync(delay),
() => TestTimeout(cts));
}

[Fact(DisplayName = "AkkaPersistenceLivenessProbe should time out if snapshot recovery does not respond")]
public async Task SnapshotLoadTimeoutTest()
{
using var cts = new CancellationTokenSource();
var delay = new SnapshotInterceptors.CancelableDelay(30.Minutes(), SnapshotInterceptors.Noop.Instance, cts.Token);

await WithSnapshotLoad(
save => save.SetInterceptorAsync(delay),
() => TestTimeout(cts));
}

[Fact(DisplayName = "AkkaPersistenceLivenessProbe should time out if journal Persist does not respond")]
public async Task JournalPersistTimeoutTest()
{
using var cts = new CancellationTokenSource();
var delay = new JournalInterceptors.CancelableDelay(30.Minutes(), JournalInterceptors.Noop.Instance, cts.Token);

await WithJournalWrite(
save => save.SetInterceptorAsync(delay),
() => TestTimeout(cts));
}

[Fact(DisplayName = "AkkaPersistenceLivenessProbe should time out if journal recovery does not respond")]
public async Task JournalRecoveryTimeoutTest()
{
using var cts = new CancellationTokenSource();
var delay = new JournalInterceptors.CancelableDelay(30.Minutes(), JournalInterceptors.Noop.Instance, cts.Token);

await WithJournalRecovery(
save => save.SetInterceptorAsync(delay),
() => TestTimeout(cts));
}

private async Task TestTimeout(CancellationTokenSource cts)
{
var probeActor = Sys.ActorOf(Props.Create(() => new AkkaPersistenceLivenessProbe(true, 250.Milliseconds(), 500.Milliseconds())));
probeActor.Tell(new SubscribeToLiveness(TestActor));
var status = ExpectMsg<LivenessStatus>();
status.IsLive.Should().BeFalse();
status.StatusMessage.Should().StartWith("Warming up probe.");

var timeoutStatusObj = await FishForMessageAsync(
msg => msg is LivenessStatus stat && !stat.StatusMessage.StartsWith("Warming up probe."),
6.Seconds());

var timeoutStatus = (LivenessStatus)timeoutStatusObj;
timeoutStatus.IsLive.Should().BeFalse();
timeoutStatus.StatusMessage.Should().StartWith("Timeout while checking persistence liveness.");

cts.Cancel();

await AwaitAssertAsync(
() => ExpectMsg<LivenessStatus>().IsLive.Should().BeTrue(),
TimeSpan.FromSeconds(10));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ await WithSnapshotLoad(load => load.Fail(), async () =>
{
Sys.EventStream.Subscribe(TestActor, typeof(LogEvent));
var probe = Sys.ActorOf(Props.Create(() =>
new AkkaPersistenceLivenessProbe(true, TimeSpan.FromMilliseconds(400))));
new AkkaPersistenceLivenessProbe(true, 400.Milliseconds(), 3.Seconds())));
await FishForMessageAsync<LogEvent>(e => e.Message.ToString() is "Recreating persistence probe.");

var stopwatch = Stopwatch.StartNew();
Expand Down
58 changes: 58 additions & 0 deletions src/Akka.HealthCheck.Persistence.Tests/SnapshotInterceptors.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// -----------------------------------------------------------------------
// <copyright file="FailingTestSnapshotStore.cs" company="Petabridge, LLC">
// Copyright (C) 2015 - 2024 Petabridge, LLC <https://petabridge.com>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;
using Akka.Persistence;
using Akka.Persistence.TestKit;

namespace Akka.HealthCheck.Persistence.Tests;

public static class SnapshotInterceptors
{
public class Noop : ISnapshotStoreInterceptor
{
public static readonly Noop Instance = new ();

private Noop()
{
}

public Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria criteria) => Task.FromResult(true);
}

public class CancelableDelay: ISnapshotStoreInterceptor
{
public CancelableDelay(TimeSpan delay, ISnapshotStoreInterceptor next, CancellationToken cancellationToken)
{
_delay = delay;
_next = next;
_cancellationToken = cancellationToken;
}

private readonly TimeSpan _delay;
private readonly ISnapshotStoreInterceptor _next;
private readonly CancellationToken _cancellationToken;

public async Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria criteria)
{
try
{
await Task.Delay(_delay, _cancellationToken);
}
catch (OperationCanceledException)
{
// no-op
}
catch (TimeoutException)
{
// no-op
}
await _next.InterceptAsync(persistenceId, criteria);
}
}
}
45 changes: 36 additions & 9 deletions src/Akka.HealthCheck.Persistence/AkkaPersistenceLivenessProbe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,41 +75,53 @@ private CreateProbe()
{
}
}

internal sealed class CheckTimeout
{
public static readonly CheckTimeout Instance = new();

private CheckTimeout()
{
}
}

public class AkkaPersistenceLivenessProbe : ActorBase
public class AkkaPersistenceLivenessProbe : ActorBase, IWithTimers
{
private const string TimeoutTimerKey = nameof(TimeoutTimerKey);
private const string CreateProbeTimerKey = nameof(CreateProbeTimerKey);

private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly HashSet<IActorRef> _subscribers = new HashSet<IActorRef>();
private PersistenceLivenessStatus _currentLivenessStatus = new(message: "Warming up probe. Recovery status is still undefined");
private IActorRef? _probe;
private int _probeCounter;
private readonly TimeSpan _delay;
private readonly TimeSpan _timeout;
private readonly string _id;
private readonly Cancelable _shutdownCancellable;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed anymore because we drop the scheduler timer

private readonly bool _logInfo;

public AkkaPersistenceLivenessProbe(bool logInfo, TimeSpan delay)
public AkkaPersistenceLivenessProbe(bool logInfo, TimeSpan delay, TimeSpan timeout)
{
_delay = delay;
_timeout = timeout;
_id = Guid.NewGuid().ToString("N");
_shutdownCancellable = new Cancelable(Context.System.Scheduler);
_logInfo = logInfo;

Become(obj => HandleMessages(obj) || HandleSubscriptions(obj));
}

public static Props PersistentHealthCheckProps(bool logInfo, TimeSpan delay)
public ITimerScheduler Timers { get; set; } = null!;

public static Props PersistentHealthCheckProps(bool logInfo, TimeSpan delay, TimeSpan timeout)
{
// need to use the stopping strategy in case things blow up right away
return Props.Create(() => new AkkaPersistenceLivenessProbe(logInfo, delay))
return Props.Create(() => new AkkaPersistenceLivenessProbe(logInfo, delay, timeout))
.WithSupervisorStrategy(Actor.SupervisorStrategy.StoppingStrategy);
}

protected override void PostStop()
{
_probe?.Tell(PoisonPill.Instance);
_shutdownCancellable.Cancel();
_shutdownCancellable.Dispose();
base.PostStop();
}

Expand Down Expand Up @@ -156,20 +168,35 @@ private bool HandleMessages(object message)
_probe = null;
if(_logInfo)
_log.Debug($"Persistence probe terminated. Recreating in {_delay.TotalSeconds} seconds.");

Timers.CancelAll();
ScheduleProbeRestart();
return true;

case CreateProbe:
if(_logInfo)
_log.Debug("Recreating persistence probe.");

Timers.StartSingleTimer(TimeoutTimerKey, CheckTimeout.Instance, _timeout);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guarding against suicide actor deadlock

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're doing it here inside the probe actor instead of inside the suicide actor because mailbox processing can be blocked inside a persistence actor, making timers unreliable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call

_probe = Context.ActorOf(Props.Create(() => new SuicideProbe(Self, _probeCounter == 0, _id)));
Context.Watch(_probe);
_probe.Tell("hit" + _probeCounter);
_probeCounter++;
return true;

case CheckTimeout:
const string errMsg = "Timeout while checking persistence liveness. Recovery status is undefined.";
_log.Warning(errMsg);
_currentLivenessStatus = new PersistenceLivenessStatus(errMsg);
PublishStatusUpdates();

if(_probe is not null)
Context.Stop(_probe);
Comment on lines +187 to +194
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Force stop the suicide actor if it stalls/deadlocks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we also emit a new liveness status with a timeout message to let subscriber know about the problem.


return true;

case PersistenceLivenessStatus status:
Timers.CancelAll();
HandleRecoveryStatus(status);
return true;
}
Expand All @@ -189,7 +216,7 @@ protected override void PreStart()

private void ScheduleProbeRestart()
{
Context.System.Scheduler.ScheduleTellOnce(_delay, Self, CreateProbe.Instance, Self, _shutdownCancellable);
Timers.StartSingleTimer(CreateProbeTimerKey, CreateProbe.Instance, _delay);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace scheduler timer calls with IWithTimer to prevent leaks.

}

private void PublishStatusUpdates()
Expand Down
Loading
Loading