Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RemoteDaemon bug, not removing children #1068

Merged
merged 5 commits into from
Jun 23, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions src/core/Akka.Persistence/Serialization/MessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,16 @@ private PersistentMessage.Builder PersistentToProto(IPersistentRepresentation p)

private PersistentPayload.Builder PersistentPayloadToProto(object payload)
{
if (TransportInformation != null)
{
Akka.Serialization.Serialization.CurrentTransportInformation = TransportInformation;
}

var serializer = system.Serialization.FindSerializerFor(payload);
var builder = PersistentPayload.CreateBuilder();

if (serializer.IncludeManifest) builder.SetPayloadManifest(ByteString.CopyFromUtf8(payload.GetType().FullName));
if (serializer.IncludeManifest)
builder.SetPayloadManifest(ByteString.CopyFromUtf8(payload.GetType().FullName));

var bytes = serializer.ToBinaryWithAddress(TransportInformation.Address, payload);

builder
.SetPayload(ByteString.CopyFrom(serializer.ToBinary(payload)))
.SetPayload(ByteString.CopyFrom(bytes))
.SetSerializerId(serializer.Identifier);

return builder;
Expand Down
18 changes: 11 additions & 7 deletions src/core/Akka.Persistence/Serialization/SnapshotSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,6 @@ private SnapshotHeader ParseSnapshotHeader(ArraySegment<byte> headerBytes)

private byte[] SnapshotToBinary(object snapshot)
{
if (TransportInformation != null)
{
Akka.Serialization.Serialization.CurrentTransportInformation = TransportInformation;
}

var serializer = system.Serialization.FindSerializerFor(snapshot);
using (var headerOut = new MemoryStream())
{
Expand All @@ -135,8 +130,17 @@ private byte[] SnapshotToBinary(object snapshot)
{
WriteInt(output, headerBinary.Length);
output.Write(headerBinary, 0, headerBinary.Length);
var snapshotBytes = serializer.ToBinary(snapshot);
output.Write(snapshotBytes, 0, snapshotBytes.Length);

if (TransportInformation != null)
{
var snapshotBytes = serializer.ToBinaryWithAddress(TransportInformation.Address, snapshot);
output.Write(snapshotBytes, 0, snapshotBytes.Length);
}
else
{
var snapshotBytes = serializer.ToBinary(snapshot);
output.Write(snapshotBytes, 0, snapshotBytes.Length);
}

return output.ToArray();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
using Akka.Remote.Serialization;
using Akka.Routing;
using Akka.TestKit;
using Akka.Util.Internal;
using Xunit;

namespace Akka.Remote.Tests.Serialization
Expand All @@ -28,34 +27,38 @@ protected override void OnReceive(object message)
}
}

private Akka.Serialization.Serialization ser;
private IActorRef supervisor;
private readonly Akka.Serialization.Serialization _ser;
private readonly IActorRef _supervisor;

public DaemonMsgCreateSerializerSpec()
: base(@"akka.actor.provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""")
: base(@"
akka.actor.provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""
akka.remote.helios.tcp {
hostname = localhost
port = 0
}
")
{
Akka.Serialization.Serialization.CurrentTransportInformation = null;

ser = Sys.Serialization;
supervisor = Sys.ActorOf(Props.Create<MyActor>(), "supervisor");
_ser = Sys.Serialization;
_supervisor = Sys.ActorOf(Props.Create<MyActor>(), "supervisor");
}

[Fact]
public void Serialization_must_resolve_DaemonMsgCreateSerializer()
{
ser.FindSerializerForType(typeof(DaemonMsgCreate)).GetType().ShouldBe(typeof(DaemonMsgCreateSerializer));
_ser.FindSerializerForType(typeof(DaemonMsgCreate)).GetType().ShouldBe(typeof(DaemonMsgCreateSerializer));
}

[Fact]
public void Serialization_must_serialize_and_deserialize_DaemonMsgCreate_with_FromClassCreator()
{
VerifySerialization(new DaemonMsgCreate(Props.Create<MyActor>(), new Deploy(), "foo", supervisor));
VerifySerialization(new DaemonMsgCreate(Props.Create<MyActor>(), new Deploy(), "foo", _supervisor));
}

[Fact]
public void Serialization_must_serialize_and_deserialize_DaemonMsgCreate_with_function_creator()
{
VerifySerialization(new DaemonMsgCreate(Props.Create(() => new MyActor()), new Deploy(), "foo", supervisor));
VerifySerialization(new DaemonMsgCreate(Props.Create(() => new MyActor()), new Deploy(), "foo", _supervisor));
}

[Fact]
Expand All @@ -72,16 +75,18 @@ public void Serialization_must_serialize_and_deserialize_DaemonMsgCreate_with_De
FromConfig.Instance,
new RemoteScope(new Address("akka", "Test", "host2", 1922)),
Deploy.NoDispatcherGiven);
VerifySerialization(new DaemonMsgCreate(Props.Create<MyActor>().WithDispatcher("my-disp").WithDeploy(deploy1), deploy2, "foo", supervisor));
VerifySerialization(new DaemonMsgCreate(Props.Create<MyActor>().WithDispatcher("my-disp").WithDeploy(deploy1), deploy2, "foo", _supervisor));
}

#region Helper methods

private void VerifySerialization(DaemonMsgCreate msg)
{
var daemonMsgSerializer = ser.FindSerializerFor(msg);
AssertDaemonMsgCreate(msg, ser.Deserialize(daemonMsgSerializer.ToBinary(msg),
daemonMsgSerializer.Identifier, typeof(DaemonMsgCreate)).AsInstanceOf<DaemonMsgCreate>());
var daemonMsgSerializer = _ser.FindSerializerFor(msg);
var binary = daemonMsgSerializer.ToBinary(msg);
var actual = (DaemonMsgCreate) _ser.Deserialize(binary, daemonMsgSerializer.Identifier, typeof (DaemonMsgCreate));

AssertDaemonMsgCreate(msg, actual);
}

private void AssertDaemonMsgCreate(DaemonMsgCreate expected, DaemonMsgCreate actual)
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Remote/Akka.Remote.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
<Compile Include="Proto\Containerformats.cs" />
<Compile Include="Proto\Wireformats.cs" />
<Compile Include="RemoteActorRef.cs" />
<Compile Include="RemoteDaemon.cs" />
<Compile Include="RemoteSystemDaemon.cs" />
<Compile Include="RemoteDeployer.cs" />
<Compile Include="RemoteSettings.cs" />
<Compile Include="RemoteTransport.cs" />
Expand Down
3 changes: 1 addition & 2 deletions src/core/Akka.Remote/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1102,8 +1102,7 @@ private SerializedMessage SerializeMessage(object msg)
throw new EndpointException("Internal error: No handle was present during serialization of outbound message.");
}

Akka.Serialization.Serialization.CurrentTransportInformation = new Information() { Address = _handle.LocalAddress, System = _system };
return MessageSerializer.Serialize(_system, msg);
return MessageSerializer.Serialize(_system, _handle.LocalAddress, msg);
}

private int _writeCount = 0;
Expand Down
5 changes: 3 additions & 2 deletions src/core/Akka.Remote/MessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ public static object Deserialize(ActorSystem system, SerializedMessage messagePr
/// Serializes the specified message.
/// </summary>
/// <param name="system">The system.</param>
/// <param name="address"></param>
/// <param name="message">The message.</param>
/// <returns>SerializedMessage.</returns>
public static SerializedMessage Serialize(ActorSystem system, object message)
public static SerializedMessage Serialize(ActorSystem system,Address address, object message)
{
Serializer serializer = system.Serialization.FindSerializerFor(message);
byte[] messageBytes = serializer.ToBinary(message);
byte[] messageBytes = serializer.ToBinaryWithAddress(address,message);
SerializedMessage.Builder messageBuilder = new SerializedMessage.Builder()
.SetSerializerId(serializer.Identifier);
if (serializer.IncludeManifest)
Expand Down
7 changes: 1 addition & 6 deletions src/core/Akka.Remote/RemoteActorRefProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private Internals RemoteInternals
return _internals ??
(_internals =
new Internals(new Remoting(_system, this), _system.Serialization,
new RemoteDaemon(_system, RootPath / "remote", SystemGuardian, _log)));
new RemoteSystemDaemon(_system, RootPath / "remote", SystemGuardian,this.DeadLetters /* TODO: should be RemoteTerminator*/, _log)));
}
}

Expand Down Expand Up @@ -350,11 +350,6 @@ public Address GetExternalAddressFor(Address address)

public void UseActorOnNode(RemoteActorRef actor, Props props, Deploy deploy, IInternalActorRef supervisor)
{
Akka.Serialization.Serialization.CurrentTransportInformation = new Information
{
System = _system,
Address = actor.LocalAddressToUse,
};
_log.Debug("[{0}] Instantiating Remote Actor [{1}]", RootPath, actor.Path);
IActorRef remoteNode = ResolveActorRef(new RootActorPath(actor.Path.Address) / "remote");
remoteNode.Tell(new DaemonMsgCreate(props, deploy, actor.Path.ToSerializationFormat(), supervisor));
Expand Down
188 changes: 0 additions & 188 deletions src/core/Akka.Remote/RemoteDaemon.cs

This file was deleted.

Loading