diff --git a/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/SqliteSnapshotStoreSaveSnapshotSpec.cs b/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/SqliteSnapshotStoreSaveSnapshotSpec.cs new file mode 100644 index 00000000000..a5d0932bd94 --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.Sqlite.Tests/SqliteSnapshotStoreSaveSnapshotSpec.cs @@ -0,0 +1,45 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using Akka.Configuration; +using Akka.Persistence.TCK.Snapshot; +using Akka.Util.Internal; +using Xunit.Abstractions; + +namespace Akka.Persistence.Sqlite.Tests; + +public class SqliteSnapshotStoreSaveSnapshotSpec: SnapshotStoreSaveSnapshotSpec +{ + private static readonly AtomicCounter Counter = new(0); + + public SqliteSnapshotStoreSaveSnapshotSpec(ITestOutputHelper output) + : base(CreateSpecConfig("Filename=file:memdb-snapshot-" + Counter.IncrementAndGet() + ".db;Mode=Memory;Cache=Shared"), "SqliteSnapshotStoreSpec", output) + { + SqlitePersistence.Get(Sys); + } + + private static Config CreateSpecConfig(string connectionString) + { + return ConfigurationFactory.ParseString( + $$""" + akka.persistence { + publish-plugin-commands = on + snapshot-store { + plugin = "akka.persistence.snapshot-store.sqlite" + sqlite { + class = "Akka.Persistence.Sqlite.Snapshot.SqliteSnapshotStore, Akka.Persistence.Sqlite" + plugin-dispatcher = "akka.actor.default-dispatcher" + table-name = snapshot_store + auto-initialize = on + connection-string = "{{connectionString}}" + } + } + } + """); + } + +} \ No newline at end of file diff --git a/src/core/Akka.Persistence.TCK/Snapshot/SnapshotStoreSaveSnapshotSpec.cs b/src/core/Akka.Persistence.TCK/Snapshot/SnapshotStoreSaveSnapshotSpec.cs new file mode 100644 index 00000000000..035ae4a1ec4 --- /dev/null +++ b/src/core/Akka.Persistence.TCK/Snapshot/SnapshotStoreSaveSnapshotSpec.cs @@ -0,0 +1,267 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Configuration; +using Akka.Persistence.Snapshot; +using Akka.Persistence.TCK.Serialization; +using Akka.TestKit; +using FluentAssertions; +using FluentAssertions.Extensions; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.TCK.Snapshot; + +/// +/// This spec aims to verify custom SaveSnapshot implementations. +/// This is an optional spec that checks if SaveSnapshot supports: +/// 1. Saving snapshots with the same sequence number (upsert operation) +/// 2. Saving snapshots concurrently and be able to save them in correct order +/// +public class SnapshotStoreSaveSnapshotSpec : PluginSpec +{ + private const int RepeatCount = 200; + + private const string SpecConfigTemplate = """ +akka.persistence.publish-plugin-commands = on +akka.persistence.snapshot-store { + plugin = "akka.persistence.snapshot-store.my" + my { + class = "TestPersistencePlugin.MySnapshotStore, TestPersistencePlugin" + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + } +} +akka.actor { + serializers { + persistence-tck-test="Akka.Persistence.TCK.Serialization.TestSerializer, Akka.Persistence.TCK" + } + serialization-bindings { + "Akka.Persistence.TCK.Serialization.TestPayload, Akka.Persistence.TCK" = persistence-tck-test + } +} +"""; + + private static readonly Config Config = + ConfigurationFactory.ParseString(SpecConfigTemplate); + + private string _guid = string.Empty; + + protected override bool SupportsSerialization => true; + protected virtual bool SupportsConcurrentSaves => false; + protected string PersistenceId => $"ac-{_guid}"; + protected readonly TestProbe SenderProbe; + + protected SnapshotStoreSaveSnapshotSpec(Config config = null, string actorSystemName = null, ITestOutputHelper output = null) + : base(FromConfig(config).WithFallback(Config), actorSystemName ?? "SnapshotStoreSpec", output) + { + SenderProbe = CreateTestProbe(); + } + + [Fact(DisplayName = "Rapid multiple SaveSnapshot invocation with no journal persist should only save the latest snapshot")] + public async Task MultipleSnapshotsWithNoPersistTest() + { + if(!SupportsConcurrentSaves) + return; + + foreach (var iteration in Enumerable.Range(1, RepeatCount)) + { + NewPersistenceId(); + + var persistenceActor = CreatePersistenceActor(Sys); + + // No persist call before SaveSnapshot burst + persistenceActor.Tell(new TakeSnapshotsWithValues(new []{ new[]{0}, new[]{1}, new[]{2}, new []{3} }), SenderProbe); + await SenderProbe.ExpectMsgAsync(); + + await StopActorAsync(persistenceActor); + persistenceActor = CreatePersistenceActor(Sys); + + persistenceActor.Tell(GetAll.Instance, SenderProbe); + var result = await SenderProbe.ExpectMsgAsync(); + await StopActorAsync(persistenceActor); + + result.Length.Should().Be(1, $"expecting an array with length 1 (on iteration {iteration}/{RepeatCount})"); + result[0].Should().Be(3, $"recovered snapshot should be the last snapshot (on iteration {iteration}/{RepeatCount})"); + + Output.WriteLine($"Iteration: {iteration}"); + } + } + + [Fact(DisplayName = "Rapid multiple SaveSnapshot invocation with journal persist should only save the latest snapshot")] + public async Task MultipleSnapshotsWithPersistTest() + { + if(!SupportsConcurrentSaves) + return; + + foreach (var iteration in Enumerable.Range(1, RepeatCount)) + { + NewPersistenceId(); + + var persistenceActor = CreatePersistenceActor(Sys); + + // persist 2 events + persistenceActor.Tell(1, SenderProbe); + SenderProbe.ExpectMsg(); + persistenceActor.Tell(2, SenderProbe); + SenderProbe.ExpectMsg(); + + persistenceActor.Tell(new TakeSnapshotsWithValues(new []{ new[]{0}, new[]{1}, new[]{2}, new []{3} }), SenderProbe); + await SenderProbe.ExpectMsgAsync(); + + await StopActorAsync(persistenceActor); + persistenceActor = CreatePersistenceActor(Sys); + + persistenceActor.Tell(GetAll.Instance, SenderProbe); + var result = await SenderProbe.ExpectMsgAsync(); + await StopActorAsync(persistenceActor); + + result.Length.Should().Be(1, $"expecting an array with length 1 (on iteration {iteration}/{RepeatCount})"); + result[0].Should().Be(3, $"recovered snapshot should be the last snapshot (on iteration {iteration}/{RepeatCount})"); + + Output.WriteLine($"Iteration: {iteration}"); + } + } + + [Fact(DisplayName = "Multiple SaveSnapshot invocation with the same sequence number should not throw")] + public async Task MultipleSnapshotsWithSameSeqNo() + { + var persistence = Persistence.Instance.Apply(Sys); + var snapshotStore = persistence.SnapshotStoreFor(null); + var snap = new TestPayload(SenderProbe.Ref); + + var metadata = new SnapshotMetadata(PersistenceId, 3, DateTime.Now); + snapshotStore.Tell(new SaveSnapshot(metadata, snap), SenderProbe); + var success = await SenderProbe.ExpectMsgAsync(10.Minutes()); + success.Metadata.PersistenceId.Should().Be(metadata.PersistenceId); + success.Metadata.Timestamp.Should().Be(metadata.Timestamp); + success.Metadata.SequenceNr.Should().Be(metadata.SequenceNr); + + metadata = new SnapshotMetadata(PersistenceId, 3, DateTime.Now); + snapshotStore.Tell(new SaveSnapshot(metadata, 3), SenderProbe); + success = await SenderProbe.ExpectMsgAsync(); + success.Metadata.PersistenceId.Should().Be(metadata.PersistenceId); + success.Metadata.Timestamp.Should().Be(metadata.Timestamp); + success.Metadata.SequenceNr.Should().Be(metadata.SequenceNr); + } + + #region Utility + + private void NewPersistenceId() + { + _guid = Guid.NewGuid().ToString("N")[^8..]; + } + + private IActorRef CreatePersistenceActor(ActorSystem sys) + => sys.ActorOf(Props.Create(() => new MyPersistenceActor(PersistenceId)), "persistence-actor-1"); + + private async Task StopActorAsync(IActorRef actor) + { + await WatchAsync(actor); + actor.Tell(PoisonPill.Instance); + await ExpectTerminatedAsync(actor); + await UnwatchAsync(actor); + } + #endregion + + #region Classes + private sealed class MyPersistenceActor : ReceivePersistentActor + { + private List _values = new(); + private IActorRef? _sender; + private int _snapshotCount; + private int _savedSnapshotCount; + + public MyPersistenceActor(string persistenceId) + { + PersistenceId = persistenceId; + + Recover( + offer => + { + if (offer.Snapshot is IEnumerable ints) + _values = ints.ToList(); + }); + + Recover(_values.Add); + + Command( i => + { + _sender = Sender; + Persist(i, _ => + { + _values.Add(i); + _sender.Tell(Ack.Instance); + }); + }); + + Command(_ => SaveSnapshot(_values)); + + Command(msg => SaveSnapshots(new []{ msg.Values })); + + Command(msg => SaveSnapshots(msg.Values)); + + Command(_ => Sender.Tell(_values.ToArray())); + + Command( + _ => + { + _savedSnapshotCount++; + if(_savedSnapshotCount == _snapshotCount) + _sender.Tell(SnapshotAck.Instance); + }); + } + + public override string PersistenceId { get; } + + private void SaveSnapshots(int[][] snapshots) + { + _sender = Sender; + _snapshotCount = snapshots.Length; + _savedSnapshotCount = 0; + foreach (var snapshot in snapshots) + { + _values = snapshot.ToList(); + SaveSnapshot(_values); + } + } + } + + private sealed class Ack + { + public static readonly Ack Instance = new Ack(); + private Ack() { } + } + + private sealed class SnapshotAck + { + public static readonly SnapshotAck Instance = new(); + private SnapshotAck() { } + } + + private sealed class GetAll + { + public static readonly GetAll Instance = new(); + private GetAll() { } + } + + private sealed class TakeSnapshot + { + public static readonly TakeSnapshot Instance = new(); + private TakeSnapshot() { } + } + + private sealed record TakeSnapshotWithValue(int[] Values); + + private sealed record TakeSnapshotsWithValues(int[][] Values); + + #endregion +} \ No newline at end of file