From 67bbd465310aa7d0d23063d54ade0399ab3af0ca Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 26 Aug 2022 01:54:31 +0700 Subject: [PATCH] Add/update Setup classes --- .../AzurePersistenceConfigSpec.cs | 207 ++++++++++++++---- .../AzurePersistence.cs | 2 +- .../Journal/AzureTableStorageJournal.cs | 20 +- .../AzureTableStorageJournalSettings.cs | 138 +++++++++++- .../Journal/AzureTableStorageJournalSetup.cs | 92 ++++++++ .../Query/AzureTableStorageReadJournal.cs | 23 +- .../AzureTableStorageReadJournalSetup.cs | 35 +++ .../Snapshot/AzureBlobSnapshotSetup.cs | 113 ++++++++-- .../Snapshot/AzureBlobSnapshotStore.cs | 24 +- .../AzureBlobSnapshotStoreSettings.cs | 130 +++++++++++ 10 files changed, 694 insertions(+), 90 deletions(-) create mode 100644 src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSetup.cs create mode 100644 src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournalSetup.cs diff --git a/src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs b/src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs index 2bdc59b..3ff209c 100644 --- a/src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs @@ -9,84 +9,199 @@ using Akka.Persistence.Azure.Journal; using Akka.Persistence.Azure.Query; using Akka.Persistence.Azure.Snapshot; +using Azure.Data.Tables; +using Azure.Identity; +using Azure.Storage.Blobs; using Azure.Storage.Blobs.Models; using FluentAssertions; +using FluentAssertions.Extensions; using Xunit; namespace Akka.Persistence.Azure.Tests { public class AzurePersistenceConfigSpec { + private const string SnapshotStorePath = "akka.persistence.snapshot-store.azure-blob-store"; + private const string JournalPath = "akka.persistence.journal.azure-table"; + + private static readonly AzureBlobSnapshotStoreSettings DefaultSnapshotSettings = + AzureBlobSnapshotStoreSettings.Create(AzurePersistence.DefaultConfig.GetConfig(SnapshotStorePath)); + + private static readonly AzureTableStorageJournalSettings DefaultJournalSettings = + AzureTableStorageJournalSettings.Create(AzurePersistence.DefaultConfig.GetConfig(JournalPath)); + [Fact] public void ShouldLoadDefaultConfig() { - var defaultConfig = AzurePersistence.DefaultConfig; - defaultConfig.HasPath("akka.persistence.journal.azure-table").Should().BeTrue(); - defaultConfig.HasPath("akka.persistence.snapshot-store.azure-blob-store").Should().BeTrue(); + AzurePersistence.DefaultConfig.HasPath(SnapshotStorePath).Should().BeTrue(); + AzurePersistence.DefaultConfig.HasPath(JournalPath).Should().BeTrue(); + AzurePersistence.DefaultConfig.HasPath(AzureTableStorageReadJournal.Identifier).Should().BeTrue(); } [Fact] public void ShouldParseDefaultSnapshotConfig() { - var blobSettings = - AzureBlobSnapshotStoreSettings.Create( - ConfigurationFactory.ParseString(@"akka.persistence.snapshot-store.azure-blob-store{ - connection-string = foo - container-name = bar - }").WithFallback(AzurePersistence.DefaultConfig) - .GetConfig("akka.persistence.snapshot-store.azure-blob-store")); + var settings = + AzureBlobSnapshotStoreSettings.Create(AzurePersistence.DefaultConfig.GetConfig(SnapshotStorePath)); - blobSettings.ContainerName.Should().Be("bar"); - blobSettings.ConnectionString.Should().Be("foo"); - blobSettings.ConnectTimeout.Should().Be(TimeSpan.FromSeconds(3)); - blobSettings.RequestTimeout.Should().Be(TimeSpan.FromSeconds(3)); - blobSettings.VerboseLogging.Should().BeFalse(); - blobSettings.ContainerPublicAccessType.Should().Be(PublicAccessType.None); + settings.ConnectionString.Should().BeEmpty(); + settings.ContainerName.Should().Be("akka-persistence-default-container"); + settings.ConnectTimeout.Should().Be(3.Seconds()); + settings.RequestTimeout.Should().Be(3.Seconds()); + settings.VerboseLogging.Should().BeFalse(); + settings.Development.Should().BeFalse(); + settings.AutoInitialize.Should().BeTrue(); + settings.ContainerPublicAccessType.Should().Be(PublicAccessType.None); + settings.ServiceUri.Should().BeNull(); + settings.DefaultAzureCredential.Should().BeNull(); + settings.BlobClientOptions.Should().BeNull(); } - [Fact] - public void ShouldProvideDefaultContainerNameValue() + [Fact(DisplayName = "AzureBlobSnapshotStoreSettings With overrides should override default values")] + public void SnapshotSettingsWithMethodsTest() { - var blobSettings = - AzureBlobSnapshotStoreSettings.Create( - ConfigurationFactory.ParseString(@"akka.persistence.snapshot-store.azure-blob-store{ - connection-string = foo - }").WithFallback(AzurePersistence.DefaultConfig) - .GetConfig("akka.persistence.snapshot-store.azure-blob-store")); + var uri = new Uri("https://whatever.com"); + var credentials = new DefaultAzureCredential(); + var options = new BlobClientOptions(); + var settings = DefaultSnapshotSettings + .WithConnectionString("abc") + .WithContainerName("bcd") + .WithConnectTimeout(1.Seconds()) + .WithRequestTimeout(2.Seconds()) + .WithVerboseLogging(true) + .WithDevelopment(true) + .WithAutoInitialize(false) + .WithContainerPublicAccessType(PublicAccessType.Blob) + .WithAzureCredential(uri, credentials, options); + + settings.ConnectionString.Should().Be("abc"); + settings.ContainerName.Should().Be("bcd"); + settings.ConnectTimeout.Should().Be(1.Seconds()); + settings.RequestTimeout.Should().Be(2.Seconds()); + settings.VerboseLogging.Should().BeTrue(); + settings.Development.Should().BeTrue(); + settings.AutoInitialize.Should().BeFalse(); + settings.ContainerPublicAccessType.Should().Be(PublicAccessType.Blob); + settings.ServiceUri.Should().Be(uri); + settings.DefaultAzureCredential.Should().Be(credentials); + settings.BlobClientOptions.Should().Be(options); + } - blobSettings.ContainerName.Should().Be("akka-persistence-default-container"); + [Fact(DisplayName = "AzureBlobSnapshotStoreSetup should override settings values")] + public void SnapshotSetupTest() + { + var uri = new Uri("https://whatever.com"); + var credentials = new DefaultAzureCredential(); + var options = new BlobClientOptions(); + var setup = new AzureBlobSnapshotSetup + { + ConnectionString = "abc", + ContainerName = "bcd", + ConnectTimeout = 1.Seconds(), + RequestTimeout = 2.Seconds(), + VerboseLogging = true, + Development = true, + AutoInitialize = false, + ContainerPublicAccessType = PublicAccessType.Blob, + ServiceUri = uri, + DefaultAzureCredential = credentials, + BlobClientOptions = options + }; + + var settings = setup.Apply(DefaultSnapshotSettings); + + settings.ConnectionString.Should().Be("abc"); + settings.ContainerName.Should().Be("bcd"); + settings.ConnectTimeout.Should().Be(1.Seconds()); + settings.RequestTimeout.Should().Be(2.Seconds()); + settings.VerboseLogging.Should().BeTrue(); + settings.Development.Should().BeTrue(); + settings.AutoInitialize.Should().BeFalse(); + settings.ContainerPublicAccessType.Should().Be(PublicAccessType.Blob); + settings.ServiceUri.Should().Be(uri); + settings.DefaultAzureCredential.Should().Be(credentials); + settings.BlobClientOptions.Should().Be(options); } [Fact] public void ShouldParseTableConfig() { - var tableSettings = - AzureTableStorageJournalSettings.Create( - ConfigurationFactory.ParseString(@"akka.persistence.journal.azure-table{ - connection-string = foo - table-name = bar - }").WithFallback(AzurePersistence.DefaultConfig) - .GetConfig("akka.persistence.journal.azure-table")); + var settings = DefaultJournalSettings; - tableSettings.TableName.Should().Be("bar"); - tableSettings.ConnectionString.Should().Be("foo"); - tableSettings.ConnectTimeout.Should().Be(TimeSpan.FromSeconds(3)); - tableSettings.RequestTimeout.Should().Be(TimeSpan.FromSeconds(3)); - tableSettings.VerboseLogging.Should().BeFalse(); + settings.ConnectionString.Should().BeEmpty(); + settings.TableName.Should().Be("AkkaPersistenceDefaultTable"); + settings.ConnectTimeout.Should().Be(3.Seconds()); + settings.RequestTimeout.Should().Be(3.Seconds()); + settings.VerboseLogging.Should().BeFalse(); + settings.Development.Should().BeFalse(); + settings.AutoInitialize.Should().BeTrue(); + settings.ServiceUri.Should().BeNull(); + settings.DefaultAzureCredential.Should().BeNull(); + settings.TableClientOptions.Should().BeNull(); } - [Fact] - public void ShouldProvideDefaultTableNameValue() + [Fact(DisplayName = "AzureTableStorageJournalSettings With overrides should override default values")] + public void JournalSettingsWithMethodsTest() { - var tableSettings = - AzureTableStorageJournalSettings.Create( - ConfigurationFactory.ParseString(@"akka.persistence.journal.azure-table{ - connection-string = foo - }").WithFallback(AzurePersistence.DefaultConfig) - .GetConfig("akka.persistence.journal.azure-table")); - tableSettings.TableName.Should().Be("AkkaPersistenceDefaultTable"); + var uri = new Uri("https://whatever.com"); + var credentials = new DefaultAzureCredential(); + var options = new TableClientOptions(); + var settings = DefaultJournalSettings + .WithConnectionString("abc") + .WithTableName("bcd") + .WithConnectTimeout(1.Seconds()) + .WithRequestTimeout(2.Seconds()) + .WithVerboseLogging(true) + .WithDevelopment(true) + .WithAutoInitialize(false) + .WithAzureCredential(uri, credentials, options); + + settings.ConnectionString.Should().Be("abc"); + settings.TableName.Should().Be("bcd"); + settings.ConnectTimeout.Should().Be(1.Seconds()); + settings.RequestTimeout.Should().Be(2.Seconds()); + settings.VerboseLogging.Should().BeTrue(); + settings.Development.Should().BeTrue(); + settings.AutoInitialize.Should().BeFalse(); + settings.ServiceUri.Should().Be(uri); + settings.DefaultAzureCredential.Should().Be(credentials); + settings.TableClientOptions.Should().Be(options); } + [Fact(DisplayName = "AzureTableStorageJournalSetup should override settings values")] + public void JournalSetupTest() + { + var uri = new Uri("https://whatever.com"); + var credentials = new DefaultAzureCredential(); + var options = new TableClientOptions(); + var setup = new AzureTableStorageJournalSetup + { + ConnectionString = "abc", + TableName = "bcd", + ConnectTimeout = 1.Seconds(), + RequestTimeout = 2.Seconds(), + VerboseLogging = true, + Development = true, + AutoInitialize = false, + ServiceUri = uri, + DefaultAzureCredential = credentials, + TableClientOptions = options + }; + + var settings = setup.Apply(DefaultJournalSettings); + + settings.ConnectionString.Should().Be("abc"); + settings.TableName.Should().Be("bcd"); + settings.ConnectTimeout.Should().Be(1.Seconds()); + settings.RequestTimeout.Should().Be(2.Seconds()); + settings.VerboseLogging.Should().BeTrue(); + settings.Development.Should().BeTrue(); + settings.AutoInitialize.Should().BeFalse(); + settings.ServiceUri.Should().Be(uri); + settings.DefaultAzureCredential.Should().Be(credentials); + settings.TableClientOptions.Should().Be(options); + } + [Theory] [InlineData("fo", "Invalid table name length")] [InlineData("1foo", "Invalid table name")] diff --git a/src/Akka.Persistence.Azure/AzurePersistence.cs b/src/Akka.Persistence.Azure/AzurePersistence.cs index 478f8a5..7abeb4b 100644 --- a/src/Akka.Persistence.Azure/AzurePersistence.cs +++ b/src/Akka.Persistence.Azure/AzurePersistence.cs @@ -34,7 +34,7 @@ public AzurePersistence(ActorSystem system, AzureTableStorageJournalSettings tab /// /// The default HOCON configuration for . /// - public static Config DefaultConfig => + public static readonly Config DefaultConfig = ConfigurationFactory.FromResource("Akka.Persistence.Azure.reference.conf"); /// diff --git a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs index 1891569..3b92549 100644 --- a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs +++ b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs @@ -61,8 +61,26 @@ public AzureTableStorageJournal(Config config = null) AzurePersistence.Get(Context.System).TableSettings : AzureTableStorageJournalSettings.Create(config); + var setup = Context.System.Settings.Setup.Get(); + if (setup.HasValue) + _settings = setup.Value.Apply(_settings); + _serialization = new SerializationHelper(Context.System); - _tableServiceClient = new TableServiceClient(_settings.ConnectionString); + + if (_settings.Development) + { + _tableServiceClient = new TableServiceClient(connectionString: "UseDevelopmentStorage=true"); + } + else + { + // Use DefaultAzureCredential if both ServiceUri and DefaultAzureCredential are populated in the settings + _tableServiceClient = _settings.ServiceUri != null && _settings.DefaultAzureCredential != null + ? new TableServiceClient( + endpoint: _settings.ServiceUri, + tokenCredential: _settings.DefaultAzureCredential, + options: _settings.TableClientOptions) + : new TableServiceClient(connectionString: _settings.ConnectionString); + } } public TableClient Table diff --git a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs index 8ac6f0b..95ed994 100644 --- a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs +++ b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs @@ -6,8 +6,11 @@ using System; using System.Linq; +using Akka.Actor; using Akka.Configuration; using Akka.Persistence.Azure.Util; +using Azure.Data.Tables; +using Azure.Identity; namespace Akka.Persistence.Azure.Journal { @@ -18,6 +21,7 @@ public sealed class AzureTableStorageJournalSettings { private static readonly string[] ReservedTableNames = {"tables"}; + [Obsolete] public AzureTableStorageJournalSettings( string connectionString, string tableName, @@ -26,6 +30,30 @@ public AzureTableStorageJournalSettings( bool verboseLogging, bool development, bool autoInitialize) + : this( + connectionString: connectionString, + tableName: tableName, + connectTimeout: connectTimeout, + requestTimeout: requestTimeout, + verboseLogging: verboseLogging, + development: development, + autoInitialize: autoInitialize, + serviceUri: null, + defaultAzureCredential: null, + tableClientOptions: null) + { } + + public AzureTableStorageJournalSettings( + string connectionString, + string tableName, + TimeSpan connectTimeout, + TimeSpan requestTimeout, + bool verboseLogging, + bool development, + bool autoInitialize, + Uri serviceUri, + DefaultAzureCredential defaultAzureCredential, + TableClientOptions tableClientOptions) { if(string.IsNullOrWhiteSpace(tableName)) throw new ConfigurationException("[AzureTableStorageJournal] Table name is null or empty."); @@ -45,6 +73,9 @@ public AzureTableStorageJournalSettings( VerboseLogging = verboseLogging; Development = development; AutoInitialize = autoInitialize; + ServiceUri = serviceUri; + DefaultAzureCredential = defaultAzureCredential; + TableClientOptions = tableClientOptions; } /// @@ -72,10 +103,97 @@ public AzureTableStorageJournalSettings( /// public bool VerboseLogging { get; } + /// + /// Flag that we're running in development mode. When this is set, and + /// will be ignored, replaced with "UseDevelopmentStorage=true" for local + /// connection to Azurite. + /// public bool Development { get; } + /// + /// Automatically create the Table Storage table if no existing table is found + /// public bool AutoInitialize { get; } + /// + /// A referencing the blob service. + /// This is likely to be similar to "https://{account_name}.table.core.windows.net". + /// + public Uri ServiceUri { get; } + + /// + /// The used to sign API requests. + /// + public DefaultAzureCredential DefaultAzureCredential { get; } + + /// + /// Optional client options that define the transport pipeline policies for authentication, + /// retries, etc., that are applied to every request. + /// + public TableClientOptions TableClientOptions { get; } + + public AzureTableStorageJournalSettings WithConnectionString(string connectionString) + => Copy(connectionString: connectionString); + public AzureTableStorageJournalSettings WithTableName(string tableName) + => Copy(tableName: tableName); + public AzureTableStorageJournalSettings WithConnectTimeout(TimeSpan connectTimeout) + => Copy(connectTimeout: connectTimeout); + public AzureTableStorageJournalSettings WithRequestTimeout(TimeSpan requestTimeout) + => Copy(requestTimeout: requestTimeout); + public AzureTableStorageJournalSettings WithVerboseLogging(bool verboseLogging) + => Copy(verboseLogging: verboseLogging); + public AzureTableStorageJournalSettings WithDevelopment(bool development) + => Copy(development: development); + public AzureTableStorageJournalSettings WithAutoInitialize(bool autoInitialize) + => Copy(autoInitialize: autoInitialize); + public AzureTableStorageJournalSettings WithAzureCredential( + Uri serviceUri, + DefaultAzureCredential defaultAzureCredential, + TableClientOptions tableClientOptions = null) + => Copy( + serviceUri: serviceUri, + defaultAzureCredential: defaultAzureCredential, + tableClientOptions: tableClientOptions); + + private AzureTableStorageJournalSettings Copy( + string connectionString = null, + string tableName = null, + TimeSpan? connectTimeout = null, + TimeSpan? requestTimeout = null, + bool? verboseLogging = null, + bool? development = null, + bool? autoInitialize = null, + Uri serviceUri = null, + DefaultAzureCredential defaultAzureCredential = null, + TableClientOptions tableClientOptions = null) + => new AzureTableStorageJournalSettings( + connectionString: connectionString ?? ConnectionString, + tableName: tableName ?? TableName, + connectTimeout: connectTimeout ?? ConnectTimeout, + requestTimeout: requestTimeout ?? RequestTimeout, + verboseLogging: verboseLogging ?? VerboseLogging, + development: development ?? Development, + autoInitialize: autoInitialize ?? AutoInitialize, + serviceUri: serviceUri ?? ServiceUri, + defaultAzureCredential: defaultAzureCredential ?? DefaultAzureCredential, + tableClientOptions: tableClientOptions ?? TableClientOptions); + + /// + /// Creates an instance using the + /// `akka.persistence.journal.azure-table` HOCON configuration section inside + /// the settings. + /// + /// The to extract the configuration from + /// A new settings instance. + public static AzureTableStorageJournalSettings Create(ActorSystem system) + { + var config = system.Settings.Config.GetConfig("akka.persistence.journal.azure-table"); + if (config is null) + throw new ConfigurationException( + "Could not find HOCON config at path 'akka.persistence.journal.azure-table'"); + return Create(config); + } + /// /// Creates an instance using the /// `akka.persistence.journal.azure-table` HOCON configuration section. @@ -84,6 +202,9 @@ public AzureTableStorageJournalSettings( /// A new settings instance. public static AzureTableStorageJournalSettings Create(Config config) { + if (config is null) + throw new ArgumentNullException(nameof(config)); + var connectionString = config.GetString("connection-string"); var tableName = config.GetString("table-name"); var connectTimeout = config.GetTimeSpan("connect-timeout", TimeSpan.FromSeconds(3)); @@ -93,13 +214,16 @@ public static AzureTableStorageJournalSettings Create(Config config) var autoInitialize = config.GetBoolean("auto-initialize", true); return new AzureTableStorageJournalSettings( - connectionString, - tableName, - connectTimeout, - requestTimeout, - verbose, - development, - autoInitialize); + connectionString: connectionString, + tableName: tableName, + connectTimeout: connectTimeout, + requestTimeout: requestTimeout, + verboseLogging: verbose, + development: development, + autoInitialize: autoInitialize, + serviceUri: null, + defaultAzureCredential: null, + tableClientOptions: null); } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSetup.cs b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSetup.cs new file mode 100644 index 0000000..3e42ad3 --- /dev/null +++ b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSetup.cs @@ -0,0 +1,92 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015 - 2018 Petabridge, LLC +// +// ----------------------------------------------------------------------- + +using System; +using Akka.Actor.Setup; +using Azure.Data.Tables; +using Azure.Identity; + +namespace Akka.Persistence.Azure.Journal +{ + public class AzureTableStorageJournalSetup : Setup + { + /// + /// The connection string for connecting to Windows Azure table storage. + /// + public string ConnectionString { get; set; } + + /// + /// The table of the table we'll be connecting to. + /// + public string TableName { get; set; } + + /// + /// Initial timeout to use when connecting to Azure Table Storage for the first time. + /// + public TimeSpan? ConnectTimeout { get; set; } + + /// + /// Timeouts for individual read, write, and delete requests to Azure Table Storage. + /// + public TimeSpan? RequestTimeout { get; set; } + + /// + /// For debugging purposes only. Logs every individual operation to Azure table storage. + /// + public bool? VerboseLogging { get; set; } + + /// + /// Flag that we're running in development mode. When this is set, and + /// will be ignored, replaced with "UseDevelopmentStorage=true" for local + /// connection to Azurite. + /// + public bool? Development { get; set; } + + /// + /// Automatically create the Table Storage table if no existing table is found + /// + public bool? AutoInitialize { get; set; } + + /// + /// A referencing the blob service. + /// This is likely to be similar to "https://{account_name}.table.core.windows.net". + /// + public Uri ServiceUri { get; set; } + + /// + /// The used to sign API requests. + /// + public DefaultAzureCredential DefaultAzureCredential { get; set; } + + /// + /// Optional client options that define the transport pipeline policies for authentication, + /// retries, etc., that are applied to every request. + /// + public TableClientOptions TableClientOptions { get; set; } + + internal AzureTableStorageJournalSettings Apply(AzureTableStorageJournalSettings settings) + { + if (ConnectionString != null) + settings = settings.WithConnectionString(ConnectionString); + if (TableName != null) + settings = settings.WithTableName(TableName); + if (ConnectTimeout != null) + settings = settings.WithConnectTimeout(ConnectTimeout.Value); + if (RequestTimeout != null) + settings = settings.WithRequestTimeout(RequestTimeout.Value); + if (VerboseLogging != null) + settings = settings.WithVerboseLogging(VerboseLogging.Value); + if (Development != null) + settings = settings.WithDevelopment(Development.Value); + if (AutoInitialize != null) + settings = settings.WithAutoInitialize(AutoInitialize.Value); + if (ServiceUri != null && DefaultAzureCredential != null) + settings = settings.WithAzureCredential(ServiceUri, DefaultAzureCredential, TableClientOptions); + + return settings; + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournal.cs b/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournal.cs index ac61994..aaa35d2 100644 --- a/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournal.cs +++ b/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournal.cs @@ -17,26 +17,37 @@ public class AzureTableStorageReadJournal : IReadJournal, IEventsByTagQuery, ICurrentEventsByTagQuery { - public static string Identifier = "akka.persistence.query.journal.azure-table"; + public const string Identifier = "akka.persistence.query.journal.azure-table"; private readonly int _maxBufferSize; private readonly TimeSpan _refreshInterval; private readonly string _writeJournalPluginId; /// - /// Returns a default query configuration for akka persistence SQLite-based journals and snapshot stores. + /// Returns a default query configuration for akka persistence Azure-based journals and snapshot stores. /// /// - public static Config DefaultConfiguration() - { - return ConfigurationFactory.FromResource("Akka.Persistence.Azure.reference.conf"); - } + // NOTE: Do NOT remove this method, this is being called through reflection magic code + // in Akka.Persistence.Query.PersistenceQuery.GetDefaultConfig() + public static Config DefaultConfiguration() => AzurePersistence.DefaultConfig; public AzureTableStorageReadJournal(ExtendedActorSystem system, Config config) { _maxBufferSize = config.GetInt("max-buffer-size"); _refreshInterval = config.GetTimeSpan("refresh-interval"); _writeJournalPluginId = config.GetString("write-plugin"); + + var setupOption = system.Settings.Setup.Get(); + if (setupOption.HasValue) + { + var setup = setupOption.Value; + if (setup.MaxBufferSize != null) + _maxBufferSize = setup.MaxBufferSize.Value; + if (setup.RefreshInterval != null) + _refreshInterval = setup.RefreshInterval.Value; + if (!string.IsNullOrWhiteSpace(setup.WritePluginId)) + _writeJournalPluginId = setup.WritePluginId; + } } /// diff --git a/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournalSetup.cs b/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournalSetup.cs new file mode 100644 index 0000000..733a07f --- /dev/null +++ b/src/Akka.Persistence.Azure/Query/AzureTableStorageReadJournalSetup.cs @@ -0,0 +1,35 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015 - 2018 Petabridge, LLC +// +// ----------------------------------------------------------------------- + +using System; +using Akka.Actor.Setup; + +namespace Akka.Persistence.Azure.Query +{ + public class AzureTableStorageReadJournalSetup: Setup + { + /// + /// How many events to fetch in one query (replay) and keep buffered until they + /// are delivered downstream. + /// + public int? MaxBufferSize { get; set; } + + /// + /// The Azure Table write journal is notifying the query side as soon as things + /// are persisted, but for efficiency reasons the query side retrieves the events + /// in batches that sometimes can be delayed up to the configured . + /// + public TimeSpan? RefreshInterval { get; set; } + + /// + /// Absolute path to the write journal plugin configuration entry that this + /// query journal will connect to. + /// If undefined (or "") it will connect to the default journal as specified by the + /// akka.persistence.journal.plugin property. + /// + public string WritePluginId { get; set; } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotSetup.cs b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotSetup.cs index c7c26a9..f6d1005 100644 --- a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotSetup.cs +++ b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotSetup.cs @@ -1,12 +1,21 @@ -using System; -using System.Collections.Generic; -using System.Text; +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015 - 2018 Petabridge, LLC +// +// ----------------------------------------------------------------------- + +using System; using Akka.Actor.Setup; using Azure.Identity; using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Models; namespace Akka.Persistence.Azure.Snapshot { + /// + /// Setup class for . + /// Any populated properties will override its respective HOCON setting. + /// public class AzureBlobSnapshotSetup : Setup { /// @@ -28,22 +37,94 @@ public static AzureBlobSnapshotSetup Create( Uri serviceUri, DefaultAzureCredential defaultAzureCredential, BlobClientOptions blobClientOptions = default) - => new AzureBlobSnapshotSetup(serviceUri, defaultAzureCredential, blobClientOptions); + => new AzureBlobSnapshotSetup + { + ServiceUri = serviceUri, + DefaultAzureCredential = defaultAzureCredential, + BlobClientOptions = blobClientOptions + }; - private AzureBlobSnapshotSetup( - Uri serviceUri, - DefaultAzureCredential azureCredential, - BlobClientOptions blobClientOptions) - { - ServiceUri = serviceUri; - DefaultAzureCredential = azureCredential; - BlobClientOptions = blobClientOptions; - } + /// + /// The connection string for connecting to Windows Azure blob storage account. + /// + public string ConnectionString { get; set; } + + /// + /// The table of the container we'll be using to serialize these blobs. + /// + public string ContainerName { get; set; } + + /// + /// Initial timeout to use when connecting to Azure Container Storage for the first time. + /// + public TimeSpan? ConnectTimeout { get; set; } + + /// + /// Timeouts for individual read, write, and delete requests to Azure Container Storage. + /// + public TimeSpan? RequestTimeout { get; set; } + + /// + /// For debugging purposes only. Logs every individual operation to Azure table storage. + /// + public bool? VerboseLogging { get; set; } + + /// + /// Flag that we're running in development mode. When this is set, and + /// will be ignored, replaced with "UseDevelopmentStorage=true" for local + /// connection to Azurite. + /// + public bool? Development { get; set; } + + /// + /// Automatically create the Blog Storage container if no existing Blob container is found + /// + public bool? AutoInitialize { get; set; } + + /// + /// The public access type of the auto-initialized Blob Storage container + /// + public PublicAccessType? ContainerPublicAccessType { get; set; } + + /// + /// A referencing the blob service. + /// This is likely to be similar to "https://{account_name}.blob.core.windows.net". + /// + public Uri ServiceUri { get; set; } - public Uri ServiceUri { get; } + /// + /// The used to sign API requests. + /// + public DefaultAzureCredential DefaultAzureCredential { get; set; } + + /// + /// Optional client options that define the transport pipeline policies for authentication, + /// retries, etc., that are applied to every request. + /// + public BlobClientOptions BlobClientOptions { get; set; } - public DefaultAzureCredential DefaultAzureCredential { get; } + internal AzureBlobSnapshotStoreSettings Apply(AzureBlobSnapshotStoreSettings settings) + { + if (ConnectionString != null) + settings = settings.WithConnectionString(ConnectionString); + if (ContainerName != null) + settings = settings.WithContainerName(ContainerName); + if (ConnectTimeout != null) + settings = settings.WithConnectTimeout(ConnectTimeout.Value); + if (RequestTimeout != null) + settings = settings.WithRequestTimeout(RequestTimeout.Value); + if (VerboseLogging != null) + settings = settings.WithVerboseLogging(VerboseLogging.Value); + if (Development != null) + settings = settings.WithDevelopment(Development.Value); + if (AutoInitialize != null) + settings = settings.WithAutoInitialize(AutoInitialize.Value); + if (ContainerPublicAccessType != null) + settings = settings.WithContainerPublicAccessType(ContainerPublicAccessType.Value); + if (ServiceUri != null && DefaultAzureCredential != null) + settings = settings.WithAzureCredential(ServiceUri, DefaultAzureCredential, BlobClientOptions); - public BlobClientOptions BlobClientOptions { get; } + return settings; + } } } diff --git a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs index 2afe505..488b04f 100644 --- a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs +++ b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs @@ -54,24 +54,22 @@ public AzureBlobSnapshotStore(Config config = null) ? AzurePersistence.Get(Context.System).BlobSettings : AzureBlobSnapshotStoreSettings.Create(config); + var setup = Context.System.Settings.Setup.Get(); + if (setup.HasValue) + _settings = setup.Value.Apply(_settings); + if (_settings.Development) { - _serviceClient = new BlobServiceClient("UseDevelopmentStorage=true"); + _serviceClient = new BlobServiceClient(connectionString: "UseDevelopmentStorage=true"); } else { - var credentialSetup = Context.System.Settings.Setup.Get(); - if (credentialSetup.HasValue) - { - _serviceClient = new BlobServiceClient( - credentialSetup.Value.ServiceUri, - credentialSetup.Value.DefaultAzureCredential, - credentialSetup.Value.BlobClientOptions); - } - else - { - _serviceClient = new BlobServiceClient(_settings.ConnectionString); - } + _serviceClient = _settings.ServiceUri != null && _settings.DefaultAzureCredential != null + ? _serviceClient = new BlobServiceClient( + serviceUri: _settings.ServiceUri, + credential: _settings.DefaultAzureCredential, + options: _settings.BlobClientOptions) + : _serviceClient = new BlobServiceClient(connectionString: _settings.ConnectionString); } _containerClient = new Lazy(() => InitCloudStorage(5).Result); diff --git a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStoreSettings.cs b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStoreSettings.cs index f7c5aa2..25ca31e 100644 --- a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStoreSettings.cs +++ b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStoreSettings.cs @@ -5,8 +5,11 @@ // ----------------------------------------------------------------------- using System; +using Akka.Actor; using Akka.Configuration; using Akka.Persistence.Azure.Util; +using Azure.Identity; +using Azure.Storage.Blobs; using Azure.Storage.Blobs.Models; namespace Akka.Persistence.Azure.Snapshot @@ -29,6 +32,7 @@ public AzureBlobSnapshotStoreSettings( : this(connectionString, containerName, connectTimeout, requestTimeout, verboseLogging, development, autoInitialize, PublicAccessType.BlobContainer) { } + [Obsolete] public AzureBlobSnapshotStoreSettings( string connectionString, string containerName, @@ -38,6 +42,32 @@ public AzureBlobSnapshotStoreSettings( bool development, bool autoInitialize, PublicAccessType containerPublicAccessType) + : this( + connectionString: connectionString, + containerName: containerName, + connectTimeout: connectTimeout, + requestTimeout: requestTimeout, + verboseLogging: verboseLogging, + development: development, + autoInitialize: autoInitialize, + containerPublicAccessType: containerPublicAccessType, + serviceUri: null, + defaultAzureCredential: null, + blobClientOption: null) + { } + + public AzureBlobSnapshotStoreSettings( + string connectionString, + string containerName, + TimeSpan connectTimeout, + TimeSpan requestTimeout, + bool verboseLogging, + bool development, + bool autoInitialize, + PublicAccessType containerPublicAccessType, + Uri serviceUri, + DefaultAzureCredential defaultAzureCredential, + BlobClientOptions blobClientOption) { if (string.IsNullOrWhiteSpace(containerName)) throw new ConfigurationException("[AzureBlobSnapshotStore] Container name is null or empty."); @@ -51,6 +81,9 @@ public AzureBlobSnapshotStoreSettings( Development = development; AutoInitialize = autoInitialize; ContainerPublicAccessType = containerPublicAccessType; + ServiceUri = serviceUri; + DefaultAzureCredential = defaultAzureCredential; + BlobClientOptions = blobClientOption; } /// @@ -78,12 +111,106 @@ public AzureBlobSnapshotStoreSettings( /// public bool VerboseLogging { get; } + /// + /// Flag that we're running in development mode. When this is set, and + /// will be ignored, replaced with "UseDevelopmentStorage=true" for local + /// connection to Azurite. + /// public bool Development { get; } + /// + /// Automatically create the Blog Storage container if no existing Blob container is found + /// public bool AutoInitialize { get; } + /// + /// The public access type of the auto-initialized Blob Storage container + /// public PublicAccessType ContainerPublicAccessType { get; } + /// + /// A referencing the blob service. + /// This is likely to be similar to "https://{account_name}.blob.core.windows.net". + /// + public Uri ServiceUri { get; } + + /// + /// The used to sign API requests. + /// + public DefaultAzureCredential DefaultAzureCredential { get; } + + /// + /// Optional client options that define the transport pipeline policies for authentication, + /// retries, etc., that are applied to every request. + /// + public BlobClientOptions BlobClientOptions { get; } + + public AzureBlobSnapshotStoreSettings WithConnectionString(string connectionString) + => Copy(connectionString: connectionString); + public AzureBlobSnapshotStoreSettings WithContainerName(string containerName) + => Copy(containerName: containerName); + public AzureBlobSnapshotStoreSettings WithConnectTimeout(TimeSpan connectTimeout) + => Copy(connectTimeout: connectTimeout); + public AzureBlobSnapshotStoreSettings WithRequestTimeout(TimeSpan requestTimeout) + => Copy(requestTimeout: requestTimeout); + public AzureBlobSnapshotStoreSettings WithVerboseLogging(bool verboseLogging) + => Copy(verboseLogging: verboseLogging); + public AzureBlobSnapshotStoreSettings WithDevelopment(bool development) + => Copy(development: development); + public AzureBlobSnapshotStoreSettings WithAutoInitialize(bool autoInitialize) + => Copy(autoInitialize: autoInitialize); + public AzureBlobSnapshotStoreSettings WithContainerPublicAccessType(PublicAccessType containerPublicAccessType) + => Copy(containerPublicAccessType: containerPublicAccessType); + public AzureBlobSnapshotStoreSettings WithAzureCredential( + Uri serviceUri, + DefaultAzureCredential defaultAzureCredential, + BlobClientOptions blobClientOption = null) + => Copy( + serviceUri: serviceUri, + defaultAzureCredential: defaultAzureCredential, + blobClientOption: blobClientOption); + + private AzureBlobSnapshotStoreSettings Copy( + string connectionString = null, + string containerName = null, + TimeSpan? connectTimeout = null, + TimeSpan? requestTimeout = null, + bool? verboseLogging = null, + bool? development = null, + bool? autoInitialize = null, + PublicAccessType? containerPublicAccessType = null, + Uri serviceUri = null, + DefaultAzureCredential defaultAzureCredential = null, + BlobClientOptions blobClientOption = null) + => new AzureBlobSnapshotStoreSettings( + connectionString ?? ConnectionString, + containerName ?? ContainerName, + connectTimeout ?? ConnectTimeout, + requestTimeout ?? RequestTimeout, + verboseLogging ?? VerboseLogging, + development ?? Development, + autoInitialize ?? AutoInitialize, + containerPublicAccessType ?? ContainerPublicAccessType, + serviceUri ?? ServiceUri, + defaultAzureCredential ?? DefaultAzureCredential, + blobClientOption ?? BlobClientOptions); + + /// + /// Creates an instance using the + /// `akka.persistence.snapshot-store.azure-blob-store` HOCON configuration section inside + /// the settings. + /// + /// The to extract the configuration from + /// A new settings instance. + public static AzureBlobSnapshotStoreSettings Create(ActorSystem system) + { + var config = system.Settings.Config.GetConfig("akka.persistence.snapshot-store.azure-blob-store"); + if (config is null) + throw new ConfigurationException( + "Could not find HOCON config at path 'akka.persistence.snapshot-store.azure-blob-store'"); + return Create(config); + } + /// /// Creates an instance using the /// `akka.persistence.snapshot-store.azure-blob-store` HOCON configuration section. @@ -92,6 +219,9 @@ public AzureBlobSnapshotStoreSettings( /// A new settings instance. public static AzureBlobSnapshotStoreSettings Create(Config config) { + if (config is null) + throw new ArgumentNullException(nameof(config)); + var connectionString = config.GetString("connection-string"); var containerName = config.GetString("container-name"); var connectTimeout = config.GetTimeSpan("connect-timeout", TimeSpan.FromSeconds(3));