Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remoting system upgrade #1596

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,21 @@ private class BenchmarkActorRef : MinimalActorRef
{
private readonly Counter _counter;

public BenchmarkActorRef(Counter counter)
public BenchmarkActorRef(Counter counter, RemoteActorRefProvider provider)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing some bugs here that I introduced when I changed the way IInboundMessageDispatcher works - specifically, I changed the way two IActorRefs compare equality. Instead of using == they now use .Equals, which is correct. Unfortunately that caused this code to throw a NullReferenceException during the performance test due to the Path property of BenchmarkActorRef being null. These changes fix that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
(we should really go through all of our code to find == on actorrefs, I do think we have a few of those. but that is for another issue/task)

{
_counter = counter;
Provider = provider;
Path = new RootActorPath(Provider.DefaultAddress) / "user" / "tempRef";
}

protected override void TellInternal(object message, IActorRef sender)
{
_counter.Increment();
}

public override ActorPath Path { get { return null; } }
public override IActorRefProvider Provider { get { return null; } }
public override ActorPath Path { get; }

public override IActorRefProvider Provider { get; }
}

private static readonly Config RemoteHocon = ConfigurationFactory.ParseString(@"
Expand Down Expand Up @@ -76,7 +79,7 @@ public void Setup(BenchmarkContext context)
_inboundMessageDispatcherCounter = context.GetCounter(MessageDispatcherThroughputCounterName);
_message = SerializedMessage.CreateBuilder().SetSerializerId(0).SetMessage(ByteString.CopyFromUtf8("foo")).Build();
_dispatcher = new DefaultMessageDispatcher(_actorSystem, RARP.For(_actorSystem).Provider, _actorSystem.Log);
_targetActorRef = new BenchmarkActorRef(_inboundMessageDispatcherCounter);
_targetActorRef = new BenchmarkActorRef(_inboundMessageDispatcherCounter, RARP.For(_actorSystem).Provider);
}

[PerfBenchmark(Description = "Tests the performance of the Default", RunMode = RunMode.Throughput, NumberOfIterations = 13, TestMode = TestMode.Measurement)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ public void Setup(BenchmarkContext context)
var system1EchoActorPath = new RootActorPath(system1Address) / "user" / "echo";
var system2RemoteActorPath = new RootActorPath(system2Address) / "user" / "benchmark";

_remoteReceiver = System1.ActorSelection(system2RemoteActorPath).ResolveOne(TimeSpan.FromSeconds(2)).Result;
// set the timeout high here to avoid timeouts
// TL;DR; - on slow machines it can take longer than 2 seconds to form the association, do the handshake, and reply back
// using the in-memory transport.
_remoteReceiver = System1.ActorSelection(system2RemoteActorPath).ResolveOne(TimeSpan.FromSeconds(30)).Result;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amongst the other issues I tackled within Akka.Remote, this change fixed the racy issue with the various remoting throughput benchmarks failing. The comment explains it for future contributors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any potential quirks associated with a longer timeout?
And what happens if the timeout is reached?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any potential quirks associated with a longer timeout?

Nah - it might make the benchmark run slightly longer on the build server, but it has zero impact on the benchmark itself (since this code runs before the benchmark can start.)

As far as functional stuff goes, not as far as I know - I expect it to work just like the 2s timeout.

And what happens if the timeout is reached?

Same as before - we get a failure logged on the build server. ActorNotFoundException

_remoteEcho =
System2.ActorSelection(system1EchoActorPath).ResolveOne(TimeSpan.FromSeconds(2)).Result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,23 @@ public override string CreateRegistryKey()
public override Config CreateActorSystemConfig(string actorSystemName, string ipOrHostname, int port, string registryKey = null)
{
var baseConfig = ConfigurationFactory.ParseString(@"
akka {
actor.provider = ""Akka.Remote.RemoteActorRefProvider,Akka.Remote""

remote {
log-remote-lifecycle-events = off

enabled-transports = [
""akka.remote.test"",
]

test {
transport-class = ""Akka.Remote.Transport.TestTransport,Akka.Remote""
applied-adapters = []
maximum-payload-bytes = 128000b
scheme-identifier = test
akka {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed some issues with the HOCON here. Had an extra brace at the end.

actor.provider = ""Akka.Remote.RemoteActorRefProvider,Akka.Remote""

remote {
log-remote-lifecycle-events = off
enabled-transports = [
""akka.remote.test"",
]

test {
transport-class = ""Akka.Remote.Transport.TestTransport,Akka.Remote""
applied-adapters = []
maximum-payload-bytes = 128000b
scheme-identifier = test
}
}
}
}
");

port = 10; //BUG: setting the port to 0 causes the DefaultAddress to report the port as -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,27 @@ public override Config CreateActorSystemConfig(string actorSystemName, string ip
{
var baseConfig = ConfigurationFactory.ParseString(@"
akka {
actor.provider = ""Akka.Remote.RemoteActorRefProvider,Akka.Remote""

remote {
log-remote-lifecycle-events = off

enabled-transports = [
""akka.remote.test"",
]

test {
transport-class = ""Akka.Remote.Transport.TestTransport,Akka.Remote""
applied-adapters = []
maximum-payload-bytes = 128000b
scheme-identifier = test
loglevel = ""WARNING""
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a bunch of logging settings when I was debugging - left them there but turned up the warning level sufficiently high so as to not impact the performance tests themselves.

stdout-loglevel = ""WARNING""
actor.provider = ""Akka.Remote.RemoteActorRefProvider,Akka.Remote""

remote {
log-received-messages = off
log-sent-messages = off
log-remote-lifecycle-events = off

enabled-transports = [
""akka.remote.test"",
]

test {
transport-class = ""Akka.Remote.Transport.TestTransport,Akka.Remote""
applied-adapters = []
maximum-payload-bytes = 128000b
scheme-identifier = test
}
}
}
}
");

port = 10; //BUG: setting the port to 0 causes the DefaultAddress to report the port as -1
Expand Down
24 changes: 12 additions & 12 deletions src/core/Akka.Remote.Tests/EndpointRegistrySpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void EndpointRegistry_must_be_able_to_register_a_writeable_endpoint_and_p
var reg = new EndpointRegistry();
Assert.Null(reg.WritableEndpointWithPolicyFor(address1));

Assert.Equal(actorA, reg.RegisterWritableEndpoint(address1, actorA));
Assert.Equal(actorA, reg.RegisterWritableEndpoint(address1, actorA,null,null));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The EndpointRegistry was modified to include some new data (UIDs for quarantining) and thus the signatures on these methods changed and that needed to be reflected in the specs.

EndpointRegistry is internal so it won't have an impact on our users.


Assert.IsType<EndpointManager.Pass>(reg.WritableEndpointWithPolicyFor(address1));
Assert.Equal(actorA, reg.WritableEndpointWithPolicyFor(address1).AsInstanceOf<EndpointManager.Pass>().Endpoint);
Expand All @@ -52,8 +52,8 @@ public void EndpointRegistry_must_be_able_to_register_a_readonly_endpoint()
var reg = new EndpointRegistry();
Assert.Null(reg.ReadOnlyEndpointFor(address1));

Assert.Equal(actorA, reg.RegisterReadOnlyEndpoint(address1, actorA));
Assert.Equal(actorA, reg.ReadOnlyEndpointFor(address1));
Assert.Equal(actorA, reg.RegisterReadOnlyEndpoint(address1, actorA, 0));
Assert.Equal(Tuple.Create(actorA, 0), reg.ReadOnlyEndpointFor(address1));
Assert.Null(reg.WritableEndpointWithPolicyFor(address1));
Assert.False(reg.IsWritable(actorA));
Assert.True(reg.IsReadOnly(actorA));
Expand All @@ -67,10 +67,10 @@ public void EndpointRegistry_must_be_able_to_register_writable_and_readonly_endp
Assert.Null(reg.ReadOnlyEndpointFor(address1));
Assert.Null(reg.WritableEndpointWithPolicyFor(address1));

Assert.Equal(actorA, reg.RegisterReadOnlyEndpoint(address1, actorA));
Assert.Equal(actorB, reg.RegisterWritableEndpoint(address1, actorB));
Assert.Equal(actorA, reg.RegisterReadOnlyEndpoint(address1, actorA, 1));
Assert.Equal(actorB, reg.RegisterWritableEndpoint(address1, actorB, null,null));

Assert.Equal(actorA, reg.ReadOnlyEndpointFor(address1));
Assert.Equal(Tuple.Create(actorA,1), reg.ReadOnlyEndpointFor(address1));
Assert.Equal(actorB, reg.WritableEndpointWithPolicyFor(address1).AsInstanceOf<EndpointManager.Pass>().Endpoint);

Assert.False(reg.IsWritable(actorA));
Expand All @@ -85,7 +85,7 @@ public void EndpointRegistry_must_be_able_to_register_Gated_policy_for_an_addres
{
var reg = new EndpointRegistry();
Assert.Null(reg.WritableEndpointWithPolicyFor(address1));
reg.RegisterWritableEndpoint(address1, actorA);
reg.RegisterWritableEndpoint(address1, actorA, null, null);
var deadline = Deadline.Now;
reg.MarkAsFailed(actorA, deadline);
Assert.Equal(deadline, reg.WritableEndpointWithPolicyFor(address1).AsInstanceOf<EndpointManager.Gated>().TimeOfRelease);
Expand All @@ -97,7 +97,7 @@ public void EndpointRegistry_must_be_able_to_register_Gated_policy_for_an_addres
public void EndpointRegistry_must_remove_readonly_endpoints_if_marked_as_failed()
{
var reg = new EndpointRegistry();
reg.RegisterReadOnlyEndpoint(address1, actorA);
reg.RegisterReadOnlyEndpoint(address1, actorA, 2);
reg.MarkAsFailed(actorA, Deadline.Now);
Assert.Null(reg.ReadOnlyEndpointFor(address1));
}
Expand All @@ -106,8 +106,8 @@ public void EndpointRegistry_must_remove_readonly_endpoints_if_marked_as_failed(
public void EndpointRegistry_must_keep_tombstones_when_removing_an_endpoint()
{
var reg = new EndpointRegistry();
reg.RegisterWritableEndpoint(address1, actorA);
reg.RegisterWritableEndpoint(address2, actorB);
reg.RegisterWritableEndpoint(address1, actorA, null, null);
reg.RegisterWritableEndpoint(address2, actorB, null, null);
var deadline = Deadline.Now;
reg.MarkAsFailed(actorA, deadline);
reg.MarkAsQuarantined(address2, 42, deadline);
Expand All @@ -124,8 +124,8 @@ public void EndpointRegistry_must_keep_tombstones_when_removing_an_endpoint()
public void EndpointRegistry_should_prune_outdated_Gated_directives_properly()
{
var reg = new EndpointRegistry();
reg.RegisterWritableEndpoint(address1, actorA);
reg.RegisterWritableEndpoint(address2, actorB);
reg.RegisterWritableEndpoint(address1, actorA, null, null);
reg.RegisterWritableEndpoint(address2, actorB, null, null);
reg.MarkAsFailed(actorA, Deadline.Now);
var farIntheFuture = Deadline.Now + TimeSpan.FromSeconds(60);
reg.MarkAsFailed(actorB, farIntheFuture);
Expand Down
9 changes: 7 additions & 2 deletions src/core/Akka.Remote.Tests/RemoteConfigSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ public void Remoting_should_contain_correct_configuration_values_in_ReferenceCon
Assert.Equal(TimeSpan.FromSeconds(2), remoteSettings.FlushWait);
Assert.Equal(TimeSpan.FromSeconds(10), remoteSettings.StartupTimeout);
Assert.Equal(TimeSpan.FromSeconds(5), remoteSettings.RetryGateClosedFor);
//Assert.Equal("akka.remote.default-remote-dispatcher", remoteSettings.Dispatcher); //TODO: add RemoteDispatcher support
Assert.Equal("akka.remote.default-remote-dispatcher", remoteSettings.Dispatcher);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uncommented this setting since we've actually supported this value as of 1.0

Assert.True(remoteSettings.UsePassiveConnections);
Assert.Equal(TimeSpan.FromMilliseconds(50), remoteSettings.BackoffPeriod);
Assert.Equal(TimeSpan.FromSeconds(0.3d), remoteSettings.SysMsgAckTimeout);
Assert.Equal(TimeSpan.FromSeconds(2), remoteSettings.SysResendTimeout);
Assert.Equal(1000, remoteSettings.SysMsgBufferSize);
Assert.Equal(20000, remoteSettings.SysMsgBufferSize);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value was changed in the JVM Akka reference config.

Assert.Equal(TimeSpan.FromMinutes(3), remoteSettings.InitialSysMsgDeliveryTimeout);
Assert.Equal(TimeSpan.FromDays(5), remoteSettings.QuarantineDuration);
Assert.Equal(TimeSpan.FromDays(5), remoteSettings.QuarantineSilentSystemTimeout);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New property added to RemoteSettings, which needed to be accounted for in this spec.

Assert.Equal(TimeSpan.FromSeconds(30), remoteSettings.CommandAckTimeout);
Assert.Equal(1, remoteSettings.Transports.Length);
Assert.Equal(typeof(HeliosTcpTransport), Type.GetType(remoteSettings.Transports.Head().TransportClass));
Expand All @@ -58,6 +59,10 @@ public void Remoting_should_contain_correct_configuration_values_in_ReferenceCon
Assert.Equal(TimeSpan.FromMilliseconds(100), remoteSettings.WatchFailureDetectorConfig.GetTimeSpan("min-std-deviation"));

//TODO add adapter support

// TODO add WatchFailureDetectorConfig assertions

remoteSettings.Config.GetString("akka.remote.log-frame-size-exceeding").ShouldBe("off");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a TODO here to test for log-frame-size-exceeding, which I added.

}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ public static Config ThrottlerTransportAdapterSpecConfig
{
return ConfigurationFactory.ParseString(@"
akka {
akka.test.single-expect-default = 6s #to help overcome issues with gated connections
loglevel = ""DEBUG""
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some debugging here when this spec was failing as a result of some of my changes to EndpointWriter, which helped me find the bug. Should I turn these off?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a test, ¯_(ツ)_/¯ . I think its fine to leave it on.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool

stdout-loglevel = ""DEBUG""
test.single-expect-default = 6s #to help overcome issues with gated connections
actor.provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""
remote.helios.tcp.hostname = ""localhost""
remote.log-remote-lifecycle-events = off
Expand Down Expand Up @@ -90,7 +92,7 @@ public Lost(string msg)
Msg = msg;
}

public string Msg { get; private set; }
public string Msg { get; }

public bool Equals(Lost other)
{
Expand All @@ -108,7 +110,12 @@ public override bool Equals(object obj)

public override int GetHashCode()
{
return (Msg != null ? Msg.GetHashCode() : 0);
return Msg?.GetHashCode() ?? 0;
}

public override string ToString()
{
return GetType() + ": " + Msg;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Print the message that was supposed to be lost for debugging purposes.

}
}
}
Expand Down Expand Up @@ -200,7 +207,7 @@ public void ThrottlerTransportAdapter_must_survive_blackholing()
here.Tell(new ThrottlingTester.Lost("BlackHole 2"));
ExpectNoMsg(TimeSpan.FromSeconds(1));
Disassociate().ShouldBeTrue();
ExpectNoMsg(TimeSpan.FromSeconds(3));
ExpectNoMsg(TimeSpan.FromSeconds(1));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Value was supposed to be 1 second according to the JVM spec.


Throttle(ThrottleTransportAdapter.Direction.Both, Unthrottled.Instance).ShouldBeTrue();

Expand Down
25 changes: 24 additions & 1 deletion src/core/Akka.Remote/Configuration/Remote.conf
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,25 @@ akka {
# the affected systems after lifting the quarantine is undefined.
prune-quarantine-marker-after = 5 d

# If system messages have been exchanged between two systems (i.e. remote death
# watch or remote deployment has been used) a remote system will be marked as
# quarantined after the two system has no active association, and no
# communication happens during the time configured here.
# The only purpose of this setting is to avoid storing system message redelivery
# data (sequence number state, etc.) for an undefined amount of time leading to long
# term memory leak. Instead, if a system has been gone for this period,
# or more exactly
# - there is no association between the two systems (TCP connection, if TCP transport is used)
# - neither side has been attempting to communicate with the other
# - there are no pending system messages to deliver
# for the amount of time configured here, the remote system will be quarantined and all state
# associated with it will be dropped.
quarantine-after-silence = 5 d
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New setting that's used by the quarantining system


# This setting defines the maximum number of unacknowledged system messages
# allowed for a remote system. If this limit is reached the remote system is
# declared to be dead and its UID marked as tainted.
system-message-buffer-size = 1000
system-message-buffer-size = 20000
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JVM increased the buffer size 20x


# This setting defines the maximum idle time after an individual
# acknowledgement for system messages is sent. System message delivery
Expand All @@ -257,6 +272,14 @@ akka {
# resent.
resend-interval = 2 s

# Maximum number of unacknowledged system messages that will be resent
# each 'resend-interval'. If you watch many (> 1000) remote actors you can
# increase this value to for example 600, but a too large limit (e.g. 10000)
# may flood the connection and might cause false failure detection to trigger.
# Test such a configuration by watching all actors at the same time and stop
# all watched actors at the same time.
resend-limit = 200
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another new setting


# WARNING: this setting should not be not changed unless all of its consequences
# are properly understood which assumes experience with remoting internals
# or expert advice.
Expand Down
Loading