diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index a227772..e590b96 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,6 +1,2 @@ -#### 1.3.14 October 04 2019 #### -You can see [the full set of changes for Akka.Persistence.MongoDb v1.3.14 here](https://github.com/akkadotnet/Akka.Persistence.MongoDB/milestone/2). - -This PR fixes a number of problems stemming from implementations of Akka.Persistence.Query implementations that were incorrect. - -**Note: we're working on [adding support for future versions of Akka.Persistence.MongoDB and you should read them here if you plan on continuing to use the plugin](https://github.com/akkadotnet/Akka.Persistence.MongoDB/issues/72).** \ No newline at end of file +#### 1.4.0-beta1 October 30 2019 #### +Beta release of Akka.Persistence.MongoDB which implements the [new standardized Akka.Persistence serialization paradigm](https://github.com/akkadotnet/Akka.Persistence.MongoDB/issues/72) going forward. Has full backwards compatibility for reading events which were written in 1.3.[0-14] style serialization. \ No newline at end of file diff --git a/build-system/windows-release.yaml b/build-system/windows-release.yaml index a54ca91..6927c56 100644 --- a/build-system/windows-release.yaml +++ b/build-system/windows-release.yaml @@ -26,7 +26,7 @@ steps: displayName: 'FAKE Build' inputs: filename: build.cmd - arguments: 'nuget nugetpublishurl=https://www.nuget.org/api/v2/package nugetkey=$(nugetKey)' + arguments: 'nuget nugetpublishurl=https://www.nuget.org/api/v2/package nugetkey=$(nugetKey) nugetprerelease=dev' - task: GitHubRelease@0 displayName: 'GitHub release (create)' diff --git a/build.fsx b/build.fsx index 720d15a..ae2608b 100644 --- a/build.fsx +++ b/build.fsx @@ -26,8 +26,9 @@ let preReleaseVersionSuffix = "beta" + (if (not (buildNumber = "0")) then (build let versionSuffix = match (getBuildParam "nugetprerelease") with | "dev" -> preReleaseVersionSuffix - | _ -> "" - + | "" -> "" + | str -> str + let releaseNotes = File.ReadLines "./RELEASE_NOTES.md" |> ReleaseNotesHelper.parseReleaseNotes diff --git a/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj b/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj index 9ae4ee6..9d35353 100644 --- a/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj +++ b/src/Akka.Persistence.MongoDb.Tests/Akka.Persistence.MongoDb.Tests.csproj @@ -2,8 +2,7 @@ - netcoreapp2.0 - + netcoreapp2.1 diff --git a/src/Akka.Persistence.MongoDb.Tests/Bug61FixSpec.cs b/src/Akka.Persistence.MongoDb.Tests/Bug61FixSpec.cs index 25e39db..a20624d 100644 --- a/src/Akka.Persistence.MongoDb.Tests/Bug61FixSpec.cs +++ b/src/Akka.Persistence.MongoDb.Tests/Bug61FixSpec.cs @@ -206,4 +206,4 @@ class = ""Akka.Persistence.MongoDb.Query.MongoDbReadJournalProvider, Akka.Persis return ConfigurationFactory.ParseString(specString); } } -} +} \ No newline at end of file diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbJournalSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbJournalSpec.cs index 876657b..38f059a 100644 --- a/src/Akka.Persistence.MongoDb.Tests/MongoDbJournalSpec.cs +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbJournalSpec.cs @@ -41,36 +41,4 @@ class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.Mong return ConfigurationFactory.ParseString(specString); } } - - [Collection("MongoDbSpec")] - public class MongoDbBinaryJournalSpec : JournalSpec, IClassFixture - { - protected override bool SupportsRejectingNonSerializableObjects { get; } = false; - - public MongoDbBinaryJournalSpec(DatabaseFixture databaseFixture) : base(CreateSpecConfig(databaseFixture), "MongoDbJournalSpec") - { - Initialize(); - } - - private static Config CreateSpecConfig(DatabaseFixture databaseFixture) - { - var specString = @" - akka.test.single-expect-default = 3s - akka.persistence { - publish-plugin-commands = on - journal { - plugin = ""akka.persistence.journal.mongodb"" - mongodb { - class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"" - connection-string = """ + databaseFixture.ConnectionString + @""" - auto-initialize = on - collection = ""EventJournal"" - stored-as = binary - } - } - }"; - - return ConfigurationFactory.ParseString(specString); - } - } } diff --git a/src/Akka.Persistence.MongoDb.Tests/MongoDbSettingsSpec.cs b/src/Akka.Persistence.MongoDb.Tests/MongoDbSettingsSpec.cs index 51409ab..ce5f864 100644 --- a/src/Akka.Persistence.MongoDb.Tests/MongoDbSettingsSpec.cs +++ b/src/Akka.Persistence.MongoDb.Tests/MongoDbSettingsSpec.cs @@ -22,7 +22,6 @@ public void Mongo_JournalSettings_must_have_default_values() mongoPersistence.JournalSettings.AutoInitialize.Should().BeFalse(); mongoPersistence.JournalSettings.Collection.Should().Be("EventJournal"); mongoPersistence.JournalSettings.MetadataCollection.Should().Be("Metadata"); - mongoPersistence.JournalSettings.StoredAs.Should().BeOfType(); } [Fact] @@ -33,7 +32,6 @@ public void Mongo_SnapshotStoreSettingsSettings_must_have_default_values() mongoPersistence.SnapshotStoreSettings.ConnectionString.Should().Be(string.Empty); mongoPersistence.SnapshotStoreSettings.AutoInitialize.Should().BeFalse(); mongoPersistence.SnapshotStoreSettings.Collection.Should().Be("SnapshotStore"); - mongoPersistence.SnapshotStoreSettings.StoredAs.Should().BeOfType(); } } } diff --git a/src/Akka.Persistence.MongoDb.Tests/Serialization/MongoDbJournalSerializationSpec.cs b/src/Akka.Persistence.MongoDb.Tests/Serialization/MongoDbJournalSerializationSpec.cs new file mode 100644 index 0000000..2f7455a --- /dev/null +++ b/src/Akka.Persistence.MongoDb.Tests/Serialization/MongoDbJournalSerializationSpec.cs @@ -0,0 +1,51 @@ +using System.Collections.Generic; +using Akka.Configuration; +using Akka.Persistence.TCK.Serialization; +using Akka.Util.Internal; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.MongoDb.Tests.Serialization +{ + [Collection("MongoDbSpec")] + public class MongoDbJournalSerializationSpec : JournalSerializationSpec, IClassFixture + { + public static readonly AtomicCounter Counter = new AtomicCounter(0); + private readonly ITestOutputHelper _output; + + public MongoDbJournalSerializationSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) + : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), nameof(MongoDbJournalSerializationSpec), output) + { + _output = output; + output.WriteLine(databaseFixture.ConnectionString + Counter.Current); + } + + private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id) + { + var specString = @" + akka.test.single-expect-default = 3s + akka.persistence { + publish-plugin-commands = on + journal { + plugin = ""akka.persistence.journal.mongodb"" + mongodb { + class = ""Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"" + connection-string = """ + databaseFixture.ConnectionString + @""" + auto-initialize = on + collection = ""EventJournal"" + stored-as = object + } + } + }"; + + return ConfigurationFactory.ParseString(specString); + } + + + [Fact(Skip = "Waiting on better error messages")] + public override void Journal_should_serialize_Persistent_with_EventAdapter_manifest() + { + base.Journal_should_serialize_Persistent_with_EventAdapter_manifest(); + } + } +} diff --git a/src/Akka.Persistence.MongoDb.Tests/Serialization/MongoDbSnapshotStoreSerializationSpec.cs b/src/Akka.Persistence.MongoDb.Tests/Serialization/MongoDbSnapshotStoreSerializationSpec.cs new file mode 100644 index 0000000..72b77eb --- /dev/null +++ b/src/Akka.Persistence.MongoDb.Tests/Serialization/MongoDbSnapshotStoreSerializationSpec.cs @@ -0,0 +1,42 @@ +using Akka.Configuration; +using Akka.Persistence.TCK.Serialization; +using Akka.Util.Internal; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.MongoDb.Tests.Serialization +{ + [Collection("MongoDbSpec")] + public class MongoDbSnapshotStoreSerializationSpec : SnapshotStoreSerializationSpec, IClassFixture + { + public static readonly AtomicCounter Counter = new AtomicCounter(0); + private readonly ITestOutputHelper _output; + + public MongoDbSnapshotStoreSerializationSpec(ITestOutputHelper output, DatabaseFixture databaseFixture) + : base(CreateSpecConfig(databaseFixture, Counter.GetAndIncrement()), nameof(MongoDbSnapshotStoreSerializationSpec), output) + { + _output = output; + output.WriteLine(databaseFixture.ConnectionString + Counter.Current); + } + + private static Config CreateSpecConfig(DatabaseFixture databaseFixture, int id) + { + var specString = @" + akka.test.single-expect-default = 3s + akka.persistence { + publish-plugin-commands = on + snapshot-store { + plugin = ""akka.persistence.snapshot-store.mongodb"" + mongodb { + class = ""Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"" + connection-string = """ + databaseFixture.ConnectionString + id + @""" + auto-initialize = on + collection = ""SnapshotStore"" + } + } + }"; + + return ConfigurationFactory.ParseString(specString); + } + } +} diff --git a/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj b/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj index 85184ab..5d2e157 100644 --- a/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj +++ b/src/Akka.Persistence.MongoDb/Akka.Persistence.MongoDb.csproj @@ -2,7 +2,7 @@ - net45;netstandard1.6 + netstandard2.0 @@ -10,6 +10,6 @@ - + \ No newline at end of file diff --git a/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs b/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs index 5f521ae..c7972ed 100644 --- a/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs +++ b/src/Akka.Persistence.MongoDb/Journal/MongoDbJournal.cs @@ -37,51 +37,19 @@ public class MongoDbJournal : AsyncWriteJournal private readonly HashSet _allPersistenceIds = new HashSet(); private readonly HashSet _allPersistenceIdSubscribers = new HashSet(); - private readonly Dictionary> _tagSubscribers = + private readonly Dictionary> _tagSubscribers = new Dictionary>(); - private readonly Dictionary> _persistenceIdSubscribers + private readonly Dictionary> _persistenceIdSubscribers = new Dictionary>(); - private readonly Func _serialize; - private readonly Func _deserialize; - + private Akka.Serialization.Serialization _serialization; public MongoDbJournal() { _settings = MongoDbPersistence.Get(Context.System).JournalSettings; - var serialization = Context.System.Serialization; - switch (_settings.StoredAs) - { - case StoredAsType.Binary: - _serialize = representation => - { - var serializer = serialization.FindSerializerFor(representation); - return new SerializationResult(serializer.ToBinary(representation), serializer); - }; - _deserialize = (type, serialized, manifest, serializerId) => - { - if (serializerId.HasValue) - { - /* - * Backwards compat: check to see if manifest is populated before using it. - * Otherwise, fall back to using the stored type data instead. - * Per: https://github.com/AkkaNetContrib/Akka.Persistence.MongoDB/issues/57 - */ - if (string.IsNullOrEmpty(manifest)) - return serialization.Deserialize((byte[]) serialized, serializerId.Value, type); - return serialization.Deserialize((byte[])serialized, serializerId.Value, manifest); - } + _serialization = Context.System.Serialization; - var deserializer = serialization.FindSerializerForType(type); - return deserializer.FromBinary((byte[])serialized, type); - }; - break; - default: - _serialize = representation => new SerializationResult(representation, null); - _deserialize = (type, serialized, manifest, serializerId) => serialized; - break; - } } protected override void PreStart() @@ -108,7 +76,7 @@ protected override void PreStart() .Descending(entry => entry.SequenceNr)); collection.Indexes - .CreateOneAsync(modelForEntryAndSequenceNr, cancellationToken:CancellationToken.None) + .CreateOneAsync(modelForEntryAndSequenceNr, cancellationToken: CancellationToken.None) .Wait(); var modelWithOrdering = new CreateIndexModel( @@ -135,7 +103,7 @@ protected override void PreStart() .Ascending(entry => entry.PersistenceId)); collection.Indexes - .CreateOneAsync(modelWithAscendingPersistenceId, cancellationToken:CancellationToken.None) + .CreateOneAsync(modelWithAscendingPersistenceId, cancellationToken: CancellationToken.None) .Wait(); } @@ -169,7 +137,8 @@ public override async Task ReplayMessagesAsync(IActorContext context, string per .Limit(limitValue) .ToListAsync(); - collections.ForEach(doc => { + collections.ForEach(doc => + { recoveryCallback(ToPersistenceRepresentation(doc, context.Sender)); }); } @@ -198,7 +167,7 @@ private async Task ReplayTaggedMessagesAsync(ReplayTaggedMessages replay) if (toSequenceNr != long.MaxValue) seqNoFilter &= builder.Lte(x => x.Ordering, new BsonTimestamp(toSequenceNr)); - + // Need to know what the highest seqNo of this query will be // and return that as part of the RecoverySuccess message var maxSeqNoEntry = await _journalCollection.Value.Find(seqNoFilter) @@ -223,10 +192,11 @@ await _journalCollection.Value .Find(readFilter) .Sort(sort) .Limit(limitValue) - .ForEachAsync(entry => { + .ForEachAsync(entry => + { var persistent = ToPersistenceRepresentation(entry, ActorRefs.NoSender); foreach (var adapted in AdaptFromJournal(persistent)) - replay.ReplyTo.Tell(new ReplayedTaggedMessage(adapted, tag, entry.Ordering.Value), + replay.ReplyTo.Tell(new ReplayedTaggedMessage(adapted, tag, entry.Ordering.Value), ActorRefs.NoSender); }); @@ -251,7 +221,8 @@ protected override async Task> WriteMessagesAsync(IEnu var persistentIds = new HashSet(); var messageList = messages.ToList(); - var writeTasks = messageList.Select(async message => { + var writeTasks = messageList.Select(async message => + { var persistentMessages = ((IImmutableList)message.Payload); if (HasTagSubscribers) @@ -287,8 +258,10 @@ protected override async Task> WriteMessagesAsync(IEnu } } - if (HasTagSubscribers && allTags.Count != 0) { - foreach (var tag in allTags) { + if (HasTagSubscribers && allTags.Count != 0) + { + foreach (var tag in allTags) + { NotifyTagChange(tag); } } @@ -313,33 +286,27 @@ private JournalEntry ToJournalEntry(IPersistentRepresentation message) { object payload = message.Payload; if (message.Payload is Tagged tagged) + { payload = tagged.Payload; + message = message.WithPayload(payload); // need to update the internal payload when working with tags + } - var serializationResult = _serialize(payload); - var serializer = serializationResult.Serializer; - var hasSerializer = serializer != null; - var manifest = ""; - if (hasSerializer && serializer is SerializerWithStringManifest stringManifest) - manifest = stringManifest.Manifest(message.Payload); - else if (hasSerializer && serializer.IncludeManifest) - manifest = message.GetType().TypeQualifiedName(); - else - manifest = string.IsNullOrEmpty(message.Manifest) - ? message.GetType().TypeQualifiedName() - : message.Manifest; + var serializer = _serialization.FindSerializerFor(message); + var binary = serializer.ToBinary(message); + return new JournalEntry { Id = message.PersistenceId + "_" + message.SequenceNr, Ordering = new BsonTimestamp(0), // Auto-populates with timestamp IsDeleted = message.IsDeleted, - Payload = serializationResult.Payload, + Payload = binary, PersistenceId = message.PersistenceId, SequenceNr = message.SequenceNr, - Manifest = manifest, + Manifest = string.Empty, // don't need a manifest here - it's embedded inside the PersistentMessage Tags = tagged.Tags?.ToList(), - SerializerId = serializer?.Identifier + SerializerId = null // don't need a serializer ID here either; only for backwards-comat }; } @@ -347,14 +314,44 @@ private Persistent ToPersistenceRepresentation(JournalEntry entry, IActorRef sen { int? serializerId = null; Type type = null; + + var legacy = entry.SerializerId.HasValue || !string.IsNullOrEmpty(entry.Manifest); + if (!legacy) + { + var ser = _serialization.FindSerializerForType(typeof(Persistent)); + return ser.FromBinary((byte[]) entry.Payload); + } + + // legacy serialization if (!entry.SerializerId.HasValue && !string.IsNullOrEmpty(entry.Manifest)) type = Type.GetType(entry.Manifest, true); else serializerId = entry.SerializerId; - var deserialized = _deserialize(type, entry.Payload, entry.Manifest, serializerId); + if (entry.Payload is byte[] bytes) + { + object deserialized = null; + if (serializerId.HasValue) + { + deserialized = _serialization.Deserialize(bytes, serializerId.Value, entry.Manifest); + } + else + { + var deserializer = _serialization.FindSerializerForType(type); + deserialized = deserializer.FromBinary(bytes, type); + } + + if (deserialized is Persistent p) + return p; + + return new Persistent(deserialized, entry.SequenceNr, entry.PersistenceId, entry.Manifest, entry.IsDeleted, sender); + } + else // backwards compat for object serialization - Payload was already deserialized by BSON + { + return new Persistent(entry.Payload, entry.SequenceNr, entry.PersistenceId, entry.Manifest, + entry.IsDeleted, sender); + } - return new Persistent(deserialized, entry.SequenceNr, entry.PersistenceId, entry.Manifest, entry.IsDeleted, sender); } private async Task SetHighSequenceId(IList messages) @@ -406,7 +403,8 @@ protected override bool ReceivePluginInternal(object message) private void AddAllPersistenceIdSubscriber(IActorRef subscriber) { - lock (_allPersistenceIdSubscribers) { + lock (_allPersistenceIdSubscribers) + { _allPersistenceIdSubscribers.Add(subscriber); } subscriber.Tell(new CurrentPersistenceIds(GetAllPersistenceIds())); @@ -414,7 +412,8 @@ private void AddAllPersistenceIdSubscriber(IActorRef subscriber) private void AddTagSubscriber(IActorRef subscriber, string tag) { - if (!_tagSubscribers.TryGetValue(tag, out var subscriptions)) { + if (!_tagSubscribers.TryGetValue(tag, out var subscriptions)) + { subscriptions = new HashSet(); _tagSubscribers.Add(tag, subscriptions); } @@ -432,7 +431,8 @@ private IEnumerable GetAllPersistenceIds() private void AddPersistenceIdSubscriber(IActorRef subscriber, string persistenceId) { - if (!_persistenceIdSubscribers.TryGetValue(persistenceId, out var subscriptions)) { + if (!_persistenceIdSubscribers.TryGetValue(persistenceId, out var subscriptions)) + { subscriptions = new HashSet(); _persistenceIdSubscribers.Add(persistenceId, subscriptions); } @@ -460,7 +460,8 @@ private void RemoveSubscriber(IActorRef subscriber) private void NotifyNewPersistenceIdAdded(string persistenceId) { var isNew = TryAddPersistenceId(persistenceId); - if (isNew && HasAllPersistenceIdSubscribers) { + if (isNew && HasAllPersistenceIdSubscribers) + { var added = new PersistenceIdAdded(persistenceId); foreach (var subscriber in _allPersistenceIdSubscribers) subscriber.Tell(added); @@ -469,14 +470,16 @@ private void NotifyNewPersistenceIdAdded(string persistenceId) private bool TryAddPersistenceId(string persistenceId) { - lock (_allPersistenceIds) { + lock (_allPersistenceIds) + { return _allPersistenceIds.Add(persistenceId); } } private void NotifyPersistenceIdChange(string persistenceId) { - if (_persistenceIdSubscribers.TryGetValue(persistenceId, out var subscribers)) { + if (_persistenceIdSubscribers.TryGetValue(persistenceId, out var subscribers)) + { var changed = new EventAppended(persistenceId); foreach (var subscriber in subscribers) subscriber.Tell(changed); @@ -485,7 +488,8 @@ private void NotifyPersistenceIdChange(string persistenceId) private void NotifyTagChange(string tag) { - if (_tagSubscribers.TryGetValue(tag, out var subscribers)) { + if (_tagSubscribers.TryGetValue(tag, out var subscribers)) + { var changed = new TaggedEventAppended(tag); foreach (var subscriber in subscribers) subscriber.Tell(changed); diff --git a/src/Akka.Persistence.MongoDb/MongoDbSettings.cs b/src/Akka.Persistence.MongoDb/MongoDbSettings.cs index f725537..a3c7a9c 100644 --- a/src/Akka.Persistence.MongoDb/MongoDbSettings.cs +++ b/src/Akka.Persistence.MongoDb/MongoDbSettings.cs @@ -10,12 +10,6 @@ namespace Akka.Persistence.MongoDb { - public enum StoredAsType - { - Object, - Binary - } - /// /// Settings for the MongoDB persistence implementation, parsed from HOCON configuration. /// @@ -36,21 +30,11 @@ public abstract class MongoDbSettings /// public string Collection { get; private set; } - /// - /// Specifies data type for payload column. - /// - public StoredAsType StoredAs { get; private set; } - protected MongoDbSettings(Config config) { ConnectionString = config.GetString("connection-string"); Collection = config.GetString("collection"); AutoInitialize = config.GetBoolean("auto-initialize"); - - StoredAs = StoredAsType.Object; - - if (Enum.TryParse(config.GetString("stored-as"), true, out StoredAsType storedAs)) - StoredAs = storedAs; } } diff --git a/src/Akka.Persistence.MongoDb/SerializationResult.cs b/src/Akka.Persistence.MongoDb/SerializationResult.cs deleted file mode 100644 index f779644..0000000 --- a/src/Akka.Persistence.MongoDb/SerializationResult.cs +++ /dev/null @@ -1,16 +0,0 @@ -using Akka.Serialization; - -namespace Akka.Persistence.MongoDb -{ - internal class SerializationResult - { - public SerializationResult(object payload, Serializer serializer) - { - Payload = payload; - Serializer = serializer; - } - - public object Payload { get; } - public Serializer Serializer { get; } - } -} \ No newline at end of file diff --git a/src/Akka.Persistence.MongoDb/Snapshot/MongoDbSnapshotStore.cs b/src/Akka.Persistence.MongoDb/Snapshot/MongoDbSnapshotStore.cs index d775e00..506853c 100644 --- a/src/Akka.Persistence.MongoDb/Snapshot/MongoDbSnapshotStore.cs +++ b/src/Akka.Persistence.MongoDb/Snapshot/MongoDbSnapshotStore.cs @@ -24,45 +24,14 @@ public class MongoDbSnapshotStore : SnapshotStore private Lazy> _snapshotCollection; - private readonly Func _serialize; - private readonly Func _deserialize; + + private readonly Akka.Serialization.Serialization _serialization; public MongoDbSnapshotStore() { _settings = MongoDbPersistence.Get(Context.System).SnapshotStoreSettings; - var serialization = Context.System.Serialization; - switch (_settings.StoredAs) - { - case StoredAsType.Binary: - _serialize = o => - { - var serializer = serialization.FindSerializerFor(o); - return new SerializationResult(serializer.ToBinary(o), serializer); - }; - _deserialize = (type, serialized, manifest, serializerId) => - { - if (serializerId.HasValue) - { - /* - * Backwards compat: check to see if manifest is populated before using it. - * Otherwise, fall back to using the stored type data instead. - * Per: https://github.com/AkkaNetContrib/Akka.Persistence.MongoDB/issues/57 - */ - if (string.IsNullOrEmpty(manifest)) - return serialization.Deserialize((byte[])serialized, serializerId.Value, type); - return serialization.Deserialize((byte[])serialized, serializerId.Value, manifest); - } - - var deserializer = serialization.FindSerializerForType(type); - return deserializer.FromBinary((byte[]) serialized, type); - }; - break; - default: - _serialize = o => new SerializationResult(o, null); - _deserialize = (type, serialized, manifest, serializerId) => serialized; - break; - } + _serialization = Context.System.Serialization; } protected override void PreStart() @@ -84,7 +53,7 @@ protected override void PreStart() .Descending(entry => entry.SequenceNr)); collection.Indexes - .CreateOneAsync(modelWithAscendingPersistenceIdAndDescendingSequenceNr, cancellationToken:CancellationToken.None) + .CreateOneAsync(modelWithAscendingPersistenceIdAndDescendingSequenceNr, cancellationToken: CancellationToken.None) .Wait(); } @@ -161,24 +130,22 @@ private static FilterDefinition CreateRangeFilter(string persiste private SnapshotEntry ToSnapshotEntry(SnapshotMetadata metadata, object snapshot) { - var serializationResult = _serialize(snapshot); - var serializer = serializationResult.Serializer; - var hasSerializer = serializer != null; + var snapshotRep = new Akka.Persistence.Serialization.Snapshot(snapshot); + var serializer = _serialization.FindSerializerFor(snapshotRep); + var binary = serializer.ToBinary(snapshotRep); var manifest = ""; - if (hasSerializer && serializer is SerializerWithStringManifest) - manifest = ((SerializerWithStringManifest)serializer).Manifest(snapshot); - else if (hasSerializer && serializer.IncludeManifest) - manifest = snapshot.GetType().TypeQualifiedName(); + if (serializer is SerializerWithStringManifest stringManifest) + manifest = stringManifest.Manifest(snapshotRep); else - manifest = snapshot.GetType().TypeQualifiedName(); + manifest = snapshotRep.GetType().TypeQualifiedName(); return new SnapshotEntry { Id = metadata.PersistenceId + "_" + metadata.SequenceNr, PersistenceId = metadata.PersistenceId, SequenceNr = metadata.SequenceNr, - Snapshot = serializationResult.Payload, + Snapshot = binary, Timestamp = metadata.Timestamp.Ticks, Manifest = manifest, SerializerId = serializer?.Identifier @@ -187,15 +154,38 @@ private SnapshotEntry ToSnapshotEntry(SnapshotMetadata metadata, object snapshot private SelectedSnapshot ToSelectedSnapshot(SnapshotEntry entry) { - Type type = null; - if (!string.IsNullOrEmpty(entry.Manifest)) - type = Type.GetType(entry.Manifest, throwOnError: true); - var snapshot = _deserialize(type, entry.Snapshot, entry.Manifest, entry.SerializerId); + if (entry.Snapshot is byte[] bytes) + { + Type type = null; + + if (string.IsNullOrEmpty(entry.Manifest)) + type = Type.GetType(entry.Manifest, throwOnError: true); + + object dSnapshot; + if (entry.SerializerId.HasValue) + { + dSnapshot = type == null ? _serialization.Deserialize(bytes, entry.SerializerId.Value, entry.Manifest) + : _serialization.Deserialize(bytes, entry.SerializerId.Value, type); + } + else + { + var deserializer = _serialization.FindSerializerForType(type); + dSnapshot = deserializer.FromBinary(bytes, type); + } + + if (dSnapshot is Serialization.Snapshot snap) + return new SelectedSnapshot( + new SnapshotMetadata(entry.PersistenceId, entry.SequenceNr, new DateTime(entry.Timestamp)), snap.Data); + + return new SelectedSnapshot( + new SnapshotMetadata(entry.PersistenceId, entry.SequenceNr, new DateTime(entry.Timestamp)), dSnapshot); + } + // backwards compat - loaded an old snapshot using BSON serialization. No need to deserialize via Akka.NET return new SelectedSnapshot( - new SnapshotMetadata(entry.PersistenceId, entry.SequenceNr, new DateTime(entry.Timestamp)), snapshot); + new SnapshotMetadata(entry.PersistenceId, entry.SequenceNr, new DateTime(entry.Timestamp)), entry.Snapshot); } } } diff --git a/src/Akka.Persistence.MongoDb/reference.conf b/src/Akka.Persistence.MongoDb/reference.conf index f0e3a68..51cf5c2 100644 --- a/src/Akka.Persistence.MongoDb/reference.conf +++ b/src/Akka.Persistence.MongoDb/reference.conf @@ -18,9 +18,6 @@ # metadata collection metadata-collection = "Metadata" - - # MongoDb type for payload field. Allowed: object, binary, default : object - stored-as = object } } @@ -40,9 +37,6 @@ # MongoDb collection corresponding with persistent snapshot store collection = "SnapshotStore" - - # MongoDb type for payload field. Allowed: object, binary, default : object - stored-as = object } } diff --git a/src/common.props b/src/common.props index edbfe49..fcc8659 100644 --- a/src/common.props +++ b/src/common.props @@ -2,20 +2,18 @@ Copyright © 2013-2019 Akka.NET Project Akka.NET Contrib - 1.3.14 + 1.4.0 http://getakka.net/images/akkalogo.png https://github.com/akkadotnet/Akka.Persistence.MongoDB https://github.com/akkadotnet/Akka.Persistence.MongoDB/blob/master/LICENSE.md - You can see [the full set of changes for Akka.Persistence.MongoDb v1.3.14 here](https://github.com/akkadotnet/Akka.Persistence.MongoDB/milestone/2). -This PR fixes a number of problems stemming from implementations of Akka.Persistence.Query implementations that were incorrect. -Note: we're working on [adding support for future versions of Akka.Persistence.MongoDB and you should read them here if you plan on continuing to use the plugin](https://github.com/akkadotnet/Akka.Persistence.MongoDB/issues/72).** + Beta release of Akka.Persistence.MongoDB which implements the [new standardized Akka.Persistence serialization paradigm](https://github.com/akkadotnet/Akka.Persistence.MongoDB/issues/72) going forward. Has full backwards compatibility for reading events which were written in 1.3.[0-14] style serialization. true Akka Persistence journal and snapshot store backed by MongoDB database. $(NoWarn);CS1591 2.4.1 - 15.3.0 - 1.3.13 + 16.3.0 + 1.4.0-beta* \ No newline at end of file