Skip to content

Commit

Permalink
Fix for race conditions in presistence plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
Horusiath committed Dec 13, 2015
1 parent 449e0f8 commit ee31816
Showing 1 changed file with 15 additions and 32 deletions.
47 changes: 15 additions & 32 deletions src/core/Akka.Persistence/Persistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public class PersistenceExtension : IExtension
private readonly Lazy<string> _defaultJournalPluginId;
private readonly Lazy<string> _defaultSnapshotPluginId;

private readonly ConcurrentDictionary<string, Lazy<PluginHolder>> _journalPluginExtensionIds = new ConcurrentDictionary<string, Lazy<PluginHolder>>();
private readonly ConcurrentDictionary<string, Lazy<PluginHolder>> _snapshotPluginExtensionIds = new ConcurrentDictionary<string, Lazy<PluginHolder>>();
private readonly ConcurrentDictionary<string, PluginHolder> _journalPluginExtensionIds = new ConcurrentDictionary<string, PluginHolder>();
private readonly ConcurrentDictionary<string, PluginHolder> _snapshotPluginExtensionIds = new ConcurrentDictionary<string, PluginHolder>();

public PersistenceExtension(ExtendedActorSystem system)
{
Expand Down Expand Up @@ -79,14 +79,9 @@ public string PersistenceId(IActorRef actor)
public IActorRef SnapshotStoreFor(string snapshotPluginId)
{
var configPath = string.IsNullOrEmpty(snapshotPluginId) ? _defaultSnapshotPluginId.Value : snapshotPluginId;
Lazy<PluginHolder> pluginContainer;
if (!_snapshotPluginExtensionIds.TryGetValue(configPath, out pluginContainer))
{
var plugin = new Lazy<PluginHolder>(() => CreatePlugin(configPath, _ => DefaultPluginDispatcherId), LazyThreadSafetyMode.ExecutionAndPublication);
pluginContainer = _snapshotPluginExtensionIds.AddOrUpdate(configPath, plugin, (key, old) => plugin);
}
var plugin = _snapshotPluginExtensionIds.GetOrAdd(configPath, key => CreatePlugin(key, _ => DefaultPluginDispatcherId));

return pluginContainer.Value.Ref;
return plugin.Ref;
}

/// <summary>
Expand All @@ -96,18 +91,12 @@ public IActorRef SnapshotStoreFor(string snapshotPluginId)
public IActorRef JournalFor(string journalPluginId)
{
var configPath = string.IsNullOrEmpty(journalPluginId) ? _defaultJournalPluginId.Value : journalPluginId;
Lazy<PluginHolder> pluginContainer;
if (!_journalPluginExtensionIds.TryGetValue(configPath, out pluginContainer))
{
var plugin = new Lazy<PluginHolder>(() => CreatePlugin(configPath, type =>
typeof (AsyncWriteJournal).IsAssignableFrom(type)
? Dispatchers.DefaultDispatcherId
: DefaultPluginDispatcherId),
LazyThreadSafetyMode.ExecutionAndPublication);
pluginContainer = _journalPluginExtensionIds.AddOrUpdate(configPath, plugin, (key, old) => plugin);
}
var plugin = _journalPluginExtensionIds.GetOrAdd(configPath,
key => CreatePlugin(key, type => typeof (AsyncWriteJournal).IsAssignableFrom(type)
? Dispatchers.DefaultDispatcherId
: DefaultPluginDispatcherId));

return pluginContainer.Value.Ref;
return plugin.Ref;
}

/// <summary>
Expand All @@ -120,18 +109,12 @@ public IActorRef JournalFor(string journalPluginId)
public EventAdapters AdaptersFor(string journalPluginId)
{
var configPath = string.IsNullOrEmpty(journalPluginId) ? _defaultJournalPluginId.Value : journalPluginId;
Lazy<PluginHolder> pluginContainer;
if (!_journalPluginExtensionIds.TryGetValue(configPath, out pluginContainer))
{
var plugin = new Lazy<PluginHolder>(() =>
CreatePlugin(configPath, type => typeof (AsyncWriteJournal).IsAssignableFrom(type)
? Dispatchers.DefaultDispatcherId
: DefaultPluginDispatcherId),
LazyThreadSafetyMode.ExecutionAndPublication);
pluginContainer = _journalPluginExtensionIds.AddOrUpdate(configPath, plugin, (key, old) => plugin);
}
var plugin = _journalPluginExtensionIds.GetOrAdd(configPath,
key => CreatePlugin(key, type => typeof(AsyncWriteJournal).IsAssignableFrom(type)
? Dispatchers.DefaultDispatcherId
: DefaultPluginDispatcherId));

return pluginContainer.Value.Adapters;
return plugin.Adapters;
}

/// <summary>
Expand All @@ -142,7 +125,7 @@ public EventAdapters AdaptersFor(string journalPluginId)
internal EventAdapters AdaptersFor(IActorRef journalRef)
{
return _journalPluginExtensionIds.Values
.Select(ext => Equals(ext.Value.Ref, journalRef) ? ext.Value.Adapters : null)
.Select(ext => Equals(ext.Ref, journalRef) ? ext.Adapters : null)
.FirstOrDefault(r => r != null)
?? IdentityEventAdapters.Instance;
}
Expand Down

0 comments on commit ee31816

Please sign in to comment.