Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RemoteActorRefProvider address paring, caching and resolving improvements #5273

Merged
merged 41 commits into from
Oct 1, 2021
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
f25de15
refactor remote-actorref-provider and add tests for cache entries
Zetanova Sep 7, 2021
3fc0168
replace address-cache with actorpath-cache
Zetanova Sep 7, 2021
9a67f1c
refactor resolve with local address
Zetanova Sep 7, 2021
746f76b
refactor and cleanup
Zetanova Sep 7, 2021
bff01bb
remove volatile from fields
Zetanova Sep 7, 2021
d0f232e
Merge branch 'dev' into perf-remote-actorref-provider
Zetanova Sep 7, 2021
3dc67a5
merge upstream
Zetanova Sep 8, 2021
b458ae4
Merge branch 'perf-remote-actorref-provider' of https://github.com/Ze…
Zetanova Sep 8, 2021
2a55502
remove double equals
Zetanova Sep 8, 2021
14e88e2
cleanup
Zetanova Sep 9, 2021
762ec30
refactor to base
Zetanova Sep 9, 2021
aea2166
optimize elements list
Zetanova Sep 9, 2021
a36170f
improve actor path join
Zetanova Sep 10, 2021
aa211c7
improve actor path equals and compare
Zetanova Sep 10, 2021
1842505
cleanup
Zetanova Sep 10, 2021
f388cef
protect stack and use moveto of arraybuilder
Zetanova Sep 11, 2021
baaeea2
update api spec
Zetanova Sep 11, 2021
f86dc4b
test for jumbo actor path name support
Zetanova Sep 11, 2021
4427087
small refactors
Zetanova Sep 12, 2021
93f5d9c
add ActorPath.ParentOf(depth)
Zetanova Sep 12, 2021
d74deaa
dont copy actorpath
Zetanova Sep 12, 2021
a7a525e
use actorpath-cache and remove cache entry test
Zetanova Sep 12, 2021
d3c33e8
refactor fill array
Zetanova Sep 12, 2021
4512aec
prepair actor path cache for better deduplication
Zetanova Sep 12, 2021
ad3ca76
update api
Zetanova Sep 12, 2021
026a6fc
cache root actor path
Zetanova Sep 15, 2021
b29e242
update api
Zetanova Sep 15, 2021
9d9a2d7
remove obsolete code
Zetanova Sep 15, 2021
29eaff2
cleanup code
Zetanova Sep 15, 2021
f33929e
Merge branch 'dev' into perf-remote-actorref-provider
Aaronontheweb Sep 15, 2021
1051ed3
Merge remote-tracking branch 'upstream/dev' into perf-remote-actorref…
Zetanova Sep 15, 2021
2564415
Merge branch 'perf-remote-actorref-provider' of https://github.com/Ze…
Zetanova Sep 15, 2021
d3a0ae0
removed commented cache tests
Zetanova Sep 15, 2021
217485c
refactor span to string bulder
Zetanova Sep 15, 2021
409485f
use internal fields and ref equals
Zetanova Sep 17, 2021
9eba407
add rebase path test
Zetanova Sep 17, 2021
cc30aa1
fix possible NRE
Zetanova Sep 18, 2021
73d26ff
extend and test address parsing
Zetanova Sep 18, 2021
c13ad50
update api
Zetanova Sep 18, 2021
793b8b5
Merge branch 'dev' into perf-remote-actorref-provider
Zetanova Sep 29, 2021
8dd6e0c
Merge branch 'dev' into perf-remote-actorref-provider
Aaronontheweb Oct 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 14 additions & 21 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,21 @@ namespace Akka.Actor
protected ActorPath(Akka.Actor.Address address, string name) { }
protected ActorPath(Akka.Actor.ActorPath parentPath, string name, long uid) { }
public Akka.Actor.Address Address { get; }
public abstract System.Collections.Generic.IReadOnlyList<string> Elements { get; }
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
public int Depth { get; }
public System.Collections.Generic.IReadOnlyList<string> Elements { get; }
public string Name { get; }
public abstract Akka.Actor.ActorPath Parent { get; }
public abstract Akka.Actor.ActorPath Root { get; }
public Akka.Actor.ActorPath Parent { get; }
[Newtonsoft.Json.JsonIgnoreAttribute()]
public Akka.Actor.ActorPath Root { get; }
public long Uid { get; }
public Akka.Actor.ActorPath Child(string childName) { }
public abstract int CompareTo(Akka.Actor.ActorPath other);
public int CompareTo(Akka.Actor.ActorPath other) { }
public bool Equals(Akka.Actor.ActorPath other) { }
public override bool Equals(object obj) { }
public static string FormatPathElements(System.Collections.Generic.IEnumerable<string> pathElements) { }
public override int GetHashCode() { }
public static bool IsValidPathElement(string s) { }
public Akka.Actor.ActorPath ParentOf(int depth) { }
public static Akka.Actor.ActorPath Parse(string path) { }
public string ToSerializationFormat() { }
public string ToSerializationFormatWithAddress(Akka.Actor.Address address) { }
Expand All @@ -199,13 +202,15 @@ namespace Akka.Actor
public string ToStringWithoutAddress() { }
public Akka.Util.ISurrogate ToSurrogate(Akka.Actor.ActorSystem system) { }
public static bool TryParse(string path, out Akka.Actor.ActorPath actorPath) { }
public static bool TryParse(Akka.Actor.ActorPath basePath, System.ReadOnlySpan<char> absoluteUri, out Akka.Actor.ActorPath actorPath) { }
public static bool TryParseAddress(string path, out Akka.Actor.Address address) { }
public abstract Akka.Actor.ActorPath WithUid(long uid);
public static bool TryParseAddress(string path, out Akka.Actor.Address address, out System.ReadOnlySpan<char> absoluteUri) { }
public Akka.Actor.ActorPath WithUid(long uid) { }
public static Akka.Actor.ActorPath /(Akka.Actor.ActorPath path, string name) { }
public static Akka.Actor.ActorPath /(Akka.Actor.ActorPath path, System.Collections.Generic.IEnumerable<string> name) { }
public static bool ==(Akka.Actor.ActorPath left, Akka.Actor.ActorPath right) { }
public static bool !=(Akka.Actor.ActorPath left, Akka.Actor.ActorPath right) { }
public class Surrogate : Akka.Util.ISurrogate, System.IEquatable<Akka.Actor.ActorPath.Surrogate>, System.IEquatable<Akka.Actor.ActorPath>
public sealed class Surrogate : Akka.Util.ISurrogate, System.IEquatable<Akka.Actor.ActorPath.Surrogate>, System.IEquatable<Akka.Actor.ActorPath>
{
public Surrogate(string path) { }
public string Path { get; }
Expand Down Expand Up @@ -414,7 +419,7 @@ namespace Akka.Actor
public Akka.Actor.Address WithSystem(string system) { }
public static bool ==(Akka.Actor.Address left, Akka.Actor.Address right) { }
public static bool !=(Akka.Actor.Address left, Akka.Actor.Address right) { }
public class AddressSurrogate : Akka.Util.ISurrogate
public sealed class AddressSurrogate : Akka.Util.ISurrogate
{
public AddressSurrogate() { }
public string Host { get; set; }
Expand Down Expand Up @@ -497,15 +502,9 @@ namespace Akka.Actor
{
public static void CancelIfNotNull(this Akka.Actor.ICancelable cancelable) { }
}
public class ChildActorPath : Akka.Actor.ActorPath
public sealed class ChildActorPath : Akka.Actor.ActorPath
{
public ChildActorPath(Akka.Actor.ActorPath parentPath, string name, long uid) { }
public override System.Collections.Generic.IReadOnlyList<string> Elements { get; }
public override Akka.Actor.ActorPath Parent { get; }
public override Akka.Actor.ActorPath Root { get; }
public override int CompareTo(Akka.Actor.ActorPath other) { }
public override int GetHashCode() { }
public override Akka.Actor.ActorPath WithUid(long uid) { }
}
public sealed class CoordinatedShutdown : Akka.Actor.IExtension
{
Expand Down Expand Up @@ -1544,15 +1543,9 @@ namespace Akka.Actor
public void SwapUnderlying(Akka.Actor.ICell cell) { }
protected override void TellInternal(object message, Akka.Actor.IActorRef sender) { }
}
public class RootActorPath : Akka.Actor.ActorPath
public sealed class RootActorPath : Akka.Actor.ActorPath
{
public RootActorPath(Akka.Actor.Address address, string name = "") { }
public override System.Collections.Generic.IReadOnlyList<string> Elements { get; }
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
public override Akka.Actor.ActorPath Parent { get; }
[Newtonsoft.Json.JsonIgnoreAttribute()]
public override Akka.Actor.ActorPath Root { get; }
public override int CompareTo(Akka.Actor.ActorPath other) { }
public override Akka.Actor.ActorPath WithUid(long uid) { }
}
[Akka.Annotations.InternalApiAttribute()]
public class RootGuardianActorRef : Akka.Actor.LocalActorRef
Expand Down
29 changes: 29 additions & 0 deletions src/core/Akka.Remote.Tests/RemotingSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
using Xunit.Abstractions;
using Nito.AsyncEx;
using ThreadLocalRandom = Akka.Util.ThreadLocalRandom;
using Akka.Remote.Serialization;

namespace Akka.Remote.Tests
{
Expand Down Expand Up @@ -177,6 +178,34 @@ public async Task Remoting_must_support_Ask()
Assert.IsType<FutureActorRef<(string, IActorRef)>>(actorRef);
}

//[Fact]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to uncomment these @Zetanova ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this can be let here or deleted.

The thing with ThreadLocal.Values does not really work in the current Cache implementation,
it would produce a memory leak in production.

I did not an easy way to test the cache contains from outside.
It should be tested in one point in the feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed it, we would need to refactor the cache infrastructure.

Optimal place and a very easy thing to do,
would to move the ActorPathCache from TreadLocal to context based.

To create an instance of ActorPathCache in EndpointReader and passing it down to decode-messages would be an optimal place with the best cache-hit-rate.

Because ActorResolveCache is used by both writing/reader,
we would there something else.

//public async Task Remoting_should_not_cache_ref_of_local_ask()
//{
// var localActorRefResolveCache = ActorRefResolveThreadLocalCache.For(Sys);
// var localActorPathCache = ActorPathThreadLocalCache.For(Sys);

// var (msg, actorRef) = await _here.Ask<(string, IActorRef)>("ping", DefaultTimeout);
// Assert.Equal("pong", msg);
// Assert.IsType<FutureActorRef<(string, IActorRef)>>(actorRef);

// Assert.Equal(0, localActorRefResolveCache.All.Sum(n => n.Stats.Entries));
// Assert.Equal(2, localActorPathCache.All.Sum(n => n.Stats.Entries));
//}

//[Fact]
//public async Task Remoting_should_not_cache_ref_of_remote_ask()
//{
// var remoteActorRefResolveCache = ActorRefResolveThreadLocalCache.For(_remoteSystem);
// var remoteActorPathCache = ActorPathThreadLocalCache.For(_remoteSystem);

// var (msg, actorRef) = await _here.Ask<(string, IActorRef)>("ping", DefaultTimeout);
// Assert.Equal("pong", msg);
// Assert.IsType<FutureActorRef<(string, IActorRef)>>(actorRef);

// Assert.Equal(0, remoteActorRefResolveCache.All.Sum(n => n.Stats.Entries));
// Assert.Equal(2, remoteActorPathCache.All.Sum(n => n.Stats.Entries)); //should be 1
//}

[Fact(Skip = "Racy")]
public async Task Ask_does_not_deadlock()
{
Expand Down
97 changes: 48 additions & 49 deletions src/core/Akka.Remote/RemoteActorRefProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public interface IRemoteActorRefProvider : IActorRefProvider
/// <see cref="IActorRefProvider.ResolveActorRef(string)"/> method.
/// </summary>
/// <param name="path">The path of the actor we intend to resolve.</param>
/// <returns>An <see cref="IActorRef"/> if a match was found. Otherwise nobody.</returns>
/// <returns>An <see cref="IActorRef"/> if a match was found. Otherwise deadletters.</returns>
IActorRef InternalResolveActorRef(string path);

/// <summary>
Expand Down Expand Up @@ -128,7 +128,7 @@ public RemoteActorRefProvider(string systemName, Settings settings, EventStream
}

private readonly LocalActorRefProvider _local;
private volatile Internals _internals;
Zetanova marked this conversation as resolved.
Show resolved Hide resolved
private Internals _internals;
private ActorSystemImpl _system;

private Internals RemoteInternals
Expand Down Expand Up @@ -235,34 +235,34 @@ public void UnregisterTempActor(ActorPath path)
_local.UnregisterTempActor(path);
}

private volatile IActorRef _remotingTerminator;
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
private volatile IActorRef _remoteWatcher;
private IActorRef _remotingTerminator;
private IActorRef _remoteWatcher;

private volatile ActorRefResolveThreadLocalCache _actorRefResolveThreadLocalCache;
private volatile ActorPathThreadLocalCache _actorPathThreadLocalCache;
private ActorRefResolveThreadLocalCache _actorRefResolveThreadLocalCache;
private ActorPathThreadLocalCache _actorPathThreadLocalCache;

/// <summary>
/// The remote death watcher.
/// </summary>
public IActorRef RemoteWatcher => _remoteWatcher;
private volatile IActorRef _remoteDeploymentWatcher;
private IActorRef _remoteDeploymentWatcher;

/// <inheritdoc/>
public virtual void Init(ActorSystemImpl system)
{
_system = system;

_local.Init(system);

_actorRefResolveThreadLocalCache = ActorRefResolveThreadLocalCache.For(system);
_actorPathThreadLocalCache = ActorPathThreadLocalCache.For(system);

_local.Init(system);

_remotingTerminator =
_system.SystemActorOf(
RemoteSettings.ConfigureDispatcher(Props.Create(() => new RemotingTerminator(_local.SystemGuardian))),
"remoting-terminator");

_internals = CreateInternals();
_internals = CreateInternals();

_remotingTerminator.Tell(RemoteInternals);

Expand Down Expand Up @@ -433,9 +433,10 @@ public Deploy LookUpRemotes(IEnumerable<string> p)
return Deploy.None;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool HasAddress(Address address)
{
return address.Equals(_local.RootPath.Address) || address.Equals(RootPath.Address) || Transport.Addresses.Contains(address);
return address.Equals(RootPath.Address) || Transport.Addresses.Contains(address);
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
}

/// <summary>
Expand All @@ -458,21 +459,6 @@ private IInternalActorRef LocalActorOf(ActorSystemImpl system, Props props, IInt
return _local.ActorOf(system, props, supervisor, path, systemService, deploy, lookupDeploy, async);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool TryParseCachedPath(string actorPath, out ActorPath path)
{
if (_actorPathThreadLocalCache != null)
{
path = _actorPathThreadLocalCache.Cache.GetOrCompute(actorPath);
return path != null;
}
else // cache not initialized yet
{
return ActorPath.TryParse(actorPath, out path);
}
}


/// <summary>
/// INTERNAL API.
///
Expand All @@ -483,20 +469,31 @@ private bool TryParseCachedPath(string actorPath, out ActorPath path)
/// <returns>TBD</returns>
public IInternalActorRef ResolveActorRefWithLocalAddress(string path, Address localAddress)
{
if (TryParseCachedPath(path, out var actorPath))
if (path is null)
Zetanova marked this conversation as resolved.
Show resolved Hide resolved
{
//the actor's local address was already included in the ActorPath
if (HasAddress(actorPath.Address))
{
if (actorPath is RootActorPath)
return RootGuardian;
return (IInternalActorRef)ResolveActorRef(path); // so we can use caching
}
_log.Debug("resolve of unknown path [{0}] failed", path);
return InternalDeadLetters;
}

return CreateRemoteRef(new RootActorPath(actorPath.Address) / actorPath.ElementsWithUid, localAddress);
ActorPath actorPath;
if (_actorPathThreadLocalCache != null)
{
actorPath = _actorPathThreadLocalCache.Cache.GetOrCompute(path);
}
_log.Debug("resolve of unknown path [{0}] failed", path);
return InternalDeadLetters;
else // cache not initialized yet
{
ActorPath.TryParse(path, out actorPath);
}

if (!HasAddress(actorPath.Address))
return CreateRemoteRef(actorPath, localAddress);

//the actor's local address was already included in the ActorPath

if (actorPath is RootActorPath)
return RootGuardian;

return (IInternalActorRef)ResolveActorRef(path); // so we can use caching
}


Expand Down Expand Up @@ -539,7 +536,8 @@ public IActorRef ResolveActorRef(string path)
// if the value is not cached
if (_actorRefResolveThreadLocalCache == null)
{
return InternalResolveActorRef(path); // cache not initialized yet
// cache not initialized yet, should never happen
return InternalResolveActorRef(path);
}
return _actorRefResolveThreadLocalCache.Cache.GetOrCompute(path);
}
Expand Down Expand Up @@ -592,19 +590,20 @@ public IActorRef ResolveActorRef(ActorPath actorPath)
/// <returns>The remote Address, if applicable. If not applicable <c>null</c> may be returned.</returns>
public Address GetExternalAddressFor(Address address)
{
if (HasAddress(address)) { return _local.RootPath.Address; }
if (!string.IsNullOrEmpty(address.Host) && address.Port.HasValue)
if (HasAddress(address))
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
return _local.RootPath.Address;

if (string.IsNullOrEmpty(address.Host) || !address.Port.HasValue)
return null;

try
{
try
{
return Transport.LocalAddressForRemote(address);
}
catch
{
return null;
}
return Transport.LocalAddressForRemote(address);
}
catch
{
return null;
}
return null;
}

/// <summary>
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Remote/RemoteSystemDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ internal interface IDaemonMsg { }
/// <summary>
/// INTERNAL API
/// </summary>
internal class DaemonMsgCreate : IDaemonMsg
internal sealed class DaemonMsgCreate : IDaemonMsg
{
/// <summary>
/// Initializes a new instance of the <see cref="DaemonMsgCreate" /> class.
Expand Down Expand Up @@ -77,7 +77,7 @@ public DaemonMsgCreate(Props props, Deploy deploy, string path, IActorRef superv
///
/// It acts as the brain of the remote that responds to system remote messages and executes actions accordingly.
/// </summary>
internal class RemoteSystemDaemon : VirtualPathContainer
internal sealed class RemoteSystemDaemon : VirtualPathContainer
{
private readonly ActorSystemImpl _system;
private readonly Switch _terminating = new Switch(false);
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Remote/Remoting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ internal interface IPriorityMessage { }
/// <summary>
/// INTERNAL API
/// </summary>
internal class Remoting : RemoteTransport
internal sealed class Remoting : RemoteTransport
{
private readonly ILoggingAdapter _log;
private volatile IDictionary<string, HashSet<ProtocolTransportAddressPair>> _transportMapping;
Expand Down
17 changes: 13 additions & 4 deletions src/core/Akka.Remote/Serialization/ActorPathCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using Akka.Actor;
using System.Threading;
using System.Collections.Generic;

namespace Akka.Remote.Serialization
{
Expand All @@ -16,7 +17,7 @@ namespace Akka.Remote.Serialization
/// </summary>
internal sealed class ActorPathThreadLocalCache : ExtensionIdProvider<ActorPathThreadLocalCache>, IExtension
{
private readonly ThreadLocal<ActorPathCache> _current = new ThreadLocal<ActorPathCache>(() => new ActorPathCache());
private readonly ThreadLocal<ActorPathCache> _current = new ThreadLocal<ActorPathCache>(() => new ActorPathCache(), false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used true temporary to get ThreadLocal<>.Values for testing
This whole ActorPathThreadLocalCache will be removed in a feature PR
Better to put it into the EndpointReader


public ActorPathCache Cache => _current.Value;

Expand Down Expand Up @@ -47,9 +48,17 @@ protected override int Hash(string k)

protected override ActorPath Compute(string k)
{
if (ActorPath.TryParse(k, out var actorPath))
return actorPath;
return null;
//todo lookup in address cache

if (!ActorPath.TryParseAddress(k, out var address, out var absoluteUri))
return null;

//todo lookup in root in cache

if (!ActorPath.TryParse(new RootActorPath(address), absoluteUri, out var actorPath))
return null;

return actorPath;
Zetanova marked this conversation as resolved.
Show resolved Hide resolved
}

protected override bool IsCacheable(ActorPath v)
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.Remote/Serialization/ActorRefResolveCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// </copyright>
//-----------------------------------------------------------------------

using System.Collections.Generic;
using System.Threading;
using Akka.Actor;
using Akka.Util.Internal;
Expand All @@ -23,7 +24,7 @@ public ActorRefResolveThreadLocalCache() { }
public ActorRefResolveThreadLocalCache(IRemoteActorRefProvider provider)
{
_provider = provider;
_current = new ThreadLocal<ActorRefResolveCache>(() => new ActorRefResolveCache(_provider));
_current = new ThreadLocal<ActorRefResolveCache>(() => new ActorRefResolveCache(_provider), false);
}

public override ActorRefResolveThreadLocalCache CreateExtension(ExtendedActorSystem system)
Expand Down
Loading