Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf optimize ActorSelection #4962

Merged
62 changes: 62 additions & 0 deletions src/benchmark/Akka.Benchmarks/Actor/ActorSelectionBenchmark.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using System;
using System.Collections.Generic;
using System.Net.NetworkInformation;
using System.Text;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Benchmarks.Configurations;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Engines;

namespace Akka.Benchmarks.Actor
{
[Config(typeof(MicroBenchmarkConfig))] // need memory diagnosis
public class ActorSelectionBenchmark
{
[Params(10000)]
public int Iterations { get; set; }
private TimeSpan _timeout;
private ActorSystem _system;
private IActorRef _echo;

// cached selection for measuring .Tell / .Ask performance
private ActorSelection _actorSelection;

[GlobalSetup]
public void Setup()
{
_timeout = TimeSpan.FromMinutes(1);
_system = ActorSystem.Create("system");
_echo = _system.ActorOf(Props.Create(() => new EchoActor()), "echo");
_actorSelection = _system.ActorSelection("/user/echo");
}

[Benchmark]
public async Task RequestResponseActorSelection()
{
for(var i = 0; i < Iterations; i++)
await _actorSelection.Ask("foo", _timeout);
}

[Benchmark]
public void CreateActorSelection()
{
for (var i = 0; i < Iterations; i++)
_system.ActorSelection("/user/echo");
}

[GlobalCleanup]
public void Cleanup()
{
_system.Terminate().Wait();
}

public class EchoActor : UntypedActor
{
protected override void OnReceive(object message)
{
Sender.Tell(message);
}
}
}
}
2 changes: 2 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1621,13 +1621,15 @@ namespace Akka.Actor
}
public class SelectChildRecursive : Akka.Actor.SelectionPathElement
{
public static readonly Akka.Actor.SelectChildRecursive Instance;
public SelectChildRecursive() { }
public override bool Equals(object obj) { }
public override int GetHashCode() { }
public override string ToString() { }
}
public class SelectParent : Akka.Actor.SelectionPathElement
{
public static readonly Akka.Actor.SelectParent Instance;
public SelectParent() { }
public override bool Equals(object obj) { }
public override int GetHashCode() { }
Expand Down
5 changes: 5 additions & 0 deletions src/core/Akka.Tests.Performance/Actor/ActorSelectionSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public void Setup(BenchmarkContext context)
[PerfBenchmark(Description = "Tests the message delivery throughput of NEW ActorSelections to NEW actors",
NumberOfIterations = 13, RunMode = RunMode.Throughput, RunTimeMilliseconds = 1000, TestMode = TestMode.Measurement)]
[CounterMeasurement(ActorSelectionCounterName)]
[MemoryMeasurement(MemoryMetric.TotalBytesAllocated)]
public void New_ActorSelection_on_new_actor_throughput(BenchmarkContext context)
{
var actorRef = System.ActorOf(_oneMessageBenchmarkProps); // create a new actor every time
Expand All @@ -77,6 +78,7 @@ public void New_ActorSelection_on_new_actor_throughput(BenchmarkContext context)
[PerfBenchmark(Description = "Tests the message delivery throughput of REUSABLE ActorSelections to PRE-EXISTING actors",
NumberOfIterations = 13, RunMode = RunMode.Iterations, TestMode = TestMode.Measurement)]
[CounterMeasurement(ActorSelectionCounterName)]
[MemoryMeasurement(MemoryMetric.TotalBytesAllocated)]
public void Reused_ActorSelection_on_pre_existing_actor_throughput(BenchmarkContext context)
{
var actorSelection = System.ActorSelection(_receiverActorPath);
Expand All @@ -91,6 +93,7 @@ public void Reused_ActorSelection_on_pre_existing_actor_throughput(BenchmarkCont
[PerfBenchmark(Description = "Tests the message delivery throughput of NEW ActorSelections to PRE-EXISTING actors. This is really a stress test.",
NumberOfIterations = 13, RunMode = RunMode.Iterations, TestMode = TestMode.Measurement)]
[CounterMeasurement(ActorSelectionCounterName)]
[MemoryMeasurement(MemoryMetric.TotalBytesAllocated)]
public void New_ActorSelection_on_pre_existing_actor_throughput(BenchmarkContext context)
{
for (var i = 0; i < NumberOfMessages;)
Expand All @@ -104,6 +107,7 @@ public void New_ActorSelection_on_pre_existing_actor_throughput(BenchmarkContext
[PerfBenchmark(Description = "Tests the throughput of resolving an ActorSelection on a pre-existing actor via ResolveOne",
NumberOfIterations = 13, RunMode = RunMode.Throughput, RunTimeMilliseconds = 1000, TestMode = TestMode.Measurement)]
[CounterMeasurement(ActorSelectionCounterName)]
[MemoryMeasurement(MemoryMetric.TotalBytesAllocated)]
public void ActorSelection_ResolveOne_throughput(BenchmarkContext context)
{
var actorRef= System.ActorSelection(_receiverActorPath).ResolveOne(TimeSpan.FromSeconds(2)).Result; // send that actor a message via selection
Expand All @@ -113,6 +117,7 @@ public void ActorSelection_ResolveOne_throughput(BenchmarkContext context)
[PerfBenchmark(Description = "Continuously creates actors and attempts to resolve them immediately. Used to surface race conditions.",
NumberOfIterations = 13, RunMode = RunMode.Throughput, RunTimeMilliseconds = 1000, TestMode = TestMode.Measurement)]
[CounterMeasurement(ActorSelectionCounterName)]
[MemoryMeasurement(MemoryMetric.TotalBytesAllocated)]
public void ActorSelection_ResolveOne_stress_test(BenchmarkContext context)
{
var actorRef = System.ActorOf(_oneMessageBenchmarkProps); // create a new actor every time
Expand Down
3 changes: 3 additions & 0 deletions src/core/Akka.Tests/Actor/ActorSelectionSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,9 @@ public void An_ActorSelection_must_identify_actors_with_double_wildcard_selectio
// nothing under /user/a/b2/c1/d
Sys.ActorSelection("/user/a/b2/c1/d/**").Tell(new Identify(3), probe.Ref);
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));

Action illegalDoubleWildCard = () => Sys.ActorSelection("/user/a/**/d").Tell(new Identify(4), probe.Ref);
illegalDoubleWildCard.Should().Throw<IllegalActorNameException>();
}

[Fact]
Expand Down
43 changes: 29 additions & 14 deletions src/core/Akka/Actor/ActorSelection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,31 +77,34 @@ public ActorSelection(IActorRef anchor, IEnumerable<string> elements)
Anchor = anchor;

var list = new List<SelectionPathElement>();
var iter = elements.Iterator();
while (!iter.IsEmpty())
var count = elements.Count(); // shouldn't have a multiple enumeration issue\
var i = 0;
foreach (var s in elements)
{
var s = iter.Next();
switch (s)
{
case null:
case "":
break;
case "**":
if (!iter.IsEmpty())
if (i < count-1)
throw new IllegalActorNameException("Double wildcard can only appear at the last path entry");
list.Add(new SelectChildRecursive());
list.Add(SelectChildRecursive.Instance);
break;
case string e when e.Contains("?") || e.Contains("*"):
list.Add(new SelectChildPattern(e));
break;
case string e when e == "..":
list.Add(new SelectParent());
list.Add(SelectParent.Instance);
break;
default:
list.Add(new SelectChildName(s));
break;
}

i++;
}

Path = list.ToArray();
}

Expand Down Expand Up @@ -194,10 +197,12 @@ void Rec(IInternalActorRef actorRef)
{
if (actorRef is ActorRefWithCell refWithCell)
{
var emptyRef = new EmptyLocalActorRef(
provider: refWithCell.Provider,
path: anchor.Path / sel.Elements.Select(el => el.ToString()),
eventStream: refWithCell.Underlying.System.EventStream);
EmptyLocalActorRef EmptyRef(){
return new EmptyLocalActorRef(
provider: refWithCell.Provider,
path: anchor.Path / sel.Elements.Select(el => el.ToString()),
eventStream: refWithCell.Underlying.System.EventStream);
}

switch (iter.Next())
{
Expand All @@ -217,7 +222,7 @@ void Rec(IInternalActorRef actorRef)
{
// don't send to emptyRef after wildcard fan-out
if (!sel.WildCardFanOut)
emptyRef.Tell(sel, sender);
EmptyRef().Tell(sel, sender);
}
else if (iter.IsEmpty())
{
Expand All @@ -234,7 +239,7 @@ void Rec(IInternalActorRef actorRef)
if (allChildren.Count == 0)
return;

var msg = new ActorSelectionMessage(sel.Message, new[] { new SelectChildRecursive() }, true);
var msg = new ActorSelectionMessage(sel.Message, new SelectionPathElement[] { SelectChildRecursive.Instance }, true);
foreach (var c in allChildren)
{
c.Tell(sel.Message, sender);
Expand All @@ -250,7 +255,7 @@ void Rec(IInternalActorRef actorRef)
if (iter.IsEmpty())
{
if (matchingChildren.Count == 0 && !sel.WildCardFanOut)
emptyRef.Tell(sel, sender);
EmptyRef().Tell(sel, sender);
else
{
for (var i = 0; i < matchingChildren.Count; i++)
Expand All @@ -261,7 +266,7 @@ void Rec(IInternalActorRef actorRef)
{
// don't send to emptyRef after wildcard fan-out
if (matchingChildren.Count == 0 && !sel.WildCardFanOut)
emptyRef.Tell(sel, sender);
EmptyRef().Tell(sel, sender);
else
{
var message = new ActorSelectionMessage(
Expand Down Expand Up @@ -474,6 +479,11 @@ public override bool Equals(object obj)
return true;
}

/// <summary>
/// Use this instead of calling the default constructor
/// </summary>
public static readonly SelectChildRecursive Instance = new SelectChildRecursive();

/// <inheritdoc/>
public override int GetHashCode() => "**".GetHashCode();

Expand All @@ -487,6 +497,11 @@ public override bool Equals(object obj)
/// </summary>
public class SelectParent : SelectionPathElement
{
/// <summary>
/// Use this instead of calling the default constructor
/// </summary>
public static readonly SelectParent Instance = new SelectParent();

/// <inheritdoc/>
public override bool Equals(object obj) => !ReferenceEquals(obj, null) && obj is SelectParent;

Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka/Util/Internal/Collections/Iterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@

namespace Akka.Util.Internal.Collections
{
internal sealed class Iterator<T>
internal struct Iterator<T>
{
private readonly IList<T> _enumerator;
private int _index;

public Iterator(IEnumerable<T> enumerator)
{
_index = 0;
_enumerator = enumerator.ToList();
}

Expand Down