Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Journal and snapshot store should obtain their configuration from Persistence #147

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Akka.Cluster.Sharding" Version="$(AkkaVersion)" />
<PackageReference Include="Akka.TestKit" Version="$(AkkaVersion)" />
<PackageReference Include="Akka.Persistence.TCK" Version="$(AkkaVersion)" />
<PackageReference Include="Akka.TestKit.Xunit2" Version="$(AkkaVersion)" />
Expand Down
85 changes: 85 additions & 0 deletions src/Akka.Persistence.Redis.Tests/RedisMultiConfigurationSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Cluster.Sharding;
using Akka.Configuration;
using Akka.Persistence.Redis.Journal;
using Akka.Persistence.TCK;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.Redis.Tests
{
public class RedisMultiConfigurationSpec : PluginSpec
{
private static Config _config = ConfigurationFactory.ParseString(@"
akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster""
akka.cluster.sharding {
journal-plugin-id = ""akka.persistence.journal.sharding""
snapshot-plugin-id = ""akka.persistence.snapshot-store.sharding""
}
akka.persistence {
journal {
plugin = ""akka.persistence.journal.redis""
redis {
class = ""Akka.Persistence.Redis.Journal.RedisJournal, Akka.Persistence.Redis""
configuration-string = ""example""
plugin-dispatcher = ""akka.actor.default-dispatcher""
key-prefix = ""akka:persistence:journal""
}
sharding {
class = ""Akka.Persistence.Redis.Journal.RedisJournal, Akka.Persistence.Redis""
configuration-string = ""example""
plugin-dispatcher = ""akka.actor.default-dispatcher""
key-prefix = ""akka:persistence:sharding:journal""
}
}
snapshot-store {
plugin = ""akka.persistence.snapshot-store.redis""
redis {
class = ""Akka.Persistence.Redis.Snapshot.RedisSnapshotStore, Akka.Persistence.Redis""
configuration-string = ""example""
plugin-dispatcher = ""akka.actor.default-dispatcher""
key-prefix = ""akka:persistence:snapshot""
}
sharding {
class = ""Akka.Persistence.Redis.Snapshot.RedisSnapshotStore, Akka.Persistence.Redis""
configuration-string = ""example""
plugin-dispatcher = ""akka.actor.default-dispatcher""
key-prefix = ""akka:persistence:sharding:snapshot""
}
}
}");

public RedisMultiConfigurationSpec(ITestOutputHelper output)
: base(_config, nameof(RedisMultiConfigurationSpec), output)
{ }

[Fact]
public void PluginMustBeAbleToUseDifferentConfigurationForPersistence()
{
var cluster = ClusterSharding.Get(Sys);
cluster.Settings.JournalPluginId.Should().Be("akka.persistence.journal.sharding");
cluster.Settings.SnapshotPluginId.Should().Be("akka.persistence.snapshot-store.sharding");

var persistence = Persistence.Instance.Get(Sys);

var @ref = persistence.JournalFor("akka.persistence.journal.redis");
@ref.Should().NotBeNull();

@ref = persistence.SnapshotStoreFor("akka.persistence.snapshot-store.redis");
@ref.Should().NotBeNull();

@ref = persistence.JournalFor("akka.persistence.journal.sharding");
@ref.Should().NotBeNull();

@ref = persistence.SnapshotStoreFor("akka.persistence.snapshot-store.sharding");
@ref.Should().NotBeNull();
}
}
}
14 changes: 8 additions & 6 deletions src/Akka.Persistence.Redis.Tests/RedisSettingsSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,22 @@ public class RedisSettingsSpec : Akka.TestKit.Xunit2.TestKit
public void Redis_JournalSettings_must_have_default_values()
{
var redisPersistence = RedisPersistence.Get(Sys);
var settings = RedisSettings.Create(redisPersistence.DefaultJournalConfig);

redisPersistence.JournalSettings.ConfigurationString.Should().Be(string.Empty);
redisPersistence.JournalSettings.Database.Should().Be(0);
redisPersistence.JournalSettings.KeyPrefix.Should().Be(string.Empty);
settings.ConfigurationString.Should().Be(string.Empty);
settings.Database.Should().Be(0);
settings.KeyPrefix.Should().Be(string.Empty);
}

[Fact]
public void Redis_SnapshotStoreSettingsSettings_must_have_default_values()
{
var redisPersistence = RedisPersistence.Get(Sys);
var settings = RedisSettings.Create(redisPersistence.DefaultSnapshotConfig);

redisPersistence.SnapshotStoreSettings.ConfigurationString.Should().Be(string.Empty);
redisPersistence.SnapshotStoreSettings.Database.Should().Be(0);
redisPersistence.SnapshotStoreSettings.KeyPrefix.Should().Be(string.Empty);
settings.ConfigurationString.Should().Be(string.Empty);
settings.Database.Should().Be(0);
settings.KeyPrefix.Should().Be(string.Empty);
}
}
}
6 changes: 4 additions & 2 deletions src/Akka.Persistence.Redis/Journal/RedisJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Persistence.Journal;
using Akka.Persistence.Redis.Query;
using Akka.Util.Internal;
Expand All @@ -19,6 +20,7 @@ namespace Akka.Persistence.Redis.Journal
{
public class RedisJournal : AsyncWriteJournal
{
protected static readonly RedisPersistence Extension = RedisPersistence.Get(Context.System);
private readonly HashSet<IActorRef> _newEventsSubscriber = new HashSet<IActorRef>();

private readonly RedisSettings _settings;
Expand All @@ -31,9 +33,9 @@ public class RedisJournal : AsyncWriteJournal

protected bool HasNewEventSubscribers => _newEventsSubscriber.Count != 0;

public RedisJournal()
public RedisJournal(Config journalConfig)
{
_settings = RedisPersistence.Get(Context.System).JournalSettings;
_settings = RedisSettings.Create(journalConfig.WithFallback(Extension.DefaultJournalConfig));
_journalHelper = new JournalHelper(Context.System, _settings.KeyPrefix);
_system = Context.System;
_database = new Lazy<IDatabase>(() =>
Expand Down
15 changes: 14 additions & 1 deletion src/Akka.Persistence.Redis/RedisPersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public static RedisSettings Create(Config config)

public class RedisPersistence : IExtension
{
public const string JournalConfigPath = "akka.persistence.journal.redis";
public const string SnapshotConfigPath = "akka.persistence.snapshot-store.redis";

public static RedisPersistence Get(ActorSystem system)
{
return system.WithExtension<RedisPersistence, RedisPersistenceProvider>();
Expand All @@ -49,12 +52,22 @@ public static Config DefaultConfig()
return ConfigurationFactory.FromResource<RedisPersistence>("Akka.Persistence.Redis.reference.conf");
}

[Obsolete("This returns the default journal settings, not the current journal settings.")]
public RedisSettings JournalSettings { get; }

[Obsolete("This returns the default snapshot settings, not the current snapshot settings.")]
public RedisSettings SnapshotStoreSettings { get; }

public Config DefaultJournalConfig { get; }
public Config DefaultSnapshotConfig { get; }

public RedisPersistence(ExtendedActorSystem system)
{
system.Settings.InjectTopLevelFallback(DefaultConfig());
var defaultConfig = DefaultConfig();
system.Settings.InjectTopLevelFallback(defaultConfig);

DefaultJournalConfig = defaultConfig.GetConfig(JournalConfigPath);
DefaultSnapshotConfig = defaultConfig.GetConfig(SnapshotConfigPath);

JournalSettings = RedisSettings.Create(system.Settings.Config.GetConfig("akka.persistence.journal.redis"));
SnapshotStoreSettings =
Expand Down
16 changes: 7 additions & 9 deletions src/Akka.Persistence.Redis/Snapshot/RedisSnapshotStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,26 @@
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Persistence.Snapshot;
using StackExchange.Redis;

namespace Akka.Persistence.Redis.Snapshot
{
public class RedisSnapshotStore : SnapshotStore
{
protected static readonly RedisPersistence Extension = RedisPersistence.Get(Context.System);

private readonly RedisSettings _settings;
private Lazy<IDatabase> _database;
private ActorSystem _system;
private readonly Lazy<IDatabase> _database;
private readonly ActorSystem _system;
public IDatabase Database => _database.Value;

public bool IsClustered { get; private set; }

public RedisSnapshotStore()
{
_settings = RedisPersistence.Get(Context.System).SnapshotStoreSettings;
}

protected override void PreStart()
public RedisSnapshotStore(Config snapshotConfig)
{
base.PreStart();
_settings = RedisSettings.Create(snapshotConfig.WithFallback(Extension.DefaultSnapshotConfig));

_system = Context.System;
_database = new Lazy<IDatabase>(() =>
Expand Down