diff --git a/src/core/Akka.Persistence/Serialization/MessageSerializer.cs b/src/core/Akka.Persistence/Serialization/MessageSerializer.cs index b99207ebf5d..c00c91c8c21 100644 --- a/src/core/Akka.Persistence/Serialization/MessageSerializer.cs +++ b/src/core/Akka.Persistence/Serialization/MessageSerializer.cs @@ -131,18 +131,16 @@ private PersistentMessage.Builder PersistentToProto(IPersistentRepresentation p) private PersistentPayload.Builder PersistentPayloadToProto(object payload) { - if (TransportInformation != null) - { - Akka.Serialization.Serialization.CurrentTransportInformation = TransportInformation; - } - var serializer = system.Serialization.FindSerializerFor(payload); var builder = PersistentPayload.CreateBuilder(); - if (serializer.IncludeManifest) builder.SetPayloadManifest(ByteString.CopyFromUtf8(payload.GetType().FullName)); + if (serializer.IncludeManifest) + builder.SetPayloadManifest(ByteString.CopyFromUtf8(payload.GetType().FullName)); + + var bytes = serializer.ToBinaryWithAddress(TransportInformation.Address, payload); builder - .SetPayload(ByteString.CopyFrom(serializer.ToBinary(payload))) + .SetPayload(ByteString.CopyFrom(bytes)) .SetSerializerId(serializer.Identifier); return builder; diff --git a/src/core/Akka.Persistence/Serialization/SnapshotSerializer.cs b/src/core/Akka.Persistence/Serialization/SnapshotSerializer.cs index 8586528017e..7b341cde447 100644 --- a/src/core/Akka.Persistence/Serialization/SnapshotSerializer.cs +++ b/src/core/Akka.Persistence/Serialization/SnapshotSerializer.cs @@ -114,11 +114,6 @@ private SnapshotHeader ParseSnapshotHeader(ArraySegment headerBytes) private byte[] SnapshotToBinary(object snapshot) { - if (TransportInformation != null) - { - Akka.Serialization.Serialization.CurrentTransportInformation = TransportInformation; - } - var serializer = system.Serialization.FindSerializerFor(snapshot); using (var headerOut = new MemoryStream()) { @@ -135,8 +130,17 @@ private byte[] SnapshotToBinary(object snapshot) { WriteInt(output, headerBinary.Length); output.Write(headerBinary, 0, headerBinary.Length); - var snapshotBytes = serializer.ToBinary(snapshot); - output.Write(snapshotBytes, 0, snapshotBytes.Length); + + if (TransportInformation != null) + { + var snapshotBytes = serializer.ToBinaryWithAddress(TransportInformation.Address, snapshot); + output.Write(snapshotBytes, 0, snapshotBytes.Length); + } + else + { + var snapshotBytes = serializer.ToBinary(snapshot); + output.Write(snapshotBytes, 0, snapshotBytes.Length); + } return output.ToArray(); } diff --git a/src/core/Akka.Remote.Tests/Serialization/DaemonMsgCreateSerializerSpec.cs b/src/core/Akka.Remote.Tests/Serialization/DaemonMsgCreateSerializerSpec.cs index 918529df3fe..90da49148a5 100644 --- a/src/core/Akka.Remote.Tests/Serialization/DaemonMsgCreateSerializerSpec.cs +++ b/src/core/Akka.Remote.Tests/Serialization/DaemonMsgCreateSerializerSpec.cs @@ -13,7 +13,6 @@ using Akka.Remote.Serialization; using Akka.Routing; using Akka.TestKit; -using Akka.Util.Internal; using Xunit; namespace Akka.Remote.Tests.Serialization @@ -28,34 +27,38 @@ protected override void OnReceive(object message) } } - private Akka.Serialization.Serialization ser; - private IActorRef supervisor; + private readonly Akka.Serialization.Serialization _ser; + private readonly IActorRef _supervisor; public DaemonMsgCreateSerializerSpec() - : base(@"akka.actor.provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""") + : base(@" + akka.actor.provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote"" + akka.remote.helios.tcp { + hostname = localhost + port = 0 + } + ") { - Akka.Serialization.Serialization.CurrentTransportInformation = null; - - ser = Sys.Serialization; - supervisor = Sys.ActorOf(Props.Create(), "supervisor"); + _ser = Sys.Serialization; + _supervisor = Sys.ActorOf(Props.Create(), "supervisor"); } [Fact] public void Serialization_must_resolve_DaemonMsgCreateSerializer() { - ser.FindSerializerForType(typeof(DaemonMsgCreate)).GetType().ShouldBe(typeof(DaemonMsgCreateSerializer)); + _ser.FindSerializerForType(typeof(DaemonMsgCreate)).GetType().ShouldBe(typeof(DaemonMsgCreateSerializer)); } [Fact] public void Serialization_must_serialize_and_deserialize_DaemonMsgCreate_with_FromClassCreator() { - VerifySerialization(new DaemonMsgCreate(Props.Create(), new Deploy(), "foo", supervisor)); + VerifySerialization(new DaemonMsgCreate(Props.Create(), new Deploy(), "foo", _supervisor)); } [Fact] public void Serialization_must_serialize_and_deserialize_DaemonMsgCreate_with_function_creator() { - VerifySerialization(new DaemonMsgCreate(Props.Create(() => new MyActor()), new Deploy(), "foo", supervisor)); + VerifySerialization(new DaemonMsgCreate(Props.Create(() => new MyActor()), new Deploy(), "foo", _supervisor)); } [Fact] @@ -72,16 +75,18 @@ public void Serialization_must_serialize_and_deserialize_DaemonMsgCreate_with_De FromConfig.Instance, new RemoteScope(new Address("akka", "Test", "host2", 1922)), Deploy.NoDispatcherGiven); - VerifySerialization(new DaemonMsgCreate(Props.Create().WithDispatcher("my-disp").WithDeploy(deploy1), deploy2, "foo", supervisor)); + VerifySerialization(new DaemonMsgCreate(Props.Create().WithDispatcher("my-disp").WithDeploy(deploy1), deploy2, "foo", _supervisor)); } #region Helper methods private void VerifySerialization(DaemonMsgCreate msg) { - var daemonMsgSerializer = ser.FindSerializerFor(msg); - AssertDaemonMsgCreate(msg, ser.Deserialize(daemonMsgSerializer.ToBinary(msg), - daemonMsgSerializer.Identifier, typeof(DaemonMsgCreate)).AsInstanceOf()); + var daemonMsgSerializer = _ser.FindSerializerFor(msg); + var binary = daemonMsgSerializer.ToBinary(msg); + var actual = (DaemonMsgCreate) _ser.Deserialize(binary, daemonMsgSerializer.Identifier, typeof (DaemonMsgCreate)); + + AssertDaemonMsgCreate(msg, actual); } private void AssertDaemonMsgCreate(DaemonMsgCreate expected, DaemonMsgCreate actual) diff --git a/src/core/Akka.Remote/Akka.Remote.csproj b/src/core/Akka.Remote/Akka.Remote.csproj index 2c3be67746b..7ae108ac855 100644 --- a/src/core/Akka.Remote/Akka.Remote.csproj +++ b/src/core/Akka.Remote/Akka.Remote.csproj @@ -86,7 +86,7 @@ - + diff --git a/src/core/Akka.Remote/Endpoint.cs b/src/core/Akka.Remote/Endpoint.cs index 38955748e12..7d052ffcbab 100644 --- a/src/core/Akka.Remote/Endpoint.cs +++ b/src/core/Akka.Remote/Endpoint.cs @@ -1102,8 +1102,7 @@ private SerializedMessage SerializeMessage(object msg) throw new EndpointException("Internal error: No handle was present during serialization of outbound message."); } - Akka.Serialization.Serialization.CurrentTransportInformation = new Information() { Address = _handle.LocalAddress, System = _system }; - return MessageSerializer.Serialize(_system, msg); + return MessageSerializer.Serialize(_system, _handle.LocalAddress, msg); } private int _writeCount = 0; diff --git a/src/core/Akka.Remote/MessageSerializer.cs b/src/core/Akka.Remote/MessageSerializer.cs index b9793b24d8c..43948b615f6 100644 --- a/src/core/Akka.Remote/MessageSerializer.cs +++ b/src/core/Akka.Remote/MessageSerializer.cs @@ -37,12 +37,13 @@ public static object Deserialize(ActorSystem system, SerializedMessage messagePr /// Serializes the specified message. /// /// The system. + /// /// The message. /// SerializedMessage. - public static SerializedMessage Serialize(ActorSystem system, object message) + public static SerializedMessage Serialize(ActorSystem system,Address address, object message) { Serializer serializer = system.Serialization.FindSerializerFor(message); - byte[] messageBytes = serializer.ToBinary(message); + byte[] messageBytes = serializer.ToBinaryWithAddress(address,message); SerializedMessage.Builder messageBuilder = new SerializedMessage.Builder() .SetSerializerId(serializer.Identifier); if (serializer.IncludeManifest) diff --git a/src/core/Akka.Remote/RemoteActorRefProvider.cs b/src/core/Akka.Remote/RemoteActorRefProvider.cs index bec37a7bff5..d3ad01a7eda 100644 --- a/src/core/Akka.Remote/RemoteActorRefProvider.cs +++ b/src/core/Akka.Remote/RemoteActorRefProvider.cs @@ -50,7 +50,7 @@ private Internals RemoteInternals return _internals ?? (_internals = new Internals(new Remoting(_system, this), _system.Serialization, - new RemoteDaemon(_system, RootPath / "remote", SystemGuardian, _log))); + new RemoteSystemDaemon(_system, RootPath / "remote", SystemGuardian,this.DeadLetters /* TODO: should be RemoteTerminator*/, _log))); } } @@ -350,11 +350,6 @@ public Address GetExternalAddressFor(Address address) public void UseActorOnNode(RemoteActorRef actor, Props props, Deploy deploy, IInternalActorRef supervisor) { - Akka.Serialization.Serialization.CurrentTransportInformation = new Information - { - System = _system, - Address = actor.LocalAddressToUse, - }; _log.Debug("[{0}] Instantiating Remote Actor [{1}]", RootPath, actor.Path); IActorRef remoteNode = ResolveActorRef(new RootActorPath(actor.Path.Address) / "remote"); remoteNode.Tell(new DaemonMsgCreate(props, deploy, actor.Path.ToSerializationFormat(), supervisor)); diff --git a/src/core/Akka.Remote/RemoteDaemon.cs b/src/core/Akka.Remote/RemoteDaemon.cs deleted file mode 100644 index 412117cdc28..00000000000 --- a/src/core/Akka.Remote/RemoteDaemon.cs +++ /dev/null @@ -1,188 +0,0 @@ -//----------------------------------------------------------------------- -// -// Copyright (C) 2009-2015 Typesafe Inc. -// Copyright (C) 2013-2015 Akka.NET project -// -//----------------------------------------------------------------------- - -using System.Collections.Generic; -using System.Linq; -using Akka.Actor; -using Akka.Actor.Internals; -using Akka.Dispatch.SysMsg; -using Akka.Event; -using Akka.Util.Internal; - -namespace Akka.Remote -{ - /// - /// INTERNAL API - /// - internal interface IDaemonMsg { } - - /// - /// INTERNAL API - /// - internal class DaemonMsgCreate : IDaemonMsg - { - /// - /// Initializes a new instance of the class. - /// - /// The props. - /// The deploy. - /// The path. - /// The supervisor. - public DaemonMsgCreate(Props props, Deploy deploy, string path, IActorRef supervisor) - { - Props = props; - Deploy = deploy; - Path = path; - Supervisor = supervisor; - } - - /// - /// Gets the props. - /// - /// The props. - public Props Props { get; private set; } - - /// - /// Gets the deploy. - /// - /// The deploy. - public Deploy Deploy { get; private set; } - - /// - /// Gets the path. - /// - /// The path. - public string Path { get; private set; } - - /// - /// Gets the supervisor. - /// - /// The supervisor. - public IActorRef Supervisor { get; private set; } - } - - /// - /// INTERNAL API - /// - /// Internal system "daemon" actor for remote internal communication. - /// - /// It acts as the brain of the remote that response to system remote messages and executes actions accordingly. - /// - internal class RemoteDaemon : VirtualPathContainer - { - private readonly ActorSystemImpl _system; - - /// - /// Initializes a new instance of the class. - /// - /// The system. - /// The path. - /// The parent. - /// - public RemoteDaemon(ActorSystemImpl system, ActorPath path, IInternalActorRef parent, ILoggingAdapter log) - : base(system.Provider, path, parent, log) - { - _system = system; - AddressTerminatedTopic.Get(system).Subscribe(this); - } - - - /// - /// Called when [receive]. - /// - /// The message. - protected void OnReceive(object message) - { - //note: RemoteDaemon does not handle ActorSelection messages - those are handled directly by the RemoteActorRefProvider. - if (message is IDaemonMsg) - { - Log.Debug("Received command [{0}] to RemoteSystemDaemon on [{1}]", message, Path.Address); - if (message is DaemonMsgCreate) HandleDaemonMsgCreate((DaemonMsgCreate)message); - } - - //Remote ActorSystem on another process / machine has died. - //Need to clean up any references to remote deployments here. - else if (message is AddressTerminated) - { - var addressTerminated = (AddressTerminated) message; - //stop any remote actors that belong to this address - ForEachChild(@ref => - { - if(@ref.Parent.Path.Address == addressTerminated.Address) _system.Stop(@ref); - }); - } - } - - /// - /// Tells the internal. - /// - /// The message. - /// The sender. - protected override void TellInternal(object message, IActorRef sender) - { - OnReceive(message); - } - - /// - /// Handles the daemon MSG create. - /// - /// The message. - private void HandleDaemonMsgCreate(DaemonMsgCreate message) - { - var supervisor = (IInternalActorRef) message.Supervisor; - Props props = message.Props; - ActorPath childPath; - if(ActorPath.TryParse(message.Path, out childPath)) - { - IEnumerable subPath = childPath.Elements.Drop(1); //drop the /remote - ActorPath path = Path/subPath; - var localProps = props; //.WithDeploy(new Deploy(Scope.Local)); - IInternalActorRef actor = _system.Provider.ActorOf(_system, localProps, supervisor, path, false, - message.Deploy, true, false); - string childName = subPath.Join("/"); - AddChild(childName, actor); - actor.Tell(new Watch(actor, this)); - actor.Start(); - } - else - { - Log.Debug("remote path does not match path from message [{0}]", message); - } - } - - /// - /// Gets the child. - /// - /// The name. - /// ActorRef. - public override IActorRef GetChild(IEnumerable name) - { - string[] parts = name.ToArray(); - //TODO: I have no clue what the scala version does - if (!parts.Any()) - return this; - - string n = parts.First(); - if (string.IsNullOrEmpty(n)) - return this; - - for (int i = parts.Length; i >= 0; i--) - { - string joined = string.Join("/", parts, 0, i); - IInternalActorRef child; - if (TryGetChild(joined, out child)) - { - //longest match found - IEnumerable rest = parts.Skip(i); - return child.GetChild(rest); - } - } - return ActorRefs.Nobody; - } - } -} - diff --git a/src/core/Akka.Remote/RemoteSystemDaemon.cs b/src/core/Akka.Remote/RemoteSystemDaemon.cs new file mode 100644 index 00000000000..042653eceff --- /dev/null +++ b/src/core/Akka.Remote/RemoteSystemDaemon.cs @@ -0,0 +1,320 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2015 Typesafe Inc. +// Copyright (C) 2013-2015 Akka.NET project +// +//----------------------------------------------------------------------- + +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using Akka.Actor; +using Akka.Actor.Internals; +using Akka.Dispatch.SysMsg; +using Akka.Event; +using Akka.Util; +using Akka.Util.Internal; +using Akka.Util.Internal.Collections; + +namespace Akka.Remote +{ + /// + /// INTERNAL API + /// + internal interface IDaemonMsg { } + + /// + /// INTERNAL API + /// + internal class DaemonMsgCreate : IDaemonMsg + { + /// + /// Initializes a new instance of the class. + /// + /// The props. + /// The deploy. + /// The path. + /// The supervisor. + public DaemonMsgCreate(Props props, Deploy deploy, string path, IActorRef supervisor) + { + Props = props; + Deploy = deploy; + Path = path; + Supervisor = supervisor; + } + + /// + /// Gets the props. + /// + /// The props. + public Props Props { get; private set; } + + /// + /// Gets the deploy. + /// + /// The deploy. + public Deploy Deploy { get; private set; } + + /// + /// Gets the path. + /// + /// The path. + public string Path { get; private set; } + + /// + /// Gets the supervisor. + /// + /// The supervisor. + public IActorRef Supervisor { get; private set; } + } + + /// + /// INTERNAL API + /// + /// Internal system "daemon" actor for remote internal communication. + /// + /// It acts as the brain of the remote that response to system remote messages and executes actions accordingly. + /// + internal class RemoteSystemDaemon : VirtualPathContainer + { + private readonly ActorSystemImpl _system; + private readonly Switch _terminating = new Switch(false); + //private val parent2children = new ConcurrentHashMap[ActorRef, Set[ActorRef]] + private readonly ConcurrentDictionary> _parent2Children = new ConcurrentDictionary>(); + private readonly IActorRef _terminator; + + /// + /// Initializes a new instance of the class. + /// + /// The system. + /// The path. + /// The parent. + /// + /// + public RemoteSystemDaemon(ActorSystemImpl system, ActorPath path, IInternalActorRef parent,IActorRef terminator, ILoggingAdapter log) + : base(system.Provider, path, parent, log) + { + _terminator = terminator; + _system = system; + AddressTerminatedTopic.Get(system).Subscribe(this); + } + + + /// + /// Called when [receive]. + /// + /// The message. + protected void OnReceive(object message) + { + //note: RemoteDaemon does not handle ActorSelection messages - those are handled directly by the RemoteActorRefProvider. + if (message is IDaemonMsg) + { + Log.Debug("Received command [{0}] to RemoteSystemDaemon on [{1}]", message, Path.Address); + if (message is DaemonMsgCreate) HandleDaemonMsgCreate((DaemonMsgCreate)message); + } + + //Remote ActorSystem on another process / machine has died. + //Need to clean up any references to remote deployments here. + else if (message is AddressTerminated) + { + var addressTerminated = (AddressTerminated) message; + //stop any remote actors that belong to this address + ForEachChild(@ref => + { + if(@ref.Parent.Path.Address == addressTerminated.Address) _system.Stop(@ref); + }); + } + else if (message is DeathWatchNotification) + { + var deathWatchNotification = message as DeathWatchNotification; + var child = deathWatchNotification.Actor as ActorRefWithCell; + if (child != null) + { + if (child.IsLocal) + { + // removeChild(child.path.elements.drop(1).mkString("/"), child) + // val parent = child.getParent + // if (removeChildParentNeedsUnwatch(parent, child)) parent.sendSystemMessage(Unwatch(parent, this)) + // terminationHookDoneWhenNoChildren() + + _terminating.Locked(() => + { + var name = child.Path.Elements.Drop(1).Join("/"); + RemoveChild(name,child); + var parent = child.Parent; + if (RemoveChildParentNeedsUnwatch(parent, child)) + { + parent.Tell(new Unwatch(parent, this)); + } + TerminationHookDoneWhenNoChildren(); + + }); + } + } + else + { + //case DeathWatchNotification(parent: ActorRef with ActorRefScope, _, _) if !parent.isLocal ⇒ + // terminating.locked { + // parent2children.remove(parent) match { + // case null ⇒ + // case children ⇒ + // for (c ← children) { + // system.stop(c) + // removeChild(c.path.elements.drop(1).mkString("/"), c) + // } + // terminationHookDoneWhenNoChildren() + // } + // } + var parent = deathWatchNotification.Actor; + var parentWithScope = parent as IActorRefScope; + if (parentWithScope != null && !parentWithScope.IsLocal) + { + _terminating.Locked(() => + { + IImmutableSet children; + if (_parent2Children.TryRemove(parent,out children)) + { + foreach (var c in children) + { + _system.Stop(c); + var name = c.Path.Elements.Drop(1).Join("/"); + RemoveChild(name,c); + } + TerminationHookDoneWhenNoChildren(); + } + }); + } + } + } + } + + private void TerminationHookDoneWhenNoChildren() + { + _terminating.WhileOn(() => + { + if (!HasChildren) + { + _terminator.Tell(TerminationHookDone.Instance, this); + } + }); + } + + // def terminationHookDoneWhenNoChildren(): Unit = terminating.whileOn { + // if (!hasChildren) terminator.tell(TerminationHookDone, this) + //} + + /// + /// Tells the internal. + /// + /// The message. + /// The sender. + protected override void TellInternal(object message, IActorRef sender) + { + OnReceive(message); + } + + /// + /// Handles the daemon MSG create. + /// + /// The message. + private void HandleDaemonMsgCreate(DaemonMsgCreate message) + { + var supervisor = (IInternalActorRef) message.Supervisor; + var parent = supervisor; + Props props = message.Props; + ActorPath childPath; + if(ActorPath.TryParse(message.Path, out childPath)) + { + IEnumerable subPath = childPath.Elements.Drop(1); //drop the /remote + ActorPath path = Path/subPath; + var localProps = props; //.WithDeploy(new Deploy(Scope.Local)); + + bool isTerminating = !_terminating.WhileOff(() => + { + IInternalActorRef actor = _system.Provider.ActorOf(_system, localProps, supervisor, path, false, + message.Deploy, true, false); + string childName = subPath.Join("/"); + AddChild(childName, actor); + actor.Tell(new Watch(actor, this)); + actor.Start(); + if (AddChildParentNeedsWatch(parent, actor)) + { + //TODO: figure out why current transport is not set when this message is sent + //parent.Tell(new Watch(parent, this),this); + } + }); + if (isTerminating) + { + Log.Error("Skipping [{0}] to RemoteSystemDaemon on [{1}] while terminating", message, path.Address); + } + + } + else + { + Log.Debug("remote path does not match path from message [{0}]", message); + } + } + + /// + /// Gets the child. + /// + /// The name. + /// ActorRef. + public override IActorRef GetChild(IEnumerable name) + { + string[] parts = name.ToArray(); + //TODO: I have no clue what the scala version does + if (!parts.Any()) + return this; + + string n = parts.First(); + if (string.IsNullOrEmpty(n)) + return this; + + for (int i = parts.Length; i >= 0; i--) + { + string joined = string.Join("/", parts, 0, i); + IInternalActorRef child; + if (TryGetChild(joined, out child)) + { + //longest match found + IEnumerable rest = parts.Skip(i); + return child.GetChild(rest); + } + } + return ActorRefs.Nobody; + } + + private bool AddChildParentNeedsWatch(IActorRef parent, IActorRef child) + { + const bool weDontHaveTailRecursion = true; + while (weDontHaveTailRecursion) + { + if (_parent2Children.TryAdd(parent, ImmutableTreeSet.Create(child))) + return true; //child was successfully added + + IImmutableSet children; + if (_parent2Children.TryGetValue(parent, out children)) + { + if (_parent2Children.TryUpdate(parent, children.Add(child), children)) + return false; //child successfully added + } + } + } + + private bool RemoveChildParentNeedsUnwatch(IActorRef parent, IActorRef child) + { + const bool weDontHaveTailRecursion = true; + while (weDontHaveTailRecursion) + { + IImmutableSet children; + if (!_parent2Children.TryGetValue(parent, out children)) + return false; //parent is missing, so child does not need to be removed + + if (_parent2Children.TryUpdate(parent, children.Remove(child), children)) + return true; //child was removed + } + } + } +} + diff --git a/src/core/Akka.Remote/Remoting.cs b/src/core/Akka.Remote/Remoting.cs index a9a70b8efed..fb398b15627 100644 --- a/src/core/Akka.Remote/Remoting.cs +++ b/src/core/Akka.Remote/Remoting.cs @@ -290,7 +290,7 @@ private void NotifyError(string msg, Exception cause) public const string EndpointManagerName = "endpointManager"; - internal static Address LocalAddressForRemote( + internal Address LocalAddressForRemote( IDictionary> transportMapping, Address remote) { HashSet transports; diff --git a/src/core/Akka/Actor/ActorRef.cs b/src/core/Akka/Actor/ActorRef.cs index 59b8c4df7fa..e83a9ad8222 100644 --- a/src/core/Akka/Actor/ActorRef.cs +++ b/src/core/Akka/Actor/ActorRef.cs @@ -402,7 +402,7 @@ public void AddChild(string name, IInternalActorRef actor) { _children.AddOrUpdate(name, actor, (k, v) => { - //TODO: log.debug("{} replacing child {} ({} -> {})", path, name, old, ref) + Log.Warning("{0} replacing child {1} ({2} -> {3})", name, actor, v, actor); return v; }); } @@ -412,7 +412,16 @@ public void RemoveChild(string name) IInternalActorRef tmp; if (!_children.TryRemove(name, out tmp)) { - //TODO: log.warning("{} trying to remove non-child {}", path, name) + Log.Warning("{0} trying to remove non-child {1}", Path, name); + } + } + + public void RemoveChild(string name,IActorRef child) + { + IInternalActorRef tmp; + if (!_children.TryRemove(name, out tmp)) + { + Log.Warning("{0} trying to remove non-child {1}",Path,name); } } diff --git a/src/core/Akka/Serialization/NewtonSoftJsonSerializer.cs b/src/core/Akka/Serialization/NewtonSoftJsonSerializer.cs index c318998d84a..d8e58c4750a 100644 --- a/src/core/Akka/Serialization/NewtonSoftJsonSerializer.cs +++ b/src/core/Akka/Serialization/NewtonSoftJsonSerializer.cs @@ -93,7 +93,6 @@ public override bool IncludeManifest /// A byte array containing the serialized object public override byte[] ToBinary(object obj) { - Serialization.CurrentSystem = system; string data = JsonConvert.SerializeObject(obj, Formatting.None, _settings); byte[] bytes = Encoding.Default.GetBytes(data); return bytes; @@ -107,7 +106,6 @@ public override byte[] ToBinary(object obj) /// The object contained in the array public override object FromBinary(byte[] bytes, Type type) { - Serialization.CurrentSystem = system; string data = Encoding.Default.GetString(bytes); object res = JsonConvert.DeserializeObject(data, _settings); diff --git a/src/core/Akka/Serialization/Serialization.cs b/src/core/Akka/Serialization/Serialization.cs index bf124880ba0..f834eb0d874 100644 --- a/src/core/Akka/Serialization/Serialization.cs +++ b/src/core/Akka/Serialization/Serialization.cs @@ -9,6 +9,7 @@ using System.Collections.Generic; using System.Linq; using Akka.Actor; +using Akka.Actor.Internals; using Akka.Util.Internal; namespace Akka.Serialization @@ -21,9 +22,20 @@ public class Information public class Serialization { - [ThreadStatic] public static Information CurrentTransportInformation; + [ThreadStatic] private static Information _currentTransportInformation; + + public static T SerializeWithTransport(ActorSystem system, Address address, Func action) + { + _currentTransportInformation = new Information() + { + System = system, + Address = address + }; + var res = action(); + _currentTransportInformation = null; + return res; + } - [ThreadStatic] public static ActorSystem CurrentSystem; private readonly Serializer _nullSerializer; private readonly Dictionary _serializerMap = new Dictionary(); @@ -123,42 +135,48 @@ public Serializer FindSerializerForType(Type objectType) throw new Exception("Serializer not found for type " + objectType.Name); } - public static string SerializedActorPath(IActorRef @ref) + public static string SerializedActorPath(IActorRef actorRef) { - if (@ref == ActorRefs.NoSender) return String.Empty; + if (Equals(actorRef, ActorRefs.NoSender)) + return String.Empty; - /* -val path = actorRef.path - val originalSystem: ExtendedActorSystem = actorRef match { - case a: ActorRefWithCell ⇒ a.underlying.system.asInstanceOf[ExtendedActorSystem] - case _ ⇒ null - } - Serialization.currentTransportInformation.value match { - case null ⇒ originalSystem match { - case null ⇒ path.toSerializationFormat - case system ⇒ - try path.toSerializationFormatWithAddress(system.provider.getDefaultAddress) - catch { case NonFatal(_) ⇒ path.toSerializationFormat } - } - case Information(address, system) ⇒ - if (originalSystem == null || originalSystem == system) - path.toSerializationFormatWithAddress(address) - else { - val provider = originalSystem.provider - path.toSerializationFormatWithAddress(provider.getExternalAddressFor(address).getOrElse(provider.getDefaultAddress)) - } - }*/ - ActorSystem originalSystem = null; - if (@ref is ActorRefWithCell) + var path = actorRef.Path; + ExtendedActorSystem originalSystem = null; + if (actorRef is ActorRefWithCell) + { + originalSystem = actorRef.AsInstanceOf().Underlying.System.AsInstanceOf(); + } + + if (_currentTransportInformation == null) { - originalSystem = @ref.AsInstanceOf().Underlying.System; - if (CurrentTransportInformation == null) + if (originalSystem == null) + { + var res = path.ToSerializationFormat(); + return res; + } + else { - return @ref.Path.ToSerializationFormat(); + var defaultAddress = originalSystem.Provider.DefaultAddress; + var res = path.ToStringWithAddress(defaultAddress); + return res; } - return @ref.Path.ToStringWithAddress(CurrentTransportInformation.Address); } - return @ref.Path.ToSerializationFormat(); + + //CurrentTransportInformation exists + var system = _currentTransportInformation.System; + var address = _currentTransportInformation.Address; + if (originalSystem == null || originalSystem == system) + { + var res = path.ToStringWithAddress(address); + return res; + } + else + { + var provider = originalSystem.Provider; + var res = + path.ToStringWithAddress(provider.GetExternalAddressFor(address).GetOrElse(provider.DefaultAddress)); + return res; + } } public Serializer GetSerializerById(int serializerId) diff --git a/src/core/Akka/Serialization/Serializer.cs b/src/core/Akka/Serialization/Serializer.cs index 7f2660baf94..23580d0faae 100644 --- a/src/core/Akka/Serialization/Serializer.cs +++ b/src/core/Akka/Serialization/Serializer.cs @@ -63,6 +63,17 @@ public Serializer(ExtendedActorSystem system) /// A byte array containing the serialized object public abstract byte[] ToBinary(object obj); + /// + /// Serializes the given object into a byte array and uses the given address to decorate serialized ActorRef's + /// + /// The address to use when serializing local ActorRef´s + /// The object to serialize + /// + public byte[] ToBinaryWithAddress(Address address, object obj) + { + return Serialization.SerializeWithTransport(system, address, () => ToBinary(obj)); + } + /// /// Deserializes a byte array into an object of type . /// diff --git a/src/core/Akka/Util/Switch.cs b/src/core/Akka/Util/Switch.cs index 700f2e1782b..5fa1951c6e2 100644 --- a/src/core/Akka/Util/Switch.cs +++ b/src/core/Akka/Util/Switch.cs @@ -177,6 +177,14 @@ public bool IsOff { get { return !_switch.Value; } } + + public void Locked(Action action) + { + lock (_lock) + { + action(); + } + } } }