Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sharding recovery error and WithTransport serialization #3744

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
37660ed
fixed typo in RemoteActorRefProvider comment
Aaronontheweb Mar 22, 2019
531c7eb
Working on #3414 - bringing SerializeWithTransport API up to par with…
Aaronontheweb Mar 22, 2019
0fc478f
added spec to help validate CurrentTransportInformation issues
Aaronontheweb Mar 22, 2019
160db40
working on bringing serialization up to snuff
Aaronontheweb Mar 25, 2019
0b84f3b
brought serialization class up to snuff
Aaronontheweb Mar 25, 2019
b7080e0
wrapping up RmeoteActorRefProvider implementation
Aaronontheweb Mar 25, 2019
c30b68e
Merge branch 'dev' into fix-3414-sharding-recovery-error
Aaronontheweb May 24, 2019
1d4c4f8
WIP
Aaronontheweb May 24, 2019
96238af
cleaning up Serialization class
Aaronontheweb May 25, 2019
84de193
looks like there's a Lazy<SerializationInfo> translation from Scala t…
Aaronontheweb May 25, 2019
c336417
fixed Serialization class
Aaronontheweb May 25, 2019
04fa972
fixed bug with Akka.Remote.Serialization.SerializationTransportInform…
Aaronontheweb May 25, 2019
d7eb903
forced a couple of specs using default akka.remote configs to run seq…
Aaronontheweb May 25, 2019
7258288
added serialization verification to the Akka.Persistence.TCK
Aaronontheweb May 25, 2019
b558492
fixed issues with default Akka.Perisstence.TCK specs
Aaronontheweb May 25, 2019
5827048
fixed IActorRef serialziation support in Akka.Persistence journals an…
Aaronontheweb May 25, 2019
9b02494
fixed compilation issuyes
Aaronontheweb May 25, 2019
17ee26c
fixed Akka.Sql.Common serialization in a backwards-compatible fashion
Aaronontheweb May 25, 2019
22075ca
had to disable serialization specs for Sql Journals
Aaronontheweb May 25, 2019
98ed473
Added API approvals
Aaronontheweb May 25, 2019
bd1df59
updated creator and serialize-all-messages serialization
Aaronontheweb May 25, 2019
eb4597f
Merge branch 'dev' into fix-3414-sharding-recovery-error
Aaronontheweb Jun 4, 2019
44cdd61
Merge branch 'dev' into fix-3414-sharding-recovery-error
Aaronontheweb Jun 6, 2019
88ee076
Merge branch 'dev' into fix-3414-sharding-recovery-error
Aaronontheweb Jun 11, 2019
69244be
Merge branch 'dev' into fix-3414-sharding-recovery-error
Aaronontheweb Jun 26, 2019
632612e
Merge branch 'dev' into fix-3414-sharding-recovery-error
Aaronontheweb Jul 1, 2019
f7ba0be
added ITestOutputHelper to Akka.Cluster.Sharding.Tests.SupervisionSpec
Aaronontheweb Jul 1, 2019
54cc1e6
Merge branch 'dev' into fix-3414-sharding-recovery-error
Aaronontheweb Jul 12, 2019
94eaa40
Merge branch 'fix-3414-sharding-recovery-error' of https://github.com…
Aaronontheweb Jul 12, 2019
6e944d5
made changes to LocalSnapshotSerializer
Aaronontheweb Jul 12, 2019
cb43452
fixed bug in WithTransport method
Aaronontheweb Jul 12, 2019
939aba3
updated Akka.Remote MessageSerializer
Aaronontheweb Jul 12, 2019
cf2dbf4
Merge branch 'dev' into fix-3414-sharding-recovery-error
Aaronontheweb Jul 14, 2019
4fd4e03
Merge branch 'dev' into fix-3414-sharding-recovery-error
Aaronontheweb Jul 16, 2019
a4a0bf4
Merge branch 'dev' into fix-3414-sharding-recovery-error
Aaronontheweb Jul 16, 2019
9814412
Merge branch 'dev' into fix-3414-sharding-recovery-error
Aaronontheweb Jul 16, 2019
f9c6d86
Merge branch 'dev' into fix-3414-sharding-recovery-error
Aaronontheweb Jul 17, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Akka.Event;
using Akka.Pattern;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Cluster.Sharding.Tests
{
Expand Down Expand Up @@ -93,7 +94,7 @@ protected override void OnReceive(object message)
private readonly ExtractShardId _extractShard = message =>
message is Msg msg ? (msg.Id % 2).ToString(CultureInfo.InvariantCulture) : null;

public SupervisionSpec() : base(GetConfig())
public SupervisionSpec(ITestOutputHelper output) : base(GetConfig(), output: output)
{ }

public static Config GetConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1099,8 +1099,7 @@ private async Task HandleWriteMessages(WriteMessages req, TCommand command)

foreach (var envelope in req.Messages)
{
var write = envelope as AtomicWrite;
if (write != null)
if (envelope is AtomicWrite write)
{
var writes = (IImmutableList<IPersistentRepresentation>)write.Payload;
foreach (var unadapted in writes)
Expand All @@ -1111,9 +1110,8 @@ private async Task HandleWriteMessages(WriteMessages req, TCommand command)
tagBuilder.Clear();

var persistent = AdaptToJournal(unadapted);
if (persistent.Payload is Tagged)
if (persistent.Payload is Tagged tagged)
{
var tagged = (Tagged)persistent.Payload;
if (tagged.Tags.Count != 0)
{
tagBuilder.Append(';');
Expand Down Expand Up @@ -1205,29 +1203,35 @@ protected virtual void WriteEvent(TCommand command, IPersistentRepresentation pe
var payloadType = persistent.Payload.GetType();
var serializer = _serialization.FindSerializerForType(payloadType, Setup.DefaultSerializer);

string manifest = "";
if (serializer is SerializerWithStringManifest)
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
Akka.Serialization.Serialization.WithTransport(_serialization.System, () =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I right that WithTransport sets a specific ambient context for serialization data? If so maybe it would be easier to just pass that context directly into inner function parameter? How application will behave when we'll have multiple actor systems living in the same process, serializing at the same time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't pass it in because other libraries may not directly have access to when it's set via Akka.Remote - rather it gets parked ambiently in the local thread's storage and then unset when it's done being used. Won't be an issue for multiple actor systems due to the latter - the reference always gets unset and not leaked from the thread. There's more comments on the JVM issue that I ported this from: akka/akka#25068

{
manifest = ((SerializerWithStringManifest)serializer).Manifest(persistent.Payload);
}
else
{
if (serializer.IncludeManifest)
string manifest = "";
if (serializer is SerializerWithStringManifest stringManifest)
{
manifest = persistent.Payload.GetType().TypeQualifiedName();
manifest = stringManifest.Manifest(persistent.Payload);
}
}
else
{
if (serializer.IncludeManifest)
{
manifest = persistent.Payload.GetType().TypeQualifiedName();
}
}

var binary = serializer.ToBinary(persistent.Payload);

var binary = serializer.ToBinary(persistent.Payload);
AddParameter(command, "@PersistenceId", DbType.String, persistent.PersistenceId);
AddParameter(command, "@SequenceNr", DbType.Int64, persistent.SequenceNr);
AddParameter(command, "@Timestamp", DbType.Int64, 0L);
AddParameter(command, "@IsDeleted", DbType.Boolean, false);
AddParameter(command, "@Manifest", DbType.String, manifest);
AddParameter(command, "@Payload", DbType.Binary, binary);
AddParameter(command, "@Tag", DbType.String, tags);
AddParameter(command, "@SerializerId", DbType.Int32, serializer.Identifier);

AddParameter(command, "@PersistenceId", DbType.String, persistent.PersistenceId);
AddParameter(command, "@SequenceNr", DbType.Int64, persistent.SequenceNr);
AddParameter(command, "@Timestamp", DbType.Int64, 0L);
AddParameter(command, "@IsDeleted", DbType.Boolean, false);
AddParameter(command, "@Manifest", DbType.String, manifest);
AddParameter(command, "@Payload", DbType.Binary, binary);
AddParameter(command, "@Tag", DbType.String, tags);
AddParameter(command, "@SerializerId", DbType.Int32, serializer.Identifier);
return manifest;
});
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,20 +690,26 @@ protected virtual void WriteEvent(DbCommand command, IPersistentRepresentation e
var payloadType = e.Payload.GetType();
var serializer = Serialization.FindSerializerForType(payloadType, Configuration.DefaultSerializer);

// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
string manifest = "";
if (serializer is SerializerWithStringManifest)
var binary = Akka.Serialization.Serialization.WithTransport(Serialization.System, () =>
{
manifest = ((SerializerWithStringManifest)serializer).Manifest(e.Payload);
}
else
{
if (serializer.IncludeManifest)

if (serializer is SerializerWithStringManifest stringManifest)
{
manifest = e.Payload.GetType().TypeQualifiedName();
manifest = stringManifest.Manifest(e.Payload);
}
else
{
if (serializer.IncludeManifest)
{
manifest = e.Payload.GetType().TypeQualifiedName();
}
}
}

var binary = serializer.ToBinary(e.Payload);
return serializer.ToBinary(e.Payload);
});


AddParameter(command, "@PersistenceId", DbType.String, e.PersistenceId);
AddParameter(command, "@SequenceNr", DbType.Int64, e.SequenceNr);
Expand Down Expand Up @@ -746,11 +752,13 @@ protected virtual IPersistentRepresentation ReadEvent(DbDataReader reader)
// Support old writes that did not set the serializer id
var type = Type.GetType(manifest, true);
var deserializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer);
deserialized = deserializer.FromBinary((byte[])payload, type);
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
deserialized = Akka.Serialization.Serialization.WithTransport(Serialization.System, () => deserializer.FromBinary((byte[])payload, type) );
}
else
{
var serializerId = reader.GetInt32(SerializerIdIndex);
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
deserialized = Serialization.Deserialize((byte[])payload, serializerId, manifest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ protected virtual void SetPayloadParameter(object snapshot, DbCommand command)
{
var snapshotType = snapshot.GetType();
var serializer = Serialization.FindSerializerForType(snapshotType, Configuration.DefaultSerializer);

var binary = serializer.ToBinary(snapshot);
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
var binary = Akka.Serialization.Serialization.WithTransport(Serialization.System, () => serializer.ToBinary(snapshot));
AddParameter(command, "@Payload", DbType.Binary, binary);
}

Expand Down Expand Up @@ -585,8 +585,9 @@ protected object GetSnapshot(DbDataReader reader)
if (reader.IsDBNull(5))
{
var type = Type.GetType(manifest, true);
// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
var serializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer);
obj = serializer.FromBinary(binary, type);
obj = Akka.Serialization.Serialization.WithTransport(Serialization.System, () => serializer.FromBinary(binary, type));
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Akka.Event;
using Akka.Persistence.Snapshot;
using Akka.Util;
using Akka.Util.Internal;

namespace Akka.Persistence.Sql.Common.Snapshot
{
Expand All @@ -39,12 +40,15 @@ private Initialized() { }

private readonly SnapshotStoreSettings _settings;

private readonly ExtendedActorSystem _actorSystem;

/// <summary>
/// Initializes a new instance of the <see cref="SqlSnapshotStore"/> class.
/// </summary>
/// <param name="config">The configuration used to configure the snapshot store.</param>
protected SqlSnapshotStore(Config config)
{
_actorSystem = Context.System.AsInstanceOf<ExtendedActorSystem>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the advantage of AsInstanceOf over direct type cast? Honestly in my opinion, this is part which follows Scala over simple logic.

_settings = new SnapshotStoreSettings(config);
_pendingRequestsCancellation = new CancellationTokenSource();
}
Expand Down Expand Up @@ -224,7 +228,9 @@ private SnapshotEntry ToSnapshotEntry(SnapshotMetadata metadata, object snapshot
var snapshotType = snapshot.GetType();
var serializer = Context.System.Serialization.FindSerializerForType(snapshotType, _settings.DefaultSerializer);

var binary = serializer.ToBinary(snapshot);
var binary = Akka.Serialization.Serialization.WithTransport(_actorSystem,
() => serializer.ToBinary(snapshot));


return new SnapshotEntry(
persistenceId: metadata.PersistenceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<PropertyGroup>
<AssemblyTitle>Akka.Persistence.Sqlite.Tests</AssemblyTitle>
<TargetFrameworks>$(NetCoreTestVersion)</TargetFrameworks>
<RuntimeIdentifier Condition=" '$(TargetFramework)' == '$(NetFrameworkTestVersion)' And '$(OS)' == 'Windows_NT'">win7-x64</RuntimeIdentifier>

</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,8 @@ class = ""Akka.Persistence.Sqlite.Journal.BatchingSqliteJournal, Akka.Persistenc
}
}");
}

// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
protected override bool SupportsSerialization => false;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another reason for #3811 - none of the Akka.Persistence.Sql journals support WriterGuid at the moment due to their hand-rolled serialization layers, so the serialization specs will not pass.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public SqliteJournalSpec(ITestOutputHelper output)
Initialize();
}

// TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
protected override bool SupportsSerialization => false;

private static Config CreateSpecConfig(string connectionString)
{
return ConfigurationFactory.ParseString(@"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,7 @@ class = ""Akka.Persistence.Sqlite.Snapshot.SqliteSnapshotStore, Akka.Persistence
}
}");
}

protected override bool SupportsSerialization => true;
}
}
22 changes: 20 additions & 2 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Tools")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.MultiNodeTestRunner.Shared.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Persistence")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Persistence.TCK")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Remote")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Remote.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Remote.Tests.MultiNode")]
Expand Down Expand Up @@ -985,6 +986,8 @@ namespace Akka.Actor
Akka.Actor.LocalActorRef Guardian { get; }
Akka.Actor.IInternalActorRef RootGuardian { get; }
Akka.Actor.ActorPath RootPath { get; }
[Akka.Annotations.InternalApiAttribute()]
Akka.Serialization.Information SerializationInformation { get; }
Akka.Actor.Settings Settings { get; }
Akka.Actor.LocalActorRef SystemGuardian { get; }
Akka.Actor.IInternalActorRef TempContainer { get; }
Expand Down Expand Up @@ -1270,6 +1273,7 @@ namespace Akka.Actor
public Akka.Event.ILoggingAdapter Log { get; }
public Akka.Actor.IInternalActorRef RootGuardian { get; }
public Akka.Actor.ActorPath RootPath { get; }
public Akka.Serialization.Information SerializationInformation { get; }
public Akka.Actor.Settings Settings { get; }
public Akka.Actor.LocalActorRef SystemGuardian { get; }
public Akka.Actor.IInternalActorRef TempContainer { get; }
Expand Down Expand Up @@ -4541,6 +4545,16 @@ namespace Akka.Serialization
public override object FromBinary(byte[] bytes, System.Type type) { }
public override byte[] ToBinary(object obj) { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class Information : System.IEquatable<Akka.Serialization.Information>
{
public Information(Akka.Actor.Address address, Akka.Actor.ActorSystem system) { }
public Akka.Actor.Address Address { get; }
public Akka.Actor.ActorSystem System { get; }
public bool Equals(Akka.Serialization.Information other) { }
public override bool Equals(object obj) { }
public override int GetHashCode() { }
}
public class NewtonSoftJsonSerializer : Akka.Serialization.Serializer
{
public NewtonSoftJsonSerializer(Akka.Actor.ExtendedActorSystem system) { }
Expand Down Expand Up @@ -4572,7 +4586,7 @@ namespace Akka.Serialization
public class Serialization
{
public Serialization(Akka.Actor.ExtendedActorSystem system) { }
public Akka.Actor.ActorSystem System { get; }
public Akka.Actor.ExtendedActorSystem System { get; }
public void AddSerializationMap(System.Type type, Akka.Serialization.Serializer serializer) { }
[System.ObsoleteAttribute("No longer supported. Use the AddSerializer(name, serializer) overload instead.", true)]
public void AddSerializer(Akka.Serialization.Serializer serializer) { }
Expand All @@ -4581,8 +4595,12 @@ namespace Akka.Serialization
public object Deserialize(byte[] bytes, int serializerId, string manifest) { }
public Akka.Serialization.Serializer FindSerializerFor(object obj, string defaultSerializerName = null) { }
public Akka.Serialization.Serializer FindSerializerForType(System.Type objectType, string defaultSerializerName = null) { }
public static Akka.Serialization.Information GetCurrentTransportInformation() { }
public byte[] Serialize(object o) { }
public static string SerializedActorPath(Akka.Actor.IActorRef actorRef) { }
public static T SerializeWithTransport<T>(Akka.Actor.ActorSystem system, Akka.Actor.Address address, System.Func<T> action) { }
[System.ObsoleteAttribute("Obsolete. Use the SerializeWithTransport<T>(ExtendedActorSystem) method instead.")]
public static T WithTransport<T>(Akka.Actor.ActorSystem system, Akka.Actor.Address address, System.Func<T> action) { }
public static T WithTransport<T>(Akka.Actor.ExtendedActorSystem system, System.Func<T> action) { }
}
public abstract class Serializer
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ namespace Akka.Remote
public Akka.Actor.IActorRef RemoteWatcher { get; }
public Akka.Actor.IInternalActorRef RootGuardian { get; }
public Akka.Actor.ActorPath RootPath { get; }
public Akka.Serialization.Information SerializationInformation { get; }
public Akka.Actor.Settings Settings { get; }
public Akka.Actor.LocalActorRef SystemGuardian { get; }
public Akka.Actor.IInternalActorRef TempContainer { get; }
Expand Down
2 changes: 2 additions & 0 deletions src/core/Akka.Persistence.TCK.Tests/LocalSnapshotStoreSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public LocalSnapshotStoreSpec(ITestOutputHelper output)
Initialize();
}

protected override bool SupportsSerialization => true;

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
Expand Down
2 changes: 2 additions & 0 deletions src/core/Akka.Persistence.TCK.Tests/MemoryJournalSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@ public MemoryJournalSpec(ITestOutputHelper output)
}

protected override bool SupportsRejectingNonSerializableObjects { get { return false; } }

protected override bool SupportsSerialization => true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public MemorySnapshotStoreSpec(ITestOutputHelper output)
Initialize();
}

protected override bool SupportsSerialization => true;

[Fact]
public void MemorySnapshotStore_is_threadsafe()
{
Expand Down
Loading