diff --git a/src/benchmark/Akka.Benchmarks/Actor/SpawnActorBenchmarks.cs b/src/benchmark/Akka.Benchmarks/Actor/SpawnActorBenchmarks.cs index 063892c72f4..f7ffd0623be 100644 --- a/src/benchmark/Akka.Benchmarks/Actor/SpawnActorBenchmarks.cs +++ b/src/benchmark/Akka.Benchmarks/Actor/SpawnActorBenchmarks.cs @@ -20,12 +20,19 @@ public class SpawnActorBenchmarks { [Params(100_000)] public int ActorCount { get;set; } + + [Params(true, false)] + public bool EnableTelemetry { get; set; } + private ActorSystem system; [IterationSetup] public void Setup() { - system = ActorSystem.Create("system"); + if(EnableTelemetry) // need to measure the impact of publishing actor start / stop events + system = ActorSystem.Create("system", "akka.actor.telemetry.enabled = true"); + else + system = ActorSystem.Create("system"); } [IterationCleanup] @@ -38,7 +45,12 @@ public void Cleanup() public async Task Actor_spawn() { var parent = system.ActorOf(Parent.Props); - await parent.Ask(new StartTest(ActorCount), TimeSpan.FromMinutes(2)); + + // spawn a bunch of actors + await parent.Ask(new StartTest(ActorCount), TimeSpan.FromMinutes(2)).ConfigureAwait(false); + + // terminate the hierarchy + await parent.GracefulStop(TimeSpan.FromMinutes(1)).ConfigureAwait(false); } #region actors diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt index ec3e7061dad..74f1fb4220f 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Core.verified.txt @@ -313,6 +313,12 @@ namespace Akka.Actor public static readonly Akka.Actor.IActorRef NoSender; public static readonly Akka.Actor.Nobody Nobody; } + public sealed class ActorRestarted : Akka.Actor.IActorTelemetryEvent, Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.INotInfluenceReceiveTimeout + { + public System.Type ActorType { get; } + public System.Exception Reason { get; } + public Akka.Actor.IActorRef Subject { get; } + } public class ActorSelection : Akka.Actor.ICanTell { public ActorSelection() { } @@ -339,6 +345,11 @@ namespace Akka.Actor public Akka.Actor.ActorSelectionMessage Copy(object message = null, Akka.Actor.SelectionPathElement[] elements = null, System.Nullable wildCardFanOut = null) { } public override string ToString() { } } + public sealed class ActorStarted : Akka.Actor.IActorTelemetryEvent, Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.INotInfluenceReceiveTimeout + { + public System.Type ActorType { get; } + public Akka.Actor.IActorRef Subject { get; } + } public class ActorStashPlugin : Akka.Actor.ActorProducerPluginBase { public ActorStashPlugin() { } @@ -346,6 +357,11 @@ namespace Akka.Actor public override void BeforeIncarnated(Akka.Actor.ActorBase actor, Akka.Actor.IActorContext context) { } public override bool CanBeAppliedTo(System.Type actorType) { } } + public sealed class ActorStopped : Akka.Actor.IActorTelemetryEvent, Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.INotInfluenceReceiveTimeout + { + public System.Type ActorType { get; } + public Akka.Actor.IActorRef Subject { get; } + } public abstract class ActorSystem : Akka.Actor.IActorRefFactory, System.IDisposable { protected ActorSystem() { } @@ -991,6 +1007,11 @@ namespace Akka.Actor { Akka.Actor.IStash Stash { get; set; } } + public interface IActorTelemetryEvent : Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.INotInfluenceReceiveTimeout + { + System.Type ActorType { get; } + Akka.Actor.IActorRef Subject { get; } + } public interface IAdvancedScheduler : Akka.Actor.IActionScheduler, Akka.Actor.IRunnableScheduler { } public interface IAutoReceivedMessage { } public interface ICanTell @@ -1689,6 +1710,7 @@ namespace Akka.Actor public bool DebugRouterMisconfiguration { get; } public bool DebugUnhandledMessage { get; } public int DefaultVirtualNodesFactor { get; } + public bool EmitActorTelemetry { get; } public bool FsmDebugEvent { get; } public bool HasCluster { get; } public string Home { get; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index dc68c7dff95..312e7e27bf7 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -313,6 +313,12 @@ namespace Akka.Actor public static readonly Akka.Actor.IActorRef NoSender; public static readonly Akka.Actor.Nobody Nobody; } + public sealed class ActorRestarted : Akka.Actor.IActorTelemetryEvent, Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.INotInfluenceReceiveTimeout + { + public System.Type ActorType { get; } + public System.Exception Reason { get; } + public Akka.Actor.IActorRef Subject { get; } + } public class ActorSelection : Akka.Actor.ICanTell { public ActorSelection() { } @@ -339,6 +345,11 @@ namespace Akka.Actor public Akka.Actor.ActorSelectionMessage Copy(object message = null, Akka.Actor.SelectionPathElement[] elements = null, System.Nullable wildCardFanOut = null) { } public override string ToString() { } } + public sealed class ActorStarted : Akka.Actor.IActorTelemetryEvent, Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.INotInfluenceReceiveTimeout + { + public System.Type ActorType { get; } + public Akka.Actor.IActorRef Subject { get; } + } public class ActorStashPlugin : Akka.Actor.ActorProducerPluginBase { public ActorStashPlugin() { } @@ -346,6 +357,11 @@ namespace Akka.Actor public override void BeforeIncarnated(Akka.Actor.ActorBase actor, Akka.Actor.IActorContext context) { } public override bool CanBeAppliedTo(System.Type actorType) { } } + public sealed class ActorStopped : Akka.Actor.IActorTelemetryEvent, Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.INotInfluenceReceiveTimeout + { + public System.Type ActorType { get; } + public Akka.Actor.IActorRef Subject { get; } + } public abstract class ActorSystem : Akka.Actor.IActorRefFactory, System.IDisposable { protected ActorSystem() { } @@ -993,6 +1009,11 @@ namespace Akka.Actor { Akka.Actor.IStash Stash { get; set; } } + public interface IActorTelemetryEvent : Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.INotInfluenceReceiveTimeout + { + System.Type ActorType { get; } + Akka.Actor.IActorRef Subject { get; } + } public interface IAdvancedScheduler : Akka.Actor.IActionScheduler, Akka.Actor.IRunnableScheduler { } public interface IAutoReceivedMessage { } public interface ICanTell @@ -1691,6 +1712,7 @@ namespace Akka.Actor public bool DebugRouterMisconfiguration { get; } public bool DebugUnhandledMessage { get; } public int DefaultVirtualNodesFactor { get; } + public bool EmitActorTelemetry { get; } public bool FsmDebugEvent { get; } public bool HasCluster { get; } public string Home { get; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index ec3e7061dad..74f1fb4220f 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -313,6 +313,12 @@ namespace Akka.Actor public static readonly Akka.Actor.IActorRef NoSender; public static readonly Akka.Actor.Nobody Nobody; } + public sealed class ActorRestarted : Akka.Actor.IActorTelemetryEvent, Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.INotInfluenceReceiveTimeout + { + public System.Type ActorType { get; } + public System.Exception Reason { get; } + public Akka.Actor.IActorRef Subject { get; } + } public class ActorSelection : Akka.Actor.ICanTell { public ActorSelection() { } @@ -339,6 +345,11 @@ namespace Akka.Actor public Akka.Actor.ActorSelectionMessage Copy(object message = null, Akka.Actor.SelectionPathElement[] elements = null, System.Nullable wildCardFanOut = null) { } public override string ToString() { } } + public sealed class ActorStarted : Akka.Actor.IActorTelemetryEvent, Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.INotInfluenceReceiveTimeout + { + public System.Type ActorType { get; } + public Akka.Actor.IActorRef Subject { get; } + } public class ActorStashPlugin : Akka.Actor.ActorProducerPluginBase { public ActorStashPlugin() { } @@ -346,6 +357,11 @@ namespace Akka.Actor public override void BeforeIncarnated(Akka.Actor.ActorBase actor, Akka.Actor.IActorContext context) { } public override bool CanBeAppliedTo(System.Type actorType) { } } + public sealed class ActorStopped : Akka.Actor.IActorTelemetryEvent, Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.INotInfluenceReceiveTimeout + { + public System.Type ActorType { get; } + public Akka.Actor.IActorRef Subject { get; } + } public abstract class ActorSystem : Akka.Actor.IActorRefFactory, System.IDisposable { protected ActorSystem() { } @@ -991,6 +1007,11 @@ namespace Akka.Actor { Akka.Actor.IStash Stash { get; set; } } + public interface IActorTelemetryEvent : Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.INotInfluenceReceiveTimeout + { + System.Type ActorType { get; } + Akka.Actor.IActorRef Subject { get; } + } public interface IAdvancedScheduler : Akka.Actor.IActionScheduler, Akka.Actor.IRunnableScheduler { } public interface IAutoReceivedMessage { } public interface ICanTell @@ -1689,6 +1710,7 @@ namespace Akka.Actor public bool DebugRouterMisconfiguration { get; } public bool DebugUnhandledMessage { get; } public int DefaultVirtualNodesFactor { get; } + public bool EmitActorTelemetry { get; } public bool FsmDebugEvent { get; } public bool HasCluster { get; } public string Home { get; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.verified.txt index 102fa9edc78..0d9c780778b 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.verified.txt @@ -23,7 +23,7 @@ [assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Tests.Performance")] [assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)] [assembly: System.Runtime.InteropServices.GuidAttribute("1a5cab08-b032-49ca-8db3-9428c5a9db14")] -[assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETStandard,Version=v2.0", FrameworkDisplayName=".NET Standard 2.0")] +[assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETStandard,Version=v2.0", FrameworkDisplayName="")] namespace Akka.Actor { public abstract class ActorBase : Akka.Actor.IInternalActor @@ -314,6 +314,12 @@ namespace Akka.Actor public static readonly Akka.Actor.IActorRef NoSender; public static readonly Akka.Actor.Nobody Nobody; } + public sealed class ActorRestarted : Akka.Actor.IActorTelemetryEvent, Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.INotInfluenceReceiveTimeout + { + public System.Type ActorType { get; } + public System.Exception Reason { get; } + public Akka.Actor.IActorRef Subject { get; } + } public class ActorSelection : Akka.Actor.ICanTell { public ActorSelection() { } @@ -340,6 +346,11 @@ namespace Akka.Actor public Akka.Actor.ActorSelectionMessage Copy(object message = null, Akka.Actor.SelectionPathElement[] elements = null, System.Nullable wildCardFanOut = null) { } public override string ToString() { } } + public sealed class ActorStarted : Akka.Actor.IActorTelemetryEvent, Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.INotInfluenceReceiveTimeout + { + public System.Type ActorType { get; } + public Akka.Actor.IActorRef Subject { get; } + } public class ActorStashPlugin : Akka.Actor.ActorProducerPluginBase { public ActorStashPlugin() { } @@ -347,6 +358,11 @@ namespace Akka.Actor public override void BeforeIncarnated(Akka.Actor.ActorBase actor, Akka.Actor.IActorContext context) { } public override bool CanBeAppliedTo(System.Type actorType) { } } + public sealed class ActorStopped : Akka.Actor.IActorTelemetryEvent, Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.INotInfluenceReceiveTimeout + { + public System.Type ActorType { get; } + public Akka.Actor.IActorRef Subject { get; } + } public abstract class ActorSystem : Akka.Actor.IActorRefFactory, System.IDisposable { protected ActorSystem() { } @@ -988,6 +1004,11 @@ namespace Akka.Actor { Akka.Actor.IStash Stash { get; set; } } + public interface IActorTelemetryEvent : Akka.Actor.INoSerializationVerificationNeeded, Akka.Actor.INotInfluenceReceiveTimeout + { + System.Type ActorType { get; } + Akka.Actor.IActorRef Subject { get; } + } public interface IAdvancedScheduler : Akka.Actor.IActionScheduler, Akka.Actor.IRunnableScheduler { } public interface IAutoReceivedMessage { } public interface ICanTell @@ -1682,6 +1703,7 @@ namespace Akka.Actor public bool DebugRouterMisconfiguration { get; } public bool DebugUnhandledMessage { get; } public int DefaultVirtualNodesFactor { get; } + public bool EmitActorTelemetry { get; } public bool FsmDebugEvent { get; } public bool HasCluster { get; } public string Home { get; } diff --git a/src/core/Akka.Remote.Tests/RemoteActorTelemetrySpecs.cs b/src/core/Akka.Remote.Tests/RemoteActorTelemetrySpecs.cs new file mode 100644 index 00000000000..b9dabbf576e --- /dev/null +++ b/src/core/Akka.Remote.Tests/RemoteActorTelemetrySpecs.cs @@ -0,0 +1,150 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.TestKit; +using Akka.TestKit.TestActors; +using Akka.Util.Internal; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Remote.Tests +{ + public class RemoteActorTelemetrySpecs : AkkaSpec + { + // create HOCON configuraiton that enables telemetry and Akka.Remote + private static readonly string Config = @" + akka { + actor { + provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote"" + telemetry.enabled = true + } + remote { + log-remote-lifecycle-events = on + dot-netty.tcp { + port = 0 + hostname = localhost + } + } + }"; + + public RemoteActorTelemetrySpecs(ITestOutputHelper outputHelper) : base(Config, outputHelper) + { + + } + + private class TelemetrySubscriber : ReceiveActor + { + // keep track of integer counters for each event type + private int _actorCreated; + private int _actorStopped; + private int _actorRestarted; + + // create a message type that will send the current values of all counters + public sealed class GetTelemetry + { + public int ActorCreated { get; } + public int ActorStopped { get; } + public int ActorRestarted { get; } + + public GetTelemetry(int actorCreated, int actorStopped, int actorRestarted) + { + ActorCreated = actorCreated; + ActorStopped = actorStopped; + ActorRestarted = actorRestarted; + } + } + + public class GetTelemetryRequest + { + // make singleton + public static readonly GetTelemetryRequest Instance = new GetTelemetryRequest(); + + private GetTelemetryRequest() + { + } + } + + public TelemetrySubscriber() + { + // Receive each type of IActorTelemetryEvent + Receive(e => { _actorCreated++; }); + Receive(e => { _actorStopped++; }); + Receive(e => { _actorRestarted++; }); + // receive a request for current counter values and return a GetTelemetry result + Receive(e => + Sender.Tell(new GetTelemetry(_actorCreated, _actorStopped, _actorRestarted))); + } + + protected override void PreStart() + { + Context.System.EventStream.Subscribe(Self, typeof(IActorTelemetryEvent)); + } + } + + // create a unit test where a second ActorSystem connects to Sys and receives an IActorRef from Sys and subscribes to Telemetry events + [Fact] + public async Task RemoteActorRefs_should_not_produce_telemetry() + { + // create a second ActorSystem that connects to Sys + var system2 = ActorSystem.Create(Sys.Name, Sys.Settings.Config); + try + { + // create a subscriber to receive telemetry events + var subscriber = system2.ActorOf(Props.Create()); + + // send a request for the current telemetry counters + var telemetry = await subscriber + .Ask(TelemetrySubscriber.GetTelemetryRequest.Instance); + + // verify that the counters are all correct + Assert.Equal(0, telemetry.ActorCreated); + Assert.Equal(0, telemetry.ActorStopped); + Assert.Equal(0, telemetry.ActorRestarted); + + // create an actor in Sys + var actor1 = Sys.ActorOf(BlackHoleActor.Props, "actor1"); + + // resolve the currently bound Akka.Remote address for Sys + var address = Sys.AsInstanceOf().Provider.DefaultAddress; + + // create a RootActorPath for actor1 that uses the previous address value + var actor1Path = new RootActorPath(address) / "user" / "actor1"; + + // have system2 send a request to actor1 via Akka.Remote + var actor2 = await system2.ActorSelection(actor1Path).ResolveOne(RemainingOrDefault); + + // send a request for the current telemetry counters + telemetry = await subscriber + .Ask(TelemetrySubscriber.GetTelemetryRequest.Instance); + + // verify that created actors is greater than 1 + var previouslyCreated = telemetry.ActorCreated; + Assert.True(previouslyCreated > 1); // should have had some /system actors started as well + Assert.Equal(0, telemetry.ActorStopped); + Assert.Equal(0, telemetry.ActorRestarted); + + // stop the actor in Sys + Sys.Stop(actor1); + + // send a request for the current telemetry counters + telemetry = await subscriber + .Ask(TelemetrySubscriber.GetTelemetryRequest.Instance); + // verify that the counters are all zero + Assert.Equal(previouslyCreated, telemetry.ActorCreated); // should not have changed + Assert.Equal(0, telemetry.ActorStopped); + Assert.Equal(0, telemetry.ActorRestarted); + } + finally + { + Shutdown(system2); + } + } + } +} \ No newline at end of file diff --git a/src/core/Akka.Tests/Actor/ActorTelemetrySpecs.cs b/src/core/Akka.Tests/Actor/ActorTelemetrySpecs.cs new file mode 100644 index 00000000000..57f74a3e53f --- /dev/null +++ b/src/core/Akka.Tests/Actor/ActorTelemetrySpecs.cs @@ -0,0 +1,304 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Configuration; +using Akka.Routing; +using Akka.TestKit; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Tests.Actor +{ + public class ActorTelemetrySpecs : AkkaSpec + { + public static readonly Config WithTelemetry = @"akka.actor.telemetry.enabled = on"; + + public ActorTelemetrySpecs(ITestOutputHelper output) : base(WithTelemetry, output) + { + } + + // create an actor that will subscribe to all of the IActorTelemetryEvents in the EventStream + private class TelemetrySubscriber : ReceiveActor + { + // keep track of integer counters for each event type + private int _actorCreated; + private int _actorStopped; + private int _actorRestarted; + + // create a message type that will send the current values of all counters + public sealed class GetTelemetry + { + public int ActorCreated { get; } + public int ActorStopped { get; } + public int ActorRestarted { get; } + + public GetTelemetry(int actorCreated, int actorStopped, int actorRestarted) + { + ActorCreated = actorCreated; + ActorStopped = actorStopped; + ActorRestarted = actorRestarted; + } + } + + public class GetTelemetryRequest + { + // make singleton + public static readonly GetTelemetryRequest Instance = new GetTelemetryRequest(); + private GetTelemetryRequest() { } + } + + public TelemetrySubscriber() + { + // Receive each type of IActorTelemetryEvent + Receive(e => { _actorCreated++; }); + Receive(e => { _actorStopped++; }); + Receive(e => { _actorRestarted++; }); + // receive a request for current counter values and return a GetTelemetry result + Receive(e => Sender.Tell(new GetTelemetry(_actorCreated, _actorStopped, _actorRestarted))); + } + + protected override void PreStart() + { + Context.System.EventStream.Subscribe(Self, typeof(IActorTelemetryEvent)); + } + } + + // CreateChildren message type + public class CreateChildren + { + public CreateChildren(int count) + { + Count = count; + } + + public int Count { get; } + } + + // create a RestartChildren message type + public class RestartChildren + { + // make singleton + public static readonly RestartChildren Instance = new RestartChildren(); + + private RestartChildren() + { + } + } + + // an actor that will spawn a configurable number of child actors + private class ParentActor : ReceiveActor + { + public ParentActor() + { + // handle a command that will spawn N children + Receive(cmd => + { + for (var i = 0; i < cmd.Count; i++) + { + Context.ActorOf(Props.Create(), $"child-{i}"); + } + + // reply back to sender once complete + Sender.Tell("done"); + }); + + // forward a restart command to all children + Receive(cmd => + { + foreach (var child in Context.GetChildren()) + { + child.Forward(cmd); + } + + // reply back to sender once complete + Sender.Tell("done"); + }); + + // handle a command that causes the parent actor to restart + Receive(cmd => + { + if (cmd == "restart") + { + throw new Exception("Restarting"); + } + }); + } + } + + // create the ChildActor implementation + private class ChildActor : ReceiveActor + { + public ChildActor() + { + // handle a command that forces a restart + Receive(cmd => { throw new ApplicationException("Restarting"); }); + ReceiveAny(_ => { }); + } + } + + [Fact] + public async Task ActorTelemetry_must_be_accurate() + { + // create a TelemetrySubscriber actor + var subscriber = Sys.ActorOf(Props.Create(), "subscriber"); + + // request current telemetry values (ensure that the actor has started, so counter values will be accurate) + await subscriber.Ask(TelemetrySubscriber.GetTelemetryRequest.Instance); + + // create a parent actor + var parent = Sys.ActorOf(Props.Create(), "parent"); + + // send a message to the parent to create 100 children + parent.Tell(new CreateChildren(100)); + + // wait for the parent to reply back + ExpectMsg("done"); + + // awaitassert collecting data from the telemetry subscriber until we can see that 102 actors have been created + // 100 children + parent + subscriber itself + await AwaitAssertAsync(async () => + { + var telemetry = await subscriber.Ask(TelemetrySubscriber.GetTelemetryRequest.Instance); + Assert.Equal(102, telemetry.ActorCreated); + // assert no restarts or stops recorded + Assert.Equal(0, telemetry.ActorRestarted); + Assert.Equal(0, telemetry.ActorStopped); + }, RemainingOrDefault); + + // send a message to the parent to restart all children + parent.Tell(RestartChildren.Instance); + + // wait for the parent to reply back + ExpectMsg("done"); + + // await assert collecting data from the telemetry subscriber until we can see that 102 actors have been restarted + await AwaitAssertAsync(async () => + { + var telemetry = await subscriber.Ask(TelemetrySubscriber.GetTelemetryRequest.Instance); + // assert that actor start count is still 102 + Assert.Equal(102, telemetry.ActorCreated); + Assert.Equal(100, telemetry.ActorRestarted); + // assert no stops recorded + Assert.Equal(0, telemetry.ActorStopped); + }, RemainingOrDefault); + + // GracefulStop parent actor and assert that 101 actors have been stopped + await parent.GracefulStop(RemainingOrDefault); + await AwaitAssertAsync(async () => + { + var telemetry = await subscriber.Ask(TelemetrySubscriber.GetTelemetryRequest.Instance); + // assert that actor start count is still 102 + Assert.Equal(102, telemetry.ActorCreated); + Assert.Equal(100, telemetry.ActorRestarted); + Assert.Equal(101, telemetry.ActorStopped); + }, RemainingOrDefault); + } + + // create a unit test where a parent actor spawns 100 children and then restarts + [Fact] + public async Task ActorTelemetry_must_be_accurate_when_parent_restarts() + { + // create a TelemetrySubscriber actor + var subscriber = Sys.ActorOf(Props.Create(), "subscriber"); + + // request current telemetry values (ensure that the actor has started, so counter values will be accurate) + await subscriber.Ask(TelemetrySubscriber.GetTelemetryRequest.Instance); + + // create a parent actor + var parent = Sys.ActorOf(Props.Create(), "parent"); + + // send a message to the parent to create 100 children + parent.Tell(new CreateChildren(100)); + + // wait for the parent to reply back + ExpectMsg("done"); + + // awaitassert collecting data from the telemetry subscriber until we can see that 102 actors have been created + // 100 children + parent + subscriber itself + await AwaitAssertAsync(async () => + { + var telemetry = await subscriber.Ask(TelemetrySubscriber.GetTelemetryRequest.Instance); + Assert.Equal(102, telemetry.ActorCreated); + // assert no restarts or stops recorded + Assert.Equal(0, telemetry.ActorRestarted); + Assert.Equal(0, telemetry.ActorStopped); + }, RemainingOrDefault); + + // send a message to the parent to restart + parent.Tell("restart"); + + // await assert collecting data from the telemetry subscriber until we can see that 102 actors have been restarted + await AwaitAssertAsync(async () => + { + var telemetry = await subscriber.Ask(TelemetrySubscriber.GetTelemetryRequest.Instance); + // assert that actor start count is still 102 + Assert.Equal(102, telemetry.ActorCreated); + + // only 1 parent restart recorded + Assert.Equal(1, telemetry.ActorRestarted); + // assert 100 stops recorded (only the child actors) + Assert.Equal(100, telemetry.ActorStopped); + }, RemainingOrDefault); + } + + /// + /// Pool routers should have their start / stop / restarts counted too + /// + [Fact] + public async Task ActorTelemetry_must_be_accurate_for_pool_router() + { + // create a TelemetrySubscriber actor + var subscriber = Sys.ActorOf(Props.Create(), "subscriber"); + + // request current telemetry values (ensure that the actor has started, so counter values will be accurate) + await subscriber.Ask(TelemetrySubscriber.GetTelemetryRequest.Instance); + + // create a pool router + var router = Sys.ActorOf(Props.Create().WithRouter(new RoundRobinPool(10)), "router"); + + // awaitassert collecting data from the telemetry subscriber until we can see that 10 actors have been created + await AwaitAssertAsync(async () => + { + var telemetry = await subscriber.Ask(TelemetrySubscriber.GetTelemetryRequest.Instance); + Assert.Equal(12, telemetry.ActorCreated); + // assert no restarts or stops recorded + Assert.Equal(0, telemetry.ActorRestarted); + Assert.Equal(0, telemetry.ActorStopped); + }, RemainingOrDefault); + + // send a message to the router to restart all children + router.Tell(new Broadcast(RestartChildren.Instance)); + + // await assert collecting data from the telemetry subscriber until we can see that 10 actors have been restarted + await AwaitAssertAsync(async () => + { + var telemetry = await subscriber.Ask(TelemetrySubscriber.GetTelemetryRequest.Instance); + // assert that actor start count is still 10 + Assert.Equal(12, telemetry.ActorCreated); + + // bug due to https://github.com/akkadotnet/akka.net/issues/6295 - all routees and the router start each time + Assert.Equal(110, telemetry.ActorRestarted); + // assert no stops recorded + Assert.Equal(0, telemetry.ActorStopped); + }, RemainingOrDefault); + + // GracefulStop router actor and assert that 10 actors have been stopped + await router.GracefulStop(RemainingOrDefault); + await AwaitAssertAsync(async () => + { + var telemetry = await subscriber.Ask(TelemetrySubscriber.GetTelemetryRequest.Instance); + // assert that actor start count is still 10 + Assert.Equal(12, telemetry.ActorCreated); + Assert.Equal(110, telemetry.ActorRestarted); + Assert.Equal(11, telemetry.ActorStopped); + }, RemainingOrDefault); + } + } +} \ No newline at end of file diff --git a/src/core/Akka/Actor/ActorCell.DefaultMessages.cs b/src/core/Akka/Actor/ActorCell.DefaultMessages.cs index 9aaa6bc0ff7..791df01a592 100644 --- a/src/core/Akka/Actor/ActorCell.DefaultMessages.cs +++ b/src/core/Akka/Actor/ActorCell.DefaultMessages.cs @@ -449,6 +449,8 @@ private void Create(Exception failure) CheckReceiveTimeout(); if (System.Settings.DebugLifecycle) Publish(new Debug(Self.Path.ToString(), created.GetType(), "Started (" + created + ")")); + if(System.Settings.EmitActorTelemetry) + System.EventStream.Publish(new ActorStarted(Self, Props.Type)); } catch (Exception e) { diff --git a/src/core/Akka/Actor/ActorCell.FaultHandling.cs b/src/core/Akka/Actor/ActorCell.FaultHandling.cs index 6a47b1c58a2..a2cafea35d0 100644 --- a/src/core/Akka/Actor/ActorCell.FaultHandling.cs +++ b/src/core/Akka/Actor/ActorCell.FaultHandling.cs @@ -291,6 +291,9 @@ private void FinishTerminate() var pipeline = _systemImpl.ActorPipelineResolver.ResolvePipeline(a.GetType()); pipeline.BeforeActorIncarnated(a, this); } + + if(System.Settings.EmitActorTelemetry) + System.EventStream.Publish(new ActorStopped(Self, Props.Type)); } catch (Exception x) { @@ -348,7 +351,9 @@ private void FinishRecreate(Exception cause, ActorBase failedActor) if (System.Settings.DebugLifecycle) Publish(new Debug(_self.Path.ToString(), freshActor.GetType(), "Restarted (" + freshActor + ")")); - + if(System.Settings.EmitActorTelemetry) + System.EventStream.Publish(new ActorRestarted(Self, Props.Type, cause)); + // only after parent is up and running again do restart the children which were not stopped foreach (var survivingChild in survivors) { @@ -382,8 +387,7 @@ private void HandleFailed(Failed f) //Called handleFailure in Akka JVM //the UID protects against reception of a Failed from a child which was //killed in preRestart and re-created in postRestart - ChildRestartStats childStats; - if (TryGetChildStatsByRef(failedChild, out childStats)) + if (TryGetChildStatsByRef(failedChild, out var childStats)) { var statsUid = childStats.Child.Path.Uid; if (statsUid == f.Uid) @@ -429,8 +433,7 @@ private void HandleChildTerminated(IActorRef child) // if the removal changed the state of the (terminating) children container, // then we are continuing the previously suspended recreate/create/terminate action - var recreation = status as SuspendReason.Recreation; - if (recreation != null) + if (status is SuspendReason.Recreation recreation) { FinishRecreate(recreation.Cause, _actor); } diff --git a/src/core/Akka/Actor/ActorTelemetry.cs b/src/core/Akka/Actor/ActorTelemetry.cs new file mode 100644 index 00000000000..dc19c5efaf4 --- /dev/null +++ b/src/core/Akka/Actor/ActorTelemetry.cs @@ -0,0 +1,80 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2022 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using Akka.Event; + +namespace Akka.Actor +{ + /// + /// A set of events designed to provide some basic telemetry functions for monitoring actor lifecycles. + /// + /// We want to track actor starts, stops, and restarts. More detailed metrics, such as mailbox size or message + /// processing rates will require something like Phobos [https://phobos.petabridge.com/]. + /// + /// + /// Not intended to be sent across network boundaries - should only be processed locally via the . + /// + public interface IActorTelemetryEvent : INoSerializationVerificationNeeded, INotInfluenceReceiveTimeout + { + /// + /// The actor who emitted this event. + /// + IActorRef Subject {get;} + + /// + /// The implementation type for this actor. + /// + Type ActorType { get; } + } + + // Create ActorTelemetryEvent messages for the following events: starting an actor, stopping an actor, restarting an actor + public sealed class ActorStarted : IActorTelemetryEvent + { + internal ActorStarted(IActorRef subject, Type actorType) + { + Subject = subject; + ActorType = actorType; + } + + public IActorRef Subject { get; } + public Type ActorType { get; } + } + + /// + /// Event emitted when actor shuts down. + /// + public sealed class ActorStopped : IActorTelemetryEvent + { + internal ActorStopped(IActorRef subject, Type actorType) + { + Subject = subject; + ActorType = actorType; + } + + public IActorRef Subject { get; } + public Type ActorType { get; } + } + + /// + /// Emitted when an actor restarts. + /// + public sealed class ActorRestarted : IActorTelemetryEvent + { + internal ActorRestarted(IActorRef subject, Type actorType, Exception reason) + { + Subject = subject; + ActorType = actorType; + Reason = reason; + } + + public IActorRef Subject { get; } + public Type ActorType { get; } + + public Exception Reason { get; } + } +} \ No newline at end of file diff --git a/src/core/Akka/Actor/Settings.cs b/src/core/Akka/Actor/Settings.cs index 1ec770a14a2..5f2ae5dccf2 100644 --- a/src/core/Akka/Actor/Settings.cs +++ b/src/core/Akka/Actor/Settings.cs @@ -107,6 +107,7 @@ public Settings(ActorSystem system, Config config, ActorSystemSetup setup) SerializeAllMessages = Config.GetBoolean("akka.actor.serialize-messages", false); SerializeAllCreators = Config.GetBoolean("akka.actor.serialize-creators", false); + EmitActorTelemetry = Config.GetBoolean("akka.actor.telemetry.enabled", false); LogLevel = Config.GetString("akka.loglevel", null); StdoutLogLevel = Config.GetString("akka.stdout-loglevel", null); @@ -242,6 +243,17 @@ public Settings(ActorSystem system, Config config, ActorSystemSetup setup) /// /// true if [serialize all creators]; otherwise, false. public bool SerializeAllCreators { get; private set; } + + /// + /// When set to true, all actors will emit s when they are created, stopped, or restarted. + /// + /// + /// Defaults to false. + /// + /// + /// akka.actor.telemetry.enabled = on + /// + public bool EmitActorTelemetry { get; } /// /// Gets the default timeout for Futures.Ask calls. diff --git a/src/core/Akka/Configuration/Pigeon.conf b/src/core/Akka/Configuration/Pigeon.conf index c51231e125d..c748b84f9da 100644 --- a/src/core/Akka/Configuration/Pigeon.conf +++ b/src/core/Akka/Configuration/Pigeon.conf @@ -118,6 +118,12 @@ akka { # Default timeout for IActorRef.Ask. ask-timeout = infinite + # Controls telemetry settings built into Akka.NET + telemetry{ + # Enables telemetry emission from actors - actors will emit events + # whenever they are started, stopped, or restarted. + enabled = false + } # THIS DOES NOT APPLY TO .NET #