diff --git a/src/Akka.Persistence.Sql.Hosting.Tests/Bugfix344Spec.cs b/src/Akka.Persistence.Sql.Hosting.Tests/Bugfix344Spec.cs new file mode 100644 index 00000000..8aa03f60 --- /dev/null +++ b/src/Akka.Persistence.Sql.Hosting.Tests/Bugfix344Spec.cs @@ -0,0 +1,88 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2023 .NET Foundation +// +// ----------------------------------------------------------------------- + +using Akka.Hosting; +using Akka.Persistence.Hosting; +using Akka.Persistence.Query; +using Akka.Persistence.Sql.Config; +using Akka.Persistence.Sql.Tests.Common.Containers; +using Akka.Persistence.TCK.Query; +using Akka.Streams; +using Akka.Streams.TestKit; +using Xunit; +using Xunit.Abstractions; +using Xunit.Sdk; +using Akka.Persistence.Sql.Query; +using FluentAssertions.Extensions; + +namespace Akka.Persistence.Sql.Hosting.Tests +{ + public class Bugfix344Spec : Akka.Hosting.TestKit.TestKit, IClassFixture + { + private ActorMaterializer? Materializer; + private IReadJournal? ReadJournal; + private readonly SqliteContainer _fixture; + + public Bugfix344Spec(ITestOutputHelper output, SqliteContainer fixture) : base(nameof(Bugfix344Spec), output) + { + _fixture = fixture; + + if (!_fixture.InitializeDbAsync().Wait(10.Seconds())) + throw new Exception("Failed to clean up database in 10 seconds"); + } + + protected override async Task BeforeTestStart() + { + await base.BeforeTestStart(); + ReadJournal = Sys.ReadJournalFor("akka.persistence.query.journal.custom"); + Materializer = Sys.Materializer(); + } + + protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider) + { + var journalOptions = new SqlJournalOptions(true, "custom") + { + ProviderName = _fixture.ProviderName, + ConnectionString = _fixture.ConnectionString, + TagStorageMode = TagMode.TagTable, + Adapters = new AkkaPersistenceJournalBuilder("custom", builder), + QueryRefreshInterval = 1.Seconds(), + AutoInitialize = true, + }; + journalOptions.Adapters.AddWriteEventAdapter("color-tagger", [typeof(string)]); + + var snapshotOptions = new SqlSnapshotOptions(true, "custom") + { + ProviderName = _fixture.ProviderName, + ConnectionString = _fixture.ConnectionString, + AutoInitialize = true, + }; + + builder.WithSqlPersistence(journalOptions, snapshotOptions); + } + + [Fact] + public async Task ReadJournal_should_initialize_tables_when_started_before_WriteJournal() + { + if (ReadJournal is not ICurrentEventsByTagQuery queries) + throw IsTypeException.ForMismatchedType(nameof(IEventsByTagQuery), ReadJournal?.GetType().Name ?? "null"); + + // should just not return + await EventFilter.Error().ExpectAsync( + 0, + async () => + { + var blackSrc = queries.CurrentEventsByTag("random-unused-tag", offset: NoOffset.Instance); + var probe = blackSrc.RunWith(this.SinkProbe(), Materializer); + probe.Request(2); + + // query should just gracefully exit + await probe.ExpectCompleteAsync(); + }); + } + + } +} diff --git a/src/Akka.Persistence.Sql.Hosting.Tests/ConfigExtensions.cs b/src/Akka.Persistence.Sql.Hosting.Tests/ConfigExtensions.cs index 4babfa78..49995e9d 100644 --- a/src/Akka.Persistence.Sql.Hosting.Tests/ConfigExtensions.cs +++ b/src/Akka.Persistence.Sql.Hosting.Tests/ConfigExtensions.cs @@ -17,43 +17,36 @@ public static void AssertType(this Configuration.Config actual, Configuration.Co actual.HasPath(key).Should().BeTrue(); if (value is not null) Type.GetType(actual.GetString(key)).Should().Be(value); - actual.GetString(key).Should().Be(expected.GetString(key)); + else + actual.GetString(key).Should().Be(expected.GetString(key)); } public static void AssertString(this Configuration.Config actual, Configuration.Config expected, string key, string? value = null) { expected.HasPath(key).Should().BeTrue(); actual.HasPath(key).Should().BeTrue(); - if (value is not null) - actual.GetString(key).Should().Be(value); - actual.GetString(key).Should().Be(expected.GetString(key)); + actual.GetString(key).Should().Be(value ?? expected.GetString(key)); } public static void AssertInt(this Configuration.Config actual, Configuration.Config expected, string key, int? value = null) { expected.HasPath(key).Should().BeTrue(); actual.HasPath(key).Should().BeTrue(); - if (value is not null) - actual.GetInt(key).Should().Be(value.Value); - actual.GetInt(key).Should().Be(expected.GetInt(key)); + actual.GetInt(key).Should().Be(value ?? expected.GetInt(key)); } public static void AssertBool(this Configuration.Config actual, Configuration.Config expected, string key, bool? value = null) { expected.HasPath(key).Should().BeTrue(); actual.HasPath(key).Should().BeTrue(); - if (value is not null) - actual.GetBoolean(key).Should().Be(value.Value); - actual.GetBoolean(key).Should().Be(expected.GetBoolean(key)); + actual.GetBoolean(key).Should().Be(value ?? expected.GetBoolean(key)); } public static void AssertTimeSpan(this Configuration.Config actual, Configuration.Config expected, string key, TimeSpan? value = null) { expected.HasPath(key).Should().BeTrue(); actual.HasPath(key).Should().BeTrue(); - if (value is not null) - actual.GetTimeSpan(key).Should().Be(value.Value); - actual.GetTimeSpan(key).Should().Be(expected.GetTimeSpan(key)); + actual.GetTimeSpan(key).Should().Be(value ?? expected.GetTimeSpan(key)); } public static void AssertIsolationLevel(this Configuration.Config actual, Configuration.Config expected, string key) diff --git a/src/Akka.Persistence.Sql.Hosting.Tests/JournalSettingsSpec.cs b/src/Akka.Persistence.Sql.Hosting.Tests/JournalSettingsSpec.cs index 9794f8d3..bcecda62 100644 --- a/src/Akka.Persistence.Sql.Hosting.Tests/JournalSettingsSpec.cs +++ b/src/Akka.Persistence.Sql.Hosting.Tests/JournalSettingsSpec.cs @@ -87,7 +87,7 @@ public void DefaultOptionsTest() var actualQueryConfig = config.GetConfig(SqlPersistence.QueryConfigPath); actualQueryConfig.AssertType(defaultQueryConfig, "class", typeof(SqlReadJournalProvider)); - actualQueryConfig.AssertString(defaultQueryConfig, "write-plugin"); + actualQueryConfig.AssertString(defaultQueryConfig, "write-plugin", "akka.persistence.journal.sql"); actualQueryConfig.AssertInt(defaultQueryConfig, "max-buffer-size", 500); actualQueryConfig.AssertTimeSpan(defaultQueryConfig, "refresh-interval", 1.Seconds()); actualQueryConfig.AssertString(defaultQueryConfig, "connection-string", "a"); @@ -224,6 +224,7 @@ public void ModifiedOptionsTest() queryConfig.ConnectionString.Should().Be("a"); queryConfig.ProviderName.Should().Be("b"); + queryConfig.WritePluginId.Should().Be("akka.persistence.journal.custom"); queryConfig.DefaultSerializer.Should().Be("hyperion"); queryConfig.UseCloneConnection.Should().BeTrue(); // non-overridable queryConfig.RefreshInterval.Should().Be(5.Seconds()); diff --git a/src/Akka.Persistence.Sql.Hosting/SqlJournalOptions.cs b/src/Akka.Persistence.Sql.Hosting/SqlJournalOptions.cs index 3c9169a7..31fc429a 100644 --- a/src/Akka.Persistence.Sql.Hosting/SqlJournalOptions.cs +++ b/src/Akka.Persistence.Sql.Hosting/SqlJournalOptions.cs @@ -154,19 +154,20 @@ protected override StringBuilder Build(StringBuilder sb) base.Build(sb); - BuildQueryConfig(sb, QueryPluginId); + BuildQueryConfig(sb, QueryPluginId, PluginId); if (IsDefaultPlugin && Identifier is not "sql") - BuildQueryConfig(sb, "akka.persistence.query.journal.sql"); - + BuildQueryConfig(sb, "akka.persistence.query.journal.sql", "akka.persistence.journal.sql"); + return sb; } - private StringBuilder BuildQueryConfig(StringBuilder sb, string queryPluginId) + private StringBuilder BuildQueryConfig(StringBuilder sb, string queryPluginId, string pluginId) { sb.Append(queryPluginId).AppendLine("{"); sb.AppendLine($"connection-string = {ConnectionString.ToHocon()}"); sb.AppendLine($"provider-name = {ProviderName.ToHocon()}"); + sb.AppendLine($"write-plugin = {pluginId}"); if (DatabaseOptions is not null) sb.AppendLine($"table-mapping = {DatabaseOptions.Mapping.Name().ToHocon()}"); diff --git a/src/Akka.Persistence.Sql.Tests/Query/Sqlite/SqliteBugfix344Spec.cs b/src/Akka.Persistence.Sql.Tests/Query/Sqlite/SqliteBugfix344Spec.cs new file mode 100644 index 00000000..d9f180cf --- /dev/null +++ b/src/Akka.Persistence.Sql.Tests/Query/Sqlite/SqliteBugfix344Spec.cs @@ -0,0 +1,27 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2013-2023 .NET Foundation +// +// ----------------------------------------------------------------------- + +using Akka.Persistence.Sql.Config; +using Akka.Persistence.Sql.Tests.Common.Containers; +using Akka.Persistence.Sql.Tests.Common.Query; +using Akka.Persistence.Sql.Tests.Sqlite; +using Xunit; +using Xunit.Abstractions; +#if !DEBUG +using Akka.Persistence.Sql.Tests.Common.Internal.Xunit; +#endif + +namespace Akka.Persistence.Sql.Tests.Query.Sqlite +{ + [Collection(nameof(SqlitePersistenceSpec))] + public class SqliteBugfix344Spec : Bugfix344Spec + { + public SqliteBugfix344Spec(ITestOutputHelper output, SqliteContainer fixture) + : base(output, fixture) + { + } + } +} diff --git a/src/Akka.Persistence.Sql/Config/ReadJournalConfig.cs b/src/Akka.Persistence.Sql/Config/ReadJournalConfig.cs index 789f7bf4..b3f94b6e 100644 --- a/src/Akka.Persistence.Sql/Config/ReadJournalConfig.cs +++ b/src/Akka.Persistence.Sql/Config/ReadJournalConfig.cs @@ -16,6 +16,7 @@ public ReadJournalConfig(Configuration.Config config) { ConnectionString = config.GetString("connection-string"); ProviderName = config.GetString("provider-name"); + WritePluginId = config.GetString("write-plugin"); TableConfig = new JournalTableConfig(config); DaoConfig = new BaseByteArrayJournalDaoConfig(config); UseCloneConnection = config.GetBoolean("use-clone-connection"); @@ -31,6 +32,8 @@ public ReadJournalConfig(Configuration.Config config) WriteIsolationLevel = IsolationLevel.Unspecified; } + public string WritePluginId { get; } + public BaseByteArrayJournalDaoConfig DaoConfig { get; } public int MaxBufferSize { get; } diff --git a/src/Akka.Persistence.Sql/Query/SqlReadJournal.cs b/src/Akka.Persistence.Sql/Query/SqlReadJournal.cs index 91ce2bae..0b5f135c 100644 --- a/src/Akka.Persistence.Sql/Query/SqlReadJournal.cs +++ b/src/Akka.Persistence.Sql/Query/SqlReadJournal.cs @@ -52,15 +52,14 @@ public SqlReadJournal( ExtendedActorSystem system, Configuration.Config config) { - var writePluginId = config.GetString("write-plugin"); - _eventAdapters = Persistence.Instance.Apply(system).AdaptersFor(writePluginId); + _readJournalConfig = new ReadJournalConfig(config); + _eventAdapters = Persistence.Instance.Apply(system).AdaptersFor(_readJournalConfig.WritePluginId); // Fix for https://github.com/akkadotnet/Akka.Persistence.Sql/issues/344 - var writeJournal = Persistence.Instance.Apply(system).JournalFor(writePluginId); + var writeJournal = Persistence.Instance.Apply(system).JournalFor(_readJournalConfig.WritePluginId); // we want to block, we want to crash if the journal is not available var started = writeJournal.Ask(IsInitialized.Instance, TimeSpan.FromSeconds(5)).Result; - _readJournalConfig = new ReadJournalConfig(config); _system = system; var connFact = new AkkaPersistenceDataConnectionFactory(_readJournalConfig);