Skip to content

Commit

Permalink
Add Cluster.Sharding DData backward compatibility wire format mode (#…
Browse files Browse the repository at this point in the history
…6775)

* Add Cluster.Sharding DData backward compatibility wire format mode

* Add unit test

* Update API Verify list
  • Loading branch information
Arkatufus authored May 29, 2023
1 parent 8d550bf commit c0dc716
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// associated with an assembly.
[assembly: InternalsVisibleTo("Akka.Cluster.Sharding.Tests")]
[assembly: InternalsVisibleTo("Akka.Cluster.Sharding.Tests.MultiNode")]
[assembly: InternalsVisibleTo("Akka.DistributedData.Tests")]

// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ private static EventSourcedRememberEntitiesShardStore.EntitiesStopped EntitiesSt
}

//
// ShardCoordinator.State
// ShardCoordinator.CoordinatorState
//
private static Proto.Msg.CoordinatorState CoordinatorStateToProto(ShardCoordinator.CoordinatorState state)
{
Expand Down
4 changes: 4 additions & 0 deletions src/contrib/cluster/Akka.Cluster.Sharding/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ akka.cluster.sharding {
# can become to large if including to many in same message. Limit to
# the same number as the number of ORSet per shard.
max-delta-elements = 5

# Turn on backward compatibility wire format mode that allows Akka.Cluster.Sharding
# v1.5.8 distributed data to communicate with v1.4.x
backward-compatible-wire-format = false
}
# The id of the dispatcher to use for ClusterSharding actors.
# If not specified, the internal dispatcher is used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Akka.Cluster.Sharding\Akka.Cluster.Sharding.csproj" />
<ProjectReference Include="..\Akka.DistributedData.LightningDB\Akka.DistributedData.LightningDB.csproj" />
<ProjectReference Include="..\Akka.DistributedData\Akka.DistributedData.csproj" />
<ProjectReference Include="..\..\..\core\Akka.Tests.Shared.Internals\Akka.Tests.Shared.Internals.csproj" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// //-----------------------------------------------------------------------
// // <copyright file="ReplicatedDataSerializerBackCompatSpec.cs" company="Akka.NET Project">
// // Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
// // Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// // </copyright>
// //-----------------------------------------------------------------------

using Akka.Actor;
using Akka.Cluster.Sharding;
using Akka.DistributedData.Serialization;
using Akka.DistributedData.Serialization.Proto.Msg;
using FluentAssertions;
using Xunit;
using UniqueAddress = Akka.Cluster.UniqueAddress;
using static FluentAssertions.FluentActions;

namespace Akka.DistributedData.Tests.Serialization;

public class ReplicatedDataSerializerBackCompatSpec
{
private readonly ReplicatedDataSerializer _serializer;
private readonly UniqueAddress _address;

public ReplicatedDataSerializerBackCompatSpec()
{
var sys = ActorSystem.Create("test", @"
akka.actor.provider = cluster
akka.cluster.sharding.distributed-data.backward-compatible-wire-format = true");
_serializer = new ReplicatedDataSerializer((ExtendedActorSystem)sys);
_address = Cluster.Cluster.Get(sys).SelfUniqueAddress;
sys.Terminate();
}

[Fact(DisplayName = "DData replicated data serializer should serialize and deserialize correct backward compatible proto message")]
public void SerializeTest()
{
var lwwReg = new LWWRegister<ShardCoordinator.CoordinatorState>(_address, ShardCoordinator.CoordinatorState.Empty);
var bytes = _serializer.ToBinary(lwwReg);
var proto = LWWRegister.Parser.ParseFrom(bytes);

// Serialized type name should be equal to the old v1.4 coordinator state FQCN
proto.TypeInfo.TypeName.Should().Be("Akka.Cluster.Sharding.PersistentShardCoordinator+State, Akka.Cluster.Sharding");

// Deserializing the same message should succeed
Invoking(() => _serializer.FromBinary(bytes, _serializer.Manifest(lwwReg)))
.Should().NotThrow()
.And.Subject().Should().BeOfType<LWWRegister<ShardCoordinator.CoordinatorState>>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,13 @@ public sealed class ReplicatedDataSerializer : SerializerWithStringManifest

private readonly byte[] _emptyArray = Array.Empty<byte>();

private readonly bool _backwardCompatWireFormat;

public ReplicatedDataSerializer(ExtendedActorSystem system) : base(system)
{
_ser = new SerializationSupport(system);
_backwardCompatWireFormat =
system.Settings.Config.GetBoolean("akka.cluster.sharding.distributed-data.backward-compatible-wire-format");
}


Expand Down Expand Up @@ -735,6 +739,11 @@ private Proto.Msg.LWWRegister LWWToProto<T>(ILWWRegister r)
pLww.State = _ser.OtherMessageToProto(register.Value);
pLww.Timestamp = register.Timestamp;
pLww.TypeInfo = GetTypeDescriptor(r.RegisterType);

// HACK: Really really ugly hack to make sure that v1.5 DData cluster sharding works with v1.4
if(_backwardCompatWireFormat && pLww.TypeInfo.TypeName == "Akka.Cluster.Sharding.ShardCoordinator+CoordinatorState, Akka.Cluster.Sharding")
pLww.TypeInfo.TypeName = "Akka.Cluster.Sharding.PersistentShardCoordinator+State, Akka.Cluster.Sharding";

return pLww;
}

Expand Down Expand Up @@ -766,9 +775,13 @@ private ILWWRegister LWWRegisterFromProto(Proto.Msg.LWWRegister proto)
}
case ValType.Other:
{
// HACK: Really really ugly hack to make sure that v1.5 DData cluster sharding works with v1.4
var typeName = proto.TypeInfo.TypeName;
if (typeName == "Akka.Cluster.Sharding.PersistentShardCoordinator+State, Akka.Cluster.Sharding")
typeName = "Akka.Cluster.Sharding.ShardCoordinator+CoordinatorState, Akka.Cluster.Sharding";

// runtime type - enter horrible dynamic serialization stuff

var setContentType = Type.GetType(proto.TypeInfo.TypeName);
var setContentType = Type.GetType(typeName);

var setType = LWWRegisterMaker.MakeGenericMethod(setContentType);
return (ILWWRegister)setType.Invoke(this, new object[] { proto });
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[assembly: System.Reflection.AssemblyMetadataAttribute("RepositoryUrl", "https://github.com/akkadotnet/akka.net")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests.MultiNode")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.DistributedData.Tests")]
[assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)]
[assembly: System.Runtime.InteropServices.GuidAttribute("a05c31e8-0246-46a1-b3bc-4d6fe7a9aa49")]
[assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETCoreApp,Version=v6.0", FrameworkDisplayName=".NET 6.0")]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[assembly: System.Reflection.AssemblyMetadataAttribute("RepositoryUrl", "https://github.com/akkadotnet/akka.net")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests.MultiNode")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.DistributedData.Tests")]
[assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)]
[assembly: System.Runtime.InteropServices.GuidAttribute("a05c31e8-0246-46a1-b3bc-4d6fe7a9aa49")]
[assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETStandard,Version=v2.0", FrameworkDisplayName=".NET Standard 2.0")]
Expand Down

0 comments on commit c0dc716

Please sign in to comment.