Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka.Streams: memory-optimize ActorMaterializer HOCON injection #6440

Merged
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 43 additions & 10 deletions src/core/Akka.Streams/ActorMaterializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
//-----------------------------------------------------------------------

using System;
using System.Collections.Concurrent;
using System.Runtime.Serialization;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Dispatch;
Expand Down Expand Up @@ -43,6 +45,18 @@ public static Config DefaultConfig()

#region static

/// <summary>
/// Injecting the top-level Materializer HOCON configuration over and over again is expensive, so we want to avoid
/// doing it each time a materializer is instantiated. This flag will be set to true once the configuration has been
/// injected the first time.
/// </summary>
private static readonly ConcurrentDictionary<ActorSystem, bool> InjectedConfig = new();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be unnecessary if the materializer were an ActorSystemExtension, but since it's not - have to do it this way.


/// <summary>
/// Cache the default materializer settings so we don't constantly parse them
/// </summary>
private static readonly ConcurrentDictionary<ActorSystem, ActorMaterializerSettings> DefaultSettings = new();

/// <summary>
/// <para>
/// Creates a ActorMaterializer which will execute every step of a transformation
Expand Down Expand Up @@ -75,9 +89,28 @@ public static ActorMaterializer Create(IActorRefFactory context, ActorMaterializ
var haveShutDown = new AtomicBoolean();
var system = ActorSystemOf(context);

system.Settings.InjectTopLevelFallback(DefaultConfig());
if(!InjectedConfig.TryGetValue(system, out _) && InjectedConfig.TryAdd(system, true))
{
// Inject the top-level fallback config for the Materializer once, and only once.
// This is a performance optimization to avoid having to do this on every materialization.
system.Settings.InjectTopLevelFallback(DefaultConfig());

settings = settings ?? ActorMaterializerSettings.Create(system);
static async Task CleanUp(ActorSystem sys)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a static delegate to cleanup ActorSystem references after shut down

{
// remove ActorSystem from cache when it terminates so we don't leak memory
await sys.WhenTerminated.ConfigureAwait(false);
InjectedConfig.TryRemove(sys, out _);
DefaultSettings.TryRemove(sys, out _);
}

#pragma warning disable CS4014
CleanUp(system);
#pragma warning restore CS4014

}

// use the default settings if none have been passed in
settings ??= DefaultSettings.GetOrAdd(system, ActorMaterializerSettings.Create);

return new ActorMaterializerImpl(
system: system,
Expand All @@ -90,14 +123,14 @@ public static ActorMaterializer Create(IActorRefFactory context, ActorMaterializ

private static ActorSystem ActorSystemOf(IActorRefFactory context)
{
if (context is ExtendedActorSystem)
return (ActorSystem)context;
if (context is IActorContext)
return ((IActorContext)context).System;
if (context == null)
throw new ArgumentNullException(nameof(context), "IActorRefFactory must be defined");

throw new ArgumentException($"ActorRefFactory context must be a ActorSystem or ActorContext, got [{context.GetType()}]");
return context switch
{
ExtendedActorSystem system => system,
IActorContext actorContext => actorContext.System,
null => throw new ArgumentNullException(nameof(context), "IActorRefFactory must be defined"),
_ => throw new ArgumentException(
$"ActorRefFactory context must be a ActorSystem or ActorContext, got [{context.GetType()}]")
};
}

#endregion
Expand Down