Skip to content

Commit

Permalink
Add Snapshot Metadata timestamp support
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus committed Sep 30, 2024
1 parent 5f59273 commit e6f18ed
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// -----------------------------------------------------------------------
// <copyright file="RedisSnapshotStoreSaveSnapshotSpec.cs" company="Akka.NET Project">
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Configuration;
using Akka.Persistence.TCK.Snapshot;
using Xunit;
using Xunit.Abstractions;

#nullable enable
namespace Akka.Persistence.Redis.Tests;

[Collection("RedisSpec")]
public class RedisSnapshotStoreSaveSnapshotSpec: SnapshotStoreSaveSnapshotSpec
{
public const int Database = 1;

public static Config Config(RedisFixture fixture, int id)
{
DbUtils.Initialize(fixture);

return ConfigurationFactory.ParseString($@"
akka.test.single-expect-default = 3s
akka.persistence {{
publish-plugin-commands = on
snapshot-store {{
plugin = ""akka.persistence.snapshot-store.redis""
redis {{
class = ""Akka.Persistence.Redis.Snapshot.RedisSnapshotStore, Akka.Persistence.Redis""
configuration-string = ""{fixture.ConnectionString}""
plugin-dispatcher = ""akka.actor.default-dispatcher""
database = ""{id}""
}}
}}
}}
akka.actor {{
serializers {{
persistence-snapshot = ""Akka.Persistence.Redis.Serialization.PersistentSnapshotSerializer, Akka.Persistence.Redis""
}}
serialization-bindings {{
""Akka.Persistence.SelectedSnapshot, Akka.Persistence"" = persistence-snapshot
}}
serialization-identifiers {{
""Akka.Persistence.Redis.Serialization.PersistentSnapshotSerializer, Akka.Persistence.Redis"" = 48
}}
}}").WithFallback(RedisPersistence.DefaultConfig());
}

public RedisSnapshotStoreSaveSnapshotSpec(ITestOutputHelper output, RedisFixture fixture)
: base(Config(fixture, Database), nameof(RedisSnapshotStoreSpec), output)
{
RedisPersistence.Get(Sys);
}

protected override void AfterAll()
{
base.AfterAll();
DbUtils.Clean(Database);
}

}
29 changes: 27 additions & 2 deletions src/Akka.Persistence.Redis/Snapshot/RedisSnapshotStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,33 @@ protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot)

protected override async Task DeleteAsync(SnapshotMetadata metadata)
{
await Database.SortedSetRemoveRangeByScoreAsync(GetSnapshotKey(metadata.PersistenceId, IsClustered), metadata.SequenceNr,
metadata.SequenceNr);
if(metadata.Timestamp == DateTime.MinValue)
{
await Database.SortedSetRemoveRangeByScoreAsync(
GetSnapshotKey(metadata.PersistenceId, IsClustered),
metadata.SequenceNr,
metadata.SequenceNr);
return;
}

var snapshots = await Database.SortedSetRangeByScoreAsync(
key: GetSnapshotKey(metadata.PersistenceId, IsClustered),
start: metadata.SequenceNr,
stop: 0L,
exclude: Exclude.None,
order: Order.Descending);

var found = snapshots
.Select(c => PersistentFromBytes(c))
.Where(snapshot => snapshot.Metadata.Timestamp <= metadata.Timestamp &&
snapshot.Metadata.SequenceNr == metadata.SequenceNr)
.Select(s => _database.Value.SortedSetRemoveRangeByScoreAsync(
key: GetSnapshotKey(metadata.PersistenceId, IsClustered),
start: s.Metadata.SequenceNr,
stop: s.Metadata.SequenceNr))
.ToArray();

await Task.WhenAll(found);
}

protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria)
Expand Down

0 comments on commit e6f18ed

Please sign in to comment.