From e461aab35f87521d1fc5c5862a41fe4a50f6af0e Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sat, 13 Mar 2021 03:19:54 +0700 Subject: [PATCH] Fix broken SnapshotStore when sequential-access is turned on --- ...greSqlSnapshotStoreSequentialAccessSpec.cs | 72 +++++++++++++++++++ .../Snapshot/PostgreSqlQueryExecutor.cs | 3 +- 2 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 src/Akka.Persistence.PostgreSql.Tests/PostgreSqlSnapshotStoreSequentialAccessSpec.cs diff --git a/src/Akka.Persistence.PostgreSql.Tests/PostgreSqlSnapshotStoreSequentialAccessSpec.cs b/src/Akka.Persistence.PostgreSql.Tests/PostgreSqlSnapshotStoreSequentialAccessSpec.cs new file mode 100644 index 0000000..42e6fb7 --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Tests/PostgreSqlSnapshotStoreSequentialAccessSpec.cs @@ -0,0 +1,72 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2016 Lightbend Inc. +// Copyright (C) 2013-2016 Akka.NET project +// +//----------------------------------------------------------------------- + +using Akka.Configuration; +using Akka.Persistence.TCK.Snapshot; +using Akka.TestKit; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.PostgreSql.Tests +{ + [Collection("PostgreSqlSpec")] + public class PostgreSqlSnapshotStoreSequentialAccessSpec : SnapshotStoreSpec + { + private static Config Initialize(PostgresFixture fixture) + { + //need to make sure db is created before the tests start + DbUtils.Initialize(fixture); + + var config = @" + akka.persistence { + publish-plugin-commands = on + snapshot-store { + plugin = ""akka.persistence.snapshot-store.postgresql"" + postgresql { + class = ""Akka.Persistence.PostgreSql.Snapshot.PostgreSqlSnapshotStore, Akka.Persistence.PostgreSql"" + plugin-dispatcher = ""akka.actor.default-dispatcher"" + table-name = snapshot_store + schema-name = public + auto-initialize = on + connection-string = """ + DbUtils.ConnectionString + @""" + sequential-access = on + } + } + } + akka.test.single-expect-default = 10s"; + + return ConfigurationFactory.ParseString(config); + } + + public PostgreSqlSnapshotStoreSequentialAccessSpec(ITestOutputHelper output, PostgresFixture fixture) + : base(Initialize(fixture), "PostgreSqlSnapshotStoreSpec", output: output) + { + Initialize(); + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + DbUtils.Clean(); + } + + [Fact] + public void SnapshotStore_should_save_and_overwrite_snapshot_with_same_sequence_number_unskipped() + { + TestProbe _senderProbe = CreateTestProbe(); + var md = Metadata[4]; + SnapshotStore.Tell(new SaveSnapshot(md, "s-5-modified"), _senderProbe.Ref); + var md2 = _senderProbe.ExpectMsg().Metadata; + Assert.Equal(md.SequenceNr, md2.SequenceNr); + SnapshotStore.Tell(new LoadSnapshot(Pid, new SnapshotSelectionCriteria(md.SequenceNr), long.MaxValue), _senderProbe.Ref); + var result = _senderProbe.ExpectMsg(); + Assert.Equal("s-5-modified", result.Snapshot.Snapshot.ToString()); + Assert.Equal(md.SequenceNr, result.Snapshot.Metadata.SequenceNr); + // metadata timestamp may have been changed + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.PostgreSql/Snapshot/PostgreSqlQueryExecutor.cs b/src/Akka.Persistence.PostgreSql/Snapshot/PostgreSqlQueryExecutor.cs index c0c4018..7fa7e4a 100644 --- a/src/Akka.Persistence.PostgreSql/Snapshot/PostgreSqlQueryExecutor.cs +++ b/src/Akka.Persistence.PostgreSql/Snapshot/PostgreSqlQueryExecutor.cs @@ -139,6 +139,7 @@ protected override SelectedSnapshot ReadSnapshot(DbDataReader reader) var sequenceNr = reader.GetInt64(1); var timestamp = new DateTime(reader.GetInt64(2)); var manifest = reader.GetString(3); + var payloadObject = reader[4]; int? serializerId = null; Type type = null; @@ -151,7 +152,7 @@ protected override SelectedSnapshot ReadSnapshot(DbDataReader reader) serializerId = reader.GetInt32(5); } - var snapshot = _deserialize(type, reader[4], manifest, serializerId); + var snapshot = _deserialize(type, payloadObject, manifest, serializerId); var metadata = new SnapshotMetadata(persistenceId, sequenceNr, timestamp); return new SelectedSnapshot(metadata, snapshot);