diff --git a/src/core/Akka.Persistence.TestKit.Tests/JournalInterceptorsSpecs.cs b/src/core/Akka.Persistence.TestKit.Tests/JournalInterceptorsSpecs.cs
index 3bc235d0254..f7638dad520 100644
--- a/src/core/Akka.Persistence.TestKit.Tests/JournalInterceptorsSpecs.cs
+++ b/src/core/Akka.Persistence.TestKit.Tests/JournalInterceptorsSpecs.cs
@@ -5,6 +5,9 @@
//
//-----------------------------------------------------------------------
+using System.Threading;
+using FluentAssertions.Extensions;
+
namespace Akka.Persistence.TestKit.Tests
{
using System;
@@ -58,6 +61,28 @@ public async Task delay_must_call_next_interceptor_after_specified_delay()
probe.CalledAt.Should().BeOnOrAfter(startedAt + duration - epsilon);
}
+ [Fact]
+ public async Task cancelable_delay_must_call_next_interceptor_immediately_after_cancellation()
+ {
+ var totalDuration = 400.Milliseconds();
+ var delayDuration = 200.Milliseconds();
+ var epsilon = TimeSpan.FromMilliseconds(50);
+ using var cts = new CancellationTokenSource();
+ var probe = new InterceptorProbe();
+ var delay = new JournalInterceptors.CancelableDelay(totalDuration, probe, cts.Token);
+
+ var startedAt = DateTime.Now;
+ var task = delay.InterceptAsync(null);
+ await Task.Delay(delayDuration - epsilon);
+
+ probe.WasCalled.Should().BeFalse();
+ cts.Cancel();
+ await task;
+
+ probe.WasCalled.Should().BeTrue();
+ probe.CalledAt.Should().BeOnOrAfter(startedAt + delayDuration - epsilon);
+ }
+
[Fact]
public async Task on_type_must_call_next_interceptor_when_message_is_exactly_awaited_type()
{
diff --git a/src/core/Akka.Persistence.TestKit.Tests/SnapshotStoreInterceptorsSpec.cs b/src/core/Akka.Persistence.TestKit.Tests/SnapshotStoreInterceptorsSpec.cs
index 65d8a0be84c..50820959094 100644
--- a/src/core/Akka.Persistence.TestKit.Tests/SnapshotStoreInterceptorsSpec.cs
+++ b/src/core/Akka.Persistence.TestKit.Tests/SnapshotStoreInterceptorsSpec.cs
@@ -5,6 +5,9 @@
//
//-----------------------------------------------------------------------
+using System.Threading;
+using FluentAssertions.Extensions;
+
namespace Akka.Persistence.TestKit.Tests
{
using System;
@@ -48,6 +51,28 @@ public async Task delay_must_call_next_interceptor_after_specified_delay()
probe.CalledAt.Should().BeOnOrAfter(startedAt + duration - epsilon);
}
+ [Fact]
+ public async Task cancelable_delay_must_call_next_interceptor_immediately_after_cancellation()
+ {
+ var totalDuration = 400.Milliseconds();
+ var delayDuration = 200.Milliseconds();
+ var epsilon = TimeSpan.FromMilliseconds(50);
+ using var cts = new CancellationTokenSource();
+ var probe = new InterceptorProbe();
+ var delay = new SnapshotStoreInterceptors.CancelableDelay(totalDuration, probe, cts.Token);
+
+ var startedAt = DateTime.Now;
+ var task = delay.InterceptAsync(null, null);
+ await Task.Delay(delayDuration - epsilon);
+
+ probe.WasCalled.Should().BeFalse();
+ cts.Cancel();
+ await task;
+
+ probe.WasCalled.Should().BeTrue();
+ probe.CalledAt.Should().BeOnOrAfter(startedAt + delayDuration - epsilon);
+ }
+
[Fact]
public async Task on_condition_must_accept_sync_lambda()
{
diff --git a/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs b/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs
index f1d21181907..4fbe4d89062 100644
--- a/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs
+++ b/src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs
@@ -21,7 +21,7 @@ namespace Akka.Persistence.TestKit
/// This class represents an Akka.NET Persistence TestKit that uses xUnit
/// as its testing framework.
///
- public abstract class PersistenceTestKit : TestKit
+ public class PersistenceTestKit : TestKit
{
///
/// Create a new instance of the class.
@@ -30,7 +30,7 @@ public abstract class PersistenceTestKit : TestKit
/// Test ActorSystem configuration
/// Optional: The name of the actor system
/// TBD
- protected PersistenceTestKit(ActorSystemSetup setup, string actorSystemName = null, ITestOutputHelper output = null)
+ public PersistenceTestKit(ActorSystemSetup setup, string actorSystemName = null, ITestOutputHelper output = null)
: base(GetConfig(setup), actorSystemName, output)
{
var persistenceExtension = Persistence.Instance.Apply(Sys);
@@ -49,7 +49,7 @@ protected PersistenceTestKit(ActorSystemSetup setup, string actorSystemName = nu
/// Test ActorSystem configuration
/// Optional: The name of the actor system
/// TBD
- protected PersistenceTestKit(Config config, string actorSystemName = null, ITestOutputHelper output = null)
+ public PersistenceTestKit(Config config, string actorSystemName = null, ITestOutputHelper output = null)
: base(GetConfig(config), actorSystemName, output)
{
var persistenceExtension = Persistence.Instance.Apply(Sys);
@@ -61,13 +61,25 @@ protected PersistenceTestKit(Config config, string actorSystemName = null, ITest
Snapshots = TestSnapshotStore.FromRef(SnapshotsActorRef);
}
+ public PersistenceTestKit(ActorSystem actorSystem, ITestOutputHelper output = null)
+ : base(actorSystem, output)
+ {
+ var persistenceExtension = Persistence.Instance.Apply(Sys);
+
+ JournalActorRef = persistenceExtension.JournalFor(null);
+ Journal = TestJournal.FromRef(JournalActorRef);
+
+ SnapshotsActorRef = persistenceExtension.SnapshotStoreFor(null);
+ Snapshots = TestSnapshotStore.FromRef(SnapshotsActorRef);
+ }
+
///
/// Create a new instance of the class.
/// A new system with the default configuration will be created.
///
/// Optional: The name of the actor system
/// TBD
- protected PersistenceTestKit(string actorSystemName = null, ITestOutputHelper output = null)
+ public PersistenceTestKit(string actorSystemName = null, ITestOutputHelper output = null)
: this(Config.Empty, actorSystemName, output)
{
}
diff --git a/src/core/Akka.Persistence.TestKit/Akka.Persistence.TestKit.csproj.DotSettings b/src/core/Akka.Persistence.TestKit/Akka.Persistence.TestKit.csproj.DotSettings
new file mode 100644
index 00000000000..d96a71b7216
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/Akka.Persistence.TestKit.csproj.DotSettings
@@ -0,0 +1,4 @@
+
+ True
+ True
+ True
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/ConnectionInterceptors.cs b/src/core/Akka.Persistence.TestKit/ConnectionInterceptors.cs
new file mode 100644
index 00000000000..f16e060f957
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/ConnectionInterceptors.cs
@@ -0,0 +1,107 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2015 - 2024 Petabridge, LLC
+//
+// -----------------------------------------------------------------------
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Akka.Persistence.TestKit;
+
+public static class ConnectionInterceptors
+{
+ public sealed class Noop : IConnectionInterceptor
+ {
+ public static readonly IConnectionInterceptor Instance = new Noop();
+
+ public Task InterceptAsync() => Task.FromResult(true);
+ }
+
+ public sealed class Failure : IConnectionInterceptor
+ {
+ public static readonly IConnectionInterceptor Instance = new Failure();
+
+ public Task InterceptAsync() => throw new TestConnectionException();
+ }
+
+ public sealed class Delay : IConnectionInterceptor
+ {
+ public Delay(TimeSpan delay, IConnectionInterceptor next)
+ {
+ _delay = delay;
+ _next = next;
+ }
+
+ private readonly TimeSpan _delay;
+ private readonly IConnectionInterceptor _next;
+
+ public async Task InterceptAsync()
+ {
+ await Task.Delay(_delay);
+ await _next.InterceptAsync();
+ }
+ }
+
+ public sealed class OnCondition : IConnectionInterceptor
+ {
+ public OnCondition(Func> predicate, IConnectionInterceptor next, bool negate = false)
+ {
+ _predicate = predicate;
+ _next = next;
+ _negate = negate;
+ }
+
+ public OnCondition(Func predicate, IConnectionInterceptor next, bool negate = false)
+ {
+ _predicate = () => Task.FromResult(predicate());
+ _next = next;
+ _negate = negate;
+ }
+
+ private readonly Func> _predicate;
+ private readonly IConnectionInterceptor _next;
+ private readonly bool _negate;
+
+ public async Task InterceptAsync()
+ {
+ var result = await _predicate();
+ if ((_negate && !result) || (!_negate && result))
+ {
+ await _next.InterceptAsync();
+ }
+ }
+ }
+
+ public sealed class CancelableDelay: IConnectionInterceptor
+ {
+ public CancelableDelay(TimeSpan delay, IConnectionInterceptor next, CancellationToken cancellationToken)
+ {
+ _delay = delay;
+ _next = next;
+ _cancellationToken = cancellationToken;
+ }
+
+ private readonly TimeSpan _delay;
+ private readonly IConnectionInterceptor _next;
+ private readonly CancellationToken _cancellationToken;
+
+ public async Task InterceptAsync()
+ {
+ try
+ {
+ await Task.Delay(_delay, _cancellationToken);
+ }
+ catch (OperationCanceledException)
+ {
+ // no-op
+ }
+ catch (TimeoutException)
+ {
+ // no-op
+ }
+ await _next.InterceptAsync();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/IConnectionInterceptor.cs b/src/core/Akka.Persistence.TestKit/IConnectionInterceptor.cs
new file mode 100644
index 00000000000..18f5b0f4b93
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/IConnectionInterceptor.cs
@@ -0,0 +1,14 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2015 - 2024 Petabridge, LLC
+//
+// -----------------------------------------------------------------------
+
+using System.Threading.Tasks;
+
+namespace Akka.Persistence.TestKit;
+
+public interface IConnectionInterceptor
+{
+ Task InterceptAsync();
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/Journal/IJournalConnectionBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/Journal/IJournalConnectionBehaviorSetter.cs
new file mode 100644
index 00000000000..62ccba3b7b5
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/Journal/IJournalConnectionBehaviorSetter.cs
@@ -0,0 +1,14 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2015 - 2024 Petabridge, LLC
+//
+// -----------------------------------------------------------------------
+
+using System.Threading.Tasks;
+
+namespace Akka.Persistence.TestKit;
+
+public interface IJournalConnectionBehaviorSetter
+{
+ Task SetInterceptorAsync(IConnectionInterceptor interceptor);
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/Journal/ITestJournal.cs b/src/core/Akka.Persistence.TestKit/Journal/ITestJournal.cs
index 518f3d0b83d..bafdba8ca8f 100644
--- a/src/core/Akka.Persistence.TestKit/Journal/ITestJournal.cs
+++ b/src/core/Akka.Persistence.TestKit/Journal/ITestJournal.cs
@@ -21,5 +21,7 @@ public interface ITestJournal
/// List of interceptors to alter recovery behavior of proxied journal.
///
JournalRecoveryBehavior OnRecovery { get; }
+
+ JournalConnectionBehavior OnConnect { get; }
}
}
diff --git a/src/core/Akka.Persistence.TestKit/Journal/JournalConnectionBehavior.cs b/src/core/Akka.Persistence.TestKit/Journal/JournalConnectionBehavior.cs
new file mode 100644
index 00000000000..36d56ab156a
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/Journal/JournalConnectionBehavior.cs
@@ -0,0 +1,308 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2023 Lightbend Inc.
+// Copyright (C) 2013-2023 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+using System;
+using System.Threading.Tasks;
+
+namespace Akka.Persistence.TestKit;
+
+///
+/// Built-in Journal interceptors who will alter messages Recovery and/or Write of .
+///
+public class JournalConnectionBehavior
+{
+ internal JournalConnectionBehavior(IJournalConnectionBehaviorSetter setter)
+ {
+ Setter = setter;
+ }
+
+ private IJournalConnectionBehaviorSetter Setter { get; }
+
+ ///
+ /// Use custom, user defined interceptor.
+ ///
+ /// User defined interceptor which implements interface.
+ /// When is null.
+ public Task SetInterceptorAsync(IConnectionInterceptor interceptor)
+ {
+ if (interceptor == null) throw new ArgumentNullException(nameof(interceptor));
+
+ return Setter.SetInterceptorAsync(interceptor);
+ }
+
+ ///
+ /// Pass all messages to journal without interfering.
+ ///
+ ///
+ /// By using this interceptor all journal operations will work like
+ /// in standard .
+ ///
+ public Task Pass() => SetInterceptorAsync(ConnectionInterceptors.Noop.Instance);
+
+ ///
+ /// Delay passing all messages to journal by .
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ /// Time by which recovery operation will be delayed.
+ /// When is less or equal to .
+ public Task PassWithDelay(TimeSpan delay)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+
+ return SetInterceptorAsync(new ConnectionInterceptors.Delay(delay, ConnectionInterceptors.Noop.Instance));
+ }
+
+ ///
+ /// Always fail all messages.
+ ///
+ ///
+ ///
+ /// Journal will crash and UntypedPersistentActor.OnPersistFailure will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ public Task Fail() => SetInterceptorAsync(ConnectionInterceptors.Failure.Instance);
+
+ ///
+ /// Fail message if predicate will return true.
+ ///
+ ///
+ ///
+ /// Journal will crash and UntypedPersistentActor.OnPersistFailure will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task FailIf(Func predicate)
+ {
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new ConnectionInterceptors.OnCondition(predicate, ConnectionInterceptors.Failure.Instance));
+ }
+
+ ///
+ /// Fail message if async predicate will return true.
+ ///
+ ///
+ ///
+ /// Journal will crash and UntypedPersistentActor.OnPersistFailure will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task FailIf(Func> predicate)
+ {
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new ConnectionInterceptors.OnCondition(predicate, ConnectionInterceptors.Failure.Instance));
+ }
+
+ ///
+ /// Fail message unless predicate will return true.
+ ///
+ ///
+ ///
+ /// Journal will crash and UntypedPersistentActor.OnPersistFailure will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task FailUnless(Func predicate)
+ {
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new ConnectionInterceptors.OnCondition(predicate, ConnectionInterceptors.Failure.Instance, negate: true));
+ }
+
+ ///
+ /// Fail message unless async predicate will return true.
+ ///
+ ///
+ ///
+ /// Journal will crash and UntypedPersistentActor.OnPersistFailure will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task FailUnless(Func> predicate)
+ {
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new ConnectionInterceptors.OnCondition(predicate, ConnectionInterceptors.Failure.Instance, negate: true));
+ }
+
+ ///
+ /// Fail message after specified delay.
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// Journal will crash and UntypedPersistentActor.OnPersistFailure will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ public Task FailWithDelay(TimeSpan delay)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+
+ return SetInterceptorAsync(new ConnectionInterceptors.Delay(delay, ConnectionInterceptors.Failure.Instance));
+ }
+
+ ///
+ /// Fail message after specified delay if async predicate
+ /// will return true.
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// Journal will crash and UntypedPersistentActor.OnPersistFailure will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task FailIfWithDelay(TimeSpan delay, Func> predicate)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new ConnectionInterceptors.OnCondition(
+ predicate,
+ new ConnectionInterceptors.Delay(delay, ConnectionInterceptors.Failure.Instance)
+ ));
+ }
+
+ ///
+ /// Fail message after specified delay if predicate
+ /// will return true.
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// Journal will crash and UntypedPersistentActor.OnPersistFailure will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task FailIfWithDelay(TimeSpan delay, Func predicate)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new ConnectionInterceptors.OnCondition(
+ predicate,
+ new ConnectionInterceptors.Delay(delay, ConnectionInterceptors.Failure.Instance)
+ ));
+ }
+
+ ///
+ /// Fail message after specified delay unless predicate
+ /// will return true.
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// Journal will crash and UntypedPersistentActor.OnPersistFailure will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task FailUnlessWithDelay(TimeSpan delay, Func predicate)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new ConnectionInterceptors.OnCondition(
+ predicate,
+ new ConnectionInterceptors.Delay(delay, ConnectionInterceptors.Failure.Instance),
+ negate: true
+ ));
+ }
+
+ ///
+ /// Fail message after specified delay unless async predicate
+ /// will return true.
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// Journal will crash and UntypedPersistentActor.OnPersistFailure will be called on persistent actor.
+ ///
+ ///
+ /// Use this Journal behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task FailUnlessWithDelay(TimeSpan delay, Func> predicate)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new ConnectionInterceptors.OnCondition(
+ predicate,
+ new ConnectionInterceptors.Delay(delay, ConnectionInterceptors.Failure.Instance),
+ negate: true
+ ));
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/Journal/JournalConnectionBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/Journal/JournalConnectionBehaviorSetter.cs
new file mode 100644
index 00000000000..79cead22a67
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/Journal/JournalConnectionBehaviorSetter.cs
@@ -0,0 +1,31 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2023 Lightbend Inc.
+// Copyright (C) 2013-2023 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+using System;
+using System.Threading.Tasks;
+using Akka.Actor;
+
+namespace Akka.Persistence.TestKit;
+
+///
+/// Setter strategy for TestJournal which will set recovery interceptor.
+///
+internal class JournalConnectionBehaviorSetter : IJournalConnectionBehaviorSetter
+{
+ internal JournalConnectionBehaviorSetter(IActorRef journal)
+ {
+ _journal = journal;
+ }
+
+ private readonly IActorRef _journal;
+
+ public Task SetInterceptorAsync(IConnectionInterceptor interceptor)
+ => _journal.Ask(
+ new TestJournal.UseConnectionInterceptor(interceptor),
+ TimeSpan.FromSeconds(3)
+ );
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/Journal/JournalInterceptors.cs b/src/core/Akka.Persistence.TestKit/Journal/JournalInterceptors.cs
index 0af81c4d26c..9c9cda53501 100644
--- a/src/core/Akka.Persistence.TestKit/Journal/JournalInterceptors.cs
+++ b/src/core/Akka.Persistence.TestKit/Journal/JournalInterceptors.cs
@@ -5,35 +5,37 @@
//
//-----------------------------------------------------------------------
+using System.Threading;
+
namespace Akka.Persistence.TestKit
{
using System;
using System.Threading.Tasks;
- internal static class JournalInterceptors
+ public static class JournalInterceptors
{
- internal class Noop : IJournalInterceptor
+ public sealed class Noop : IJournalInterceptor
{
public static readonly IJournalInterceptor Instance = new Noop();
public Task InterceptAsync(IPersistentRepresentation message) => Task.FromResult(true);
}
- internal class Failure : IJournalInterceptor
+ public sealed class Failure : IJournalInterceptor
{
public static readonly IJournalInterceptor Instance = new Failure();
public Task InterceptAsync(IPersistentRepresentation message) => throw new TestJournalFailureException();
}
- internal class Rejection : IJournalInterceptor
+ public sealed class Rejection : IJournalInterceptor
{
public static readonly IJournalInterceptor Instance = new Rejection();
public Task InterceptAsync(IPersistentRepresentation message) => throw new TestJournalRejectionException();
}
- internal class Delay : IJournalInterceptor
+ public sealed class Delay : IJournalInterceptor
{
public Delay(TimeSpan delay, IJournalInterceptor next)
{
@@ -51,7 +53,7 @@ public async Task InterceptAsync(IPersistentRepresentation message)
}
}
- internal sealed class OnCondition : IJournalInterceptor
+ public sealed class OnCondition : IJournalInterceptor
{
public OnCondition(Func> predicate, IJournalInterceptor next, bool negate = false)
{
@@ -81,7 +83,7 @@ public async Task InterceptAsync(IPersistentRepresentation message)
}
}
- internal class OnType : IJournalInterceptor
+ public sealed class OnType : IJournalInterceptor
{
public OnType(Type messageType, IJournalInterceptor next)
{
@@ -102,5 +104,36 @@ public async Task InterceptAsync(IPersistentRepresentation message)
}
}
}
+
+ public sealed class CancelableDelay: IJournalInterceptor
+ {
+ public CancelableDelay(TimeSpan delay, IJournalInterceptor next, CancellationToken cancellationToken)
+ {
+ _delay = delay;
+ _next = next;
+ _cancellationToken = cancellationToken;
+ }
+
+ private readonly TimeSpan _delay;
+ private readonly IJournalInterceptor _next;
+ private readonly CancellationToken _cancellationToken;
+
+ public async Task InterceptAsync(IPersistentRepresentation message)
+ {
+ try
+ {
+ await Task.Delay(_delay, _cancellationToken);
+ }
+ catch (OperationCanceledException)
+ {
+ // no-op
+ }
+ catch (TimeoutException)
+ {
+ // no-op
+ }
+ await _next.InterceptAsync(message);
+ }
+ }
}
}
diff --git a/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs b/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs
index ec64ebdaf84..c5e77efca6d 100644
--- a/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs
+++ b/src/core/Akka.Persistence.TestKit/Journal/TestJournal.cs
@@ -22,6 +22,7 @@ public sealed class TestJournal : MemoryJournal
{
private IJournalInterceptor _writeInterceptor = JournalInterceptors.Noop.Instance;
private IJournalInterceptor _recoveryInterceptor = JournalInterceptors.Noop.Instance;
+ private IConnectionInterceptor _connectionInterceptor = ConnectionInterceptors.Noop.Instance;
protected override bool ReceivePluginInternal(object message)
{
@@ -37,6 +38,11 @@ protected override bool ReceivePluginInternal(object message)
Sender.Tell(Ack.Instance);
return true;
+ case UseConnectionInterceptor use:
+ _connectionInterceptor = use.Interceptor;
+ Sender.Tell(Ack.Instance);
+ return true;
+
default:
return base.ReceivePluginInternal(message);
}
@@ -44,6 +50,7 @@ protected override bool ReceivePluginInternal(object message)
protected override async Task> WriteMessagesAsync(IEnumerable messages)
{
+ await _connectionInterceptor.InterceptAsync();
var exceptions = new List();
foreach (var w in messages)
{
@@ -74,6 +81,7 @@ protected override async Task> WriteMessagesAsync(IEnu
public override async Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action recoveryCallback)
{
+ await _connectionInterceptor.InterceptAsync();
var highest = HighestSequenceNr(persistenceId);
if (highest != 0L && max != 0L)
{
@@ -95,6 +103,12 @@ public override async Task ReplayMessagesAsync(IActorContext context, string per
}
}
+ public override async Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr)
+ {
+ await _connectionInterceptor.InterceptAsync();
+ return await base.ReadHighestSequenceNrAsync(persistenceId, fromSequenceNr);
+ }
+
///
/// Create proxy object from journal actor reference which can alter behavior of journal.
///
@@ -128,6 +142,16 @@ public UseRecoveryInterceptor(IJournalInterceptor interceptor)
public IJournalInterceptor Interceptor { get; }
}
+ public sealed class UseConnectionInterceptor
+ {
+ public UseConnectionInterceptor(IConnectionInterceptor interceptor)
+ {
+ Interceptor = interceptor;
+ }
+
+ public IConnectionInterceptor Interceptor { get; }
+ }
+
public sealed class Ack
{
public static readonly Ack Instance = new();
@@ -145,6 +169,8 @@ public TestJournalWrapper(IActorRef actor)
public JournalWriteBehavior OnWrite => new(new JournalWriteBehaviorSetter(_actor));
public JournalRecoveryBehavior OnRecovery => new(new JournalRecoveryBehaviorSetter(_actor));
+
+ public JournalConnectionBehavior OnConnect => new(new JournalConnectionBehaviorSetter(_actor));
}
}
}
diff --git a/src/core/Akka.Persistence.TestKit/Properties/AssemblyInfo.cs b/src/core/Akka.Persistence.TestKit/Properties/AssemblyInfo.cs
deleted file mode 100644
index 5d3f46b9ab0..00000000000
--- a/src/core/Akka.Persistence.TestKit/Properties/AssemblyInfo.cs
+++ /dev/null
@@ -1,24 +0,0 @@
-//-----------------------------------------------------------------------
-//
-// Copyright (C) 2009-2024 Lightbend Inc.
-// Copyright (C) 2013-2024 .NET Foundation
-//
-//-----------------------------------------------------------------------
-
-using System.Reflection;
-using System.Runtime.CompilerServices;
-using System.Runtime.InteropServices;
-
-// General Information about an assembly is controlled through the following
-// set of attributes. Change these attribute values to modify the information
-// associated with an assembly.
-
-// Setting ComVisible to false makes the types in this assembly not visible
-// to COM components. If you need to access a type in this assembly from
-// COM, set the ComVisible attribute to true on that type.
-[assembly: ComVisible(false)]
-
-// The following GUID is for the ID of the typelib if this project is exposed to COM
-[assembly: Guid("66023c4f-f246-446d-b212-2b8f20755671")]
-
-[assembly: InternalsVisibleTo("Akka.Persistence.TestKit.Tests")]
diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/ISnapshotStoreConnectionBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/ISnapshotStoreConnectionBehaviorSetter.cs
new file mode 100644
index 00000000000..6756220c0a5
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/ISnapshotStoreConnectionBehaviorSetter.cs
@@ -0,0 +1,14 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2015 - 2024 Petabridge, LLC
+//
+// -----------------------------------------------------------------------
+
+using System.Threading.Tasks;
+
+namespace Akka.Persistence.TestKit;
+
+public interface ISnapshotStoreConnectionBehaviorSetter
+{
+ Task SetInterceptorAsync(IConnectionInterceptor interceptor);
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/ITestSnapshotStore.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/ITestSnapshotStore.cs
index 7f592c3ae21..120f32f6335 100644
--- a/src/core/Akka.Persistence.TestKit/SnapshotStore/ITestSnapshotStore.cs
+++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/ITestSnapshotStore.cs
@@ -12,5 +12,6 @@ public interface ITestSnapshotStore
SnapshotStoreSaveBehavior OnSave { get; }
SnapshotStoreLoadBehavior OnLoad { get; }
SnapshotStoreDeleteBehavior OnDelete { get; }
+ SnapshotStoreConnectionBehavior OnConnect { get; }
}
}
diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreConnectionBehavior.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreConnectionBehavior.cs
new file mode 100644
index 00000000000..dea49b944ae
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreConnectionBehavior.cs
@@ -0,0 +1,309 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2023 Lightbend Inc.
+// Copyright (C) 2013-2023 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+using System;
+using System.Threading.Tasks;
+using Akka.Persistence.Snapshot;
+
+namespace Akka.Persistence.TestKit;
+
+///
+/// Built-in SnapshotStore interceptors who will alter messages Recovery and/or Write of .
+///
+public class SnapshotStoreConnectionBehavior
+{
+ internal SnapshotStoreConnectionBehavior(ISnapshotStoreConnectionBehaviorSetter setter)
+ {
+ Setter = setter;
+ }
+
+ private ISnapshotStoreConnectionBehaviorSetter Setter { get; }
+
+ ///
+ /// Use custom, user defined interceptor.
+ ///
+ /// User defined interceptor which implements interface.
+ /// When is null.
+ public Task SetInterceptorAsync(IConnectionInterceptor interceptor)
+ {
+ if (interceptor == null) throw new ArgumentNullException(nameof(interceptor));
+
+ return Setter.SetInterceptorAsync(interceptor);
+ }
+
+ ///
+ /// Pass all messages to journal without interfering.
+ ///
+ ///
+ /// By using this interceptor all journal operations will work like
+ /// in standard .
+ ///
+ public Task Pass() => SetInterceptorAsync(ConnectionInterceptors.Noop.Instance);
+
+ ///
+ /// Delay passing all messages to journal by .
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ /// Time by which recovery operation will be delayed.
+ /// When is less or equal to .
+ public Task PassWithDelay(TimeSpan delay)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+
+ return SetInterceptorAsync(new ConnectionInterceptors.Delay(delay, ConnectionInterceptors.Noop.Instance));
+ }
+
+ ///
+ /// Always fail all messages.
+ ///
+ ///
+ ///
+ /// SnapshotStore will crash and UntypedPersistentActor.OnPersistFailure will be called on persistent actor.
+ ///
+ ///
+ /// Use this SnapshotStore behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ public Task Fail() => SetInterceptorAsync(ConnectionInterceptors.Failure.Instance);
+
+ ///
+ /// Fail message if predicate will return true.
+ ///
+ ///
+ ///
+ /// SnapshotStore will crash and UntypedPersistentActor.OnPersistFailure will be called on persistent actor.
+ ///
+ ///
+ /// Use this SnapshotStore behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task FailIf(Func predicate)
+ {
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new ConnectionInterceptors.OnCondition(predicate, ConnectionInterceptors.Failure.Instance));
+ }
+
+ ///
+ /// Fail message if async predicate will return true.
+ ///
+ ///
+ ///
+ /// SnapshotStore will crash and UntypedPersistentActor.OnPersistFailure will be called on persistent actor.
+ ///
+ ///
+ /// Use this SnapshotStore behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task FailIf(Func> predicate)
+ {
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new ConnectionInterceptors.OnCondition(predicate, ConnectionInterceptors.Failure.Instance));
+ }
+
+ ///
+ /// Fail message unless predicate will return true.
+ ///
+ ///
+ ///
+ /// SnapshotStore will crash and UntypedPersistentActor.OnPersistFailure will be called on persistent actor.
+ ///
+ ///
+ /// Use this SnapshotStore behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task FailUnless(Func predicate)
+ {
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new ConnectionInterceptors.OnCondition(predicate, ConnectionInterceptors.Failure.Instance, negate: true));
+ }
+
+ ///
+ /// Fail message unless async predicate will return true.
+ ///
+ ///
+ ///
+ /// SnapshotStore will crash and UntypedPersistentActor.OnPersistFailure will be called on persistent actor.
+ ///
+ ///
+ /// Use this SnapshotStore behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ /// When is null.
+ public Task FailUnless(Func> predicate)
+ {
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new ConnectionInterceptors.OnCondition(predicate, ConnectionInterceptors.Failure.Instance, negate: true));
+ }
+
+ ///
+ /// Fail message after specified delay.
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// SnapshotStore will crash and UntypedPersistentActor.OnPersistFailure will be called on persistent actor.
+ ///
+ ///
+ /// Use this SnapshotStore behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ public Task FailWithDelay(TimeSpan delay)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+
+ return SetInterceptorAsync(new ConnectionInterceptors.Delay(delay, ConnectionInterceptors.Failure.Instance));
+ }
+
+ ///
+ /// Fail message after specified delay if async predicate
+ /// will return true.
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// SnapshotStore will crash and UntypedPersistentActor.OnPersistFailure will be called on persistent actor.
+ ///
+ ///
+ /// Use this SnapshotStore behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task FailIfWithDelay(TimeSpan delay, Func> predicate)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new ConnectionInterceptors.OnCondition(
+ predicate,
+ new ConnectionInterceptors.Delay(delay, ConnectionInterceptors.Failure.Instance)
+ ));
+ }
+
+ ///
+ /// Fail message after specified delay if predicate
+ /// will return true.
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// SnapshotStore will crash and UntypedPersistentActor.OnPersistFailure will be called on persistent actor.
+ ///
+ ///
+ /// Use this SnapshotStore behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task FailIfWithDelay(TimeSpan delay, Func predicate)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new ConnectionInterceptors.OnCondition(
+ predicate,
+ new ConnectionInterceptors.Delay(delay, ConnectionInterceptors.Failure.Instance)
+ ));
+ }
+
+ ///
+ /// Fail message after specified delay unless predicate
+ /// will return true.
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// SnapshotStore will crash and UntypedPersistentActor.OnPersistFailure will be called on persistent actor.
+ ///
+ ///
+ /// Use this SnapshotStore behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task FailUnlessWithDelay(TimeSpan delay, Func predicate)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new ConnectionInterceptors.OnCondition(
+ predicate,
+ new ConnectionInterceptors.Delay(delay, ConnectionInterceptors.Failure.Instance),
+ negate: true
+ ));
+ }
+
+ ///
+ /// Fail message after specified delay unless async predicate
+ /// will return true.
+ ///
+ ///
+ ///
+ /// Each message will be delayed individually.
+ ///
+ ///
+ /// SnapshotStore will crash and UntypedPersistentActor.OnPersistFailure will be called on persistent actor.
+ ///
+ ///
+ /// Use this SnapshotStore behavior when it is needed to verify how well a persistent actor will handle network problems
+ /// and similar issues with underlying journal.
+ ///
+ ///
+ ///
+ ///
+ /// When is less or equal to .
+ /// When is null.
+ public Task FailUnlessWithDelay(TimeSpan delay, Func> predicate)
+ {
+ if (delay <= TimeSpan.Zero) throw new ArgumentException("Delay must be greater than zero", nameof(delay));
+ if (predicate == null) throw new ArgumentNullException(nameof(predicate));
+
+ return SetInterceptorAsync(new ConnectionInterceptors.OnCondition(
+ predicate,
+ new ConnectionInterceptors.Delay(delay, ConnectionInterceptors.Failure.Instance),
+ negate: true
+ ));
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreConnectionBehaviorSetter.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreConnectionBehaviorSetter.cs
new file mode 100644
index 00000000000..41783ff0941
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreConnectionBehaviorSetter.cs
@@ -0,0 +1,31 @@
+//-----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2023 Lightbend Inc.
+// Copyright (C) 2013-2023 .NET Foundation
+//
+//-----------------------------------------------------------------------
+
+using System;
+using System.Threading.Tasks;
+using Akka.Actor;
+
+namespace Akka.Persistence.TestKit;
+
+///
+/// Setter strategy for TestSnapshotStore which will set recovery interceptor.
+///
+internal class SnapshotStoreConnectionBehaviorSetter : ISnapshotStoreConnectionBehaviorSetter
+{
+ internal SnapshotStoreConnectionBehaviorSetter(IActorRef journal)
+ {
+ _journal = journal;
+ }
+
+ private readonly IActorRef _journal;
+
+ public Task SetInterceptorAsync(IConnectionInterceptor interceptor)
+ => _journal.Ask(
+ new TestSnapshotStore.UseConnectionInterceptor(interceptor),
+ TimeSpan.FromSeconds(3)
+ );
+}
\ No newline at end of file
diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreInterceptors.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreInterceptors.cs
index 48d09209a88..5f49a3bee1b 100644
--- a/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreInterceptors.cs
+++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/SnapshotStoreInterceptors.cs
@@ -5,28 +5,30 @@
//
//-----------------------------------------------------------------------
+using System.Threading;
+
namespace Akka.Persistence.TestKit
{
using System;
using System.Threading.Tasks;
- internal static class SnapshotStoreInterceptors
+ public static class SnapshotStoreInterceptors
{
- internal class Noop : ISnapshotStoreInterceptor
+ public sealed class Noop : ISnapshotStoreInterceptor
{
public static readonly ISnapshotStoreInterceptor Instance = new Noop();
public Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria criteria) => Task.FromResult(true);
}
- internal class Failure : ISnapshotStoreInterceptor
+ public sealed class Failure : ISnapshotStoreInterceptor
{
public static readonly ISnapshotStoreInterceptor Instance = new Failure();
public Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria criteria) => throw new TestSnapshotStoreFailureException();
}
- internal class Delay : ISnapshotStoreInterceptor
+ public sealed class Delay : ISnapshotStoreInterceptor
{
public Delay(TimeSpan delay, ISnapshotStoreInterceptor next)
{
@@ -44,7 +46,7 @@ public async Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria
}
}
- internal sealed class OnCondition : ISnapshotStoreInterceptor
+ public sealed class OnCondition : ISnapshotStoreInterceptor
{
public OnCondition(Func> predicate, ISnapshotStoreInterceptor next, bool negate = false)
{
@@ -73,5 +75,36 @@ public async Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria
}
}
}
+
+ public sealed class CancelableDelay: ISnapshotStoreInterceptor
+ {
+ public CancelableDelay(TimeSpan delay, ISnapshotStoreInterceptor next, CancellationToken cancellationToken)
+ {
+ _delay = delay;
+ _next = next;
+ _cancellationToken = cancellationToken;
+ }
+
+ private readonly TimeSpan _delay;
+ private readonly ISnapshotStoreInterceptor _next;
+ private readonly CancellationToken _cancellationToken;
+
+ public async Task InterceptAsync(string persistenceId, SnapshotSelectionCriteria criteria)
+ {
+ try
+ {
+ await Task.Delay(_delay, _cancellationToken);
+ }
+ catch (OperationCanceledException)
+ {
+ // no-op
+ }
+ catch (TimeoutException)
+ {
+ // no-op
+ }
+ await _next.InterceptAsync(persistenceId, criteria);
+ }
+ }
}
}
diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs
index 81a411dc0ff..de13fc6de64 100644
--- a/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs
+++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStore.cs
@@ -19,6 +19,7 @@ public class TestSnapshotStore : MemorySnapshotStore
private ISnapshotStoreInterceptor _saveInterceptor = SnapshotStoreInterceptors.Noop.Instance;
private ISnapshotStoreInterceptor _loadInterceptor = SnapshotStoreInterceptors.Noop.Instance;
private ISnapshotStoreInterceptor _deleteInterceptor = SnapshotStoreInterceptors.Noop.Instance;
+ private IConnectionInterceptor _connectionInterceptor = ConnectionInterceptors.Noop.Instance;
protected override bool ReceivePluginInternal(object message)
{
@@ -39,6 +40,11 @@ protected override bool ReceivePluginInternal(object message)
Sender.Tell(Ack.Instance);
return true;
+ case UseConnectionInterceptor use:
+ _connectionInterceptor = use.Interceptor;
+ Sender.Tell(Ack.Instance);
+ return true;
+
default:
return base.ReceivePluginInternal(message);
}
@@ -46,24 +52,28 @@ protected override bool ReceivePluginInternal(object message)
protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot)
{
+ await _connectionInterceptor.InterceptAsync();
await _saveInterceptor.InterceptAsync(metadata.PersistenceId, ToSelectionCriteria(metadata));
await base.SaveAsync(metadata, snapshot);
}
protected override async Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria)
{
+ await _connectionInterceptor.InterceptAsync();
await _loadInterceptor.InterceptAsync(persistenceId, criteria);
return await base.LoadAsync(persistenceId, criteria);
}
protected override async Task DeleteAsync(SnapshotMetadata metadata)
{
+ await _connectionInterceptor.InterceptAsync();
await _deleteInterceptor.InterceptAsync(metadata.PersistenceId, ToSelectionCriteria(metadata));
await base.DeleteAsync(metadata);
}
protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria)
{
+ await _connectionInterceptor.InterceptAsync();
await _deleteInterceptor.InterceptAsync(persistenceId, criteria);
await base.DeleteAsync(persistenceId, criteria);
}
@@ -113,7 +123,17 @@ public UseDeleteInterceptor(ISnapshotStoreInterceptor interceptor)
public ISnapshotStoreInterceptor Interceptor { get; }
}
+
+ public sealed class UseConnectionInterceptor
+ {
+ public UseConnectionInterceptor(IConnectionInterceptor interceptor)
+ {
+ Interceptor = interceptor;
+ }
+ public IConnectionInterceptor Interceptor { get; }
+ }
+
public sealed class Ack
{
public static readonly Ack Instance = new();
@@ -131,6 +151,7 @@ public TestSnapshotStoreWrapper(IActorRef actor)
public SnapshotStoreSaveBehavior OnSave => new(new SnapshotStoreSaveBehaviorSetter(_actor));
public SnapshotStoreLoadBehavior OnLoad => new(new SnapshotStoreLoadBehaviorSetter(_actor));
public SnapshotStoreDeleteBehavior OnDelete => new(new SnapshotStoreDeleteBehaviorSetter(_actor));
+ public SnapshotStoreConnectionBehavior OnConnect => new(new SnapshotStoreConnectionBehaviorSetter(_actor));
}
}
}
diff --git a/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStoreFailureException.cs b/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStoreFailureException.cs
index 786f1f0e483..9a6abcedbf1 100644
--- a/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStoreFailureException.cs
+++ b/src/core/Akka.Persistence.TestKit/SnapshotStore/TestSnapshotStoreFailureException.cs
@@ -7,8 +7,8 @@
namespace Akka.Persistence.TestKit
{
- using System;
- using System.Runtime.Serialization;
+using System;
+using System.Runtime.Serialization;
[Serializable]
public class TestSnapshotStoreFailureException : Exception
diff --git a/src/core/Akka.Persistence.TestKit/TestConnectionException.cs b/src/core/Akka.Persistence.TestKit/TestConnectionException.cs
new file mode 100644
index 00000000000..2fc35f80462
--- /dev/null
+++ b/src/core/Akka.Persistence.TestKit/TestConnectionException.cs
@@ -0,0 +1,18 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2015 - 2024 Petabridge, LLC
+//
+// -----------------------------------------------------------------------
+
+using System;
+using System.Runtime.Serialization;
+
+namespace Akka.Persistence.TestKit;
+
+public class TestConnectionException: Exception
+{
+ public TestConnectionException() { }
+ public TestConnectionException(string message) : base(message) { }
+ public TestConnectionException(string message, Exception inner) : base(message, inner) { }
+ protected TestConnectionException(SerializationInfo info, StreamingContext context) : base(info, context) { }
+}
\ No newline at end of file