Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add per-plugin recovery permitter actors #7448

Merged

Conversation

Arkatufus
Copy link
Contributor

@Arkatufus Arkatufus commented Jan 8, 2025

Fixes #7447

Changes

All changes are applied to internal APIs.

  • Move recovery permitter cache from global Persistence extension to each EventSourced actors.
  • RecoveryPermitter actor is now lazily created and stored inside persistence PluginHolder
  • Add option to declare "max-concurrent-recoveries" HOCON setting in each persistence plugin that, if declared, will override the global "akka.persistence.max-concurrent-recoveries"

Checklist

For significant changes, please ensure that the following have been completed (delete if not relevant):

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design looks good - but we should add some specs:

  1. Validate that we look for a plugin-specific max-concurrent-recoveries and that that can override the global default
  2. Validate, with a quick test against the Persistence extension itself, that multiple recovery permitter actors are created if multiple journals are defined.

No need for a big End2End integration spec, I think that should be fine.

@@ -166,6 +167,8 @@ public IStash Stash
/// </summary>
public IActorRef Journal => _journal ??= Extension.JournalFor(JournalPluginId);

internal IActorRef RecoveryPermitter => _recoveryPermitter ??= Extension.RecoveryPermitterFor(JournalPluginId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

}

public IActorRef Ref { get; }

public EventAdapters Adapters { get; }

public Config Config { get; }

public IActorRef RecoveryPermitter { get; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -303,8 +310,9 @@ private static PluginHolder NewPluginHolder(ExtendedActorSystem system, string c
var config = system.Settings.Config.GetConfig(configPath).WithFallback(system.Settings.Config.GetConfig(fallbackPath));
var plugin = CreatePlugin(system, configPath, config);
var adapters = CreateAdapters(system, configPath);
var recoveryPermitter = CreateRecoveryPermitter(system, configPath, config);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor Author

@Arkatufus Arkatufus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self-review

@@ -81,6 +81,7 @@ public abstract partial class Eventsourced : ActorBase, IPersistentIdentity, IPe
private readonly IStash _internalStash;
private IActorRef _snapshotStore;
private IActorRef _journal;
private IActorRef _recoveryPermitter;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The recovery permitter is being cached here instead of in the Persistence extension. This is lazily created when the plugin is initialized for the first time.

@@ -166,6 +167,8 @@ public IStash Stash
/// </summary>
public IActorRef Journal => _journal ??= Extension.JournalFor(JournalPluginId);

internal IActorRef RecoveryPermitter => _recoveryPermitter ??= Extension.RecoveryPermitterFor(JournalPluginId);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lazy instantiation

{
Ref = @ref;
Adapters = adapters;
Config = config;
RecoveryPermitter = recoveryPermitter;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recovery permitter is also cached inside the PluginHolder so that it can be looked up.

@@ -152,9 +147,10 @@ public string PersistenceId(IActorRef actor)
/// INTERNAL API: When starting many persistent actors at the same time the journal its data store is protected
/// from being overloaded by limiting number of recoveries that can be in progress at the same time.
/// </summary>
internal IActorRef RecoveryPermitter()
internal IActorRef RecoveryPermitterFor(string journalPluginId)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same design as AdaptersFor() to look up recovery permitter actor per plugin basis

@Arkatufus
Copy link
Contributor Author

Done adding spec

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Aaronontheweb Aaronontheweb enabled auto-merge (squash) January 8, 2025 19:57
@Aaronontheweb Aaronontheweb disabled auto-merge January 8, 2025 20:29
@Aaronontheweb Aaronontheweb merged commit 848cd4c into akkadotnet:dev Jan 8, 2025
10 of 12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[PERF] Akka.Cluster.Sharding remember-entity recovery timeouts for large entity counts
2 participants