From 6e3dacb0b13db6f9af4e1c103dcd8fefce491b0d Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 22 Apr 2021 21:56:14 +0700 Subject: [PATCH 1/5] Fix serialization verification problem with Akka.IO messages --- src/core/Akka.Tests/IO/TcpIntegrationSpec.cs | 4 +++- src/core/Akka.Tests/IO/TcpListenerSpec.cs | 8 +++++--- .../Akka.Tests/IO/UdpConnectedIntegrationSpec.cs | 2 ++ src/core/Akka.Tests/IO/UdpIntegrationSpec.cs | 2 ++ src/core/Akka.Tests/IO/UdpListenerSpec.cs | 10 +++++++--- src/core/Akka/Actor/ActorCell.Children.cs | 16 +++++++++++++--- src/core/Akka/Actor/ActorCell.cs | 8 ++++++++ src/core/Akka/IO/Dns.cs | 2 +- src/core/Akka/IO/Tcp.cs | 2 +- src/core/Akka/IO/Udp.cs | 4 ++-- src/core/Akka/IO/UdpConnected.cs | 6 +++--- 11 files changed, 47 insertions(+), 17 deletions(-) diff --git a/src/core/Akka.Tests/IO/TcpIntegrationSpec.cs b/src/core/Akka.Tests/IO/TcpIntegrationSpec.cs index 67937d68e57..cb8abc1c4d3 100644 --- a/src/core/Akka.Tests/IO/TcpIntegrationSpec.cs +++ b/src/core/Akka.Tests/IO/TcpIntegrationSpec.cs @@ -45,6 +45,8 @@ class AckWithValue : Tcp.Event public TcpIntegrationSpec(ITestOutputHelper output) : base($@"akka.loglevel = DEBUG + akka.actor.serialize-creators = on + akka.actor.serialize-messages = on akka.io.tcp.trace-logging = true akka.io.tcp.write-commands-queue-max-size = {InternalConnectionActorMaxQueueSize}", output: output) { } @@ -190,7 +192,7 @@ public void The_TCP_transport_implementation_should_properly_support_connecting_ var targetAddress = new DnsEndPoint("localhost", boundMsg.LocalAddress.AsInstanceOf().Port); var clientHandler = CreateTestProbe(); Sys.Tcp().Tell(new Tcp.Connect(targetAddress), clientHandler); - clientHandler.ExpectMsg(TimeSpan.FromMinutes(10)); + clientHandler.ExpectMsg(TimeSpan.FromSeconds(3)); var clientEp = clientHandler.Sender; clientEp.Tell(new Tcp.Register(clientHandler)); serverHandler.ExpectMsg(); diff --git a/src/core/Akka.Tests/IO/TcpListenerSpec.cs b/src/core/Akka.Tests/IO/TcpListenerSpec.cs index e5db0fe2674..02c40edbedb 100644 --- a/src/core/Akka.Tests/IO/TcpListenerSpec.cs +++ b/src/core/Akka.Tests/IO/TcpListenerSpec.cs @@ -19,12 +19,14 @@ namespace Akka.Tests.IO public class TcpListenerSpec : AkkaSpec { public TcpListenerSpec() - : base(@"akka.io.tcp.register-timeout = 500ms + : base(@" + akka.actor.serialize-creators = on + akka.actor.serialize-messages = on + akka.io.tcp.register-timeout = 500ms akka.io.tcp.max-received-message-size = 1024 akka.io.tcp.direct-buffer-size = 512 akka.actor.serialize-creators = on - akka.io.tcp.batch-accept-limit = 2 - ") + akka.io.tcp.batch-accept-limit = 2") { } [Fact] diff --git a/src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs b/src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs index e948193f219..3f79b8a8d9c 100644 --- a/src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs +++ b/src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs @@ -24,6 +24,8 @@ public class UdpConnectedIntegrationSpec : AkkaSpec public UdpConnectedIntegrationSpec(ITestOutputHelper output) : base(@" + akka.actor.serialize-creators = on + akka.actor.serialize-messages = on akka.io.udp-connected.nr-of-selectors = 1 akka.io.udp-connected.direct-buffer-pool-limit = 100 akka.io.udp-connected.direct-buffer-size = 1024 diff --git a/src/core/Akka.Tests/IO/UdpIntegrationSpec.cs b/src/core/Akka.Tests/IO/UdpIntegrationSpec.cs index a3af3b3d769..a4c69741bdb 100644 --- a/src/core/Akka.Tests/IO/UdpIntegrationSpec.cs +++ b/src/core/Akka.Tests/IO/UdpIntegrationSpec.cs @@ -27,6 +27,8 @@ public class UdpIntegrationSpec : AkkaSpec public UdpIntegrationSpec(ITestOutputHelper output) : base(@" + akka.actor.serialize-creators = on + akka.actor.serialize-messages = on akka.io.udp.max-channels = unlimited akka.io.udp.nr-of-selectors = 1 akka.io.udp.direct-buffer-pool-limit = 100 diff --git a/src/core/Akka.Tests/IO/UdpListenerSpec.cs b/src/core/Akka.Tests/IO/UdpListenerSpec.cs index 7895c6b4944..7021126af1d 100644 --- a/src/core/Akka.Tests/IO/UdpListenerSpec.cs +++ b/src/core/Akka.Tests/IO/UdpListenerSpec.cs @@ -13,17 +13,21 @@ using Akka.IO; using Akka.TestKit; using Xunit; +using Xunit.Abstractions; using UdpListener = Akka.IO.UdpListener; namespace Akka.Tests.IO { public class UdpListenerSpec : AkkaSpec { - public UdpListenerSpec() - : base(@"akka.io.udp.max-channels = unlimited + public UdpListenerSpec(ITestOutputHelper output) + : base(@" + akka.actor.serialize-creators = on + akka.actor.serialize-messages = on + akka.io.udp.max-channels = unlimited akka.io.udp.nr-of-selectors = 1 akka.io.udp.direct-buffer-pool-limit = 100 - akka.io.udp.direct-buffer-size = 1024") + akka.io.udp.direct-buffer-size = 1024", output) { } [Fact] diff --git a/src/core/Akka/Actor/ActorCell.Children.cs b/src/core/Akka/Actor/ActorCell.Children.cs index 0affbfe94f6..66532081083 100644 --- a/src/core/Akka/Actor/ActorCell.Children.cs +++ b/src/core/Akka/Actor/ActorCell.Children.cs @@ -10,8 +10,10 @@ using System.Collections.Immutable; using System.Text; using System.Runtime.CompilerServices; +using System.Runtime.Serialization; using System.Threading; using Akka.Actor.Internal; +using Akka.Dispatch.SysMsg; using Akka.Serialization; using Akka.Util; using Akka.Util.Internal; @@ -452,6 +454,7 @@ private IInternalActorRef MakeChild(Props props, string name, bool async, bool s if (_systemImpl.Settings.SerializeAllCreators && !systemService && !(props.Deploy.Scope is LocalScope)) { var oldInfo = Serialization.Serialization.CurrentTransportInformation; + object propArgument = null; try { if (oldInfo == null) @@ -465,17 +468,24 @@ private IInternalActorRef MakeChild(Props props, string name, bool async, bool s { if (argument != null && !(argument is INoSerializationVerificationNeeded)) { + propArgument = argument; var serializer = ser.FindSerializerFor(argument); var bytes = serializer.ToBinary(argument); var ms = Serialization.Serialization.ManifestFor(serializer, argument); - if(ser.Deserialize(bytes, serializer.Identifier, ms) == null) + if (ser.Deserialize(bytes, serializer.Identifier, ms) == null) throw new ArgumentException( - $"Pre-creation serialization check failed at [${_self.Path}/{name}]", - nameof(name)); + $"Pre-creation serialization check failed at [${_self.Path}/{name}]", + nameof(name)); } } } } + catch (Exception e) + { + throw new SerializationException( + $"Failed to serialize and deserialize actor props argument of type {propArgument?.GetType()}", + e); + } finally { Serialization.Serialization.CurrentTransportInformation = oldInfo; diff --git a/src/core/Akka/Actor/ActorCell.cs b/src/core/Akka/Actor/ActorCell.cs index 349ea7420c8..25d834de8e3 100644 --- a/src/core/Akka/Actor/ActorCell.cs +++ b/src/core/Akka/Actor/ActorCell.cs @@ -13,6 +13,7 @@ using Akka.Dispatch.SysMsg; using Akka.Event; using System.Reflection; +using System.Runtime.Serialization; using Akka.Serialization; using Akka.Util; using Assert = System.Diagnostics.Debug; @@ -518,6 +519,9 @@ private Envelope SerializeAndDeserialize(Envelope envelope) if (unwrapped is INoSerializationVerificationNeeded) return envelope; + if(unwrapped.GetType().Namespace?.StartsWith("System.Net.Sockets") ?? false) + return envelope; + var deserializedMsg = SerializeAndDeserializePayload(unwrapped); if (deadLetter != null) return new Envelope(new DeadLetter(deserializedMsg, deadLetter.Sender, deadLetter.Recipient), envelope.Sender); @@ -540,6 +544,10 @@ private object SerializeAndDeserializePayload(object obj) var manifest = Serialization.Serialization.ManifestFor(serializer, obj); return _systemImpl.Serialization.Deserialize(bytes, serializer.Identifier, manifest); } + catch (Exception e) + { + throw new SerializationException($"Failed to serialize and deserialize payload object {obj.GetType()}", e); + } finally { Serialization.Serialization.CurrentTransportInformation = oldInfo; diff --git a/src/core/Akka/IO/Dns.cs b/src/core/Akka/IO/Dns.cs index 143120feedb..b510f51331f 100644 --- a/src/core/Akka/IO/Dns.cs +++ b/src/core/Akka/IO/Dns.cs @@ -59,7 +59,7 @@ public class Dns : ExtensionIdProvider /// /// TBD /// - public abstract class Command + public abstract class Command : INoSerializationVerificationNeeded { } /// diff --git a/src/core/Akka/IO/Tcp.cs b/src/core/Akka/IO/Tcp.cs index f5efd617c8d..99695646f44 100644 --- a/src/core/Akka/IO/Tcp.cs +++ b/src/core/Akka/IO/Tcp.cs @@ -53,7 +53,7 @@ public override TcpExt CreateExtension(ExtendedActorSystem system) #region internal connection messages - internal abstract class SocketCompleted { } + internal abstract class SocketCompleted : INoSerializationVerificationNeeded { } internal sealed class SocketSent : SocketCompleted { diff --git a/src/core/Akka/IO/Udp.cs b/src/core/Akka/IO/Udp.cs index b1aea2c2bff..213994ab542 100644 --- a/src/core/Akka/IO/Udp.cs +++ b/src/core/Akka/IO/Udp.cs @@ -34,7 +34,7 @@ public class Udp : ExtensionIdProvider { #region internal connection messages - internal abstract class SocketCompleted { } + internal abstract class SocketCompleted : INoSerializationVerificationNeeded { } internal sealed class SocketSent : SocketCompleted { @@ -104,7 +104,7 @@ public override UdpExt CreateExtension(ExtendedActorSystem system) } /// The common interface for and . - public abstract class Message { } + public abstract class Message : INoSerializationVerificationNeeded { } /// The common type of all commands supported by the UDP implementation. public abstract class Command : Message diff --git a/src/core/Akka/IO/UdpConnected.cs b/src/core/Akka/IO/UdpConnected.cs index 351648c6b77..30b9c213c64 100644 --- a/src/core/Akka/IO/UdpConnected.cs +++ b/src/core/Akka/IO/UdpConnected.cs @@ -34,7 +34,7 @@ public class UdpConnected : ExtensionIdProvider { #region internal connection messages - internal abstract class SocketCompleted + internal abstract class SocketCompleted : INoSerializationVerificationNeeded { public readonly SocketAsyncEventArgs EventArgs; @@ -92,7 +92,7 @@ public override UdpConnectedExt CreateExtension(ExtendedActorSystem system) /// /// The common interface for and . /// - public abstract class Message { } + public abstract class Message : INoSerializationVerificationNeeded { } /// /// The common type of all commands supported by the UDP implementation. @@ -372,7 +372,7 @@ private Disconnected() /// /// TBD /// - public class UdpConnectedExt : IOExtension + public class UdpConnectedExt : IOExtension, INoSerializationVerificationNeeded { public UdpConnectedExt(ExtendedActorSystem system) : this(system, UdpSettings.Create(system.Settings.Config.GetConfig("akka.io.udp-connected"))) From bed4022b41acfc6adb1f90094a721ac5e9637f0c Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 22 Apr 2021 23:05:46 +0700 Subject: [PATCH 2/5] Wrap naked SocketAsyncEventArgs in a struct that inherits INoSerializationVerificationNeeded --- src/core/Akka/Actor/ActorCell.cs | 16 ++++---- src/core/Akka/IO/TcpListener.cs | 64 ++++++++++++++++++-------------- 2 files changed, 46 insertions(+), 34 deletions(-) diff --git a/src/core/Akka/Actor/ActorCell.cs b/src/core/Akka/Actor/ActorCell.cs index 25d834de8e3..cd58404aad2 100644 --- a/src/core/Akka/Actor/ActorCell.cs +++ b/src/core/Akka/Actor/ActorCell.cs @@ -519,10 +519,16 @@ private Envelope SerializeAndDeserialize(Envelope envelope) if (unwrapped is INoSerializationVerificationNeeded) return envelope; - if(unwrapped.GetType().Namespace?.StartsWith("System.Net.Sockets") ?? false) - return envelope; + object deserializedMsg; + try + { + deserializedMsg = SerializeAndDeserializePayload(unwrapped); + } + catch (Exception e) + { + throw new SerializationException($"Failed to serialize and deserialize payload object [{unwrapped.GetType()}]. Envelope: [{envelope}]", e); + } - var deserializedMsg = SerializeAndDeserializePayload(unwrapped); if (deadLetter != null) return new Envelope(new DeadLetter(deserializedMsg, deadLetter.Sender, deadLetter.Recipient), envelope.Sender); return new Envelope(deserializedMsg, envelope.Sender); @@ -544,10 +550,6 @@ private object SerializeAndDeserializePayload(object obj) var manifest = Serialization.Serialization.ManifestFor(serializer, obj); return _systemImpl.Serialization.Deserialize(bytes, serializer.Identifier, manifest); } - catch (Exception e) - { - throw new SerializationException($"Failed to serialize and deserialize payload object {obj.GetType()}", e); - } finally { Serialization.Serialization.CurrentTransportInformation = oldInfo; diff --git a/src/core/Akka/IO/TcpListener.cs b/src/core/Akka/IO/TcpListener.cs index 9407c1c153c..50769ab88cb 100644 --- a/src/core/Akka/IO/TcpListener.cs +++ b/src/core/Akka/IO/TcpListener.cs @@ -71,9 +71,9 @@ private IEnumerable Accept(int limit) { var self = Self; var saea = new SocketAsyncEventArgs(); - saea.Completed += (s, e) => self.Tell(e); + saea.Completed += (s, e) => self.Tell(new SocketEvent(e)); if (!_socket.AcceptAsync(saea)) - Self.Tell(saea); + Self.Tell(new SocketEvent(saea)); yield return saea; } } @@ -85,34 +85,34 @@ protected override SupervisorStrategy SupervisorStrategy() protected override bool Receive(object message) { - if (message is SocketAsyncEventArgs) + switch (message) { - var saea = message as SocketAsyncEventArgs; - if (saea.SocketError == SocketError.Success) - Context.ActorOf(Props.Create(_tcp, saea.AcceptSocket, _bind.Handler, _bind.Options, _bind.PullMode).WithDeploy(Deploy.Local)); - saea.AcceptSocket = null; + case SocketEvent evt: + var saea = evt.Args; + if (saea.SocketError == SocketError.Success) + Context.ActorOf(Props.Create(_tcp, saea.AcceptSocket, _bind.Handler, _bind.Options, _bind.PullMode).WithDeploy(Deploy.Local)); + saea.AcceptSocket = null; - if (!_socket.AcceptAsync(saea)) - Self.Tell(saea); - return true; - } - var resumeAccepting = message as Tcp.ResumeAccepting; - if (resumeAccepting != null) - { - _acceptLimit = resumeAccepting.BatchSize; - _saeas = Accept(_acceptLimit).ToArray(); - return true; - } - if (message is Tcp.Unbind) - { - _log.Debug("Unbinding endpoint {0}", _bind.LocalAddress); - _socket.Dispose(); - Sender.Tell(Tcp.Unbound.Instance); - _log.Debug("Unbound endpoint {0}, stopping listener", _bind.LocalAddress); - Context.Stop(Self); - return true; + if (!_socket.AcceptAsync(saea)) + Self.Tell(new SocketEvent(saea)); + return true; + + case Tcp.ResumeAccepting resumeAccepting: + _acceptLimit = resumeAccepting.BatchSize; + _saeas = Accept(_acceptLimit).ToArray(); + return true; + + case Tcp.Unbind _: + _log.Debug("Unbinding endpoint {0}", _bind.LocalAddress); + _socket.Dispose(); + Sender.Tell(Tcp.Unbound.Instance); + _log.Debug("Unbound endpoint {0}, stopping listener", _bind.LocalAddress); + Context.Stop(Self); + return true; + + default: + return false; } - return false; } /// @@ -130,5 +130,15 @@ protected override void PostStop() _log.Debug("Error closing ServerSocketChannel: {0}", e); } } + + private struct SocketEvent : INoSerializationVerificationNeeded + { + public SocketAsyncEventArgs Args; + + public SocketEvent(SocketAsyncEventArgs args) + { + Args = args; + } + } } } From cf23a2d5171e3d25b0a4210ee68700444928d784 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 22 Apr 2021 23:11:06 +0700 Subject: [PATCH 3/5] Make the wrapper struct readonly --- src/core/Akka/IO/TcpListener.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/Akka/IO/TcpListener.cs b/src/core/Akka/IO/TcpListener.cs index 50769ab88cb..a26d8f80592 100644 --- a/src/core/Akka/IO/TcpListener.cs +++ b/src/core/Akka/IO/TcpListener.cs @@ -131,9 +131,9 @@ protected override void PostStop() } } - private struct SocketEvent : INoSerializationVerificationNeeded + private readonly struct SocketEvent : INoSerializationVerificationNeeded { - public SocketAsyncEventArgs Args; + public readonly SocketAsyncEventArgs Args; public SocketEvent(SocketAsyncEventArgs args) { From bbcb13271518ed5b4256b5202162aaeb4f57def2 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Thu, 22 Apr 2021 23:43:49 +0700 Subject: [PATCH 4/5] Expand exception message with their actor types --- src/core/Akka/Actor/ActorCell.Children.cs | 2 +- src/core/Akka/Actor/ActorCell.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/Akka/Actor/ActorCell.Children.cs b/src/core/Akka/Actor/ActorCell.Children.cs index 66532081083..cc38364269d 100644 --- a/src/core/Akka/Actor/ActorCell.Children.cs +++ b/src/core/Akka/Actor/ActorCell.Children.cs @@ -483,7 +483,7 @@ private IInternalActorRef MakeChild(Props props, string name, bool async, bool s catch (Exception e) { throw new SerializationException( - $"Failed to serialize and deserialize actor props argument of type {propArgument?.GetType()}", + $"Failed to serialize and deserialize actor props argument of type {propArgument?.GetType()} for actor type [{props.Type}].", e); } finally diff --git a/src/core/Akka/Actor/ActorCell.cs b/src/core/Akka/Actor/ActorCell.cs index cd58404aad2..13b5a00e6b7 100644 --- a/src/core/Akka/Actor/ActorCell.cs +++ b/src/core/Akka/Actor/ActorCell.cs @@ -526,7 +526,7 @@ private Envelope SerializeAndDeserialize(Envelope envelope) } catch (Exception e) { - throw new SerializationException($"Failed to serialize and deserialize payload object [{unwrapped.GetType()}]. Envelope: [{envelope}]", e); + throw new SerializationException($"Failed to serialize and deserialize payload object [{unwrapped.GetType()}]. Envelope: [{envelope}], Actor type: [{Actor.GetType()}]", e); } if (deadLetter != null) From aae8b4a4802d72176dd898e9ecfe9a008fe27284 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 23 Apr 2021 01:39:51 +0700 Subject: [PATCH 5/5] Update API Approver list --- .../Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index 1a310d07e9c..2def445471a 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -3256,7 +3256,7 @@ namespace Akka.IO public static Akka.IO.Dns.Resolved Cached(string name, Akka.Actor.ActorSystem system) { } public override Akka.IO.DnsExt CreateExtension(Akka.Actor.ExtendedActorSystem system) { } public static Akka.IO.Dns.Resolved ResolveName(string name, Akka.Actor.ActorSystem system, Akka.Actor.IActorRef sender) { } - public abstract class Command + public abstract class Command : Akka.Actor.INoSerializationVerificationNeeded { protected Command() { } } @@ -3685,7 +3685,7 @@ namespace Akka.IO { protected Event() { } } - public abstract class Message + public abstract class Message : Akka.Actor.INoSerializationVerificationNeeded { protected Message() { } } @@ -3792,7 +3792,7 @@ namespace Akka.IO { protected Event() { } } - public abstract class Message + public abstract class Message : Akka.Actor.INoSerializationVerificationNeeded { protected Message() { } } @@ -3827,7 +3827,7 @@ namespace Akka.IO public static readonly Akka.IO.UdpConnected.SuspendReading Instance; } } - public class UdpConnectedExt : Akka.IO.IOExtension + public class UdpConnectedExt : Akka.IO.IOExtension, Akka.Actor.INoSerializationVerificationNeeded { public UdpConnectedExt(Akka.Actor.ExtendedActorSystem system) { } public UdpConnectedExt(Akka.Actor.ExtendedActorSystem system, Akka.IO.UdpSettings settings) { }