Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Akka.Persistence.TestKit #7324

Merged
merged 2 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
// </copyright>
//-----------------------------------------------------------------------

using System.Threading;
using FluentAssertions.Extensions;

namespace Akka.Persistence.TestKit.Tests
{
using System;
Expand Down Expand Up @@ -58,6 +61,28 @@ public async Task delay_must_call_next_interceptor_after_specified_delay()
probe.CalledAt.Should().BeOnOrAfter(startedAt + duration - epsilon);
}

[Fact]
public async Task cancelable_delay_must_call_next_interceptor_immediately_after_cancellation()
{
var totalDuration = 400.Milliseconds();
var delayDuration = 200.Milliseconds();
var epsilon = TimeSpan.FromMilliseconds(50);
using var cts = new CancellationTokenSource();
var probe = new InterceptorProbe();
var delay = new JournalInterceptors.CancelableDelay(totalDuration, probe, cts.Token);

var startedAt = DateTime.Now;
var task = delay.InterceptAsync(null);
await Task.Delay(delayDuration - epsilon);

probe.WasCalled.Should().BeFalse();
cts.Cancel();
await task;

probe.WasCalled.Should().BeTrue();
probe.CalledAt.Should().BeOnOrAfter(startedAt + delayDuration - epsilon);
}

[Fact]
public async Task on_type_must_call_next_interceptor_when_message_is_exactly_awaited_type()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
// </copyright>
//-----------------------------------------------------------------------

using System.Threading;
using FluentAssertions.Extensions;

namespace Akka.Persistence.TestKit.Tests
{
using System;
Expand Down Expand Up @@ -48,6 +51,28 @@ public async Task delay_must_call_next_interceptor_after_specified_delay()
probe.CalledAt.Should().BeOnOrAfter(startedAt + duration - epsilon);
}

[Fact]
public async Task cancelable_delay_must_call_next_interceptor_immediately_after_cancellation()
{
var totalDuration = 400.Milliseconds();
var delayDuration = 200.Milliseconds();
var epsilon = TimeSpan.FromMilliseconds(50);
using var cts = new CancellationTokenSource();
var probe = new InterceptorProbe();
var delay = new SnapshotStoreInterceptors.CancelableDelay(totalDuration, probe, cts.Token);

var startedAt = DateTime.Now;
var task = delay.InterceptAsync(null, null);
await Task.Delay(delayDuration - epsilon);

probe.WasCalled.Should().BeFalse();
cts.Cancel();
await task;

probe.WasCalled.Should().BeTrue();
probe.CalledAt.Should().BeOnOrAfter(startedAt + delayDuration - epsilon);
}

[Fact]
public async Task on_condition_must_accept_sync_lambda()
{
Expand Down
20 changes: 16 additions & 4 deletions src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace Akka.Persistence.TestKit
/// This class represents an Akka.NET Persistence TestKit that uses <a href="https://xunit.github.io/">xUnit</a>
/// as its testing framework.
/// </summary>
public abstract class PersistenceTestKit : TestKit
public class PersistenceTestKit : TestKit
{
/// <summary>
/// Create a new instance of the <see cref="PersistenceTestKit"/> class.
Expand All @@ -30,7 +30,7 @@ public abstract class PersistenceTestKit : TestKit
/// <param name="setup">Test ActorSystem configuration</param>
/// <param name="actorSystemName">Optional: The name of the actor system</param>
/// <param name="output">TBD</param>
protected PersistenceTestKit(ActorSystemSetup setup, string actorSystemName = null, ITestOutputHelper output = null)
public PersistenceTestKit(ActorSystemSetup setup, string actorSystemName = null, ITestOutputHelper output = null)
: base(GetConfig(setup), actorSystemName, output)
{
var persistenceExtension = Persistence.Instance.Apply(Sys);
Expand All @@ -49,7 +49,7 @@ protected PersistenceTestKit(ActorSystemSetup setup, string actorSystemName = nu
/// <param name="config">Test ActorSystem configuration</param>
/// <param name="actorSystemName">Optional: The name of the actor system</param>
/// <param name="output">TBD</param>
protected PersistenceTestKit(Config config, string actorSystemName = null, ITestOutputHelper output = null)
public PersistenceTestKit(Config config, string actorSystemName = null, ITestOutputHelper output = null)
: base(GetConfig(config), actorSystemName, output)
{
var persistenceExtension = Persistence.Instance.Apply(Sys);
Expand All @@ -61,13 +61,25 @@ protected PersistenceTestKit(Config config, string actorSystemName = null, ITest
Snapshots = TestSnapshotStore.FromRef(SnapshotsActorRef);
}

public PersistenceTestKit(ActorSystem actorSystem, ITestOutputHelper output = null)
: base(actorSystem, output)
{
var persistenceExtension = Persistence.Instance.Apply(Sys);

JournalActorRef = persistenceExtension.JournalFor(null);
Journal = TestJournal.FromRef(JournalActorRef);

SnapshotsActorRef = persistenceExtension.SnapshotStoreFor(null);
Snapshots = TestSnapshotStore.FromRef(SnapshotsActorRef);
}

/// <summary>
/// Create a new instance of the <see cref="PersistenceTestKit"/> class.
/// A new system with the default configuration will be created.
/// </summary>
/// <param name="actorSystemName">Optional: The name of the actor system</param>
/// <param name="output">TBD</param>
protected PersistenceTestKit(string actorSystemName = null, ITestOutputHelper output = null)
public PersistenceTestKit(string actorSystemName = null, ITestOutputHelper output = null)
: this(Config.Empty, actorSystemName, output)
{
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=connection/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=journal/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=snapshotstore/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
107 changes: 107 additions & 0 deletions src/core/Akka.Persistence.TestKit/ConnectionInterceptors.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// -----------------------------------------------------------------------
// <copyright file="ConnectionInterceptors.cs" company="Petabridge, LLC">
// Copyright (C) 2015 - 2024 Petabridge, LLC <https://petabridge.com>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Akka.Persistence.TestKit;

public static class ConnectionInterceptors
{
public sealed class Noop : IConnectionInterceptor
{
public static readonly IConnectionInterceptor Instance = new Noop();

public Task InterceptAsync() => Task.FromResult(true);
}

public sealed class Failure : IConnectionInterceptor
{
public static readonly IConnectionInterceptor Instance = new Failure();

public Task InterceptAsync() => throw new TestConnectionException();
}

public sealed class Delay : IConnectionInterceptor
{
public Delay(TimeSpan delay, IConnectionInterceptor next)
{
_delay = delay;
_next = next;
}

private readonly TimeSpan _delay;
private readonly IConnectionInterceptor _next;

public async Task InterceptAsync()
{
await Task.Delay(_delay);
await _next.InterceptAsync();
}
}

public sealed class OnCondition : IConnectionInterceptor
{
public OnCondition(Func<Task<bool>> predicate, IConnectionInterceptor next, bool negate = false)
{
_predicate = predicate;
_next = next;
_negate = negate;
}

public OnCondition(Func<bool> predicate, IConnectionInterceptor next, bool negate = false)
{
_predicate = () => Task.FromResult(predicate());
_next = next;
_negate = negate;
}

private readonly Func<Task<bool>> _predicate;
private readonly IConnectionInterceptor _next;
private readonly bool _negate;

public async Task InterceptAsync()
{
var result = await _predicate();
if ((_negate && !result) || (!_negate && result))
{
await _next.InterceptAsync();
}
}
}

public sealed class CancelableDelay: IConnectionInterceptor
{
public CancelableDelay(TimeSpan delay, IConnectionInterceptor next, CancellationToken cancellationToken)
{
_delay = delay;
_next = next;
_cancellationToken = cancellationToken;
}

private readonly TimeSpan _delay;
private readonly IConnectionInterceptor _next;
private readonly CancellationToken _cancellationToken;

public async Task InterceptAsync()
{
try
{
await Task.Delay(_delay, _cancellationToken);
}
catch (OperationCanceledException)
{
// no-op
}
catch (TimeoutException)
{
// no-op
}
await _next.InterceptAsync();
}
}
}
14 changes: 14 additions & 0 deletions src/core/Akka.Persistence.TestKit/IConnectionInterceptor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// -----------------------------------------------------------------------
// <copyright file="IConnectionInterceptor.cs" company="Petabridge, LLC">
// Copyright (C) 2015 - 2024 Petabridge, LLC <https://petabridge.com>
// </copyright>
// -----------------------------------------------------------------------

using System.Threading.Tasks;

namespace Akka.Persistence.TestKit;

public interface IConnectionInterceptor
{
Task InterceptAsync();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// -----------------------------------------------------------------------
// <copyright file="IConnectionBehaviorSetter.cs" company="Petabridge, LLC">
// Copyright (C) 2015 - 2024 Petabridge, LLC <https://petabridge.com>
// </copyright>
// -----------------------------------------------------------------------

using System.Threading.Tasks;

namespace Akka.Persistence.TestKit;

public interface IJournalConnectionBehaviorSetter
{
Task SetInterceptorAsync(IConnectionInterceptor interceptor);
}
2 changes: 2 additions & 0 deletions src/core/Akka.Persistence.TestKit/Journal/ITestJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@ public interface ITestJournal
/// List of interceptors to alter recovery behavior of proxied journal.
/// </summary>
JournalRecoveryBehavior OnRecovery { get; }

JournalConnectionBehavior OnConnect { get; }
}
}
Loading