Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix missing "write-plugin" HOCON settings #427

Merged
merged 1 commit into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions src/Akka.Persistence.Sql.Hosting.Tests/Bugfix344Spec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// -----------------------------------------------------------------------
// <copyright file="Bugfix344Spec.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Hosting;
using Akka.Persistence.Hosting;
using Akka.Persistence.Query;
using Akka.Persistence.Sql.Config;
using Akka.Persistence.Sql.Tests.Common.Containers;
using Akka.Persistence.TCK.Query;
using Akka.Streams;
using Akka.Streams.TestKit;
using Xunit;
using Xunit.Abstractions;
using Xunit.Sdk;
using Akka.Persistence.Sql.Query;
using FluentAssertions.Extensions;

namespace Akka.Persistence.Sql.Hosting.Tests
{
public class Bugfix344Spec : Akka.Hosting.TestKit.TestKit, IClassFixture<SqliteContainer>
{
private ActorMaterializer? Materializer;
private IReadJournal? ReadJournal;
private readonly SqliteContainer _fixture;

public Bugfix344Spec(ITestOutputHelper output, SqliteContainer fixture) : base(nameof(Bugfix344Spec), output)
{
_fixture = fixture;

if (!_fixture.InitializeDbAsync().Wait(10.Seconds()))
throw new Exception("Failed to clean up database in 10 seconds");
}

protected override async Task BeforeTestStart()
{
await base.BeforeTestStart();
ReadJournal = Sys.ReadJournalFor<SqlReadJournal>("akka.persistence.query.journal.custom");
Materializer = Sys.Materializer();
}

protected override void ConfigureAkka(AkkaConfigurationBuilder builder, IServiceProvider provider)
{
var journalOptions = new SqlJournalOptions(true, "custom")
{
ProviderName = _fixture.ProviderName,
ConnectionString = _fixture.ConnectionString,
TagStorageMode = TagMode.TagTable,
Adapters = new AkkaPersistenceJournalBuilder("custom", builder),
QueryRefreshInterval = 1.Seconds(),
AutoInitialize = true,
};
journalOptions.Adapters.AddWriteEventAdapter<ColorFruitTagger>("color-tagger", [typeof(string)]);

var snapshotOptions = new SqlSnapshotOptions(true, "custom")
{
ProviderName = _fixture.ProviderName,
ConnectionString = _fixture.ConnectionString,
AutoInitialize = true,
};

builder.WithSqlPersistence(journalOptions, snapshotOptions);
}

[Fact]
public async Task ReadJournal_should_initialize_tables_when_started_before_WriteJournal()
{
if (ReadJournal is not ICurrentEventsByTagQuery queries)
throw IsTypeException.ForMismatchedType(nameof(IEventsByTagQuery), ReadJournal?.GetType().Name ?? "null");

// should just not return
await EventFilter.Error().ExpectAsync(
0,
async () =>
{
var blackSrc = queries.CurrentEventsByTag("random-unused-tag", offset: NoOffset.Instance);
var probe = blackSrc.RunWith(this.SinkProbe<EventEnvelope>(), Materializer);
probe.Request(2);

// query should just gracefully exit
await probe.ExpectCompleteAsync();
});
}

}
}
19 changes: 6 additions & 13 deletions src/Akka.Persistence.Sql.Hosting.Tests/ConfigExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,36 @@ public static void AssertType(this Configuration.Config actual, Configuration.Co
actual.HasPath(key).Should().BeTrue();
if (value is not null)
Type.GetType(actual.GetString(key)).Should().Be(value);
actual.GetString(key).Should().Be(expected.GetString(key));
else
actual.GetString(key).Should().Be(expected.GetString(key));
}

public static void AssertString(this Configuration.Config actual, Configuration.Config expected, string key, string? value = null)
{
expected.HasPath(key).Should().BeTrue();
actual.HasPath(key).Should().BeTrue();
if (value is not null)
actual.GetString(key).Should().Be(value);
actual.GetString(key).Should().Be(expected.GetString(key));
actual.GetString(key).Should().Be(value ?? expected.GetString(key));
}

public static void AssertInt(this Configuration.Config actual, Configuration.Config expected, string key, int? value = null)
{
expected.HasPath(key).Should().BeTrue();
actual.HasPath(key).Should().BeTrue();
if (value is not null)
actual.GetInt(key).Should().Be(value.Value);
actual.GetInt(key).Should().Be(expected.GetInt(key));
actual.GetInt(key).Should().Be(value ?? expected.GetInt(key));
}

public static void AssertBool(this Configuration.Config actual, Configuration.Config expected, string key, bool? value = null)
{
expected.HasPath(key).Should().BeTrue();
actual.HasPath(key).Should().BeTrue();
if (value is not null)
actual.GetBoolean(key).Should().Be(value.Value);
actual.GetBoolean(key).Should().Be(expected.GetBoolean(key));
actual.GetBoolean(key).Should().Be(value ?? expected.GetBoolean(key));
}

public static void AssertTimeSpan(this Configuration.Config actual, Configuration.Config expected, string key, TimeSpan? value = null)
{
expected.HasPath(key).Should().BeTrue();
actual.HasPath(key).Should().BeTrue();
if (value is not null)
actual.GetTimeSpan(key).Should().Be(value.Value);
actual.GetTimeSpan(key).Should().Be(expected.GetTimeSpan(key));
actual.GetTimeSpan(key).Should().Be(value ?? expected.GetTimeSpan(key));
}

public static void AssertIsolationLevel(this Configuration.Config actual, Configuration.Config expected, string key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void DefaultOptionsTest()
var actualQueryConfig = config.GetConfig(SqlPersistence.QueryConfigPath);

actualQueryConfig.AssertType(defaultQueryConfig, "class", typeof(SqlReadJournalProvider));
actualQueryConfig.AssertString(defaultQueryConfig, "write-plugin");
actualQueryConfig.AssertString(defaultQueryConfig, "write-plugin", "akka.persistence.journal.sql");
actualQueryConfig.AssertInt(defaultQueryConfig, "max-buffer-size", 500);
actualQueryConfig.AssertTimeSpan(defaultQueryConfig, "refresh-interval", 1.Seconds());
actualQueryConfig.AssertString(defaultQueryConfig, "connection-string", "a");
Expand Down Expand Up @@ -224,6 +224,7 @@ public void ModifiedOptionsTest()

queryConfig.ConnectionString.Should().Be("a");
queryConfig.ProviderName.Should().Be("b");
queryConfig.WritePluginId.Should().Be("akka.persistence.journal.custom");
queryConfig.DefaultSerializer.Should().Be("hyperion");
queryConfig.UseCloneConnection.Should().BeTrue(); // non-overridable
queryConfig.RefreshInterval.Should().Be(5.Seconds());
Expand Down
9 changes: 5 additions & 4 deletions src/Akka.Persistence.Sql.Hosting/SqlJournalOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,20 @@ protected override StringBuilder Build(StringBuilder sb)

base.Build(sb);

BuildQueryConfig(sb, QueryPluginId);
BuildQueryConfig(sb, QueryPluginId, PluginId);

if (IsDefaultPlugin && Identifier is not "sql")
BuildQueryConfig(sb, "akka.persistence.query.journal.sql");

BuildQueryConfig(sb, "akka.persistence.query.journal.sql", "akka.persistence.journal.sql");
return sb;
}

private StringBuilder BuildQueryConfig(StringBuilder sb, string queryPluginId)
private StringBuilder BuildQueryConfig(StringBuilder sb, string queryPluginId, string pluginId)
{
sb.Append(queryPluginId).AppendLine("{");
sb.AppendLine($"connection-string = {ConnectionString.ToHocon()}");
sb.AppendLine($"provider-name = {ProviderName.ToHocon()}");
sb.AppendLine($"write-plugin = {pluginId}");
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved

if (DatabaseOptions is not null)
sb.AppendLine($"table-mapping = {DatabaseOptions.Mapping.Name().ToHocon()}");
Expand Down
27 changes: 27 additions & 0 deletions src/Akka.Persistence.Sql.Tests/Query/Sqlite/SqliteBugfix344Spec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// -----------------------------------------------------------------------
// <copyright file="SqliteBugfix344Spec.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Persistence.Sql.Config;
using Akka.Persistence.Sql.Tests.Common.Containers;
using Akka.Persistence.Sql.Tests.Common.Query;
using Akka.Persistence.Sql.Tests.Sqlite;
using Xunit;
using Xunit.Abstractions;
#if !DEBUG
using Akka.Persistence.Sql.Tests.Common.Internal.Xunit;
#endif

namespace Akka.Persistence.Sql.Tests.Query.Sqlite
{
[Collection(nameof(SqlitePersistenceSpec))]
public class SqliteBugfix344Spec : Bugfix344Spec<SqliteContainer>
{
public SqliteBugfix344Spec(ITestOutputHelper output, SqliteContainer fixture)
: base(output, fixture)
{
}
}
}
3 changes: 3 additions & 0 deletions src/Akka.Persistence.Sql/Config/ReadJournalConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public ReadJournalConfig(Configuration.Config config)
{
ConnectionString = config.GetString("connection-string");
ProviderName = config.GetString("provider-name");
WritePluginId = config.GetString("write-plugin");
TableConfig = new JournalTableConfig(config);
DaoConfig = new BaseByteArrayJournalDaoConfig(config);
UseCloneConnection = config.GetBoolean("use-clone-connection");
Expand All @@ -31,6 +32,8 @@ public ReadJournalConfig(Configuration.Config config)
WriteIsolationLevel = IsolationLevel.Unspecified;
}

public string WritePluginId { get; }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the missing setting as a property in ReadJournalConfig as its supposed to


public BaseByteArrayJournalDaoConfig DaoConfig { get; }

public int MaxBufferSize { get; }
Expand Down
7 changes: 3 additions & 4 deletions src/Akka.Persistence.Sql/Query/SqlReadJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,14 @@ public SqlReadJournal(
ExtendedActorSystem system,
Configuration.Config config)
{
var writePluginId = config.GetString("write-plugin");
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
_eventAdapters = Persistence.Instance.Apply(system).AdaptersFor(writePluginId);
_readJournalConfig = new ReadJournalConfig(config);
_eventAdapters = Persistence.Instance.Apply(system).AdaptersFor(_readJournalConfig.WritePluginId);

// Fix for https://github.com/akkadotnet/Akka.Persistence.Sql/issues/344
var writeJournal = Persistence.Instance.Apply(system).JournalFor(writePluginId);
var writeJournal = Persistence.Instance.Apply(system).JournalFor(_readJournalConfig.WritePluginId);
// we want to block, we want to crash if the journal is not available
var started = writeJournal.Ask<Initialized>(IsInitialized.Instance, TimeSpan.FromSeconds(5)).Result;

_readJournalConfig = new ReadJournalConfig(config);
_system = system;

var connFact = new AkkaPersistenceDataConnectionFactory(_readJournalConfig);
Expand Down