From 6603fba1716790fa4caeff84d8e6e3933677962e Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Fri, 5 May 2023 00:49:41 +0700 Subject: [PATCH] Move Hosting extension project from Akka.Hosting repo to this repo (#206) * Move Hosting extension project from Akka.Hosting repo to here * Add Akka.Hosting.TestKit version --- ...Akka.Persistence.PostgreSql.Hosting.csproj | 16 + ...aPersistencePostgreSqlHostingExtensions.cs | 246 +++++++++++++ .../PostgreSqlJournalOptions.cs | 115 ++++++ .../PostgreSqlSnapshotOptions.cs | 97 +++++ .../Properties/FriendsOf.cs | 3 + .../README.md | 71 ++++ .../StoredAsExtensions.cs | 22 ++ .../Akka.Persistence.PostgreSql.Tests.csproj | 2 + .../Hosting/EventAdapters.cs | 72 ++++ .../Hosting/PostgreSqlOptionsSpec.cs | 341 ++++++++++++++++++ src/Akka.Persistence.PostgreSql.sln | 6 + src/Directory.Build.props | 1 + src/Directory.Packages.props | 3 + 13 files changed, 995 insertions(+) create mode 100644 src/Akka.Persistence.PostgreSql.Hosting/Akka.Persistence.PostgreSql.Hosting.csproj create mode 100644 src/Akka.Persistence.PostgreSql.Hosting/AkkaPersistencePostgreSqlHostingExtensions.cs create mode 100644 src/Akka.Persistence.PostgreSql.Hosting/PostgreSqlJournalOptions.cs create mode 100644 src/Akka.Persistence.PostgreSql.Hosting/PostgreSqlSnapshotOptions.cs create mode 100644 src/Akka.Persistence.PostgreSql.Hosting/Properties/FriendsOf.cs create mode 100644 src/Akka.Persistence.PostgreSql.Hosting/README.md create mode 100644 src/Akka.Persistence.PostgreSql.Hosting/StoredAsExtensions.cs create mode 100644 src/Akka.Persistence.PostgreSql.Tests/Hosting/EventAdapters.cs create mode 100644 src/Akka.Persistence.PostgreSql.Tests/Hosting/PostgreSqlOptionsSpec.cs diff --git a/src/Akka.Persistence.PostgreSql.Hosting/Akka.Persistence.PostgreSql.Hosting.csproj b/src/Akka.Persistence.PostgreSql.Hosting/Akka.Persistence.PostgreSql.Hosting.csproj new file mode 100644 index 0000000..94c99a5 --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Hosting/Akka.Persistence.PostgreSql.Hosting.csproj @@ -0,0 +1,16 @@ + + + $(NetStandardLibVersion) + README.md + Akka.Persistence.PostgreSql Microsoft.Extensions.Hosting support. + + + + + + + + + + + diff --git a/src/Akka.Persistence.PostgreSql.Hosting/AkkaPersistencePostgreSqlHostingExtensions.cs b/src/Akka.Persistence.PostgreSql.Hosting/AkkaPersistencePostgreSqlHostingExtensions.cs new file mode 100644 index 0000000..d26b14a --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Hosting/AkkaPersistencePostgreSqlHostingExtensions.cs @@ -0,0 +1,246 @@ +using System; +using Akka.Actor; +using Akka.Hosting; +using Akka.Persistence.Hosting; + +#nullable enable +namespace Akka.Persistence.PostgreSql.Hosting +{ + /// + /// Extension methods for Akka.Persistence.PostgreSql + /// + public static class AkkaPersistencePostgreSqlHostingExtensions + { + /// + /// Add Akka.Persistence.PostgreSql support to the + /// + /// + /// The builder instance being configured. + /// + /// + /// Connection string used for database access. + /// + /// + /// + /// Determines which settings should be added by this method call. + /// + /// Default: + /// + /// + /// + /// The schema name for the journal and snapshot store table. + /// + /// Default: "public" + /// + /// + /// + /// Should the SQL store table be initialized automatically. + /// + /// Default: false + /// + /// + /// + /// Determines how data are being de/serialized into the table. + /// + /// Default: + /// + /// + /// + /// Uses the `CommandBehavior.SequentialAccess` when creating SQL commands, providing a performance + /// improvement for reading large BLOBS. + /// + /// Default: false + /// + /// + /// + /// When set to true, persistence will use `BIGINT` and `GENERATED ALWAYS AS IDENTITY` for journal table + /// schema creation. + /// + /// Default: false + /// + /// + /// + /// An used to configure an instance. + /// + /// Default: null + /// + /// + /// + /// The configuration identifier for the plugins + /// + /// Default: "postgresql" + /// + /// + /// + /// A bool flag to set the plugin as the default persistence plugin for the + /// + /// Default: true + /// + /// + /// The same instance originally passed in. + /// + public static AkkaConfigurationBuilder WithPostgreSqlPersistence( + this AkkaConfigurationBuilder builder, + string connectionString, + PersistenceMode mode = PersistenceMode.Both, + string schemaName = "public", + bool autoInitialize = false, + StoredAsType storedAsType = StoredAsType.ByteA, + bool sequentialAccess = false, + bool useBigintIdentityForOrderingColumn = false, + Action? journalBuilder = null, + string pluginIdentifier = "postgresql", + bool isDefaultPlugin = true) + { + if (mode == PersistenceMode.SnapshotStore && journalBuilder is { }) + throw new Exception($"{nameof(journalBuilder)} can only be set when {nameof(mode)} is set to either {PersistenceMode.Both} or {PersistenceMode.Journal}"); + + var journalOpt = new PostgreSqlJournalOptions(isDefaultPlugin, pluginIdentifier) + { + ConnectionString = connectionString, + SchemaName = schemaName, + AutoInitialize = autoInitialize, + StoredAs = storedAsType, + SequentialAccess = sequentialAccess, + UseBigIntIdentityForOrderingColumn = useBigintIdentityForOrderingColumn + }; + + var adapters = new AkkaPersistenceJournalBuilder(journalOpt.Identifier, builder); + journalBuilder?.Invoke(adapters); + journalOpt.Adapters = adapters; + + var snapshotOpt = new PostgreSqlSnapshotOptions(isDefaultPlugin, pluginIdentifier) + { + ConnectionString = connectionString, + SchemaName = schemaName, + AutoInitialize = autoInitialize, + StoredAs = storedAsType, + SequentialAccess = sequentialAccess + }; + + return mode switch + { + PersistenceMode.Journal => builder.WithPostgreSqlPersistence(journalOpt, null), + PersistenceMode.SnapshotStore => builder.WithPostgreSqlPersistence(null, snapshotOpt), + PersistenceMode.Both => builder.WithPostgreSqlPersistence(journalOpt, snapshotOpt), + _ => throw new ArgumentOutOfRangeException(nameof(mode), mode, "Invalid PersistenceMode defined.") + }; + } + + /// + /// Add Akka.Persistence.PostgreSql support to the + /// + /// + /// The builder instance being configured. + /// + /// + /// + /// An that modifies an instance of , + /// used to configure the snapshot store plugin + /// + /// Default: null + /// + /// + /// + /// An that modifies an instance of , + /// used to configure the journal plugin + /// + /// Default: null + /// + /// + /// + /// A bool flag to set the plugin as the default persistence plugin for the + /// + /// Default: true + /// + /// + /// The same instance originally passed in. + /// + /// + /// Thrown when both and + /// are null. + /// + public static AkkaConfigurationBuilder WithPostgreSqlPersistence( + this AkkaConfigurationBuilder builder, + Action? journalOptionConfigurator = null, + Action? snapshotOptionConfigurator = null, + bool isDefaultPlugin = true) + { + if (journalOptionConfigurator is null && snapshotOptionConfigurator is null) + throw new ArgumentException($"{nameof(journalOptionConfigurator)} and {nameof(snapshotOptionConfigurator)} could not both be null"); + + PostgreSqlJournalOptions? journalOptions = null; + if(journalOptionConfigurator is { }) + { + journalOptions = new PostgreSqlJournalOptions(isDefaultPlugin); + journalOptionConfigurator(journalOptions); + } + + PostgreSqlSnapshotOptions? snapshotOptions = null; + if (snapshotOptionConfigurator is { }) + { + snapshotOptions = new PostgreSqlSnapshotOptions(isDefaultPlugin); + snapshotOptionConfigurator(snapshotOptions); + } + + return builder.WithPostgreSqlPersistence(journalOptions, snapshotOptions); + } + + /// + /// Add Akka.Persistence.PostgreSql support to the + /// + /// + /// The builder instance being configured. + /// + /// + /// + /// An instance of , used to configure the snapshot store plugin + /// + /// Default: null + /// + /// + /// + /// An instance of , used to configure the journal plugin + /// + /// Default: null + /// + /// + /// The same instance originally passed in. + /// + /// + /// Thrown when both and are null. + /// + public static AkkaConfigurationBuilder WithPostgreSqlPersistence( + this AkkaConfigurationBuilder builder, + PostgreSqlJournalOptions? journalOptions = null, + PostgreSqlSnapshotOptions? snapshotOptions = null) + { + if (journalOptions is null && snapshotOptions is null) + throw new ArgumentException($"{nameof(journalOptions)} and {nameof(snapshotOptions)} could not both be null"); + + return (journalOptions, snapshotOptions) switch + { + (null, null) => + throw new ArgumentException($"{nameof(journalOptions)} and {nameof(snapshotOptions)} could not both be null"), + + (_, null) => + builder + .AddHocon(journalOptions.ToConfig(), HoconAddMode.Prepend) + .AddHocon(journalOptions.DefaultConfig, HoconAddMode.Append), + + (null, _) => + builder + .AddHocon(snapshotOptions.ToConfig(), HoconAddMode.Prepend) + .AddHocon(snapshotOptions.DefaultConfig, HoconAddMode.Append), + + (_, _) => + builder + .AddHocon(journalOptions.ToConfig(), HoconAddMode.Prepend) + .AddHocon(snapshotOptions.ToConfig(), HoconAddMode.Prepend) + .AddHocon(journalOptions.DefaultConfig, HoconAddMode.Append) + .AddHocon(snapshotOptions.DefaultConfig, HoconAddMode.Append), + }; + } + + } +} diff --git a/src/Akka.Persistence.PostgreSql.Hosting/PostgreSqlJournalOptions.cs b/src/Akka.Persistence.PostgreSql.Hosting/PostgreSqlJournalOptions.cs new file mode 100644 index 0000000..1cf5045 --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Hosting/PostgreSqlJournalOptions.cs @@ -0,0 +1,115 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2022 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Data; +using System.Text; +using Akka.Configuration; +using Akka.Persistence.Hosting; + +#nullable enable +namespace Akka.Persistence.PostgreSql.Hosting +{ + /// + /// Akka.Hosting options class to set up PostgreSql persistence journal. + /// + public sealed class PostgreSqlJournalOptions: SqlJournalOptions + { + private static readonly Config Default = PostgreSqlPersistence.DefaultConfiguration() + .GetConfig(PostgreSqlJournalSettings.JournalConfigPath); + + /// + /// Create a new instance of + /// + public PostgreSqlJournalOptions() : this(true) + { + } + + /// + /// Create a new instance of + /// + /// Indicates if this journal configuration should be the default configuration for all persistence + /// The journal configuration identifier. Default: "postgresql" + public PostgreSqlJournalOptions(bool isDefaultPlugin, string identifier = "postgresql") : base(isDefaultPlugin) + { + Identifier = identifier; + } + + /// + /// + /// The plugin identifier for this persistence plugin + /// + /// Default: "postgresql" + /// + public override string Identifier { get; set; } + + /// + /// + /// PostgreSQL schema name to table corresponding with persistent journal. + /// + /// Default: "public" + /// + public override string SchemaName { get; set; } = "public"; + + /// + /// + /// PostgreSQL table corresponding with persistent journal. + /// + /// Default: "event_journal" + /// + public override string TableName { get; set; } = "event_journal"; + + /// + /// + /// PostgreSQL table corresponding with persistent journal metadata. + /// + /// Default: "metadata" + /// + public override string MetadataTableName { get; set; } = "metadata"; + + /// + /// + /// Uses the CommandBehavior.SequentialAccess when creating DB commands, providing a performance + /// improvement for reading large BLOBS. + /// + /// Default: false + /// + public override bool SequentialAccess { get; set; } = false; + + /// + /// + /// Postgres data type for payload column + /// + /// Default: + /// + public StoredAsType StoredAs { get; set; } = StoredAsType.ByteA; + + /// + /// + /// When turned on, persistence will use `BIGINT` and `GENERATED ALWAYS AS IDENTITY` for the ordering + /// column in the journal table during schema creation. + /// + /// Default: false + /// + public bool UseBigIntIdentityForOrderingColumn { get; set; } = false; + + /// + public override IsolationLevel ReadIsolationLevel { get; set; } = IsolationLevel.Unspecified; + + /// + public override IsolationLevel WriteIsolationLevel { get; set; } = IsolationLevel.Unspecified; + + protected override Config InternalDefaultConfig => Default; + + protected override StringBuilder Build(StringBuilder sb) + { + sb.AppendLine($"use-bigint-identity-for-ordering-column = {(UseBigIntIdentityForOrderingColumn ? "on" : "off")}"); + sb.AppendLine($"stored-as = {StoredAs.ToHocon()}"); + + return base.Build(sb); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.PostgreSql.Hosting/PostgreSqlSnapshotOptions.cs b/src/Akka.Persistence.PostgreSql.Hosting/PostgreSqlSnapshotOptions.cs new file mode 100644 index 0000000..ab2929c --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Hosting/PostgreSqlSnapshotOptions.cs @@ -0,0 +1,97 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2022 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Data; +using System.Text; +using Akka.Configuration; +using Akka.Persistence.Hosting; + +#nullable enable +namespace Akka.Persistence.PostgreSql.Hosting +{ + /// + /// Akka.Hosting options class to set up PostgreSql persistence snapshot store. + /// + public sealed class PostgreSqlSnapshotOptions: SqlSnapshotOptions + { + private static readonly Config Default = PostgreSqlPersistence.DefaultConfiguration() + .GetConfig(PostgreSqlSnapshotStoreSettings.SnapshotStoreConfigPath); + + /// + /// Create a new instance of + /// + public PostgreSqlSnapshotOptions() : this(true) + { + } + + /// + /// Create a new instance of + /// + /// Indicates if this snapshot store configuration should be the default configuration for all persistence + /// The snapshot store configuration identifier, Default: "postgresql" + public PostgreSqlSnapshotOptions(bool isDefaultPlugin, string identifier = "postgresql") : base(isDefaultPlugin) + { + Identifier = identifier; + } + + /// + /// + /// The plugin identifier for this persistence plugin + /// + /// Default: "postgresql" + /// + public override string Identifier { get; set; } + + /// + /// + /// PostgreSQL schema name to table corresponding with persistent snapshot store. + /// + /// Default: "public" + /// + public override string SchemaName { get; set; } = "public"; + + /// + /// + /// PostgreSQL table corresponding with persistent snapshot store. + /// + /// Default: "snapshot_store" + /// + public override string TableName { get; set; } = "snapshot_store"; + + /// + /// + /// Uses the CommandBehavior.SequentialAccess when creating the command, providing a performance + /// improvement for reading large BLOBS. + /// + /// Default: false + /// + public override bool SequentialAccess { get; set; } = false; + + /// + /// + /// Postgres data type for the payload column + /// + /// Default: + /// + public StoredAsType StoredAs { get; set; } = StoredAsType.ByteA; + + /// + public override IsolationLevel ReadIsolationLevel { get; set; } = IsolationLevel.Unspecified; + + /// + public override IsolationLevel WriteIsolationLevel { get; set; } = IsolationLevel.Unspecified; + + protected override Config InternalDefaultConfig => Default; + + protected override StringBuilder Build(StringBuilder sb) + { + sb.AppendLine($"stored-as = {StoredAs.ToHocon()}"); + + return base.Build(sb); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.PostgreSql.Hosting/Properties/FriendsOf.cs b/src/Akka.Persistence.PostgreSql.Hosting/Properties/FriendsOf.cs new file mode 100644 index 0000000..1fe185d --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Hosting/Properties/FriendsOf.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("Akka.Persistence.PostgreSql.Tests")] \ No newline at end of file diff --git a/src/Akka.Persistence.PostgreSql.Hosting/README.md b/src/Akka.Persistence.PostgreSql.Hosting/README.md new file mode 100644 index 0000000..06dc7e3 --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Hosting/README.md @@ -0,0 +1,71 @@ +# Akka.Persistence.PostgreSql.Hosting + +Akka.Hosting extension methods to add Akka.Persistence.PostgreSql to an ActorSystem + +# Akka.Persistence.PostgreSql Extension Methods + +## WithPostgreSqlPersistence() Method + +```csharp +public static AkkaConfigurationBuilder WithPostgreSqlPersistence( + this AkkaConfigurationBuilder builder, + string connectionString, + PersistenceMode mode = PersistenceMode.Both, + string schemaName = "public", + bool autoInitialize = false, + StoredAsType storedAsType = StoredAsType.ByteA, + bool sequentialAccess = false, + bool useBigintIdentityForOrderingColumn = false, + Action configurator = null); +``` + +### Parameters + +* `connectionString` __string__ + + Connection string used for database access. + +* `mode` __PersistenceMode__ + + Determines which settings should be added by this method call. __Default__: `PersistenceMode.Both` + + * `PersistenceMode.Journal`: Only add the journal settings + * `PersistenceMode.SnapshotStore`: Only add the snapshot store settings + * `PersistenceMode.Both`: Add both journal and snapshot store settings + +* `schemaName` __string__ + + The schema name for the journal and snapshot store table. __Default__: `"public"` + +* `autoInitialize` __bool__ + + Should the SQL store table be initialized automatically. __Default__: `false` + +* `storedAsType` __StoredAsType__ + + Determines how data are being de/serialized into the table. __Default__: `StoredAsType.ByteA` + + * `StoredAsType.ByteA`: Byte array + * `StoredAsType.Json`: JSON + * `StoredAsType.JsonB`: Binary JSON + +* `sequentialAccess` __bool__ + + Uses the `CommandBehavior.SequentialAccess` when creating SQL commands, providing a performance improvement for reading large BLOBS. __Default__: `false` + +* `useBigintIdentityForOrderingColumn` __bool__ + + When set to true, persistence will use `BIGINT` and `GENERATED ALWAYS AS IDENTITY` for journal table schema creation. __Default__: false + + > __NOTE__ + > + > This only affects newly created tables, as such, it should not affect any existing database. + + > __WARNING__ + > + > To use this feature, you have to have PorsgreSql version 10 or above + +* `configurator` __Action\__ + + An Action delegate used to configure an `AkkaPersistenceJournalBuilder` instance. Used to configure [Event Adapters](https://getakka.net/articles/persistence/event-adapters.html) + diff --git a/src/Akka.Persistence.PostgreSql.Hosting/StoredAsExtensions.cs b/src/Akka.Persistence.PostgreSql.Hosting/StoredAsExtensions.cs new file mode 100644 index 0000000..8524c4c --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Hosting/StoredAsExtensions.cs @@ -0,0 +1,22 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2022 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; + +namespace Akka.Persistence.PostgreSql.Hosting +{ + public static class StoredAsExtensions + { + internal static string ToHocon(this StoredAsType storedAsType) + => storedAsType switch + { + StoredAsType.ByteA => "bytea", + StoredAsType.Json => "json", + StoredAsType.JsonB => "jsonb", + _ => throw new ArgumentOutOfRangeException(nameof(storedAsType), storedAsType, "Invalid StoredAsType defined.") + }; + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.PostgreSql.Tests/Akka.Persistence.PostgreSql.Tests.csproj b/src/Akka.Persistence.PostgreSql.Tests/Akka.Persistence.PostgreSql.Tests.csproj index 3be9303..ee86bde 100644 --- a/src/Akka.Persistence.PostgreSql.Tests/Akka.Persistence.PostgreSql.Tests.csproj +++ b/src/Akka.Persistence.PostgreSql.Tests/Akka.Persistence.PostgreSql.Tests.csproj @@ -5,6 +5,7 @@ + @@ -14,6 +15,7 @@ + diff --git a/src/Akka.Persistence.PostgreSql.Tests/Hosting/EventAdapters.cs b/src/Akka.Persistence.PostgreSql.Tests/Hosting/EventAdapters.cs new file mode 100644 index 0000000..005bf1b --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Tests/Hosting/EventAdapters.cs @@ -0,0 +1,72 @@ +using System; +using System.Threading.Tasks; +using Akka.Hosting; +using Akka.Persistence.Hosting; +using Akka.Persistence.Journal; +using Akka.Util; +using FluentAssertions; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Xunit; + +namespace Akka.Persistence.PostgreSql.Tests.Hosting; + +public class EventAdapters +{ + public sealed class Event1{ } + public sealed class Event2{ } + + public sealed class EventMapper1 : IWriteEventAdapter + { + public string Manifest(object evt) + { + return string.Empty; + } + + public object ToJournal(object evt) + { + return evt; + } + } + + public sealed class Tagger : IWriteEventAdapter + { + public string Manifest(object evt) + { + return string.Empty; + } + + public object ToJournal(object evt) + { + if (evt is Tagged t) + return t; + return new Tagged(evt, new[] { "foo" }); + } + } + + public sealed class ReadAdapter : IReadEventAdapter + { + public IEventSequence FromJournal(object evt, string manifest) + { + return new SingleEventSequence(evt); + } + } + + public sealed class ComboAdapter : IEventAdapter + { + public string Manifest(object evt) + { + return string.Empty; + } + + public object ToJournal(object evt) + { + return evt; + } + + public IEventSequence FromJournal(object evt, string manifest) + { + return new SingleEventSequence(evt); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.PostgreSql.Tests/Hosting/PostgreSqlOptionsSpec.cs b/src/Akka.Persistence.PostgreSql.Tests/Hosting/PostgreSqlOptionsSpec.cs new file mode 100644 index 0000000..9987ef3 --- /dev/null +++ b/src/Akka.Persistence.PostgreSql.Tests/Hosting/PostgreSqlOptionsSpec.cs @@ -0,0 +1,341 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2022 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System.IO; +using System.Text; +using Akka.Configuration; +using Akka.Persistence.PostgreSql; +using Akka.Persistence.PostgreSql.Hosting; +using Akka.Persistence.Query.Sql; +using Akka.Util; +using FluentAssertions; +using FluentAssertions.Extensions; +using Microsoft.Extensions.Configuration; +using Xunit; + +namespace Akka.Persistence.PostgreSql.Tests.Hosting; + +public class PostgreSqlOptionsSpec +{ + #region Journal unit tests + + [Fact(DisplayName = "PostgreSqlJournalOptions as default plugin should generate plugin setting")] + public void DefaultPluginJournalOptionsTest() + { + var options = new PostgreSqlJournalOptions(true); + var config = options.ToConfig(); + + config.GetString("akka.persistence.journal.plugin").Should().Be("akka.persistence.journal.postgresql"); + config.HasPath("akka.persistence.journal.postgresql").Should().BeTrue(); + } + + [Fact(DisplayName = "Empty PostgreSqlJournalOptions should equal empty config with default fallback")] + public void DefaultJournalOptionsTest() + { + var options = new PostgreSqlJournalOptions(false); + var emptyRootConfig = options.ToConfig().WithFallback(options.DefaultConfig); + var baseRootConfig = Config.Empty + .WithFallback(PostgreSqlPersistence.DefaultConfiguration()); + + AssertString(emptyRootConfig, baseRootConfig, "akka.persistence.journal.plugin"); + + var config = emptyRootConfig.GetConfig("akka.persistence.journal.postgresql"); + var baseConfig = baseRootConfig.GetConfig("akka.persistence.journal.postgresql"); + config.Should().NotBeNull(); + baseConfig.Should().NotBeNull(); + + AssertJournalConfig(config, baseConfig); + } + + [Fact(DisplayName = "Empty PostgreSqlJournalOptions with custom identifier should equal empty config with default fallback")] + public void CustomIdJournalOptionsTest() + { + var options = new PostgreSqlJournalOptions(false, "custom"); + var emptyRootConfig = options.ToConfig().WithFallback(options.DefaultConfig); + var baseRootConfig = Config.Empty + .WithFallback(PostgreSqlPersistence.DefaultConfiguration()); + + AssertString(emptyRootConfig, baseRootConfig, "akka.persistence.journal.plugin"); + + var config = emptyRootConfig.GetConfig("akka.persistence.journal.custom"); + var baseConfig = baseRootConfig.GetConfig("akka.persistence.journal.postgresql"); + config.Should().NotBeNull(); + baseConfig.Should().NotBeNull(); + + AssertJournalConfig(config, baseConfig); + } + + [Fact(DisplayName = "PostgreSqlJournalOptions should generate proper config")] + public void JournalOptionsTest() + { + var options = new PostgreSqlJournalOptions(true) + { + Identifier = "custom", + AutoInitialize = true, + ConnectionString = "testConnection", + ConnectionTimeout = 1.Seconds(), + MetadataTableName = "testMetadata", + SchemaName = "testSchema", + SequentialAccess = false, + TableName = "testTable", + StoredAs = StoredAsType.Json, + UseBigIntIdentityForOrderingColumn = true + }; + options.Adapters.AddWriteEventAdapter("mapper1", new [] { typeof(EventAdapters.Event1) }); + options.Adapters.AddReadEventAdapter("reader1", new [] { typeof(EventAdapters.Event1) }); + options.Adapters.AddEventAdapter("combo", boundTypes: new [] { typeof(EventAdapters.Event2) }); + options.Adapters.AddWriteEventAdapter("tagger", boundTypes: new [] { typeof(EventAdapters.Event1), typeof(EventAdapters.Event2) }); + + var baseConfig = options.ToConfig(); + + baseConfig.GetString("akka.persistence.journal.plugin").Should().Be("akka.persistence.journal.custom"); + + var config = baseConfig.GetConfig("akka.persistence.journal.custom"); + config.Should().NotBeNull(); + config.GetString("connection-string").Should().Be(options.ConnectionString); + config.GetTimeSpan("connection-timeout").Should().Be(options.ConnectionTimeout); + config.GetString("schema-name").Should().Be(options.SchemaName); + config.GetString("table-name").Should().Be(options.TableName); + config.GetBoolean("auto-initialize").Should().Be(options.AutoInitialize); + config.GetString("metadata-table-name").Should().Be(options.MetadataTableName); + config.GetBoolean("sequential-access").Should().Be(options.SequentialAccess); + config.GetString("stored-as").Should().Be(options.StoredAs.ToHocon()); + config.GetBoolean("use-bigint-identity-for-ordering-column").Should().Be(options.UseBigIntIdentityForOrderingColumn); + + config.GetStringList($"event-adapter-bindings.\"{typeof(EventAdapters.Event1).TypeQualifiedName()}\"").Should() + .BeEquivalentTo("mapper1", "reader1", "tagger"); + config.GetStringList($"event-adapter-bindings.\"{typeof(EventAdapters.Event2).TypeQualifiedName()}\"").Should() + .BeEquivalentTo("combo", "tagger"); + + config.GetString("event-adapters.mapper1").Should().Be(typeof(EventAdapters.EventMapper1).TypeQualifiedName()); + config.GetString("event-adapters.reader1").Should().Be(typeof(EventAdapters.ReadAdapter).TypeQualifiedName()); + config.GetString("event-adapters.combo").Should().Be(typeof(EventAdapters.ComboAdapter).TypeQualifiedName()); + config.GetString("event-adapters.tagger").Should().Be(typeof(EventAdapters.Tagger).TypeQualifiedName()); + + } + + const string Json = @" +{ + ""Logging"": { + ""LogLevel"": { + ""Default"": ""Information"", + ""Microsoft.AspNetCore"": ""Warning"" + } + }, + ""Akka"": { + ""JournalOptions"": { + ""StoredAs"": ""JsonB"", + ""UseBigIntIdentityForOrderingColumn"": true, + + ""ConnectionString"": ""Server=localhost,1533;Database=Akka;User Id=sa;"", + ""ConnectionTimeout"": ""00:00:55"", + ""SchemaName"": ""schema"", + ""TableName"" : ""journal"", + ""MetadataTableName"": ""meta"", + ""SequentialAccess"": false, + + ""IsDefaultPlugin"": false, + ""Identifier"": ""custom"", + ""AutoInitialize"": true, + ""Serializer"": ""hyperion"" + } + } +}"; + + [Fact(DisplayName = "PostgreSqlJournalOptions should be bindable to IConfiguration")] + public void JournalOptionsIConfigurationBindingTest() + { + using var stream = new MemoryStream(Encoding.UTF8.GetBytes(Json)); + var jsonConfig = new ConfigurationBuilder().AddJsonStream(stream).Build(); + + var options = jsonConfig.GetSection("Akka:JournalOptions").Get(); + options.IsDefaultPlugin.Should().BeFalse(); + options.Identifier.Should().Be("custom"); + options.AutoInitialize.Should().BeTrue(); + options.Serializer.Should().Be("hyperion"); + options.ConnectionString.Should().Be("Server=localhost,1533;Database=Akka;User Id=sa;"); + options.ConnectionTimeout.Should().Be(55.Seconds()); + options.SchemaName.Should().Be("schema"); + options.TableName.Should().Be("journal"); + options.MetadataTableName.Should().Be("meta"); + options.SequentialAccess.Should().BeFalse(); + + options.StoredAs.Should().Be(StoredAsType.JsonB); + options.UseBigIntIdentityForOrderingColumn.Should().BeTrue(); + } + + #endregion + + #region Snapshot unit tests + + [Fact(DisplayName = "PostgreSqlSnapshotOptions as default plugin should generate plugin setting")] + public void DefaultPluginSnapshotOptionsTest() + { + var options = new PostgreSqlSnapshotOptions(true); + var config = options.ToConfig(); + + config.GetString("akka.persistence.snapshot-store.plugin").Should().Be("akka.persistence.snapshot-store.postgresql"); + config.HasPath("akka.persistence.snapshot-store.postgresql").Should().BeTrue(); + } + + [Fact(DisplayName = "Empty PostgreSqlSnapshotOptions with default fallback should return default config")] + public void DefaultSnapshotOptionsTest() + { + var options = new PostgreSqlSnapshotOptions(false); + var emptyRootConfig = options.ToConfig().WithFallback(options.DefaultConfig); + var baseRootConfig = Config.Empty + .WithFallback(PostgreSqlPersistence.DefaultConfiguration()); + + AssertString(emptyRootConfig, baseRootConfig, "akka.persistence.snapshot-store.plugin"); + + var config = emptyRootConfig.GetConfig("akka.persistence.snapshot-store.postgresql"); + var baseConfig = baseRootConfig.GetConfig("akka.persistence.snapshot-store.postgresql"); + config.Should().NotBeNull(); + baseConfig.Should().NotBeNull(); + + AssertSnapshotConfig(config, baseConfig); + } + + [Fact(DisplayName = "Empty PostgreSqlSnapshotOptions with custom identifier should equal empty config with default fallback")] + public void CustomIdSnapshotOptionsTest() + { + var options = new PostgreSqlSnapshotOptions(false, "custom"); + var emptyRootConfig = options.ToConfig().WithFallback(options.DefaultConfig); + var baseRootConfig = Config.Empty + .WithFallback(PostgreSqlPersistence.DefaultConfiguration()); + + AssertString(emptyRootConfig, baseRootConfig, "akka.persistence.snapshot-store.plugin"); + + var config = emptyRootConfig.GetConfig("akka.persistence.snapshot-store.custom"); + var baseConfig = baseRootConfig.GetConfig("akka.persistence.snapshot-store.postgresql"); + config.Should().NotBeNull(); + baseConfig.Should().NotBeNull(); + + AssertSnapshotConfig(config, baseConfig); + } + + [Fact(DisplayName = "PostgreSqlSnapshotOptions should generate proper config")] + public void SnapshotOptionsTest() + { + var options = new PostgreSqlSnapshotOptions(true) + { + Identifier = "custom", + AutoInitialize = true, + ConnectionString = "testConnection", + ConnectionTimeout = 1.Seconds(), + SchemaName = "testSchema", + SequentialAccess = false, + TableName = "testTable", + StoredAs = StoredAsType.Json, + }; + var baseConfig = options.ToConfig() + .WithFallback(PostgreSqlPersistence.DefaultConfiguration()); + + baseConfig.GetString("akka.persistence.snapshot-store.plugin").Should().Be("akka.persistence.snapshot-store.custom"); + + var config = baseConfig.GetConfig("akka.persistence.snapshot-store.custom"); + config.Should().NotBeNull(); + config.GetString("connection-string").Should().Be(options.ConnectionString); + config.GetTimeSpan("connection-timeout").Should().Be(options.ConnectionTimeout); + config.GetString("schema-name").Should().Be(options.SchemaName); + config.GetString("table-name").Should().Be(options.TableName); + config.GetBoolean("auto-initialize").Should().Be(options.AutoInitialize); + config.GetBoolean("sequential-access").Should().Be(options.SequentialAccess); + config.GetString("stored-as").Should().Be(options.StoredAs.ToHocon()); + } + + [Fact(DisplayName = "PostgreSqlSnapshotOptions should be bindable to IConfiguration")] + public void SnapshotOptionsIConfigurationBindingTest() + { + const string json = @" +{ + ""Logging"": { + ""LogLevel"": { + ""Default"": ""Information"", + ""Microsoft.AspNetCore"": ""Warning"" + } + }, + ""Akka"": { + ""SnapshotOptions"": { + ""StoredAs"": ""JsonB"", + + ""ConnectionString"": ""Server=localhost,1533;Database=Akka;User Id=sa;"", + ""ConnectionTimeout"": ""00:00:55"", + ""SchemaName"": ""schema"", + ""TableName"" : ""snapshot"", + ""SequentialAccess"": false, + + ""IsDefaultPlugin"": false, + ""Identifier"": ""CustomSnapshot"", + ""AutoInitialize"": true, + ""Serializer"": ""hyperion"" + } + } +}"; + using var stream = new MemoryStream(Encoding.UTF8.GetBytes(json)); + var jsonConfig = new ConfigurationBuilder().AddJsonStream(stream).Build(); + + var options = jsonConfig.GetSection("Akka:SnapshotOptions").Get(); + options.IsDefaultPlugin.Should().BeFalse(); + options.Identifier.Should().Be("CustomSnapshot"); + options.AutoInitialize.Should().BeTrue(); + options.Serializer.Should().Be("hyperion"); + options.ConnectionString.Should().Be("Server=localhost,1533;Database=Akka;User Id=sa;"); + options.ConnectionTimeout.Should().Be(55.Seconds()); + options.SchemaName.Should().Be("schema"); + options.TableName.Should().Be("snapshot"); + options.SequentialAccess.Should().BeFalse(); + + options.StoredAs.Should().Be(StoredAsType.JsonB); + } + + #endregion + + private static void AssertJournalConfig(Config underTest, Config reference) + { + AssertString(underTest, reference, "class"); + AssertString(underTest, reference, "plugin-dispatcher"); + AssertString(underTest, reference, "connection-string"); + AssertTimespan(underTest, reference, "connection-timeout"); + AssertString(underTest, reference, "schema-name"); + AssertString(underTest, reference, "table-name"); + AssertBoolean(underTest, reference, "auto-initialize"); + AssertString(underTest, reference, "metadata-table-name"); + AssertBoolean(underTest, reference, "sequential-access"); + AssertString(underTest, reference, "stored-as"); + AssertBoolean(underTest, reference, "use-bigint-identity-for-ordering-column"); + AssertString(underTest, reference, "read-isolation-level"); + AssertString(underTest, reference, "write-isolation-level"); + } + + private static void AssertSnapshotConfig(Config underTest, Config reference) + { + AssertString(underTest, reference, "class"); + AssertString(underTest, reference, "plugin-dispatcher"); + AssertString(underTest, reference, "connection-string"); + AssertTimespan(underTest, reference, "connection-timeout"); + AssertString(underTest, reference, "schema-name"); + AssertString(underTest, reference, "table-name"); + AssertBoolean(underTest, reference, "auto-initialize"); + AssertBoolean(underTest, reference, "sequential-access"); + AssertBoolean(underTest, reference, "use-constant-parameter-size"); + AssertString(underTest, reference, "read-isolation-level"); + AssertString(underTest, reference, "write-isolation-level"); + } + + private static void AssertString(Config underTest, Config reference, string hoconPath) + { + underTest.GetString(hoconPath).Should().Be(reference.GetString(hoconPath)); + } + private static void AssertTimespan(Config underTest, Config reference, string hoconPath) + { + underTest.GetTimeSpan(hoconPath).Should().Be(reference.GetTimeSpan(hoconPath)); + } + private static void AssertBoolean(Config underTest, Config reference, string hoconPath) + { + underTest.GetBoolean(hoconPath).Should().Be(reference.GetBoolean(hoconPath)); + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.PostgreSql.sln b/src/Akka.Persistence.PostgreSql.sln index b970dff..d80469e 100644 --- a/src/Akka.Persistence.PostgreSql.sln +++ b/src/Akka.Persistence.PostgreSql.sln @@ -22,6 +22,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "build", "build", "{CADDD4C2 EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.PostgreSql.Performance.Tests", "Akka.Persistence.PostgreSql.Performance.Tests\Akka.Persistence.PostgreSql.Performance.Tests.csproj", "{E5A31545-77AA-4BE1-A2A4-853256CC0F5E}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.PostgreSql.Hosting", "Akka.Persistence.PostgreSql.Hosting\Akka.Persistence.PostgreSql.Hosting.csproj", "{0725302B-2814-48C6-944B-7E6F5CC72D8E}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -40,6 +42,10 @@ Global {E5A31545-77AA-4BE1-A2A4-853256CC0F5E}.Debug|Any CPU.Build.0 = Debug|Any CPU {E5A31545-77AA-4BE1-A2A4-853256CC0F5E}.Release|Any CPU.ActiveCfg = Release|Any CPU {E5A31545-77AA-4BE1-A2A4-853256CC0F5E}.Release|Any CPU.Build.0 = Release|Any CPU + {0725302B-2814-48C6-944B-7E6F5CC72D8E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {0725302B-2814-48C6-944B-7E6F5CC72D8E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {0725302B-2814-48C6-944B-7E6F5CC72D8E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {0725302B-2814-48C6-944B-7E6F5CC72D8E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/Directory.Build.props b/src/Directory.Build.props index cb14894..9d9df77 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -8,6 +8,7 @@ https://github.com/akkadotnet/Akka.Persistence.PostgreSql https://github.com/akkadotnet/Akka.Persistence.PostgreSql/blob/dev/LICENSE.md akka;actors;actor model;Akka;concurrency;Postgres;PostgreSql + 11 diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index 7d0580d..4a5cc2f 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -6,11 +6,14 @@ + + +