diff --git a/src/Akka.Persistence.Redis.Tests/Akka.Persistence.Redis.Tests.csproj b/src/Akka.Persistence.Redis.Tests/Akka.Persistence.Redis.Tests.csproj index 60d5eed..6f5dd8f 100644 --- a/src/Akka.Persistence.Redis.Tests/Akka.Persistence.Redis.Tests.csproj +++ b/src/Akka.Persistence.Redis.Tests/Akka.Persistence.Redis.Tests.csproj @@ -34,6 +34,7 @@ + diff --git a/src/Akka.Persistence.Redis.Tests/RedisMultiConfigurationSpec.cs b/src/Akka.Persistence.Redis.Tests/RedisMultiConfigurationSpec.cs new file mode 100644 index 0000000..f54586c --- /dev/null +++ b/src/Akka.Persistence.Redis.Tests/RedisMultiConfigurationSpec.cs @@ -0,0 +1,85 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Text; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Cluster.Sharding; +using Akka.Configuration; +using Akka.Persistence.Redis.Journal; +using Akka.Persistence.TCK; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Persistence.Redis.Tests +{ + public class RedisMultiConfigurationSpec : PluginSpec + { + private static Config _config = ConfigurationFactory.ParseString(@" +akka.actor.provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"" +akka.cluster.sharding { + journal-plugin-id = ""akka.persistence.journal.sharding"" + snapshot-plugin-id = ""akka.persistence.snapshot-store.sharding"" +} +akka.persistence { + journal { + plugin = ""akka.persistence.journal.redis"" + redis { + class = ""Akka.Persistence.Redis.Journal.RedisJournal, Akka.Persistence.Redis"" + configuration-string = ""example"" + plugin-dispatcher = ""akka.actor.default-dispatcher"" + key-prefix = ""akka:persistence:journal"" + } + sharding { + class = ""Akka.Persistence.Redis.Journal.RedisJournal, Akka.Persistence.Redis"" + configuration-string = ""example"" + plugin-dispatcher = ""akka.actor.default-dispatcher"" + key-prefix = ""akka:persistence:sharding:journal"" + } + } + snapshot-store { + plugin = ""akka.persistence.snapshot-store.redis"" + redis { + class = ""Akka.Persistence.Redis.Snapshot.RedisSnapshotStore, Akka.Persistence.Redis"" + configuration-string = ""example"" + plugin-dispatcher = ""akka.actor.default-dispatcher"" + key-prefix = ""akka:persistence:snapshot"" + } + sharding { + class = ""Akka.Persistence.Redis.Snapshot.RedisSnapshotStore, Akka.Persistence.Redis"" + configuration-string = ""example"" + plugin-dispatcher = ""akka.actor.default-dispatcher"" + key-prefix = ""akka:persistence:sharding:snapshot"" + } + } +}"); + + public RedisMultiConfigurationSpec(ITestOutputHelper output) + : base(_config, nameof(RedisMultiConfigurationSpec), output) + { } + + [Fact] + public void PluginMustBeAbleToUseDifferentConfigurationForPersistence() + { + var cluster = ClusterSharding.Get(Sys); + cluster.Settings.JournalPluginId.Should().Be("akka.persistence.journal.sharding"); + cluster.Settings.SnapshotPluginId.Should().Be("akka.persistence.snapshot-store.sharding"); + + var persistence = Persistence.Instance.Get(Sys); + + var @ref = persistence.JournalFor("akka.persistence.journal.redis"); + @ref.Should().NotBeNull(); + + @ref = persistence.SnapshotStoreFor("akka.persistence.snapshot-store.redis"); + @ref.Should().NotBeNull(); + + @ref = persistence.JournalFor("akka.persistence.journal.sharding"); + @ref.Should().NotBeNull(); + + @ref = persistence.SnapshotStoreFor("akka.persistence.snapshot-store.sharding"); + @ref.Should().NotBeNull(); + } + } +} diff --git a/src/Akka.Persistence.Redis.Tests/RedisSettingsSpec.cs b/src/Akka.Persistence.Redis.Tests/RedisSettingsSpec.cs index 6aa4093..678ea91 100644 --- a/src/Akka.Persistence.Redis.Tests/RedisSettingsSpec.cs +++ b/src/Akka.Persistence.Redis.Tests/RedisSettingsSpec.cs @@ -15,20 +15,22 @@ public class RedisSettingsSpec : Akka.TestKit.Xunit2.TestKit public void Redis_JournalSettings_must_have_default_values() { var redisPersistence = RedisPersistence.Get(Sys); + var settings = RedisSettings.Create(redisPersistence.DefaultJournalConfig); - redisPersistence.JournalSettings.ConfigurationString.Should().Be(string.Empty); - redisPersistence.JournalSettings.Database.Should().Be(0); - redisPersistence.JournalSettings.KeyPrefix.Should().Be(string.Empty); + settings.ConfigurationString.Should().Be(string.Empty); + settings.Database.Should().Be(0); + settings.KeyPrefix.Should().Be(string.Empty); } [Fact] public void Redis_SnapshotStoreSettingsSettings_must_have_default_values() { var redisPersistence = RedisPersistence.Get(Sys); + var settings = RedisSettings.Create(redisPersistence.DefaultSnapshotConfig); - redisPersistence.SnapshotStoreSettings.ConfigurationString.Should().Be(string.Empty); - redisPersistence.SnapshotStoreSettings.Database.Should().Be(0); - redisPersistence.SnapshotStoreSettings.KeyPrefix.Should().Be(string.Empty); + settings.ConfigurationString.Should().Be(string.Empty); + settings.Database.Should().Be(0); + settings.KeyPrefix.Should().Be(string.Empty); } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Redis/Journal/RedisJournal.cs b/src/Akka.Persistence.Redis/Journal/RedisJournal.cs index 65ca8ff..8530c5c 100644 --- a/src/Akka.Persistence.Redis/Journal/RedisJournal.cs +++ b/src/Akka.Persistence.Redis/Journal/RedisJournal.cs @@ -10,6 +10,7 @@ using System.Linq; using System.Threading.Tasks; using Akka.Actor; +using Akka.Configuration; using Akka.Persistence.Journal; using Akka.Persistence.Redis.Query; using Akka.Util.Internal; @@ -19,6 +20,7 @@ namespace Akka.Persistence.Redis.Journal { public class RedisJournal : AsyncWriteJournal { + protected static readonly RedisPersistence Extension = RedisPersistence.Get(Context.System); private readonly HashSet _newEventsSubscriber = new HashSet(); private readonly RedisSettings _settings; @@ -31,9 +33,9 @@ public class RedisJournal : AsyncWriteJournal protected bool HasNewEventSubscribers => _newEventsSubscriber.Count != 0; - public RedisJournal() + public RedisJournal(Config journalConfig) { - _settings = RedisPersistence.Get(Context.System).JournalSettings; + _settings = RedisSettings.Create(journalConfig.WithFallback(Extension.DefaultJournalConfig)); _journalHelper = new JournalHelper(Context.System, _settings.KeyPrefix); _system = Context.System; _database = new Lazy(() => diff --git a/src/Akka.Persistence.Redis/RedisPersistence.cs b/src/Akka.Persistence.Redis/RedisPersistence.cs index 7538926..d276ea9 100644 --- a/src/Akka.Persistence.Redis/RedisPersistence.cs +++ b/src/Akka.Persistence.Redis/RedisPersistence.cs @@ -39,6 +39,9 @@ public static RedisSettings Create(Config config) public class RedisPersistence : IExtension { + public const string JournalConfigPath = "akka.persistence.journal.redis"; + public const string SnapshotConfigPath = "akka.persistence.snapshot-store.redis"; + public static RedisPersistence Get(ActorSystem system) { return system.WithExtension(); @@ -49,12 +52,22 @@ public static Config DefaultConfig() return ConfigurationFactory.FromResource("Akka.Persistence.Redis.reference.conf"); } + [Obsolete("This returns the default journal settings, not the current journal settings.")] public RedisSettings JournalSettings { get; } + + [Obsolete("This returns the default snapshot settings, not the current snapshot settings.")] public RedisSettings SnapshotStoreSettings { get; } + public Config DefaultJournalConfig { get; } + public Config DefaultSnapshotConfig { get; } + public RedisPersistence(ExtendedActorSystem system) { - system.Settings.InjectTopLevelFallback(DefaultConfig()); + var defaultConfig = DefaultConfig(); + system.Settings.InjectTopLevelFallback(defaultConfig); + + DefaultJournalConfig = defaultConfig.GetConfig(JournalConfigPath); + DefaultSnapshotConfig = defaultConfig.GetConfig(SnapshotConfigPath); JournalSettings = RedisSettings.Create(system.Settings.Config.GetConfig("akka.persistence.journal.redis")); SnapshotStoreSettings = diff --git a/src/Akka.Persistence.Redis/Snapshot/RedisSnapshotStore.cs b/src/Akka.Persistence.Redis/Snapshot/RedisSnapshotStore.cs index ed3ae0a..b51ab84 100644 --- a/src/Akka.Persistence.Redis/Snapshot/RedisSnapshotStore.cs +++ b/src/Akka.Persistence.Redis/Snapshot/RedisSnapshotStore.cs @@ -8,6 +8,7 @@ using System.Linq; using System.Threading.Tasks; using Akka.Actor; +using Akka.Configuration; using Akka.Persistence.Snapshot; using StackExchange.Redis; @@ -15,21 +16,18 @@ namespace Akka.Persistence.Redis.Snapshot { public class RedisSnapshotStore : SnapshotStore { + protected static readonly RedisPersistence Extension = RedisPersistence.Get(Context.System); + private readonly RedisSettings _settings; - private Lazy _database; - private ActorSystem _system; + private readonly Lazy _database; + private readonly ActorSystem _system; public IDatabase Database => _database.Value; public bool IsClustered { get; private set; } - public RedisSnapshotStore() - { - _settings = RedisPersistence.Get(Context.System).SnapshotStoreSettings; - } - - protected override void PreStart() + public RedisSnapshotStore(Config snapshotConfig) { - base.PreStart(); + _settings = RedisSettings.Create(snapshotConfig.WithFallback(Extension.DefaultSnapshotConfig)); _system = Context.System; _database = new Lazy(() =>