Skip to content

Commit

Permalink
attemt to fix akka.persistence plugin race conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
Horusiath committed Dec 30, 2015
1 parent e7b9d42 commit b8545a7
Showing 1 changed file with 23 additions and 19 deletions.
42 changes: 23 additions & 19 deletions src/core/Akka.Persistence/Persistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
using Akka.Configuration;
using Akka.Dispatch;
using Akka.Persistence.Journal;
using Akka.Util;
using Akka.Util.Internal.Collections;

namespace Akka.Persistence
{
Expand All @@ -39,8 +41,8 @@ public class PersistenceExtension : IExtension
private readonly Lazy<string> _defaultJournalPluginId;
private readonly Lazy<string> _defaultSnapshotPluginId;

private readonly ConcurrentDictionary<string, Lazy<PluginHolder>> _journalPluginExtensionIds = new ConcurrentDictionary<string, Lazy<PluginHolder>>();
private readonly ConcurrentDictionary<string, Lazy<PluginHolder>> _snapshotPluginExtensionIds = new ConcurrentDictionary<string, Lazy<PluginHolder>>();
private readonly AtomicReference<IImmutableMap<string, Lazy<PluginHolder>>> _journalPluginExtensionIds = new AtomicReference<IImmutableMap<string, Lazy<PluginHolder>>>(ImmutableTreeMap<string, Lazy<PluginHolder>>.Empty);
private readonly AtomicReference<IImmutableMap<string, Lazy<PluginHolder>>> _snapshotPluginExtensionIds = new AtomicReference<IImmutableMap<string, Lazy<PluginHolder>>>(ImmutableTreeMap<string, Lazy<PluginHolder>>.Empty);

public PersistenceExtension(ExtendedActorSystem system)
{
Expand Down Expand Up @@ -80,13 +82,14 @@ public IActorRef SnapshotStoreFor(string snapshotPluginId)
{
var configPath = string.IsNullOrEmpty(snapshotPluginId) ? _defaultSnapshotPluginId.Value : snapshotPluginId;
Lazy<PluginHolder> pluginContainer;
if (!_snapshotPluginExtensionIds.TryGetValue(configPath, out pluginContainer))
var extensionIdMap = _snapshotPluginExtensionIds.Value;
if (!extensionIdMap.TryGet(configPath, out pluginContainer))
{
var plugin = new Lazy<PluginHolder>(() => CreatePlugin(configPath, _ => DefaultPluginDispatcherId), LazyThreadSafetyMode.ExecutionAndPublication);
pluginContainer = _snapshotPluginExtensionIds.AddOrUpdate(configPath, plugin, (key, old) => plugin);
pluginContainer = new Lazy<PluginHolder>(() => CreatePlugin(configPath, _ => DefaultPluginDispatcherId), LazyThreadSafetyMode.ExecutionAndPublication);
_snapshotPluginExtensionIds.CompareAndSet(extensionIdMap, extensionIdMap.AddOrUpdate(configPath, pluginContainer));
return SnapshotStoreFor(snapshotPluginId);
}

return pluginContainer.Value.Ref;
else return pluginContainer.Value.Ref;
}

/// <summary>
Expand All @@ -97,17 +100,18 @@ public IActorRef JournalFor(string journalPluginId)
{
var configPath = string.IsNullOrEmpty(journalPluginId) ? _defaultJournalPluginId.Value : journalPluginId;
Lazy<PluginHolder> pluginContainer;
if (!_journalPluginExtensionIds.TryGetValue(configPath, out pluginContainer))
var extensionIdMap = _journalPluginExtensionIds.Value;
if (!extensionIdMap.TryGet(configPath, out pluginContainer))
{
var plugin = new Lazy<PluginHolder>(() => CreatePlugin(configPath, type =>
pluginContainer = new Lazy<PluginHolder>(() => CreatePlugin(configPath, type =>
typeof (AsyncWriteJournal).IsAssignableFrom(type)
? Dispatchers.DefaultDispatcherId
: DefaultPluginDispatcherId),
LazyThreadSafetyMode.ExecutionAndPublication);
pluginContainer = _journalPluginExtensionIds.AddOrUpdate(configPath, plugin, (key, old) => plugin);
_journalPluginExtensionIds.CompareAndSet(extensionIdMap, extensionIdMap.AddOrUpdate(configPath, pluginContainer));
return JournalFor(journalPluginId);
}

return pluginContainer.Value.Ref;
else return pluginContainer.Value.Ref;
}

/// <summary>
Expand All @@ -121,17 +125,17 @@ public EventAdapters AdaptersFor(string journalPluginId)
{
var configPath = string.IsNullOrEmpty(journalPluginId) ? _defaultJournalPluginId.Value : journalPluginId;
Lazy<PluginHolder> pluginContainer;
if (!_journalPluginExtensionIds.TryGetValue(configPath, out pluginContainer))
var extensionIdMap = _journalPluginExtensionIds.Value;
if (!extensionIdMap.TryGet(configPath, out pluginContainer))
{
var plugin = new Lazy<PluginHolder>(() =>
pluginContainer = new Lazy<PluginHolder>(() =>
CreatePlugin(configPath, type => typeof (AsyncWriteJournal).IsAssignableFrom(type)
? Dispatchers.DefaultDispatcherId
: DefaultPluginDispatcherId),
LazyThreadSafetyMode.ExecutionAndPublication);
pluginContainer = _journalPluginExtensionIds.AddOrUpdate(configPath, plugin, (key, old) => plugin);
}

return pluginContainer.Value.Adapters;
_journalPluginExtensionIds.CompareAndSet(extensionIdMap, extensionIdMap.AddOrUpdate(configPath, pluginContainer));
return AdaptersFor(journalPluginId);
}else return pluginContainer.Value.Adapters;
}

/// <summary>
Expand All @@ -141,7 +145,7 @@ public EventAdapters AdaptersFor(string journalPluginId)
/// <returns></returns>
internal EventAdapters AdaptersFor(IActorRef journalRef)
{
return _journalPluginExtensionIds.Values
return _journalPluginExtensionIds.Value.AllValuesMinToMax
.Select(ext => Equals(ext.Value.Ref, journalRef) ? ext.Value.Adapters : null)
.FirstOrDefault(r => r != null)
?? IdentityEventAdapters.Instance;
Expand Down

0 comments on commit b8545a7

Please sign in to comment.