From b642984c7fdb26f11107b17aa9481d76454c72ab Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 18 Dec 2015 11:03:34 -0800 Subject: [PATCH] close #1586 brings Akka.Remote up to code with latest JVM stable converted all Akka.Remote system actors to ReceiveActor fixed race condition in Akka.Remote.Tests.Performance --- .../InboundMessageDispatcherSpec.cs | 11 +- .../RemoteMessagingThroughputSpecBase.cs | 5 +- .../TestTransportAssociationStressSpec.cs | 32 +- ...tTransportRemoteMessagingThroughputSpec.cs | 35 +- .../Akka.Remote.Tests/EndpointRegistrySpec.cs | 24 +- .../Akka.Remote.Tests/RemoteConfigSpec.cs | 9 +- .../ThrottlerTransportAdapterSpec.cs | 15 +- .../Akka.Remote/Configuration/Remote.conf | 25 +- src/core/Akka.Remote/Endpoint.cs | 818 ++++++++---------- src/core/Akka.Remote/EndpointManager.cs | 650 ++++++++------ src/core/Akka.Remote/EndpointRegistry.cs | 124 +-- src/core/Akka.Remote/RemoteSettings.cs | 6 + src/core/Akka.Remote/Remoting.cs | 59 +- .../Transport/AkkaProtocolTransport.cs | 4 +- .../Internal/TimeSpanExtensions.cs | 11 - src/core/Akka/Actor/ActorSelection.cs | 6 +- src/core/Akka/Actor/Exceptions.cs | 7 + .../Akka/Pattern/IllegalStateException.cs | 10 + src/core/Akka/Util/Internal/Extensions.cs | 5 + 19 files changed, 973 insertions(+), 883 deletions(-) diff --git a/src/core/Akka.Remote.Tests.Performance/InboundMessageDispatcherSpec.cs b/src/core/Akka.Remote.Tests.Performance/InboundMessageDispatcherSpec.cs index 056e5323788..10821aed2ed 100644 --- a/src/core/Akka.Remote.Tests.Performance/InboundMessageDispatcherSpec.cs +++ b/src/core/Akka.Remote.Tests.Performance/InboundMessageDispatcherSpec.cs @@ -32,9 +32,11 @@ private class BenchmarkActorRef : MinimalActorRef { private readonly Counter _counter; - public BenchmarkActorRef(Counter counter) + public BenchmarkActorRef(Counter counter, RemoteActorRefProvider provider) { _counter = counter; + Provider = provider; + Path = new RootActorPath(Provider.DefaultAddress) / "user" / "tempRef"; } protected override void TellInternal(object message, IActorRef sender) @@ -42,8 +44,9 @@ protected override void TellInternal(object message, IActorRef sender) _counter.Increment(); } - public override ActorPath Path { get { return null; } } - public override IActorRefProvider Provider { get { return null; } } + public override ActorPath Path { get; } + + public override IActorRefProvider Provider { get; } } private static readonly Config RemoteHocon = ConfigurationFactory.ParseString(@" @@ -76,7 +79,7 @@ public void Setup(BenchmarkContext context) _inboundMessageDispatcherCounter = context.GetCounter(MessageDispatcherThroughputCounterName); _message = SerializedMessage.CreateBuilder().SetSerializerId(0).SetMessage(ByteString.CopyFromUtf8("foo")).Build(); _dispatcher = new DefaultMessageDispatcher(_actorSystem, RARP.For(_actorSystem).Provider, _actorSystem.Log); - _targetActorRef = new BenchmarkActorRef(_inboundMessageDispatcherCounter); + _targetActorRef = new BenchmarkActorRef(_inboundMessageDispatcherCounter, RARP.For(_actorSystem).Provider); } [PerfBenchmark(Description = "Tests the performance of the Default", RunMode = RunMode.Throughput, NumberOfIterations = 13, TestMode = TestMode.Measurement)] diff --git a/src/core/Akka.Remote.Tests.Performance/Transports/RemoteMessagingThroughputSpecBase.cs b/src/core/Akka.Remote.Tests.Performance/Transports/RemoteMessagingThroughputSpecBase.cs index 3c36ec3ad1c..3655f77b7b2 100644 --- a/src/core/Akka.Remote.Tests.Performance/Transports/RemoteMessagingThroughputSpecBase.cs +++ b/src/core/Akka.Remote.Tests.Performance/Transports/RemoteMessagingThroughputSpecBase.cs @@ -86,7 +86,10 @@ public void Setup(BenchmarkContext context) var system1EchoActorPath = new RootActorPath(system1Address) / "user" / "echo"; var system2RemoteActorPath = new RootActorPath(system2Address) / "user" / "benchmark"; - _remoteReceiver = System1.ActorSelection(system2RemoteActorPath).ResolveOne(TimeSpan.FromSeconds(2)).Result; + // set the timeout high here to avoid timeouts + // TL;DR; - on slow machines it can take longer than 2 seconds to form the association, do the handshake, and reply back + // using the in-memory transport. + _remoteReceiver = System1.ActorSelection(system2RemoteActorPath).ResolveOne(TimeSpan.FromSeconds(30)).Result; _remoteEcho = System2.ActorSelection(system1EchoActorPath).ResolveOne(TimeSpan.FromSeconds(2)).Result; } diff --git a/src/core/Akka.Remote.Tests.Performance/Transports/TestTransportAssociationStressSpec.cs b/src/core/Akka.Remote.Tests.Performance/Transports/TestTransportAssociationStressSpec.cs index b286286d38c..e0141aea027 100644 --- a/src/core/Akka.Remote.Tests.Performance/Transports/TestTransportAssociationStressSpec.cs +++ b/src/core/Akka.Remote.Tests.Performance/Transports/TestTransportAssociationStressSpec.cs @@ -14,23 +14,23 @@ public override string CreateRegistryKey() public override Config CreateActorSystemConfig(string actorSystemName, string ipOrHostname, int port, string registryKey = null) { var baseConfig = ConfigurationFactory.ParseString(@" - akka { - actor.provider = ""Akka.Remote.RemoteActorRefProvider,Akka.Remote"" - - remote { - log-remote-lifecycle-events = off - - enabled-transports = [ - ""akka.remote.test"", - ] - - test { - transport-class = ""Akka.Remote.Transport.TestTransport,Akka.Remote"" - applied-adapters = [] - maximum-payload-bytes = 128000b - scheme-identifier = test + akka { + actor.provider = ""Akka.Remote.RemoteActorRefProvider,Akka.Remote"" + + remote { + log-remote-lifecycle-events = off + enabled-transports = [ + ""akka.remote.test"", + ] + + test { + transport-class = ""Akka.Remote.Transport.TestTransport,Akka.Remote"" + applied-adapters = [] + maximum-payload-bytes = 128000b + scheme-identifier = test + } + } } - } "); port = 10; //BUG: setting the port to 0 causes the DefaultAddress to report the port as -1 diff --git a/src/core/Akka.Remote.Tests.Performance/Transports/TestTransportRemoteMessagingThroughputSpec.cs b/src/core/Akka.Remote.Tests.Performance/Transports/TestTransportRemoteMessagingThroughputSpec.cs index 08f92772c56..e43620f1e9b 100644 --- a/src/core/Akka.Remote.Tests.Performance/Transports/TestTransportRemoteMessagingThroughputSpec.cs +++ b/src/core/Akka.Remote.Tests.Performance/Transports/TestTransportRemoteMessagingThroughputSpec.cs @@ -12,22 +12,27 @@ public override Config CreateActorSystemConfig(string actorSystemName, string ip { var baseConfig = ConfigurationFactory.ParseString(@" akka { - actor.provider = ""Akka.Remote.RemoteActorRefProvider,Akka.Remote"" - - remote { - log-remote-lifecycle-events = off - - enabled-transports = [ - ""akka.remote.test"", - ] - - test { - transport-class = ""Akka.Remote.Transport.TestTransport,Akka.Remote"" - applied-adapters = [] - maximum-payload-bytes = 128000b - scheme-identifier = test + loglevel = ""WARNING"" + stdout-loglevel = ""WARNING"" + actor.provider = ""Akka.Remote.RemoteActorRefProvider,Akka.Remote"" + + remote { + log-received-messages = off + log-sent-messages = off + log-remote-lifecycle-events = off + + enabled-transports = [ + ""akka.remote.test"", + ] + + test { + transport-class = ""Akka.Remote.Transport.TestTransport,Akka.Remote"" + applied-adapters = [] + maximum-payload-bytes = 128000b + scheme-identifier = test + } + } } - } "); port = 10; //BUG: setting the port to 0 causes the DefaultAddress to report the port as -1 diff --git a/src/core/Akka.Remote.Tests/EndpointRegistrySpec.cs b/src/core/Akka.Remote.Tests/EndpointRegistrySpec.cs index ba0ba2ae0ff..f4f266cb04b 100644 --- a/src/core/Akka.Remote.Tests/EndpointRegistrySpec.cs +++ b/src/core/Akka.Remote.Tests/EndpointRegistrySpec.cs @@ -34,7 +34,7 @@ public void EndpointRegistry_must_be_able_to_register_a_writeable_endpoint_and_p var reg = new EndpointRegistry(); Assert.Null(reg.WritableEndpointWithPolicyFor(address1)); - Assert.Equal(actorA, reg.RegisterWritableEndpoint(address1, actorA)); + Assert.Equal(actorA, reg.RegisterWritableEndpoint(address1, actorA,null,null)); Assert.IsType(reg.WritableEndpointWithPolicyFor(address1)); Assert.Equal(actorA, reg.WritableEndpointWithPolicyFor(address1).AsInstanceOf().Endpoint); @@ -52,8 +52,8 @@ public void EndpointRegistry_must_be_able_to_register_a_readonly_endpoint() var reg = new EndpointRegistry(); Assert.Null(reg.ReadOnlyEndpointFor(address1)); - Assert.Equal(actorA, reg.RegisterReadOnlyEndpoint(address1, actorA)); - Assert.Equal(actorA, reg.ReadOnlyEndpointFor(address1)); + Assert.Equal(actorA, reg.RegisterReadOnlyEndpoint(address1, actorA, 0)); + Assert.Equal(Tuple.Create(actorA, 0), reg.ReadOnlyEndpointFor(address1)); Assert.Null(reg.WritableEndpointWithPolicyFor(address1)); Assert.False(reg.IsWritable(actorA)); Assert.True(reg.IsReadOnly(actorA)); @@ -67,10 +67,10 @@ public void EndpointRegistry_must_be_able_to_register_writable_and_readonly_endp Assert.Null(reg.ReadOnlyEndpointFor(address1)); Assert.Null(reg.WritableEndpointWithPolicyFor(address1)); - Assert.Equal(actorA, reg.RegisterReadOnlyEndpoint(address1, actorA)); - Assert.Equal(actorB, reg.RegisterWritableEndpoint(address1, actorB)); + Assert.Equal(actorA, reg.RegisterReadOnlyEndpoint(address1, actorA, 1)); + Assert.Equal(actorB, reg.RegisterWritableEndpoint(address1, actorB, null,null)); - Assert.Equal(actorA, reg.ReadOnlyEndpointFor(address1)); + Assert.Equal(Tuple.Create(actorA,1), reg.ReadOnlyEndpointFor(address1)); Assert.Equal(actorB, reg.WritableEndpointWithPolicyFor(address1).AsInstanceOf().Endpoint); Assert.False(reg.IsWritable(actorA)); @@ -85,7 +85,7 @@ public void EndpointRegistry_must_be_able_to_register_Gated_policy_for_an_addres { var reg = new EndpointRegistry(); Assert.Null(reg.WritableEndpointWithPolicyFor(address1)); - reg.RegisterWritableEndpoint(address1, actorA); + reg.RegisterWritableEndpoint(address1, actorA, null, null); var deadline = Deadline.Now; reg.MarkAsFailed(actorA, deadline); Assert.Equal(deadline, reg.WritableEndpointWithPolicyFor(address1).AsInstanceOf().TimeOfRelease); @@ -97,7 +97,7 @@ public void EndpointRegistry_must_be_able_to_register_Gated_policy_for_an_addres public void EndpointRegistry_must_remove_readonly_endpoints_if_marked_as_failed() { var reg = new EndpointRegistry(); - reg.RegisterReadOnlyEndpoint(address1, actorA); + reg.RegisterReadOnlyEndpoint(address1, actorA, 2); reg.MarkAsFailed(actorA, Deadline.Now); Assert.Null(reg.ReadOnlyEndpointFor(address1)); } @@ -106,8 +106,8 @@ public void EndpointRegistry_must_remove_readonly_endpoints_if_marked_as_failed( public void EndpointRegistry_must_keep_tombstones_when_removing_an_endpoint() { var reg = new EndpointRegistry(); - reg.RegisterWritableEndpoint(address1, actorA); - reg.RegisterWritableEndpoint(address2, actorB); + reg.RegisterWritableEndpoint(address1, actorA, null, null); + reg.RegisterWritableEndpoint(address2, actorB, null, null); var deadline = Deadline.Now; reg.MarkAsFailed(actorA, deadline); reg.MarkAsQuarantined(address2, 42, deadline); @@ -124,8 +124,8 @@ public void EndpointRegistry_must_keep_tombstones_when_removing_an_endpoint() public void EndpointRegistry_should_prune_outdated_Gated_directives_properly() { var reg = new EndpointRegistry(); - reg.RegisterWritableEndpoint(address1, actorA); - reg.RegisterWritableEndpoint(address2, actorB); + reg.RegisterWritableEndpoint(address1, actorA, null, null); + reg.RegisterWritableEndpoint(address2, actorB, null, null); reg.MarkAsFailed(actorA, Deadline.Now); var farIntheFuture = Deadline.Now + TimeSpan.FromSeconds(60); reg.MarkAsFailed(actorB, farIntheFuture); diff --git a/src/core/Akka.Remote.Tests/RemoteConfigSpec.cs b/src/core/Akka.Remote.Tests/RemoteConfigSpec.cs index 64ce815245d..1a2045d83a7 100644 --- a/src/core/Akka.Remote.Tests/RemoteConfigSpec.cs +++ b/src/core/Akka.Remote.Tests/RemoteConfigSpec.cs @@ -37,14 +37,15 @@ public void Remoting_should_contain_correct_configuration_values_in_ReferenceCon Assert.Equal(TimeSpan.FromSeconds(2), remoteSettings.FlushWait); Assert.Equal(TimeSpan.FromSeconds(10), remoteSettings.StartupTimeout); Assert.Equal(TimeSpan.FromSeconds(5), remoteSettings.RetryGateClosedFor); - //Assert.Equal("akka.remote.default-remote-dispatcher", remoteSettings.Dispatcher); //TODO: add RemoteDispatcher support + Assert.Equal("akka.remote.default-remote-dispatcher", remoteSettings.Dispatcher); Assert.True(remoteSettings.UsePassiveConnections); Assert.Equal(TimeSpan.FromMilliseconds(50), remoteSettings.BackoffPeriod); Assert.Equal(TimeSpan.FromSeconds(0.3d), remoteSettings.SysMsgAckTimeout); Assert.Equal(TimeSpan.FromSeconds(2), remoteSettings.SysResendTimeout); - Assert.Equal(1000, remoteSettings.SysMsgBufferSize); + Assert.Equal(20000, remoteSettings.SysMsgBufferSize); Assert.Equal(TimeSpan.FromMinutes(3), remoteSettings.InitialSysMsgDeliveryTimeout); Assert.Equal(TimeSpan.FromDays(5), remoteSettings.QuarantineDuration); + Assert.Equal(TimeSpan.FromDays(5), remoteSettings.QuarantineSilentSystemTimeout); Assert.Equal(TimeSpan.FromSeconds(30), remoteSettings.CommandAckTimeout); Assert.Equal(1, remoteSettings.Transports.Length); Assert.Equal(typeof(HeliosTcpTransport), Type.GetType(remoteSettings.Transports.Head().TransportClass)); @@ -58,6 +59,10 @@ public void Remoting_should_contain_correct_configuration_values_in_ReferenceCon Assert.Equal(TimeSpan.FromMilliseconds(100), remoteSettings.WatchFailureDetectorConfig.GetTimeSpan("min-std-deviation")); //TODO add adapter support + + // TODO add WatchFailureDetectorConfig assertions + + remoteSettings.Config.GetString("akka.remote.log-frame-size-exceeding").ShouldBe("off"); } [Fact] diff --git a/src/core/Akka.Remote.Tests/Transport/ThrottlerTransportAdapterSpec.cs b/src/core/Akka.Remote.Tests/Transport/ThrottlerTransportAdapterSpec.cs index 303c768a9ca..0dac969fc84 100644 --- a/src/core/Akka.Remote.Tests/Transport/ThrottlerTransportAdapterSpec.cs +++ b/src/core/Akka.Remote.Tests/Transport/ThrottlerTransportAdapterSpec.cs @@ -30,7 +30,9 @@ public static Config ThrottlerTransportAdapterSpecConfig { return ConfigurationFactory.ParseString(@" akka { - akka.test.single-expect-default = 6s #to help overcome issues with gated connections + loglevel = ""DEBUG"" + stdout-loglevel = ""DEBUG"" + test.single-expect-default = 6s #to help overcome issues with gated connections actor.provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote"" remote.helios.tcp.hostname = ""localhost"" remote.log-remote-lifecycle-events = off @@ -90,7 +92,7 @@ public Lost(string msg) Msg = msg; } - public string Msg { get; private set; } + public string Msg { get; } public bool Equals(Lost other) { @@ -108,7 +110,12 @@ public override bool Equals(object obj) public override int GetHashCode() { - return (Msg != null ? Msg.GetHashCode() : 0); + return Msg?.GetHashCode() ?? 0; + } + + public override string ToString() + { + return GetType() + ": " + Msg; } } } @@ -200,7 +207,7 @@ public void ThrottlerTransportAdapter_must_survive_blackholing() here.Tell(new ThrottlingTester.Lost("BlackHole 2")); ExpectNoMsg(TimeSpan.FromSeconds(1)); Disassociate().ShouldBeTrue(); - ExpectNoMsg(TimeSpan.FromSeconds(3)); + ExpectNoMsg(TimeSpan.FromSeconds(1)); Throttle(ThrottleTransportAdapter.Direction.Both, Unthrottled.Instance).ShouldBeTrue(); diff --git a/src/core/Akka.Remote/Configuration/Remote.conf b/src/core/Akka.Remote/Configuration/Remote.conf index 364f6567f8a..4cfb341d5a5 100644 --- a/src/core/Akka.Remote/Configuration/Remote.conf +++ b/src/core/Akka.Remote/Configuration/Remote.conf @@ -237,10 +237,25 @@ akka { # the affected systems after lifting the quarantine is undefined. prune-quarantine-marker-after = 5 d + # If system messages have been exchanged between two systems (i.e. remote death + # watch or remote deployment has been used) a remote system will be marked as + # quarantined after the two system has no active association, and no + # communication happens during the time configured here. + # The only purpose of this setting is to avoid storing system message redelivery + # data (sequence number state, etc.) for an undefined amount of time leading to long + # term memory leak. Instead, if a system has been gone for this period, + # or more exactly + # - there is no association between the two systems (TCP connection, if TCP transport is used) + # - neither side has been attempting to communicate with the other + # - there are no pending system messages to deliver + # for the amount of time configured here, the remote system will be quarantined and all state + # associated with it will be dropped. + quarantine-after-silence = 5 d + # This setting defines the maximum number of unacknowledged system messages # allowed for a remote system. If this limit is reached the remote system is # declared to be dead and its UID marked as tainted. - system-message-buffer-size = 1000 + system-message-buffer-size = 20000 # This setting defines the maximum idle time after an individual # acknowledgement for system messages is sent. System message delivery @@ -257,6 +272,14 @@ akka { # resent. resend-interval = 2 s + # Maximum number of unacknowledged system messages that will be resent + # each 'resend-interval'. If you watch many (> 1000) remote actors you can + # increase this value to for example 600, but a too large limit (e.g. 10000) + # may flood the connection and might cause false failure detection to trigger. + # Test such a configuration by watching all actors at the same time and stop + # all watched actors at the same time. + resend-limit = 200 + # WARNING: this setting should not be not changed unless all of its consequences # are properly understood which assumes experience with remoting internals # or expert advice. diff --git a/src/core/Akka.Remote/Endpoint.cs b/src/core/Akka.Remote/Endpoint.cs index 0faa1576f54..fdbf41ab295 100644 --- a/src/core/Akka.Remote/Endpoint.cs +++ b/src/core/Akka.Remote/Endpoint.cs @@ -13,8 +13,10 @@ using System.Threading; using System.Threading.Tasks; using Akka.Actor; +using Akka.Dispatch; using Akka.Dispatch.SysMsg; using Akka.Event; +using Akka.Pattern; using Akka.Remote.Transport; using Akka.Serialization; using Akka.Util; @@ -63,7 +65,7 @@ public void Dispatch(IInternalActorRef recipient, Address recipientAddress, Seri // message is intended for the RemoteDaemon, usually a command to create a remote actor - if (recipient == remoteDaemon) + if (recipient.Equals(remoteDaemon)) { if (settings.UntrustedMode) log.Debug("dropping daemon message in untrusted mode"); else @@ -92,7 +94,7 @@ public void Dispatch(IInternalActorRef recipient, Address recipientAddress, Seri if (settings.UntrustedMode && (!settings.TrustedSelectionPaths.Contains(actorPath) || sel.Message is IPossiblyHarmful - || recipient != provider.Guardian)) + || !recipient.Equals(provider.Guardian))) { log.Debug( "operating in UntrustedMode, dropping inbound actor selection to [{0}], allow it" + @@ -171,9 +173,9 @@ internal interface IAssociationProblem { } /// /// INTERNAL API /// - internal sealed class ShutDownAssociationException : EndpointException, IAssociationProblem + internal sealed class ShutDownAssociation : EndpointException, IAssociationProblem { - public ShutDownAssociationException(Address localAddress, Address remoteAddress, Exception cause = null) + public ShutDownAssociation(Address localAddress, Address remoteAddress, Exception cause = null) : base(string.Format("Shut down address: {0}", remoteAddress), cause) { RemoteAddress = remoteAddress; @@ -185,26 +187,29 @@ public ShutDownAssociationException(Address localAddress, Address remoteAddress, public Address RemoteAddress { get; private set; } } - internal sealed class InvalidAddressAssociationException : EndpointException, IAssociationProblem + internal sealed class InvalidAssociation : EndpointException, IAssociationProblem { - public InvalidAddressAssociationException(Address localAddress, Address remoteAddress, Exception cause = null) + public InvalidAssociation(Address localAddress, Address remoteAddress, Exception cause = null, DisassociateInfo? disassociateInfo = null) : base(string.Format("Invalid address: {0}", remoteAddress), cause) { RemoteAddress = remoteAddress; LocalAddress = localAddress; + DisassociationInfo = disassociateInfo; } public Address LocalAddress { get; private set; } public Address RemoteAddress { get; private set; } + + public DisassociateInfo? DisassociationInfo { get; private set; } } /// /// INTERNAL API /// - internal sealed class HopelessAssociationException : EndpointException, IAssociationProblem + internal sealed class HopelessAssociation : EndpointException, IAssociationProblem { - public HopelessAssociationException(Address localAddress, Address remoteAddress, int? uid = null, Exception cause = null) + public HopelessAssociation(Address localAddress, Address remoteAddress, int? uid = null, Exception cause = null) : base("Catastrophic association error.", cause) { RemoteAddress = remoteAddress; @@ -239,6 +244,8 @@ public EndpointAssociationException(string msg) : base(msg) { } + + public EndpointAssociationException(string msg, Exception inner) : base(msg, inner) { } } /// @@ -256,59 +263,71 @@ public OversizedPayloadException(string msg) /// /// INTERNAL API - /// - /// - /// [Aaronontheweb] so this class is responsible for maintaining a buffer of retriable messages in - /// Akka and it expects an ACK / NACK response pattern before it considers a message to be sent or received. - /// - /// Currently AkkaDotNet does not have any form of guaranteed message delivery in the stack, since that was - /// considered outside the scope of V1. However, this class needs to be revisited and updated to support it, - /// along with others. - /// - /// For the time being, the class remains just a proxy for spawning actors and - /// forming any outbound associations. - /// /// - internal class ReliableDeliverySupervisor : UntypedActor + internal class ReliableDeliverySupervisor : ReceiveActor { + #region Internal message classes + + public class IsIdle + { + public static readonly IsIdle Instance = new IsIdle(); + private IsIdle() { } + } + + public class Idle + { + public static readonly Idle Instance = new Idle(); + private Idle() { } + } + + public class TooLongIdle + { + public static readonly TooLongIdle Instance = new TooLongIdle(); + private TooLongIdle() { } + } + + #endregion + private readonly ILoggingAdapter _log = Context.GetLogger(); - private AkkaProtocolHandle handleOrActive; private readonly Address _localAddress; private readonly Address _remoteAddress; private readonly int? _refuseUid; private readonly AkkaProtocolTransport _transport; private readonly RemoteSettings _settings; - private AkkaPduCodec codec; + private AkkaPduCodec _codec; private AkkaProtocolHandle _currentHandle; private readonly ConcurrentDictionary _receiveBuffers; - private EndpointRegistry _endpoints = new EndpointRegistry(); public ReliableDeliverySupervisor( - AkkaProtocolHandle handleOrActive, - Address localAddress, + AkkaProtocolHandle handleOrActive, + Address localAddress, Address remoteAddress, - int? refuseUid, - AkkaProtocolTransport transport, - RemoteSettings settings, - AkkaPduCodec codec, + int? refuseUid, + AkkaProtocolTransport transport, + RemoteSettings settings, + AkkaPduCodec codec, ConcurrentDictionary receiveBuffers) { - this.handleOrActive = handleOrActive; _localAddress = localAddress; _remoteAddress = remoteAddress; _refuseUid = refuseUid; _transport = transport; _settings = settings; - this.codec = codec; + _codec = codec; _currentHandle = handleOrActive; _receiveBuffers = receiveBuffers; - Reset(); - _writer = CreateWriter(); + Reset(); // needs to be called at startup + _writer = CreateWriter(); // need to create writer at startup Uid = handleOrActive != null ? (int?)handleOrActive.HandshakeInfo.Uid : null; UidConfirmed = Uid.HasValue; + Receiving(); + _autoResendTimer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(_settings.SysResendTimeout, _settings.SysResendTimeout, Self, new AttemptSysMsgRedelivery(), + Self); } + private readonly ICancelable _autoResendTimer; + public int? Uid { get; set; } /// @@ -320,44 +339,48 @@ public ReliableDeliverySupervisor( /// If we already have an inbound handle then UID is initially confirmed. /// (This actor is never restarted.) /// - public bool UidConfirmed { get; set; } + public bool UidConfirmed { get; private set; } - public Deadline BailoutAt + private Deadline _bailoutAt = null; + + protected override SupervisorStrategy SupervisorStrategy() { - get { return Deadline.Now + _settings.InitialSysMsgDeliveryTimeout; } + return new OneForOneStrategy(ex => + { + if (ex is IAssociationProblem) + return Directive.Escalate; + + _log.Warning("Association with remote system {0} has failed; address is now gated for {1} ms. Reason is: [{2}]", _remoteAddress, _settings.RetryGateClosedFor.TotalMilliseconds, ex); + UidConfirmed = false; // Need confirmation of UID again + if (_bufferWasInUse) + { + if ((_resendBuffer.Nacked.Any() || _resendBuffer.NonAcked.Any()) && _bailoutAt == null) + _bailoutAt = Deadline.Now + _settings.InitialSysMsgDeliveryTimeout; + Become(() => Gated(writerTerminated: false, earlyUngateRequested: false)); + _currentHandle = null; + Context.Parent.Tell(new EndpointWriter.StoppedReading(Self)); + return Directive.Stop; + } + + return Directive.Escalate; + }); } - private ICancelable _autoResendTimer; + + private ICancelable _maxSilenceTimer = null; private AckedSendBuffer _resendBuffer; - private SeqNo _lastCumulativeAck; private long _seqCounter; - private List _pendingAcks; + + private bool _bufferWasInUse; private IActorRef _writer; private void Reset() { _resendBuffer = new AckedSendBuffer(_settings.SysMsgBufferSize); - ScheduleAutoResend(); - _lastCumulativeAck = new SeqNo(-1); _seqCounter = 0L; - _pendingAcks = new List(); - } - - private void ScheduleAutoResend() - { - if (_autoResendTimer == null) - { - _autoResendTimer = Context.System.Scheduler.ScheduleTellOnceCancelable(_settings.SysResendTimeout, Self, new AttemptSysMsgRedelivery(), - Self); - } - } - - private void RescheduleAutoResend() - { - _autoResendTimer.Cancel(); - _autoResendTimer = null; - ScheduleAutoResend(); + _bailoutAt = null; + _bufferWasInUse = false; } private SeqNo NextSeq() @@ -367,43 +390,24 @@ private SeqNo NextSeq() return new SeqNo(tmp); } - private void UnstashAcks() - { - _pendingAcks.ForEach(ack => Self.Tell(ack)); - _pendingAcks = new List(); - } - #region ActorBase methods and Behaviors - protected override SupervisorStrategy SupervisorStrategy() - { - return new OneForOneStrategy(ex => - { - var directive = Directive.Stop; - ex.Match() - .With(problem => directive = Directive.Escalate) - .Default(e => - { - _log.Warning("Association with remote system {0} has failed; address is now gated for {1} ms. Reason is: [{2}]", _remoteAddress, _settings.RetryGateClosedFor.TotalMilliseconds, ex.Message); - UidConfirmed = false; - Context.Become(Gated); - _currentHandle = null; - Context.Parent.Tell(new EndpointWriter.StoppedReading(Self)); - directive = Directive.Stop; - }); - - return directive; - }); - } - protected override void PostStop() { + // All remaining messages in the buffer has to be delivered to dead letters. It is important to clear the sequence + // number otherwise deadLetters will ignore it to avoid reporting system messages as dead letters while they are + // still possibly retransmitted. + // Such a situation may arise when the EndpointWriter is shut down, and all of its mailbox contents are delivered + // to dead letters. These messages should be ignored, as they still live in resendBuffer and might be delivered to + // the remote system later. foreach (var msg in _resendBuffer.Nacked.Concat(_resendBuffer.NonAcked)) { Context.System.DeadLetters.Tell(msg.Copy(opt: null)); } EndpointManager.ResendState value; _receiveBuffers.TryRemove(new EndpointManager.Link(_localAddress, _remoteAddress), out value); + _autoResendTimer.Cancel(); + _maxSilenceTimer?.Cancel(); } protected override void PostRestart(Exception reason) @@ -411,147 +415,174 @@ protected override void PostRestart(Exception reason) throw new IllegalActorStateException("BUG: ReliableDeliverySupervisor has been attempted to be restarted. This must not happen."); } - protected override void OnReceive(object message) + protected void Receiving() { - message.Match() - .With(flush => - { - //Trying to serve untilour last breath - ResendAll(); - _writer.Tell(EndpointWriter.FlushAndStop.Instance); - Context.Become(FlushWait); - }) - .With(HandleSend) - .With(ack => + Receive(flush => + { + //Trying to serve until our last breath + ResendAll(); + _writer.Tell(EndpointWriter.FlushAndStop.Instance); + Become(FlushWait); + }); + Receive(idle => { }); // Do not reply, we will Terminate soon, or send a GotUid + Receive(send => HandleSend(send)); + Receive(ack => + { + // If we are not sure about the UID just ignore the ack. Ignoring is fine. + if (UidConfirmed) { - if (!UidConfirmed) _pendingAcks.Add(ack); - else + try { - try - { - _resendBuffer = _resendBuffer.Acknowledge(ack); - } - catch (Exception ex) - { - throw new InvalidAssociationException( - string.Format( - "Error encountered while processing system message acknowledgement {0} {1}", - _resendBuffer, ack), ex); - } + _resendBuffer = _resendBuffer.Acknowledge(ack); + } + catch (Exception ex) + { + throw new HopelessAssociation(_localAddress, _remoteAddress, Uid, + new IllegalStateException($"Error encountered while processing system message acknowledgement buffer: {_resendBuffer} ack: {ack}", ex)); + } - if (_lastCumulativeAck < ack.CumulativeAck) - { - _lastCumulativeAck = ack.CumulativeAck; - // Cumulative ack is progressing, we might not need to resend non-acked messages yet. - // If this progression stops, the timer will eventually kick in, since scheduleAutoResend - // does not cancel existing timers (see the "else" case). - RescheduleAutoResend(); - } - else - { - ScheduleAutoResend(); - } + ResendNacked(); + } + }); + Receive(sysmsg => + { + if (UidConfirmed) ResendAll(); + }); + Receive(terminated => + { + _currentHandle = null; + Context.Parent.Tell(new EndpointWriter.StoppedReading(Self)); + if (_resendBuffer.NonAcked.Any() || _resendBuffer.Nacked.Any()) + Context.System.Scheduler.ScheduleTellOnce(_settings.SysResendTimeout, Self, + new AttemptSysMsgRedelivery(), Self); + GoToIdle(); + }); + Receive(g => + { + _bailoutAt = null; + Context.Parent.Tell(g); + //New system that has the same address as the old - need to start from fresh state + UidConfirmed = true; + if (Uid.HasValue && Uid.Value != g.Uid) Reset(); + Uid = g.Uid; + ResendAll(); + }); + Receive(stopped => + { + _writer.Forward(stopped); //forward the request + }); + } - ResendNacked(); - } - }) - .With(sysmsg => - { - if (UidConfirmed) ResendAll(); - }) - .With(terminated => - { - _currentHandle = null; - Context.Parent.Tell(new EndpointWriter.StoppedReading(Self)); - if (_resendBuffer.NonAcked.Count > 0 || _resendBuffer.Nacked.Count > 0) - Context.System.Scheduler.ScheduleTellOnce(_settings.SysResendTimeout, Self, - new AttemptSysMsgRedelivery(), Self); - Context.Become(Idle); - }) - .With(g => - { - Context.Parent.Tell(g); - //New system that has the same address as the old - need to start from fresh state - UidConfirmed = true; - if (Uid.HasValue && Uid.Value != g.Uid) Reset(); - else UnstashAcks(); - Uid = _refuseUid; - }) - .With(stopped => - { - _writer.Forward(stopped); //forward the request - }); + private void GoToIdle() + { + if (_bufferWasInUse && _maxSilenceTimer == null) + _maxSilenceTimer = + Context.System.Scheduler.ScheduleTellOnceCancelable(_settings.QuarantineSilentSystemTimeout, Self, + TooLongIdle.Instance, Self); + Become(IdleBehavior); } - protected void Gated(object message) + private void GoToActive() { - message.Match() - .With( - terminated => Context.System.Scheduler.ScheduleTellOnce(_settings.RetryGateClosedFor, Self, new Ungate(), Self)) - .With(ungate => - { - if (_resendBuffer.NonAcked.Count > 0 || _resendBuffer.Nacked.Count > 0) - { - // If we talk to a system we have not talked to before (or has given up talking to in the past) stop - // system delivery attempts after the specified time. This act will drop the pending system messages and gate the - // remote address at the EndpointManager level stopping this actor. In case the remote system becomes reachable - // again it will be immediately quarantined due to out-of-sync system message buffer and becomes quarantined. - // In other words, this action is safe. - if (!UidConfirmed && BailoutAt.IsOverdue) - { - throw new InvalidAddressAssociationException(_localAddress, _remoteAddress, - new TimeoutException("Delivery of system messages timed out and they were dropped")); - } + _maxSilenceTimer?.Cancel(); + _maxSilenceTimer = null; + Become(Receiving); + } - _writer = CreateWriter(); - //Resending will be triggered by the incoming GotUid message after the connection finished - Context.Become(OnReceive); - } - else - { - Context.Become(Idle); - } - }) - .With(send => + protected void Gated(bool writerTerminated, bool earlyUngateRequested) + { + Receive(terminated => + { + if (!writerTerminated) { - if (send.Message is ISystemMessage) - { - TryBuffer(send.Copy(NextSeq())); - } - else + if (earlyUngateRequested) + Self.Tell(new Ungate()); + } + else + Context.System.Scheduler.ScheduleTellOnce(_settings.RetryGateClosedFor, Self, new Ungate(), Self); + Become(() => Gated(true, earlyUngateRequested)); + }); + Receive(idle => Sender.Tell(Idle.Instance)); + Receive(ungate => + { + if (!writerTerminated) + { + // Ungate was sent from EndpointManager, but we must wait for Terminated first. + Become(() => Gated(false, true)); + } + else if (_resendBuffer.NonAcked.Any() || _resendBuffer.Nacked.Any()) + { + // If we talk to a system we have not talked to before (or has given up talking to in the past) stop + // system delivery attempts after the specified time. This act will drop the pending system messages and gate the + // remote address at the EndpointManager level stopping this actor. In case the remote system becomes reachable + // again it will be immediately quarantined due to out-of-sync system message buffer and becomes quarantined. + // In other words, this action is safe. + if (_bailoutAt != null && _bailoutAt.IsOverdue) { - Context.System.DeadLetters.Tell(send); + throw new HopelessAssociation(_localAddress, _remoteAddress, Uid, + new TimeoutException("Delivery of system messages timed out and they were dropped")); } - }) - .With(flush => Context.Stop(Self)) - .With(stop => + + _writer = CreateWriter(); + //Resending will be triggered by the incoming GotUid message after the connection finished + GoToActive(); + } + else { - stop.ReplyTo.Tell(new EndpointWriter.StoppedReading(stop.Writer)); - Sender.Tell(new EndpointWriter.StoppedReading(stop.Writer)); - }); + GoToIdle(); + } + }); + Receive(redelivery => { }); // Ignore + Receive(send => send.Message is ISystemMessage, send => TryBuffer(send.Copy(NextSeq()))); + Receive(send => Context.System.DeadLetters.Tell(send)); + Receive(flush => Context.Stop(Self)); + Receive(stop => + { + stop.ReplyTo.Tell(new EndpointWriter.StoppedReading(stop.Writer)); + Sender.Tell(new EndpointWriter.StoppedReading(stop.Writer)); + }); } - protected void Idle(object message) + protected void IdleBehavior() { - message.Match() - .With(send => - { - _writer = CreateWriter(); - //Resending will be triggered by the incoming GotUid message after the connection finished - HandleSend(send); - Context.Become(OnReceive); - }) - .With(sys => + Receive(idle => Sender.Tell(Idle.Instance)); + Receive(send => + { + _writer = CreateWriter(); + //Resending will be triggered by the incoming GotUid message after the connection finished + HandleSend(send); + GoToActive(); + }); + + Receive(sys => + { + if (_resendBuffer.Nacked.Any() || _resendBuffer.NonAcked.Any()) { _writer = CreateWriter(); //Resending will be triggered by the incoming GotUid message after the connection finished - Context.Become(OnReceive); - }) - .With(stop => Context.Stop(Self)) - .With(stop => stop.ReplyTo.Tell(new EndpointWriter.StoppedReading(stop.Writer))); + GoToActive(); + } + }); + Receive(idle => + { + HandleTooLongIdle(); + }); + Receive(stop => Context.Stop(Self)); + Receive(stop => stop.ReplyTo.Tell(new EndpointWriter.StoppedReading(stop.Writer))); } - + protected void FlushWait() + { + Receive(idle => { }); // Do not reply, we will Terminate soon, which will do the inbound connection unstashing + Receive(terminated => + { + //Clear buffer to prevent sending system messages to dead letters -- at this point we are shutting down and + //don't know if they were properly delivered or not + _resendBuffer = new AckedSendBuffer(0); + Context.Stop(Self); + }); + ReceiveAny(o => { }); // ignore + } #endregion @@ -572,13 +603,13 @@ public GotUid(int uid) } public static Props ReliableDeliverySupervisorProps( - AkkaProtocolHandle handleOrActive, - Address localAddress, + AkkaProtocolHandle handleOrActive, + Address localAddress, Address remoteAddress, - int? refuseUid, - AkkaProtocolTransport transport, - RemoteSettings settings, - AkkaPduCodec codec, + int? refuseUid, + AkkaProtocolTransport transport, + RemoteSettings settings, + AkkaPduCodec codec, ConcurrentDictionary receiveBuffers, string dispatcher) { @@ -587,20 +618,16 @@ public static Props ReliableDeliverySupervisorProps( () => new ReliableDeliverySupervisor(handleOrActive, localAddress, remoteAddress, refuseUid, transport, settings, codec, receiveBuffers)) - .WithDispatcher(dispatcher); + .WithDispatcher(dispatcher); } #endregion - protected void FlushWait(object message) + // Extracted this method to solve a compiler issue with `Receive` + private void HandleTooLongIdle() { - if (message is Terminated) - { - //Clear buffer to prevent sending system messages to dead letters -- at this point we are shutting down and - //don't know if they were properly delivered or not - _resendBuffer = new AckedSendBuffer(0); - Context.Stop(Self); - } + throw new HopelessAssociation(_localAddress, _remoteAddress, Uid, + new TimeoutException("Delivery of system messages timed out and they were dropped")); } private void HandleSend(EndpointManager.Send send) @@ -609,9 +636,10 @@ private void HandleSend(EndpointManager.Send send) { var sequencedSend = send.Copy(NextSeq()); TryBuffer(sequencedSend); - //If we have not confirmed the remote UID we cannot transfer the system message at this point, so just buffer it. - // GotUid will kick ResendAll causing the messages to be properly written. - if (UidConfirmed) _writer.Tell(sequencedSend); + // If we have not confirmed the remote UID we cannot transfer the system message at this point just buffer it. + // GotUid will kick ResendAll() causing the messages to be properly written. + // Flow control by not sending more when we already have many outstanding. + if (UidConfirmed && _resendBuffer.NonAcked.Count <= _settings.SysResendLimit) _writer.Tell(sequencedSend); } else { @@ -627,8 +655,7 @@ private void ResendNacked() private void ResendAll() { ResendNacked(); - _resendBuffer.NonAcked.ForEach(nonacked => _writer.Tell(nonacked)); - RescheduleAutoResend(); + _resendBuffer.NonAcked.Take(_settings.SysResendLimit).ForEach(nonacked => _writer.Tell(nonacked)); } private void TryBuffer(EndpointManager.Send s) @@ -636,10 +663,11 @@ private void TryBuffer(EndpointManager.Send s) try { _resendBuffer = _resendBuffer.Buffer(s); + _bufferWasInUse = true; } catch (Exception ex) { - throw new HopelessAssociationException(_localAddress, _remoteAddress, Uid, ex); + throw new HopelessAssociation(_localAddress, _remoteAddress, Uid, ex); } } @@ -649,10 +677,10 @@ private IActorRef CreateWriter() { var writer = Context.ActorOf(RARP.For(Context.System) - .ConfigureDispatcher( - EndpointWriter.EndpointWriterProps(_currentHandle, _localAddress, _remoteAddress, _refuseUid, _transport, - _settings, new AkkaPduProtobuffCodec(), _receiveBuffers, Self) - .WithDeploy(Deploy.Local)), + .ConfigureDispatcher( + EndpointWriter.EndpointWriterProps(_currentHandle, _localAddress, _remoteAddress, _refuseUid, _transport, + _settings, new AkkaPduProtobuffCodec(), _receiveBuffers, Self) + .WithDeploy(Deploy.Local)), "endpointWriter"); Context.Watch(writer); return writer; @@ -665,7 +693,7 @@ private IActorRef CreateWriter() /// /// Abstract base class for classes /// - internal abstract class EndpointActor : UntypedActor + internal abstract class EndpointActor : ReceiveActor { protected readonly Address LocalAddress; protected Address RemoteAddress; @@ -715,83 +743,23 @@ private void TryPublish(RemotingLifecycleEvent ev) } - /// - /// INTERNAL API. - /// - /// Abstract base class for Endpoint writers that require a implementation. - /// - internal abstract class EndpointActor : FSM - { - private readonly ILoggingAdapter _log = Context.GetLogger(); - - protected readonly Address LocalAddress; - protected Address RemoteAddress; - protected RemoteSettings Settings; - protected AkkaProtocolTransport Transport; - - protected readonly EventPublisher EventPublisher; - - protected bool Inbound { get; set; } - - protected EndpointActor( - Address localAddress, - Address remoteAddress, - AkkaProtocolTransport transport, - RemoteSettings settings) - { - EventPublisher = new EventPublisher(Context.System, _log, Logging.LogLevelFor(settings.RemoteLifecycleEventsLogLevel)); - LocalAddress = localAddress; - RemoteAddress = remoteAddress; - Transport = transport; - Settings = settings; - } - - #region Event publishing methods - - protected void PublishError(Exception ex, LogLevel level) - { - TryPublish(new AssociationErrorEvent(ex, LocalAddress, RemoteAddress, Inbound, level)); - } - - protected void PublishDisassociated() - { - TryPublish(new DisassociatedEvent(LocalAddress, RemoteAddress, Inbound)); - } - - private void TryPublish(RemotingLifecycleEvent ev) - { - try - { - EventPublisher.NotifyListeners(ev); - } - catch (Exception ex) - { - _log.Error(ex, "Unable to publish error event to EventStream"); - } - } - - #endregion - - } - /// /// INTERNAL API /// internal class EndpointWriter : EndpointActor { public EndpointWriter( - AkkaProtocolHandle handleOrActive, - Address localAddress, + AkkaProtocolHandle handleOrActive, + Address localAddress, Address remoteAddress, - int? refuseUid, - AkkaProtocolTransport transport, + int? refuseUid, + AkkaProtocolTransport transport, RemoteSettings settings, - AkkaPduCodec codec, + AkkaPduCodec codec, ConcurrentDictionary receiveBuffers, IActorRef reliableDeliverySupervisor = null) : - base(localAddress, remoteAddress, transport, settings) + base(localAddress, remoteAddress, transport, settings) { - _handleOrActive = handleOrActive; _refuseUid = refuseUid; _codec = codec; _reliableDeliverySupervisor = reliableDeliverySupervisor; @@ -805,16 +773,15 @@ public EndpointWriter( if (_handle == null) { - Context.Become(Initializing); + Initializing(); } else { - Context.Become(Writing); + Writing(); } } private readonly ILoggingAdapter _log = Context.GetLogger(); - private AkkaProtocolHandle _handleOrActive; private readonly int? _refuseUid; private readonly AkkaPduCodec _codec; private readonly IActorRef _reliableDeliverySupervisor; @@ -835,7 +802,7 @@ public EndpointWriter( // Use an internal buffer instead of Stash for efficiency // stash/unstashAll is slow when many messages are stashed - // IMPORTANT: sender is not stored, so sender() and forward must not be used in EndpointWriter + // IMPORTANT: sender is not stored, so .Sender and forward must not be used in EndpointWriter private readonly LinkedList _buffer = new LinkedList(); //buffer for IPriorityMessages - ensures that heartbeats get delivered before user-defined messages @@ -869,12 +836,11 @@ protected override void PreStart() { if (handle.IsFaulted) { - var inner = handle.Exception.Flatten().InnerException; + var inner = handle.Exception?.Flatten().InnerException; return (object)new Status.Failure(new InvalidAssociationException("Association failure", inner)); } return new Handle(handle.Result); - }, - TaskContinuationOptions.ExecuteSynchronously) + }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default) .PipeTo(Self); } else @@ -910,24 +876,14 @@ protected override void PostStop() #region Receives - protected override void OnReceive(object message) + private void Initializing() { - //This should never be hit. - Unhandled(message); - } - - private void Initializing(object message) - { - if (message is EndpointManager.Send) + Receive(send => EnqueueInBuffer(send)); + Receive(failure => { - EnqueueInBuffer(message); - } - else if (message is Status.Failure) - { - var failure = message as Status.Failure; if (failure.Cause is InvalidAssociationException) { - PublishAndThrow(new InvalidAddressAssociationException(LocalAddress, RemoteAddress, failure.Cause), + PublishAndThrow(new InvalidAssociation(LocalAddress, RemoteAddress, failure.Cause), LogLevel.WarningLevel); } else @@ -936,92 +892,64 @@ private void Initializing(object message) new EndpointAssociationException(string.Format("Association failed with {0}", RemoteAddress)), LogLevel.DebugLevel); } - } - else if (message is Handle) + }); + Receive(handle => { - var inboundHandle = message as Handle; + // Assert handle == None? Context.Parent.Tell( - new ReliableDeliverySupervisor.GotUid((int)inboundHandle.ProtocolHandle.HandshakeInfo.Uid)); - _handle = inboundHandle.ProtocolHandle; + new ReliableDeliverySupervisor.GotUid((int)handle.ProtocolHandle.HandshakeInfo.Uid)); + _handle = handle.ProtocolHandle; _reader = StartReadEndpoint(_handle); + EventPublisher.NotifyListeners(new AssociatedEvent(LocalAddress, RemoteAddress, Inbound)); BecomeWritingOrSendBufferedMessages(); - } - else - { - Unhandled(message); - } + }); } - private void Buffering(object message) + private void Buffering() { - if (message is EndpointManager.Send) - { - EnqueueInBuffer(message); - } - else if (message is BackoffTimer) - { - SendBufferedMessages(); - } - else if (message is FlushAndStop) + Receive(send => EnqueueInBuffer(send)); + Receive(backoff => SendBufferedMessages()); + Receive(stop => { - _buffer.AddLast(message); //Flushing is postponed after the pending writes + _buffer.AddLast(stop); //Flushing is postponed after the pending writes Context.System.Scheduler.ScheduleTellOnce(Settings.FlushWait, Self, FlushAndStopTimeout.Instance, Self); - } - else if (message is FlushAndStopTimeout) + }); + Receive(timeout => { - //enough + // enough, ready to flush DoFlushAndStop(); - } - else - { - Unhandled(message); - } + }); } - private void Writing(object message) + private void Writing() { - if (message is EndpointManager.Send) + Receive(s => { - var s = message as EndpointManager.Send; if (!WriteSend(s)) { if (s.Seq == null) EnqueueInBuffer(s); ScheduleBackoffTimer(); - Context.Become(Buffering); + Become(Buffering); } - } - else if (message is FlushAndStop) - { - DoFlushAndStop(); - } - else if (message is AckIdleCheckTimer) + }); + Receive(flush => DoFlushAndStop()); + Receive(ack => { if (_ackDeadline.IsOverdue) { TrySendPureAck(); } - } - else - { - Unhandled(message); - } + }); } - private void Handoff(object message) + private void Handoff() { - if (message is Terminated) + Receive(terminated => { _reader = StartReadEndpoint(_handle); BecomeWritingOrSendBufferedMessages(); - } - else if (message is EndpointManager.Send) - { - EnqueueInBuffer(message); - } - else - { - Unhandled(message); - } + }); + Receive(send => EnqueueInBuffer(send)); } protected override void Unhandled(object message) @@ -1055,7 +983,7 @@ protected override void Unhandled(object message) _handle.Disassociate(); _handle = takeover.ProtocolHandle; takeover.ReplyTo.Tell(new TookOver(Self, _handle)); - Context.Become(Handoff); + Become(Handoff); } else if (message is FlushAndStop) { @@ -1100,23 +1028,28 @@ private IActorRef StartReadEndpoint(AkkaProtocolHandle handle) { var newReader = Context.ActorOf(RARP.For(Context.System) - .ConfigureDispatcher( - EndpointReader.ReaderProps(LocalAddress, RemoteAddress, Transport, Settings, _codec, _msgDispatcher, - Inbound, (int)handle.HandshakeInfo.Uid, _receiveBuffers, _reliableDeliverySupervisor) - .WithDeploy(Deploy.Local)), + .ConfigureDispatcher( + EndpointReader.ReaderProps(LocalAddress, RemoteAddress, Transport, Settings, _codec, _msgDispatcher, + Inbound, (int)handle.HandshakeInfo.Uid, _receiveBuffers, _reliableDeliverySupervisor) + .WithDeploy(Deploy.Local)), string.Format("endpointReader-{0}-{1}", AddressUrlEncoder.Encode(RemoteAddress), _readerId.Next())); Context.Watch(newReader); handle.ReadHandlerSource.SetResult(new ActorHandleEventListener(newReader)); return newReader; } + /// + /// Serializes the outbound message going onto the wire. + /// + /// The C# object we intend to serialize. + /// The Akka.NET envelope containing the serialized message and addressing information. + /// Differs from JVM implementation due to Scala implicits. private SerializedMessage SerializeMessage(object msg) { if (_handle == null) { throw new EndpointException("Internal error: No handle was present during serialization of outbound message."); } - return MessageSerializer.Serialize(_system, _handle.LocalAddress, msg); } @@ -1140,7 +1073,7 @@ private void AdjustAdaptiveBackup() } else if (_writeCount >= _maxWriteCount * 0.6) { - _adaptiveBackoffNanos = Math.Max(Convert.ToInt64(_adaptiveBackoffNanos * 0.9), MaxAdaptiveBackoffNanos); + _adaptiveBackoffNanos = Math.Max(Convert.ToInt64(_adaptiveBackoffNanos * 0.9), MinAdaptiveBackoffNanos); } else if (_writeCount <= _maxWriteCount * 0.2) { @@ -1160,24 +1093,9 @@ private void ScheduleBackoffTimer() else { _smallBackoffCount += 1; - var s = Self; - var backoffDeadlineNanoTime = MonotonicClock.GetNanos() + _adaptiveBackoffNanos; + var backoffDeadlineNanoTime = TimeSpan.FromTicks((MonotonicClock.GetNanos() + _adaptiveBackoffNanos).ToTicks()); - Task.Run(() => - { - Action backoff = null; - backoff = () => - { - var backOffNanos = backoffDeadlineNanoTime - MonotonicClock.GetNanos(); - if (backOffNanos > 0) - { - Thread.Sleep(new TimeSpan(backOffNanos.ToTicks())); - backoff(); - } - }; - backoff(); - s.Tell(BackoffTimer.Instance, ActorRefs.NoSender); - }); + Context.System.Scheduler.ScheduleTellOnce(backoffDeadlineNanoTime, Self, BackoffTimer.Instance, Self); } } @@ -1221,11 +1139,11 @@ private void BecomeWritingOrSendBufferedMessages() { if (!_buffer.Any()) { - Context.Become(Writing); + Become(Writing); } else { - Context.Become(Buffering); + Become(Buffering); SendBufferedMessages(); } } @@ -1324,7 +1242,7 @@ private void SendBufferedMessages() } return false; } - + return true; }); @@ -1350,14 +1268,14 @@ private void SendBufferedMessages() { _log.Debug("Drained buffer with maxWriteCount: {0}, fullBackoffCount: {1}," + "smallBackoffCount: {2}, noBackoffCount: {3}," + - "adaptiveBackoff: {4}", _maxWriteCount, _fullBackoffCount, _smallBackoffCount, _noBackoffCount, _adaptiveBackoffNanos / 100); + "adaptiveBackoff: {4}", _maxWriteCount, _fullBackoffCount, _smallBackoffCount, _noBackoffCount, _adaptiveBackoffNanos / 1000); } _fullBackoffCount = 1; _smallBackoffCount = 0; _noBackoffCount = 0; _writeCount = 0; _maxWriteCount = MaxWriteCount; - Context.Become(Writing); + Become(Writing); } else if (ok) { @@ -1460,7 +1378,7 @@ private AckIdleCheckTimer() { } public static AckIdleCheckTimer Instance { get { return _instance; } } } - public sealed class FlushAndStopTimeout + private sealed class FlushAndStopTimeout { private FlushAndStopTimeout() { } private static readonly FlushAndStopTimeout _instance = new FlushAndStopTimeout(); @@ -1541,6 +1459,7 @@ public EndpointReader( _reliableDeliverySupervisor = reliableDeliverySupervisor; _codec = codec; _provider = RARP.For(Context.System).Provider; + Reading(); } private readonly ILoggingAdapter _log = Context.GetLogger(); @@ -1570,15 +1489,12 @@ protected override void PostStop() SaveState(); } - protected override void OnReceive(object message) + private void Reading() { - if (message is Disassociated) + Receive(disassociated => HandleDisassociated(disassociated.Info)); + Receive(inbound => { - HandleDisassociated((message as Disassociated).Info); - } - else if (message is InboundPayload) - { - var payload = ((InboundPayload)message).Payload; + var payload = inbound.Payload; if (payload.Length > Transport.MaximumPayloadBytes) { var reason = new OversizedPayloadException( @@ -1608,41 +1524,26 @@ protected override void OnReceive(object message) } } } - } - else if (message is EndpointWriter.StopReading) + }); + Receive(stop => { - var stop = message as EndpointWriter.StopReading; SaveState(); - Context.Become(NotReading); + Become(NotReading); stop.ReplyTo.Tell(new EndpointWriter.StoppedReading(stop.Writer)); - } - else - { - Unhandled(message); - } + }); } - protected void NotReading(object message) + private void NotReading() { - if (message is Disassociated) - { - HandleDisassociated((message as Disassociated).Info); - } - else if (message is EndpointWriter.StopReading) + Receive(disassociated => HandleDisassociated(disassociated.Info)); + Receive(stop => stop.ReplyTo.Tell(new EndpointWriter.StoppedReading(stop.Writer))); + Receive(payload => { - var stop = message as EndpointWriter.StopReading; - Sender.Tell(new EndpointWriter.StoppedReading(stop.Writer)); - } else if (message is InboundPayload) - { - var payload = message as InboundPayload; var ackAndMessage = TryDecodeMessageAndAck(payload.Payload); if (ackAndMessage.AckOption != null && _reliableDeliverySupervisor != null) _reliableDeliverySupervisor.Tell(ackAndMessage.AckOption); - } - else - { - Unhandled(message); - } + }); + ReceiveAny(o => {}); // ignore } #endregion @@ -1668,29 +1569,34 @@ private EndpointManager.ResendState Merge(EndpointManager.ResendState current, private void UpdateSavedState(EndpointManager.Link key, EndpointManager.ResendState expectedState) { - if (expectedState == null) - { - if (_receiveBuffers.ContainsKey(key)) - { - var updatedValue = new EndpointManager.ResendState(_uid, _ackedReceiveBuffer); - _receiveBuffers.AddOrUpdate(key, updatedValue, (link, state) => updatedValue); - UpdateSavedState(key, updatedValue); - } - } - else + while (true) { - var canReplace = _receiveBuffers.ContainsKey(key) && _receiveBuffers[key].Equals(expectedState); - if (canReplace) + if (expectedState == null) { - _receiveBuffers[key] = Merge(new EndpointManager.ResendState(_uid, _ackedReceiveBuffer), - expectedState); + if (_receiveBuffers.ContainsKey(key)) + { + var updatedValue = new EndpointManager.ResendState(_uid, _ackedReceiveBuffer); + _receiveBuffers.AddOrUpdate(key, updatedValue, (link, state) => updatedValue); + expectedState = updatedValue; + continue; + } } else { - EndpointManager.ResendState previousValue; - _receiveBuffers.TryGetValue(key, out previousValue); - UpdateSavedState(key, previousValue); + var canReplace = _receiveBuffers.ContainsKey(key) && _receiveBuffers[key].Equals(expectedState); + if (canReplace) + { + _receiveBuffers[key] = Merge(new EndpointManager.ResendState(_uid, _ackedReceiveBuffer), expectedState); + } + else + { + EndpointManager.ResendState previousValue; + _receiveBuffers.TryGetValue(key, out previousValue); + expectedState = previousValue; + continue; + } } + break; } } @@ -1699,10 +1605,10 @@ private void HandleDisassociated(DisassociateInfo info) switch (info) { case DisassociateInfo.Quarantined: - throw new InvalidAddressAssociationException(LocalAddress, RemoteAddress, new InvalidAssociationException("The remote system has quarantined this system. No further associations " + - "to the remote system are possible until this system is restarted.")); + throw new InvalidAssociation(LocalAddress, RemoteAddress, new InvalidAssociationException("The remote system has quarantined this system. No further associations " + + "to the remote system are possible until this system is restarted."), DisassociateInfo.Quarantined); case DisassociateInfo.Shutdown: - throw new ShutDownAssociationException(LocalAddress, RemoteAddress, new InvalidAssociationException("The remote system terminated the association because it is shutting down.")); + throw new ShutDownAssociation(LocalAddress, RemoteAddress, new InvalidAssociationException("The remote system terminated the association because it is shutting down.")); case DisassociateInfo.Unknown: default: Context.Stop(Self); diff --git a/src/core/Akka.Remote/EndpointManager.cs b/src/core/Akka.Remote/EndpointManager.cs index 9c3695f0a4d..6b13f09d394 100644 --- a/src/core/Akka.Remote/EndpointManager.cs +++ b/src/core/Akka.Remote/EndpointManager.cs @@ -9,6 +9,7 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.Configuration; @@ -21,7 +22,7 @@ namespace Akka.Remote /// /// INTERNAL API /// - internal class EndpointManager : UntypedActor + internal class EndpointManager : ReceiveActor { #region Policy definitions @@ -40,20 +41,23 @@ protected EndpointPolicy(bool isTombstone) } /// - /// We will always accept a + /// We will always accept a connection from this remote node. /// public class Pass : EndpointPolicy { - public Pass(IActorRef endpoint, int? uid) + public Pass(IActorRef endpoint, int? uid, int? refuseUid) : base(false) { Uid = uid; Endpoint = endpoint; + RefuseUid = refuseUid; } public IActorRef Endpoint { get; private set; } public int? Uid { get; private set; } + + public int? RefuseUid { get; private set; } } /// @@ -76,14 +80,14 @@ public Gated(Deadline deadline) /// public class Quarantined : EndpointPolicy { - public Quarantined(long uid, Deadline deadline) + public Quarantined(int uid, Deadline deadline) : base(true) { Uid = uid; Deadline = deadline; } - public long Uid { get; private set; } + public int Uid { get; private set; } public Deadline Deadline { get; private set; } } @@ -228,9 +232,9 @@ public Link(Address localAddress, Address remoteAddress) LocalAddress = localAddress; } - public Address LocalAddress { get; private set; } + public Address LocalAddress { get; } - public Address RemoteAddress { get; private set; } + public Address RemoteAddress { get; } /// /// Overrode this to make sure that the can correctly store @@ -266,22 +270,24 @@ public ResendState(int uid, AckedReceiveBuffer buffer) public EndpointManager(Config config, ILoggingAdapter log) { - conf = config; - settings = new RemoteSettings(conf); - this.log = log; - eventPublisher = new EventPublisher(Context.System, log, Logging.LogLevelFor(settings.RemoteLifecycleEventsLogLevel)); + _conf = config; + _settings = new RemoteSettings(_conf); + _log = log; + _eventPublisher = new EventPublisher(Context.System, log, Logging.LogLevelFor(_settings.RemoteLifecycleEventsLogLevel)); + + Receiving(); } /// /// Mapping between addresses and endpoint actors. If passive connections are turned off, incoming connections /// will not be part of this map! /// - private readonly EndpointRegistry endpoints = new EndpointRegistry(); - private readonly RemoteSettings settings; - private readonly Config conf; - private AtomicCounterLong endpointId = new AtomicCounterLong(0L); - private ILoggingAdapter log; - private EventPublisher eventPublisher; + private readonly EndpointRegistry _endpoints = new EndpointRegistry(); + private readonly RemoteSettings _settings; + private readonly Config _conf; + private readonly AtomicCounterLong _endpointId = new AtomicCounterLong(0L); + private readonly ILoggingAdapter _log; + private readonly EventPublisher _eventPublisher; /// /// Used to indicate when an abrupt shutdown occurs @@ -294,11 +300,11 @@ public EndpointManager(Config config, ILoggingAdapter log) private Dictionary _transportMapping = new Dictionary(); - private ConcurrentDictionary _receiveBuffers = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _receiveBuffers = new ConcurrentDictionary(); private bool RetryGateEnabled { - get { return settings.RetryGateClosedFor > TimeSpan.Zero; } + get { return _settings.RetryGateClosedFor > TimeSpan.Zero; } } private TimeSpan PruneInterval @@ -306,7 +312,7 @@ private TimeSpan PruneInterval get { //PruneInterval = 2x the RetryGateClosedFor value, if available - if (RetryGateEnabled) return settings.RetryGateClosedFor.Add(settings.RetryGateClosedFor); + if (RetryGateEnabled) return _settings.RetryGateClosedFor.Add(_settings.RetryGateClosedFor).Max(TimeSpan.FromSeconds(1)).Min(TimeSpan.FromSeconds(10)); else return TimeSpan.Zero; } } @@ -324,12 +330,39 @@ private ICancelable PruneTimerCancelleable { return _pruneTimeCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(PruneInterval, PruneInterval, Self, new Prune(), Self); } - return null; + return _pruneTimeCancelable; + } + } + + private Dictionary _pendingReadHandoffs = new Dictionary(); + private Dictionary> _stashedInbound = new Dictionary>(); + + + private void HandleStashedInbound(IActorRef endpoint, bool writerIsIdle) + { + var stashed = _stashedInbound.GetOrElse(endpoint, new List()); + _stashedInbound.Remove(endpoint); + foreach (var ia in stashed) + HandleInboundAssociation(ia, writerIsIdle); + } + + private void KeepQuarantinedOr(Address remoteAddress, Action body) + { + var uid = _endpoints.RefuseUid(remoteAddress); + if (uid.HasValue) + { + _log.Info( + "Quarantined address [{0}] is still unreachable or has not been restarted. Keeping it quarantined.", + remoteAddress); + // Restoring Quarantine marker overwritten by a Pass(endpoint, refuseUid) pair while probing remote system. + _endpoints.MarkAsQuarantined(remoteAddress, uid.Value, Deadline.Now + _settings.QuarantineDuration); + } + else + { + body(); } } - private Dictionary pendingReadHandoffs = new Dictionary(); - private Dictionary> stashedInbound = new Dictionary>(); #region ActorBase overrides @@ -340,44 +373,62 @@ protected override SupervisorStrategy SupervisorStrategy() var directive = Directive.Stop; ex.Match() - .With(ia => + .With(ia => { - log.Warning("Tried to associate with unreachable remote address [{0}]. Address is now gated for {1} ms, all messages to this address will be delivered to dead letters. Reason: [{2}]", - ia.RemoteAddress, settings.RetryGateClosedFor.TotalMilliseconds, ia.Message); - endpoints.MarkAsFailed(Sender, Deadline.Now + settings.RetryGateClosedFor); - AddressTerminatedTopic.Get(Context.System).Publish(new AddressTerminated(ia.RemoteAddress)); + KeepQuarantinedOr(ia.RemoteAddress, () => + { + var causedBy = ia.InnerException == null + ? "" + : string.Format("Caused by: [{0}]", ia.InnerException); + _log.Warning("Tried to associate with unreachable remote address [{0}]. Address is now gated for {1} ms, all messages to this address will be delivered to dead letters. Reason: [{2}] {3}", + ia.RemoteAddress, _settings.RetryGateClosedFor.TotalMilliseconds, ia.Message, causedBy); + _endpoints.MarkAsFailed(Sender, Deadline.Now + _settings.RetryGateClosedFor); + }); + + if (ia.DisassociationInfo.HasValue && ia.DisassociationInfo == DisassociateInfo.Quarantined) + { + //TODO: add context.system.eventStream.publish(ThisActorSystemQuarantinedEvent(localAddress, remoteAddress)) + } directive = Directive.Stop; }) - .With(shutdown => + .With(shutdown => { - log.Debug("Remote system with address [{0}] has shut down. Address is now gated for {1}ms, all messages to this address will be delivered to dead letters.", - shutdown.RemoteAddress, settings.RetryGateClosedFor.TotalMilliseconds); - endpoints.MarkAsFailed(Sender, Deadline.Now + settings.RetryGateClosedFor); - AddressTerminatedTopic.Get(Context.System).Publish(new AddressTerminated(shutdown.RemoteAddress)); + KeepQuarantinedOr(shutdown.RemoteAddress, () => + { + _log.Debug("Remote system with address [{0}] has shut down. Address is now gated for {1}ms, all messages to this address will be delivered to dead letters.", + shutdown.RemoteAddress, _settings.RetryGateClosedFor.TotalMilliseconds); + _endpoints.MarkAsFailed(Sender, Deadline.Now + _settings.RetryGateClosedFor); + }); directive = Directive.Stop; }) - .With(hopeless => + .With(hopeless => { - if (settings.QuarantineDuration.HasValue && hopeless.Uid.HasValue) + if (hopeless.Uid.HasValue) { - endpoints.MarkAsQuarantined(hopeless.RemoteAddress, hopeless.Uid.Value, - Deadline.Now + settings.QuarantineDuration.Value); - eventPublisher.NotifyListeners(new QuarantinedEvent(hopeless.RemoteAddress, - hopeless.Uid.Value)); + _log.Error("Association to [{0}] with UID [{1}] is irrecoverably failed. Quarantining address.", + hopeless.RemoteAddress, hopeless.Uid); + if (_settings.QuarantineDuration.HasValue) + { + _endpoints.MarkAsQuarantined(hopeless.RemoteAddress, hopeless.Uid.Value, + Deadline.Now + _settings.QuarantineDuration.Value); + _eventPublisher.NotifyListeners(new QuarantinedEvent(hopeless.RemoteAddress, + hopeless.Uid.Value)); + } } else { - log.Warning("Association to [{0}] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for {1} ms.", - hopeless.RemoteAddress, settings.RetryGateClosedFor.TotalMilliseconds); - endpoints.MarkAsFailed(Sender, Deadline.Now + settings.RetryGateClosedFor); + _log.Warning("Association to [{0}] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for {1} ms.", + hopeless.RemoteAddress, _settings.RetryGateClosedFor.TotalMilliseconds); + _endpoints.MarkAsFailed(Sender, Deadline.Now + _settings.RetryGateClosedFor); } - AddressTerminatedTopic.Get(Context.System).Publish(new AddressTerminated(hopeless.RemoteAddress)); directive = Directive.Stop; }) .Default(msg => { if (msg is EndpointDisassociatedException || msg is EndpointAssociationException) { } //no logging - else { log.Error(ex, ex.Message); } + else { _log.Error(ex, ex.Message); } + _endpoints.MarkAsFailed(Sender, Deadline.Now + _settings.RetryGateClosedFor); + directive = Directive.Stop; }); return directive; @@ -386,16 +437,16 @@ protected override SupervisorStrategy SupervisorStrategy() protected override void PreStart() { - if(PruneTimerCancelleable != null) - log.Debug("Starting prune timer for endpoint manager..."); + if (PruneTimerCancelleable != null) + _log.Debug("Starting prune timer for endpoint manager..."); base.PreStart(); } protected override void PostStop() { - if(PruneTimerCancelleable != null) + if (PruneTimerCancelleable != null) _pruneTimeCancelable.Cancel(); - foreach(var h in pendingReadHandoffs.Values) + foreach (var h in _pendingReadHandoffs.Values) h.Disassociate(DisassociateInfo.Shutdown); if (!_normalShutdown) @@ -404,82 +455,82 @@ protected override void PostStop() // We still need to clean up any remaining transports because handles might be in mailboxes, and for example // Netty is not part of the actor hierarchy, so its handles will not be cleaned up if no actor is taking // responsibility of them (because they are sitting in a mailbox). - log.Error("Remoting system has been terminated abrubtly. Attempting to shut down transports"); + _log.Error("Remoting system has been terminated abrubtly. Attempting to shut down transports"); foreach (var t in _transportMapping.Values) t.Shutdown(); } } - protected override void OnReceive(object message) + private void Receiving() { - message.Match() - /* - * the first command the EndpointManager receives. - * instructs the EndpointManager to fire off its "Listens" command, which starts - * up all inbound transports and binds them to specific addresses via configuration. - * those results will then be piped back to Remoting, who waits for the results of - * listen.AddressPromise. - * */ - .With(listen => Listens.ContinueWith(listens => + /* + * the first command the EndpointManager receives. + * instructs the EndpointManager to fire off its "Listens" command, which starts + * up all inbound transports and binds them to specific addresses via configuration. + * those results will then be piped back to Remoting, who waits for the results of + * listen.AddressPromise. + * */ + Receive(listen => Listens.ContinueWith(listens => + { + if (listens.IsFaulted) { - if (listens.IsFaulted) - { - return new ListensFailure(listen.AddressesPromise, listens.Exception); - } - else - { - return new ListensResult(listen.AddressesPromise, listens.Result); - } - }, TaskContinuationOptions.ExecuteSynchronously) - .PipeTo(Self)) - .With(listens => + return new ListensFailure(listen.AddressesPromise, listens.Exception); + } + else { - _transportMapping = (from mapping in listens.Results - group mapping by mapping.Item1.Address - into g - select new { address = g.Key, transports = g.ToList() }).Select(x => - { - if (x.transports.Count > 1) - { - throw new RemoteTransportException( - string.Format("There are more than one transports listening on local address {0}", - x.address)); - } - return new KeyValuePair(x.address, - x.transports.Head().Item1.ProtocolTransport); - }).ToDictionary(x => x.Key, v => v.Value); - - //Register a listener to each transport and collect mapping to addresses - var transportsAndAddresses = listens.Results.Select(x => - { - x.Item2.SetResult(new ActorAssociationEventListener(Self)); - return x.Item1; - }).ToList(); + return new ListensResult(listen.AddressesPromise, listens.Result); + } + }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default) + .PipeTo(Self)); - listens.AddressesPromise.SetResult(transportsAndAddresses); - }) - .With(failure => failure.AddressesPromise.SetException(failure.Cause)) - // defer the inbound association until we can enter "Accepting" behavior - .With(ia => Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromMilliseconds(10), Self, ia, Self)) - .With(mc => Sender.Tell(new ManagementCommandAck(status:false))) - // transports are all started. Ready to start accepting inbound associations. - .With(sf => Context.Become(Accepting)) - .With(sf => + Receive(listens => + { + _transportMapping = (from mapping in listens.Results + group mapping by mapping.Item1.Address + into g + select new { address = g.Key, transports = g.ToList() }).Select(x => + { + if (x.transports.Count > 1) + { + throw new RemoteTransportException( + string.Format("There are more than one transports listening on local address {0}", + x.address)); + } + return new KeyValuePair(x.address, + x.transports.Head().Item1.ProtocolTransport); + }).ToDictionary(x => x.Key, v => v.Value); + + //Register a listener to each transport and collect mapping to addresses + var transportsAndAddresses = listens.Results.Select(x => { - Sender.Tell(true); - Context.Stop(Self); - }); + x.Item2.SetResult(new ActorAssociationEventListener(Self)); + return x.Item1; + }).ToList(); + + listens.AddressesPromise.SetResult(transportsAndAddresses); + }); + Receive(failure => failure.AddressesPromise.SetException(failure.Cause)); + + // defer the inbound association until we can enter "Accepting" behavior + + Receive( + ia => Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromMilliseconds(10), Self, ia, Self)); + Receive(mc => Sender.Tell(new ManagementCommandAck(status: false))); + Receive(sf => Become(Accepting)); + Receive(sf => + { + Sender.Tell(true); + Context.Stop(Self); + }); } /// /// Message-processing behavior when the is able to accept /// inbound association requests. /// - /// Messages from local system and the network. - protected void Accepting(object message) + protected void Accepting() { - message.Match() - .With(mc => + Receive(mc => { /* * applies a management command to all available transports. @@ -492,196 +543,248 @@ protected void Accepting(object message) .ContinueWith(x => { return new ManagementCommandAck(x.Result.All(y => y)); - }, - TaskContinuationOptions.ExecuteSynchronously) + }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default) .PipeTo(sender); - }) - .With(quarantine => - { + }); + + Receive(quarantine => + { //Stop writers - if (endpoints.WritableEndpointWithPolicyFor(quarantine.RemoteAddress) is Pass) - { - var pass = (Pass) endpoints.WritableEndpointWithPolicyFor(quarantine.RemoteAddress); + var policy = + Tuple.Create(_endpoints.WritableEndpointWithPolicyFor(quarantine.RemoteAddress), quarantine.Uid); + if (policy.Item1 is Pass && policy.Item2 == null) + { + var endpoint = policy.Item1.AsInstanceOf().Endpoint; + Context.Stop(endpoint); + _log.Warning("Association to [{0}] with unknown UID is reported as quarantined, but " + + "address cannot be quarantined without knowing the UID, gating instead for {1} ms.", quarantine.RemoteAddress, _settings.RetryGateClosedFor.TotalMilliseconds); + _endpoints.MarkAsFailed(endpoint, Deadline.Now + _settings.RetryGateClosedFor); + } + else if (policy.Item1 is Pass && policy.Item2 != null) + { + var pass = policy.Item1 as Pass; + if (pass.Uid == quarantine.Uid) Context.Stop(pass.Endpoint); - if (!pass.Uid.HasValue) - { - log.Warning("Association to [{0}] with unknown UID is reported as quarantined, but address cannot be quarantined without knowing the UID, gated instead for {0} ms", - quarantine.RemoteAddress, settings.RetryGateClosedFor.TotalMilliseconds); - endpoints.MarkAsFailed(pass.Endpoint, Deadline.Now + settings.RetryGateClosedFor); - } + } + else + { + // Do nothing, because either: + // A: we don't know yet the UID of the writer, it will be checked against current quarantine state later + // B: we know the UID, but it does not match with the UID to be quarantined } - //Stop inbound read-only association - var read = endpoints.ReadOnlyEndpointFor(quarantine.RemoteAddress); - if (read != null) + // Stop inbound read-only associations + var readPolicy = Tuple.Create(_endpoints.ReadOnlyEndpointFor(quarantine.RemoteAddress), quarantine.Uid); + if (readPolicy.Item1?.Item1 != null && quarantine.Uid == null) + Context.Stop(readPolicy.Item1.Item1); + else if (readPolicy.Item1?.Item1 != null && quarantine.Uid != null && readPolicy.Item1?.Item2 == quarantine.Uid) { Context.Stop(readPolicy.Item1.Item1); } + else { } // nothing to stop + + Func matchesQuarantine = handle => handle.RemoteAddress.Equals(quarantine.RemoteAddress) && + quarantine.Uid == handle.HandshakeInfo.Uid; + + // Stop all matching pending read handoffs + _pendingReadHandoffs = _pendingReadHandoffs.Where(x => + { + var drop = matchesQuarantine(x.Value); + // Side-effecting here + if (drop) { - Context.Stop((IInternalActorRef)read); + x.Value.Disassociate(); + Context.Stop(x.Key); } + return !drop; + }).ToDictionary(key => key.Key, value => value.Value); - if (quarantine.Uid.HasValue) + // Stop all matching stashed connections + _stashedInbound = _stashedInbound.Select(x => + { + var associations = x.Value.Where(assoc => { - endpoints.MarkAsQuarantined(quarantine.RemoteAddress, quarantine.Uid.Value, Deadline.Now + settings.QuarantineDuration); - eventPublisher.NotifyListeners(new QuarantinedEvent(quarantine.RemoteAddress, quarantine.Uid.Value)); - } - }) - .With(send => + var handle = assoc.Association.AsInstanceOf(); + var drop = matchesQuarantine(handle); + if (drop) + handle.Disassociate(); + return !drop; + }).ToList(); + return new KeyValuePair>(x.Key, associations); + }).ToDictionary(k => k.Key, v => v.Value); + + if (quarantine.Uid.HasValue) { - var recipientAddress = send.Recipient.Path.Address; - Func createAndRegisterWritingEndpoint = refuseUid => endpoints.RegisterWritableEndpoint(recipientAddress, - CreateEndpoint(recipientAddress, send.Recipient.LocalAddressToUse, - _transportMapping[send.Recipient.LocalAddressToUse], settings, writing: true, - handleOption: null, refuseUid: refuseUid), refuseUid); - - endpoints.WritableEndpointWithPolicyFor(recipientAddress).Match() - .With( - pass => - { - pass.Endpoint.Tell(send); - }) - .With(gated => + _endpoints.MarkAsQuarantined(quarantine.RemoteAddress, quarantine.Uid.Value, Deadline.Now + _settings.QuarantineDuration); + _eventPublisher.NotifyListeners(new QuarantinedEvent(quarantine.RemoteAddress, quarantine.Uid.Value)); + } + }); + + Receive(send => + { + var recipientAddress = send.Recipient.Path.Address; + Func createAndRegisterWritingEndpoint = refuseUid => _endpoints.RegisterWritableEndpoint(recipientAddress, + CreateEndpoint(recipientAddress, send.Recipient.LocalAddressToUse, + _transportMapping[send.Recipient.LocalAddressToUse], _settings, writing: true, + handleOption: null, refuseUid: refuseUid), uid: null, refuseUid: refuseUid); + + _endpoints.WritableEndpointWithPolicyFor(recipientAddress).Match() + .With( + pass => { - if(gated.TimeOfRelease.IsOverdue) createAndRegisterWritingEndpoint(null).Tell(send); - else Context.System.DeadLetters.Tell(send); + pass.Endpoint.Tell(send); }) - .With(quarantined => - { + .With(gated => + { + if (gated.TimeOfRelease.IsOverdue) createAndRegisterWritingEndpoint(null).Tell(send); + else Context.System.DeadLetters.Tell(send); + }) + .With(quarantined => + { // timeOfRelease is only used for garbage collection reasons, therefore it is ignored here. We still have // the Quarantined tombstone and we know what UID we don't want to accept, so use it. - createAndRegisterWritingEndpoint((int)quarantined.Uid).Tell(send); - }) - .Default(msg => createAndRegisterWritingEndpoint(null).Tell(send)); - }) - .With(HandleInboundAssociation) - .With(endpoint => AcceptPendingReader(endpoint.Writer)) - .With(terminated => - { - AcceptPendingReader(terminated.ActorRef); - endpoints.UnregisterEndpoint(terminated.ActorRef); - HandleStashedInbound(terminated.ActorRef); - }) - .With(tookover => RemovePendingReader(tookover.Writer, tookover.ProtocolHandle)) - .With(gotuid => - { - endpoints.RegisterWritableEndpointUid(Sender, gotuid.Uid); - HandleStashedInbound(Sender); - }) - .With(prune => endpoints.Prune()) - .With(shutdown => - { + createAndRegisterWritingEndpoint(quarantined.Uid).Tell(send); + }) + .Default(msg => createAndRegisterWritingEndpoint(null).Tell(send)); + }); + Receive(ia => HandleInboundAssociation(ia, false)); + Receive(endpoint => AcceptPendingReader(endpoint.Writer)); + Receive(terminated => + { + AcceptPendingReader(terminated.ActorRef); + _endpoints.UnregisterEndpoint(terminated.ActorRef); + HandleStashedInbound(terminated.ActorRef, writerIsIdle: false); + }); + Receive(tookover => RemovePendingReader(tookover.Writer, tookover.ProtocolHandle)); + Receive(gotuid => + { + _endpoints.RegisterWritableEndpointUid(Sender, gotuid.Uid); + HandleStashedInbound(Sender, writerIsIdle: false); + }); + Receive(idle => + { + HandleStashedInbound(Sender, writerIsIdle: true); + }); + Receive(prune => _endpoints.Prune()); + Receive(shutdown => + { //Shutdown all endpoints and signal to Sender when ready (and whether all endpoints were shutdown gracefully) var sender = Sender; // The construction of the Task for shutdownStatus has to happen after the flushStatus future has been finished // so that endpoints are shut down before transports. - var shutdownStatus = Task.WhenAll(endpoints.AllEndpoints.Select( - x => x.GracefulStop(settings.FlushWait, EndpointWriter.FlushAndStop.Instance))).ContinueWith( - result => + var shutdownStatus = Task.WhenAll(_endpoints.AllEndpoints.Select( + x => x.GracefulStop(_settings.FlushWait, EndpointWriter.FlushAndStop.Instance))).ContinueWith( + result => + { + if (result.IsFaulted || result.IsCanceled) { - if (result.IsFaulted || result.IsCanceled) - { - if (result.Exception != null) - result.Exception.Handle(e => true); - return false; - } - return result.Result.All(x => x); - }, TaskContinuationOptions.ExecuteSynchronously); - - shutdownStatus.ContinueWith(tr => Task.WhenAll(_transportMapping.Values.Select(x => x.Shutdown())).ContinueWith( - result => + if (result.Exception != null) + result.Exception.Handle(e => true); + return false; + } + return result.Result.All(x => x); + }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); + + shutdownStatus.ContinueWith(tr => Task.WhenAll(_transportMapping.Values.Select(x => x.Shutdown())).ContinueWith( + result => + { + if (result.IsFaulted || result.IsCanceled) { - if (result.IsFaulted || result.IsCanceled) - { - if (result.Exception != null) - result.Exception.Handle(e => true); - return false; - } - return result.Result.All(x => x) && tr.Result; - }, TaskContinuationOptions.ExecuteSynchronously)).Unwrap().PipeTo(sender); + if (result.Exception != null) + result.Exception.Handle(e => true); + return false; + } + return result.Result.All(x => x) && tr.Result; + }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default)).Unwrap().PipeTo(sender); - foreach (var handoff in pendingReadHandoffs.Values) - { - handoff.Disassociate(DisassociateInfo.Shutdown); - } - + foreach (var handoff in _pendingReadHandoffs.Values) + { + handoff.Disassociate(DisassociateInfo.Shutdown); + } + //Ignore all other writes _normalShutdown = true; - Context.Become(Flushing); - }); + Become(Flushing); + }); } - protected void Flushing(object message) + protected void Flushing() { - message.Match() - .With(send => Context.System.DeadLetters.Tell(send)) - .With( - ia => ia.Association.AsInstanceOf().Disassociate(DisassociateInfo.Shutdown)) - .With(terminated => { }); + Receive(send => Context.System.DeadLetters.Tell(send)); + Receive( + ia => ia.Association.AsInstanceOf().Disassociate(DisassociateInfo.Shutdown)); + Receive(terminated => { }); // why should we care now? } #endregion #region Internal methods - private void HandleInboundAssociation(InboundAssociation ia) + private void HandleInboundAssociation(InboundAssociation ia, bool writerIsIdle) { - var readonlyEndpoint = endpoints.ReadOnlyEndpointFor(ia.Association.RemoteAddress); - var handle = ((AkkaProtocolHandle) ia.Association); + var readonlyEndpoint = _endpoints.ReadOnlyEndpointFor(ia.Association.RemoteAddress); + var handle = ((AkkaProtocolHandle)ia.Association); if (readonlyEndpoint != null) { - if (pendingReadHandoffs.ContainsKey(readonlyEndpoint)) pendingReadHandoffs[readonlyEndpoint].Disassociate(); - pendingReadHandoffs.AddOrSet(readonlyEndpoint, handle); - readonlyEndpoint.Tell(new EndpointWriter.TakeOver(handle, Self)); + var endpoint = readonlyEndpoint.Item1; + if (_pendingReadHandoffs.ContainsKey(endpoint)) _pendingReadHandoffs[endpoint].Disassociate(); + _pendingReadHandoffs.AddOrSet(endpoint, handle); + endpoint.Tell(new EndpointWriter.TakeOver(handle, Self)); + _endpoints.WritableEndpointWithPolicyFor(handle.RemoteAddress).Match() + .With(pass => + { + pass.Endpoint.Tell(new ReliableDeliverySupervisor.Ungate()); + }); } else { - if (endpoints.IsQuarantined(handle.RemoteAddress, (int)handle.HandshakeInfo.Uid)) + if (_endpoints.IsQuarantined(handle.RemoteAddress, handle.HandshakeInfo.Uid)) handle.Disassociate(DisassociateInfo.Quarantined); else { - if (endpoints.WritableEndpointWithPolicyFor(handle.RemoteAddress) is Pass) + var policy = _endpoints.WritableEndpointWithPolicyFor(handle.RemoteAddress); + var pass = policy as Pass; + if (pass != null && !pass.Uid.HasValue) { - var pass = (Pass) endpoints.WritableEndpointWithPolicyFor(handle.RemoteAddress); - if (!pass.Uid.HasValue) + // Idle writer will never send a GotUid or a Terminated so we need to "provoke it" + // to get an unstash event + if (!writerIsIdle) { - if (stashedInbound.ContainsKey(pass.Endpoint)) stashedInbound[pass.Endpoint].Add(ia); - else stashedInbound.AddOrSet(pass.Endpoint, new List() {ia}); + pass.Endpoint.Tell(ReliableDeliverySupervisor.IsIdle.Instance); + var stashedInboundForEp = _stashedInbound.GetOrElse(pass.Endpoint, + new List()); + stashedInboundForEp.Add(ia); + _stashedInbound[pass.Endpoint] = stashedInboundForEp; } else { - if (handle.HandshakeInfo.Uid == pass.Uid) - { - if (pendingReadHandoffs.ContainsKey(pass.Endpoint)) - pendingReadHandoffs[pass.Endpoint].Disassociate(); - pendingReadHandoffs.AddOrSet(pass.Endpoint, handle); - pass.Endpoint.Tell(new EndpointWriter.StopReading(pass.Endpoint, Self)); - } - else - { - Context.Stop(pass.Endpoint); - endpoints.UnregisterEndpoint(pass.Endpoint); - pendingReadHandoffs.Remove(pass.Endpoint); - CreateAndRegisterEndpoint(handle, pass.Uid); - } + CreateAndRegisterEndpoint(handle, _endpoints.RefuseUid(handle.RemoteAddress)); + } + } + else if (pass != null) // has a UID value + { + if (handle.HandshakeInfo.Uid == pass.Uid) + { + _pendingReadHandoffs.GetOrElse(pass.Endpoint, null)?.Disassociate(); + _pendingReadHandoffs.AddOrSet(pass.Endpoint, handle); + pass.Endpoint.Tell(new EndpointWriter.StopReading(pass.Endpoint, Self)); + pass.Endpoint.Tell(new ReliableDeliverySupervisor.Ungate()); + } + else + { + Context.Stop(pass.Endpoint); + _endpoints.UnregisterEndpoint(pass.Endpoint); + _pendingReadHandoffs.Remove(pass.Endpoint); + CreateAndRegisterEndpoint(handle, pass.Uid); } } else { - var state = endpoints.WritableEndpointWithPolicyFor(handle.RemoteAddress); - CreateAndRegisterEndpoint(handle, null); + CreateAndRegisterEndpoint(handle, _endpoints.RefuseUid(handle.RemoteAddress)); } } } } - private void HandleStashedInbound(IActorRef endpoint) - { - var stashed = stashedInbound.GetOrElse(endpoint, new List()); - stashedInbound.Remove(endpoint); - foreach(var ia in stashed) - HandleInboundAssociation(ia); - } - private Task>>> _listens; private Task>>> @@ -699,7 +802,7 @@ private Task(); - foreach (var transportSettings in settings.Transports) + foreach (var transportSettings in _settings.Transports) { var args = new object[] { Context.System, transportSettings.Config }; @@ -710,17 +813,17 @@ private Task internal class EndpointRegistry { - private readonly Dictionary addressToReadonly = new Dictionary(); + private readonly Dictionary> _addressToReadonly = new Dictionary>(); - private Dictionary addressToWritable = + private Dictionary _addressToWritable = new Dictionary(); - private readonly Dictionary readonlyToAddress = new Dictionary(); - private readonly Dictionary writableToAddress = new Dictionary(); - public IActorRef RegisterWritableEndpoint(Address address, IActorRef endpoint, int? uid = null) + private readonly Dictionary _readonlyToAddress = new Dictionary(); + private readonly Dictionary _writableToAddress = new Dictionary(); + public IActorRef RegisterWritableEndpoint(Address address, IActorRef endpoint, int? uid, int? refuseUid) { EndpointManager.EndpointPolicy existing; - if (addressToWritable.TryGetValue(address, out existing)) + _addressToWritable.TryGetValue(address, out existing); + + var pass = existing as EndpointManager.Pass; + if (pass != null) // if we already have a writable endpoint.... { - var gated = existing as EndpointManager.Gated; - if(gated != null && !gated.TimeOfRelease.IsOverdue) //don't throw if the prune timer didn't get a chance to run first - throw new ArgumentException("Attempting to overwrite existing endpoint " + existing + " with " + endpoint); + var e = pass.Endpoint; + throw new ArgumentException("Attempting to overwrite existing endpoint " + e + " with " + endpoint); } - addressToWritable.AddOrSet(address, new EndpointManager.Pass(endpoint, uid)); - writableToAddress.AddOrSet(endpoint, address); + + _addressToWritable.AddOrSet(address, new EndpointManager.Pass(endpoint, uid, refuseUid)); + _writableToAddress.AddOrSet(endpoint, address); return endpoint; } public void RegisterWritableEndpointUid(IActorRef writer, int uid) { - var address = writableToAddress[writer]; - if (addressToWritable[address] is EndpointManager.Pass) + var address = _writableToAddress[writer]; + if (_addressToWritable[address] is EndpointManager.Pass) { - var pass = (EndpointManager.Pass) addressToWritable[address]; - addressToWritable[address] = new EndpointManager.Pass(pass.Endpoint, uid); + var pass = (EndpointManager.Pass)_addressToWritable[address]; + _addressToWritable[address] = new EndpointManager.Pass(pass.Endpoint, uid, pass.RefuseUid); } + + // if the policy is not Pass, then the GotUid might have lost the race with some failure + } - public IActorRef RegisterReadOnlyEndpoint(Address address, IActorRef endpoint) + public IActorRef RegisterReadOnlyEndpoint(Address address, IActorRef endpoint, int uid) { - addressToReadonly.Add(address, endpoint); - readonlyToAddress.Add(endpoint, address); + _addressToReadonly.Add(address, Tuple.Create(endpoint, uid)); + _readonlyToAddress.Add(endpoint, address); return endpoint; } @@ -60,30 +66,36 @@ public void UnregisterEndpoint(IActorRef endpoint) { if (IsWritable(endpoint)) { - var address = writableToAddress[endpoint]; - if (addressToWritable[address] is EndpointManager.EndpointPolicy) + var address = _writableToAddress[endpoint]; + var policy = _addressToWritable[address]; + if (policy.IsTombstone) { - var policy = addressToWritable[address]; //if there is already a tombstone directive, leave it there - //otherwise, remove this address from the writeable address range - if (!policy.IsTombstone) - { - addressToWritable.Remove(address); - } } - writableToAddress.Remove(endpoint); + else + { + _addressToWritable.Remove(address); + } + _writableToAddress.Remove(endpoint); } - else if(IsReadOnly(endpoint)) + else if (IsReadOnly(endpoint)) { - addressToReadonly.Remove(readonlyToAddress[endpoint]); - readonlyToAddress.Remove(endpoint); + _addressToReadonly.Remove(_readonlyToAddress[endpoint]); + _readonlyToAddress.Remove(endpoint); } } - public IActorRef ReadOnlyEndpointFor(Address address) + public Address AddressForWriter(IActorRef writer) { - IActorRef tmp; - if (addressToReadonly.TryGetValue(address, out tmp)) + // Needs to return null if the key is not in the dictionary, instead of throwing. + Address value; + return _writableToAddress.TryGetValue(writer, out value) ? value : null; + } + + public Tuple ReadOnlyEndpointFor(Address address) + { + Tuple tmp; + if (_addressToReadonly.TryGetValue(address, out tmp)) { return tmp; } @@ -92,32 +104,38 @@ public IActorRef ReadOnlyEndpointFor(Address address) public bool IsWritable(IActorRef endpoint) { - return writableToAddress.ContainsKey(endpoint); + return _writableToAddress.ContainsKey(endpoint); } public bool IsReadOnly(IActorRef endpoint) { - return readonlyToAddress.ContainsKey(endpoint); + return _readonlyToAddress.ContainsKey(endpoint); } public bool IsQuarantined(Address address, int uid) { - var rvalue = false; - WritableEndpointWithPolicyFor(address).Match() - .With(q => - { - if (q.Uid == uid) - rvalue = q.Deadline.HasTimeLeft; - }) - .Default(msg => rvalue = false); + // timeOfRelease is only used for garbage collection. If an address is still probed, we should report the + // known fact that it is quarantined. + var policy = WritableEndpointWithPolicyFor(address) as EndpointManager.Quarantined; + return policy?.Uid == uid; + } - return rvalue; + public int? RefuseUid(Address address) + { + // timeOfRelease is only used for garbage collection. If an address is still probed, we should report the + // known fact that it is quarantined. + var policy = WritableEndpointWithPolicyFor(address); + var q = policy as EndpointManager.Quarantined; + var p = policy as EndpointManager.Pass; + if (q != null) return q.Uid; + if (p != null) return p.RefuseUid; + return null; } public EndpointManager.EndpointPolicy WritableEndpointWithPolicyFor(Address address) { EndpointManager.EndpointPolicy tmp; - if (addressToWritable.TryGetValue(address, out tmp)) + if (_addressToWritable.TryGetValue(address, out tmp)) { return tmp; } @@ -137,34 +155,34 @@ public void MarkAsFailed(IActorRef endpoint, Deadline timeOfRelease) { if (IsWritable(endpoint)) { - addressToWritable.AddOrSet(writableToAddress[endpoint], new EndpointManager.Gated(timeOfRelease)); - writableToAddress.Remove(endpoint); + _addressToWritable.AddOrSet(_writableToAddress[endpoint], new EndpointManager.Gated(timeOfRelease)); + _writableToAddress.Remove(endpoint); } else if (IsReadOnly(endpoint)) { - addressToReadonly.Remove(readonlyToAddress[endpoint]); - readonlyToAddress.Remove(endpoint); + _addressToReadonly.Remove(_readonlyToAddress[endpoint]); + _readonlyToAddress.Remove(endpoint); } } public void MarkAsQuarantined(Address address, int uid, Deadline timeOfRelease) { - addressToWritable.AddOrSet(address, new EndpointManager.Quarantined(uid, timeOfRelease)); + _addressToWritable.AddOrSet(address, new EndpointManager.Quarantined(uid, timeOfRelease)); } public void RemovePolicy(Address address) { - addressToWritable.Remove(address); + _addressToWritable.Remove(address); } public IList AllEndpoints { - get { return writableToAddress.Keys.Concat(readonlyToAddress.Keys).ToList(); } + get { return _writableToAddress.Keys.Concat(_readonlyToAddress.Keys).ToList(); } } public void Prune() { - addressToWritable = addressToWritable.Where( + _addressToWritable = _addressToWritable.Where( x => PruneFilterFunction(x.Value)).ToDictionary(key => key.Key, value => value.Value); } diff --git a/src/core/Akka.Remote/RemoteSettings.cs b/src/core/Akka.Remote/RemoteSettings.cs index 624a98bac74..055d38a6988 100644 --- a/src/core/Akka.Remote/RemoteSettings.cs +++ b/src/core/Akka.Remote/RemoteSettings.cs @@ -17,6 +17,7 @@ public class RemoteSettings { public RemoteSettings(Config config) { + //TODO: need to add value validation for each field Config = config; LogReceive = config.GetBoolean("akka.remote.log-received-messages"); LogSend = config.GetBoolean("akka.remote.log-sent-messages"); @@ -50,9 +51,12 @@ public RemoteSettings(Config config) UsePassiveConnections = config.GetBoolean("akka.remote.use-passive-connections"); SysMsgBufferSize = config.GetInt("akka.remote.system-message-buffer-size"); SysResendTimeout = config.GetTimeSpan("akka.remote.resend-interval"); + SysResendLimit = config.GetInt("akka.remote.resend-limit"); InitialSysMsgDeliveryTimeout = config.GetTimeSpan("akka.remote.initial-system-message-delivery-timeout"); + QuarantineSilentSystemTimeout = config.GetTimeSpan("akka.remote.quarantine-after-silence"); SysMsgAckTimeout = config.GetTimeSpan("akka.remote.system-message-ack-piggyback-timeout"); QuarantineDuration = config.GetTimeSpan("akka.remote.prune-quarantine-marker-after"); + StartupTimeout = config.GetTimeSpan("akka.remote.startup-timeout"); CommandAckTimeout = config.GetTimeSpan("akka.remote.command-ack-timeout"); @@ -97,8 +101,10 @@ public RemoteSettings(Config config) public TimeSpan RetryGateClosedFor { get; set; } public bool UsePassiveConnections { get; set; } public int SysMsgBufferSize { get; set; } + public int SysResendLimit { get; set; } public TimeSpan SysResendTimeout { get; set; } public TimeSpan InitialSysMsgDeliveryTimeout { get; set; } + public TimeSpan QuarantineSilentSystemTimeout { get; set; } public TimeSpan SysMsgAckTimeout { get; set; } public TimeSpan? QuarantineDuration { get; set; } public TimeSpan StartupTimeout { get; set; } diff --git a/src/core/Akka.Remote/Remoting.cs b/src/core/Akka.Remote/Remoting.cs index e7e01f1457f..0d6b7f3a1c9 100644 --- a/src/core/Akka.Remote/Remoting.cs +++ b/src/core/Akka.Remote/Remoting.cs @@ -9,6 +9,7 @@ using System.Collections.Generic; using System.Linq; using System.Text; +using System.Threading; using System.Threading.Tasks; using System.Web; using Akka.Actor; @@ -80,7 +81,6 @@ public static RARP For(ActorSystem system) #endregion } - //TODO: needs to be implemented in Endpoint /// /// INTERNAL API /// Messages marked with this interface will be sent before other messages when buffering is active. @@ -94,7 +94,7 @@ internal interface IPriorityMessage { } /// internal class Remoting : RemoteTransport { - private readonly ILoggingAdapter log; + private readonly ILoggingAdapter _log; private volatile IDictionary> _transportMapping; private volatile IActorRef _endpointManager; @@ -107,13 +107,13 @@ internal class Remoting : RemoteTransport private volatile Address _defaultAddress; private IActorRef _transportSupervisor; - private EventPublisher _eventPublisher; + private readonly EventPublisher _eventPublisher; public Remoting(ExtendedActorSystem system, RemoteActorRefProvider provider) : base(system, provider) { - log = Logging.GetLogger(system, "remoting"); - _eventPublisher = new EventPublisher(system, log, Logging.LogLevelFor(provider.RemoteSettings.RemoteLifecycleEventsLogLevel)); + _log = Logging.GetLogger(system, "remoting"); + _eventPublisher = new EventPublisher(system, _log, Logging.LogLevelFor(provider.RemoteSettings.RemoteLifecycleEventsLogLevel)); _transportSupervisor = system.SystemActorOf(Props.Create(), "transports"); } @@ -129,15 +129,17 @@ public override Address DefaultAddress get { return _defaultAddress; } } + /// + /// Start assumes that it cannot be followed by another Start() without having a Shutdown() first + /// public override void Start() { - log.Info("Starting remoting"); - if (_endpointManager == null) { + _log.Info("Starting remoting"); _endpointManager = System.SystemActorOf(RARP.For(System).ConfigureDispatcher( - Props.Create(() => new EndpointManager(System.Settings.Config, log)).WithDeploy(Deploy.Local)), + Props.Create(() => new EndpointManager(System.Settings.Config, _log)).WithDeploy(Deploy.Local)), EndpointManagerName); try @@ -153,8 +155,7 @@ public override void Start() if(akkaProtocolTransports.Count==0) throw new ConfigurationException(@"No transports enabled under ""akka.remote.enabled-transports"""); _addresses = new HashSet
(akkaProtocolTransports.Select(a => a.Address)); - // this.transportMapping = akkaProtocolTransports - // .ToDictionary(p => p.ProtocolTransport.Transport.SchemeIdentifier,); + IEnumerable> tmp = akkaProtocolTransports.GroupBy(t => t.ProtocolTransport.SchemeIdentifier); _transportMapping = new Dictionary>(); @@ -167,7 +168,7 @@ public override void Start() _defaultAddress = akkaProtocolTransports.Head().Address; _addresses = new HashSet
(akkaProtocolTransports.Select(x => x.Address)); - log.Info("Remoting started; listening on addresses : [{0}]", string.Join(",", _addresses.Select(x => x.ToString()))); + _log.Info("Remoting started; listening on addresses : [{0}]", string.Join(",", _addresses.Select(x => x.ToString()))); _endpointManager.Tell(new EndpointManager.StartupFinished()); _eventPublisher.NotifyListeners(new RemotingListenEvent(_addresses.ToList())); @@ -191,7 +192,7 @@ public override void Start() } else { - log.Warning("Remoting was already started. Ignoring start attempt."); + _log.Warning("Remoting was already started. Ignoring start attempt."); } } @@ -199,8 +200,8 @@ public override Task Shutdown() { if (_endpointManager == null) { - log.Warning("Remoting is not running. Ignoring shutdown attempt"); - return Task.Run(() => { }); + _log.Warning("Remoting is not running. Ignoring shutdown attempt"); + return Task.FromResult(true); } else { @@ -222,12 +223,12 @@ public override Task Shutdown() { if (!result.Result) { - log.Warning("Shutdown finished, but flushing might not have been successful and some messages might have been dropped. " + + _log.Warning("Shutdown finished, but flushing might not have been successful and some messages might have been dropped. " + "Increase akka.remote.flush-wait-on-shutdown to a larger value to avoid this."); } finalize(); } - }, TaskContinuationOptions.ExecuteSynchronously); + }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } } @@ -237,10 +238,8 @@ public override void Send(object message, IActorRef sender, RemoteActorRef recip { throw new RemoteTransportException("Attempted to send remote message but Remoting is not running.", null); } - if (sender == null) - sender = ActorRefs.NoSender; - _endpointManager.Tell(new EndpointManager.Send(message, recipient, sender), sender); + _endpointManager.Tell(new EndpointManager.Send(message, recipient, sender), sender ?? ActorRefs.NoSender); } public override Task ManagementCommand(object cmd) @@ -258,12 +257,12 @@ public override Task ManagementCommand(object cmd) if (result.IsCanceled || result.IsFaulted) return false; return result.Result.Status; - }, TaskContinuationOptions.ExecuteSynchronously); + }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } public override Address LocalAddressForRemote(Address remote) { - return LocalAddressForRemote(_transportMapping, remote); + return Remoting.LocalAddressForRemote(_transportMapping, remote); } public override void Quarantine(Address address, int? uid) @@ -291,7 +290,7 @@ private void NotifyError(string msg, Exception cause) public const string EndpointManagerName = "endpointManager"; - internal Address LocalAddressForRemote( + internal static Address LocalAddressForRemote( IDictionary> transportMapping, Address remote) { HashSet transports; @@ -347,21 +346,21 @@ public RegisterTransportActor(Props props, string name) /// /// Actor responsible for supervising the creation of all transport actors /// - internal class TransportSupervisor : UntypedActor + internal class TransportSupervisor : ReceiveActor { - private readonly SupervisorStrategy _strategy = new OneForOneStrategy(3, TimeSpan.FromMinutes(1), exception => Directive.Restart); + private readonly SupervisorStrategy _strategy = new OneForOneStrategy(exception => Directive.Restart); protected override SupervisorStrategy SupervisorStrategy() { return _strategy; } - protected override void OnReceive(object message) + public TransportSupervisor() { - message.Match() - .With(r => - { - Sender.Tell(Context.ActorOf(RARP.For(Context.System).ConfigureDispatcher(r.Props.WithDeploy(Deploy.Local)), r.Name)); - }); + Receive( + r => + Sender.Tell( + Context.ActorOf(RARP.For(Context.System).ConfigureDispatcher(r.Props.WithDeploy(Deploy.Local)), + r.Name))); } } } diff --git a/src/core/Akka.Remote/Transport/AkkaProtocolTransport.cs b/src/core/Akka.Remote/Transport/AkkaProtocolTransport.cs index 8b5916189d9..987d697d5b9 100644 --- a/src/core/Akka.Remote/Transport/AkkaProtocolTransport.cs +++ b/src/core/Akka.Remote/Transport/AkkaProtocolTransport.cs @@ -233,7 +233,7 @@ public AssociateUnderlyingRefuseUid(Address remoteAddress, TaskCompletionSource< internal sealed class HandshakeInfo { - public HandshakeInfo(Address origin, long uid) + public HandshakeInfo(Address origin, int uid) { Origin = origin; Uid = uid; @@ -241,7 +241,7 @@ public HandshakeInfo(Address origin, long uid) public Address Origin { get; private set; } - public long Uid { get; private set; } + public int Uid { get; private set; } public override bool Equals(object obj) { diff --git a/src/core/Akka.TestKit/Internal/TimeSpanExtensions.cs b/src/core/Akka.TestKit/Internal/TimeSpanExtensions.cs index 3df6d12ed24..614c689e91b 100644 --- a/src/core/Akka.TestKit/Internal/TimeSpanExtensions.cs +++ b/src/core/Akka.TestKit/Internal/TimeSpanExtensions.cs @@ -119,17 +119,6 @@ public static void EnsureIsPositiveFinite(this TimeSpan timeSpan, string paramet throw new ArgumentException("The timespan must be >0. Actual value: " + timeSpan, parameterName); } - /// - /// Returns the smallest value. - /// Note! Part of internal API. Breaking changes may occur without notice. Use at own risk. - /// - public static TimeSpan Min(this TimeSpan a, TimeSpan b) - { - if(b.IsInfinite()) return a; - if(a.IsInfinite()) return b; - return a < b ? a : b; - } - /// /// Returns the smallest value. if is null it's treated as /// undefined, and is returned. diff --git a/src/core/Akka/Actor/ActorSelection.cs b/src/core/Akka/Actor/ActorSelection.cs index d8949763fbf..1f5bcb191f6 100644 --- a/src/core/Akka/Actor/ActorSelection.cs +++ b/src/core/Akka/Actor/ActorSelection.cs @@ -111,13 +111,13 @@ private async Task InnerResolveOne(TimeSpan timeout) { var identity = await this.Ask(new Identify(null), timeout); if(identity.Subject == null) - throw new ActorNotFoundException(); + throw new ActorNotFoundException("subject was null"); return identity.Subject; } - catch + catch(Exception ex) { - throw new ActorNotFoundException(); + throw new ActorNotFoundException("Exception ocurred while resolving ActorSelection", ex); } } diff --git a/src/core/Akka/Actor/Exceptions.cs b/src/core/Akka/Actor/Exceptions.cs index 07ed1323d8d..eb1279fc109 100644 --- a/src/core/Akka/Actor/Exceptions.cs +++ b/src/core/Akka/Actor/Exceptions.cs @@ -438,6 +438,13 @@ protected ActorNotFoundException(SerializationInfo info, StreamingContext contex : base(info, context) { } + + /// + /// that takes a descriptive and optional . + /// + /// A user-defined error message. + /// An inner . + public ActorNotFoundException(string message, Exception innerException = null) : base(message, innerException) { } } /// diff --git a/src/core/Akka/Pattern/IllegalStateException.cs b/src/core/Akka/Pattern/IllegalStateException.cs index 0ffe86179d8..869460f254c 100644 --- a/src/core/Akka/Pattern/IllegalStateException.cs +++ b/src/core/Akka/Pattern/IllegalStateException.cs @@ -5,6 +5,7 @@ // //----------------------------------------------------------------------- +using System; using System.Runtime.Serialization; using Akka.Actor; @@ -23,6 +24,15 @@ public IllegalStateException(string message) : base(message) { } + /// + /// Initializes a new instance of the class. + /// + /// The message that describes the error. + /// The inner that was thrown. + public IllegalStateException(string message, Exception innerEx) : base(message, innerEx) + { + } + /// /// Initializes a new instance of the class. /// diff --git a/src/core/Akka/Util/Internal/Extensions.cs b/src/core/Akka/Util/Internal/Extensions.cs index 491b49bc852..44ac45c1bd8 100644 --- a/src/core/Akka/Util/Internal/Extensions.cs +++ b/src/core/Akka/Util/Internal/Extensions.cs @@ -82,6 +82,11 @@ public static TimeSpan Max(this TimeSpan @this, TimeSpan other) return @this > other ? @this : other; } + public static TimeSpan Min(this TimeSpan @this, TimeSpan other) + { + return @this < other ? @this : other; + } + public static IEnumerable Concat(this IEnumerable enumerable, T item) { var itemInArray = new[] {item};