Skip to content

Commit

Permalink
Akka.Streams: memory-optimize ActorMaterializer HOCON injection (#6440
Browse files Browse the repository at this point in the history
)

* optimize `ActorMaterializer` to only inject HOCON configuration once

creating lots of materializer instances (i.e. in actors that own their own streams) results in a tremendous amount of unreclaimable HOCON-related memory - on the order of 11Gb in https://github.com/Aaronontheweb/AkkaSqlQueryCrushTest

This tries to avoid injecting the top level Akka.Streams fallback every time a materializer is used. In addition to that, we try to cache the default materializer settings parsed from the `ActorSystem`s HOCON if none are provided.

* modernized syntax

* improve cache to detect multiple `ActorSystem` instances
  • Loading branch information
Aaronontheweb authored Feb 24, 2023
1 parent 05cfaca commit da1fceb
Showing 1 changed file with 43 additions and 10 deletions.
53 changes: 43 additions & 10 deletions src/core/Akka.Streams/ActorMaterializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
//-----------------------------------------------------------------------

using System;
using System.Collections.Concurrent;
using System.Runtime.Serialization;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Dispatch;
Expand Down Expand Up @@ -43,6 +45,18 @@ public static Config DefaultConfig()

#region static

/// <summary>
/// Injecting the top-level Materializer HOCON configuration over and over again is expensive, so we want to avoid
/// doing it each time a materializer is instantiated. This flag will be set to true once the configuration has been
/// injected the first time.
/// </summary>
private static readonly ConcurrentDictionary<ActorSystem, bool> InjectedConfig = new();

/// <summary>
/// Cache the default materializer settings so we don't constantly parse them
/// </summary>
private static readonly ConcurrentDictionary<ActorSystem, ActorMaterializerSettings> DefaultSettings = new();

/// <summary>
/// <para>
/// Creates a ActorMaterializer which will execute every step of a transformation
Expand Down Expand Up @@ -75,9 +89,28 @@ public static ActorMaterializer Create(IActorRefFactory context, ActorMaterializ
var haveShutDown = new AtomicBoolean();
var system = ActorSystemOf(context);

system.Settings.InjectTopLevelFallback(DefaultConfig());
if(!InjectedConfig.TryGetValue(system, out _) && InjectedConfig.TryAdd(system, true))
{
// Inject the top-level fallback config for the Materializer once, and only once.
// This is a performance optimization to avoid having to do this on every materialization.
system.Settings.InjectTopLevelFallback(DefaultConfig());

settings = settings ?? ActorMaterializerSettings.Create(system);
static async Task CleanUp(ActorSystem sys)
{
// remove ActorSystem from cache when it terminates so we don't leak memory
await sys.WhenTerminated.ConfigureAwait(false);
InjectedConfig.TryRemove(sys, out _);
DefaultSettings.TryRemove(sys, out _);
}

#pragma warning disable CS4014
CleanUp(system);
#pragma warning restore CS4014

}

// use the default settings if none have been passed in
settings ??= DefaultSettings.GetOrAdd(system, ActorMaterializerSettings.Create);

return new ActorMaterializerImpl(
system: system,
Expand All @@ -90,14 +123,14 @@ public static ActorMaterializer Create(IActorRefFactory context, ActorMaterializ

private static ActorSystem ActorSystemOf(IActorRefFactory context)
{
if (context is ExtendedActorSystem)
return (ActorSystem)context;
if (context is IActorContext)
return ((IActorContext)context).System;
if (context == null)
throw new ArgumentNullException(nameof(context), "IActorRefFactory must be defined");

throw new ArgumentException($"ActorRefFactory context must be a ActorSystem or ActorContext, got [{context.GetType()}]");
return context switch
{
ExtendedActorSystem system => system,
IActorContext actorContext => actorContext.System,
null => throw new ArgumentNullException(nameof(context), "IActorRefFactory must be defined"),
_ => throw new ArgumentException(
$"ActorRefFactory context must be a ActorSystem or ActorContext, got [{context.GetType()}]")
};
}

#endregion
Expand Down

0 comments on commit da1fceb

Please sign in to comment.