Skip to content

Commit

Permalink
Merge pull request #83 from Arkatufus/#53_Fix_snapshot_store_sequenti…
Browse files Browse the repository at this point in the history
…al_access_is_broken

Fix broken SnapshotStore when sequential-access is turned on
  • Loading branch information
Arkatufus authored Mar 12, 2021
2 parents 399017f + 845d6a0 commit 4631086
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
//-----------------------------------------------------------------------
// <copyright file="PostgreSqlSnapshotStoreSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using Akka.Configuration;
using Akka.Persistence.TCK.Snapshot;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.PostgreSql.Tests
{
[Collection("PostgreSqlSpec")]
public class PostgreSqlSnapshotStoreSequentialAccessSpec : SnapshotStoreSpec
{
private static Config Initialize(PostgresFixture fixture)
{
//need to make sure db is created before the tests start
DbUtils.Initialize(fixture);

var config = @"
akka.persistence {
publish-plugin-commands = on
snapshot-store {
plugin = ""akka.persistence.snapshot-store.postgresql""
postgresql {
class = ""Akka.Persistence.PostgreSql.Snapshot.PostgreSqlSnapshotStore, Akka.Persistence.PostgreSql""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = snapshot_store
schema-name = public
auto-initialize = on
connection-string = """ + DbUtils.ConnectionString + @"""
sequential-access = on
}
}
}
akka.test.single-expect-default = 10s";

return ConfigurationFactory.ParseString(config);
}

public PostgreSqlSnapshotStoreSequentialAccessSpec(ITestOutputHelper output, PostgresFixture fixture)
: base(Initialize(fixture), "PostgreSqlSnapshotStoreSpec", output: output)
{
Initialize();
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
DbUtils.Clean();
}

[Fact]
public void SnapshotStore_should_save_and_overwrite_snapshot_with_same_sequence_number_unskipped()
{
TestProbe _senderProbe = CreateTestProbe();
var md = Metadata[4];
SnapshotStore.Tell(new SaveSnapshot(md, "s-5-modified"), _senderProbe.Ref);
var md2 = _senderProbe.ExpectMsg<SaveSnapshotSuccess>().Metadata;
Assert.Equal(md.SequenceNr, md2.SequenceNr);
SnapshotStore.Tell(new LoadSnapshot(Pid, new SnapshotSelectionCriteria(md.SequenceNr), long.MaxValue), _senderProbe.Ref);
var result = _senderProbe.ExpectMsg<LoadSnapshotResult>();
Assert.Equal("s-5-modified", result.Snapshot.Snapshot.ToString());
Assert.Equal(md.SequenceNr, result.Snapshot.Metadata.SequenceNr);
// metadata timestamp may have been changed
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ protected override SelectedSnapshot ReadSnapshot(DbDataReader reader)
var sequenceNr = reader.GetInt64(1);
var timestamp = new DateTime(reader.GetInt64(2));
var manifest = reader.GetString(3);
var payloadObject = reader[4];

int? serializerId = null;
Type type = null;
Expand All @@ -151,7 +152,7 @@ protected override SelectedSnapshot ReadSnapshot(DbDataReader reader)
serializerId = reader.GetInt32(5);
}

var snapshot = _deserialize(type, reader[4], manifest, serializerId);
var snapshot = _deserialize(type, payloadObject, manifest, serializerId);

var metadata = new SnapshotMetadata(persistenceId, sequenceNr, timestamp);
return new SelectedSnapshot(metadata, snapshot);
Expand Down

0 comments on commit 4631086

Please sign in to comment.