Skip to content

Commit

Permalink
Force write journal to initialize when read journal is started (#423)
Browse files Browse the repository at this point in the history
* Force write journal to initialize when read journal is started

close #344

* fix skip-on-windows

* re-enable startup check

* expect completion only

* added fix back

---------

Co-authored-by: Gregorius Soedharmo <arkatufus@yahoo.com>
  • Loading branch information
Aaronontheweb and Arkatufus authored Jul 30, 2024
1 parent e0b919b commit 8468332
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 1 deletion.
92 changes: 92 additions & 0 deletions src/Akka.Persistence.Sql.Tests.Common/Query/Bugfix344Spec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// -----------------------------------------------------------------------
// <copyright file="Bugfix344Spec.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Configuration;
using Akka.Persistence.Query;
using Akka.Persistence.Sql.Config;
using Akka.Persistence.Sql.Query;
using Akka.Persistence.Sql.Tests.Common.Containers;
using Akka.Streams;
using Akka.Streams.TestKit;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;
using Xunit.Sdk;

namespace Akka.Persistence.Sql.Tests.Common.Query
{
public abstract class Bugfix344Spec<T> : Akka.TestKit.Xunit2.TestKit, IAsyncLifetime where T : ITestContainer
{
protected Bugfix344Spec(ITestOutputHelper output, T fixture) : base(config:Config(TagMode.TagTable, fixture), output:output)
{
ReadJournal = Sys.ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier);
Materializer = Sys.Materializer();
}

protected ActorMaterializer Materializer { get; }
protected IReadJournal? ReadJournal { get; set; }

public Task InitializeAsync()
=> Task.CompletedTask;

public Task DisposeAsync()
=> Task.CompletedTask;

[Fact]
public async Task ReadJournal_should_initialize_tables_when_started_before_WriteJournal()
{
if (ReadJournal is not ICurrentEventsByTagQuery queries)
throw IsTypeException.ForMismatchedType(nameof(IEventsByTagQuery), ReadJournal?.GetType().Name ?? "null");

// should just not return
await EventFilter.Error().ExpectAsync(
0,
async () =>
{
var blackSrc = queries.CurrentEventsByTag("random-unused-tag", offset: NoOffset.Instance);
var probe = blackSrc.RunWith(this.SinkProbe<EventEnvelope>(), Materializer);
probe.Request(2);

// query should just gracefully exit
await probe.ExpectCompleteAsync();
});
}

private static Configuration.Config Config(TagMode tagMode, T fixture)
{
if (!fixture.InitializeDbAsync().Wait(10.Seconds()))
throw new Exception("Failed to clean up database in 10 seconds");

return ConfigurationFactory.ParseString(
$$"""
akka.loglevel = INFO
akka.persistence.journal.plugin = "akka.persistence.journal.sql"
akka.persistence.journal.sql {
event-adapters {
color-tagger = "Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK"
}
event-adapter-bindings = {
"System.String" = color-tagger
}
provider-name = "{{fixture.ProviderName}}"
tag-write-mode = "{{tagMode}}"
connection-string = "{{fixture.ConnectionString}}"
}
akka.persistence.query.journal.sql {
provider-name = "{{fixture.ProviderName}}"
connection-string = "{{fixture.ConnectionString}}"
tag-read-mode = "{{tagMode}}"
refresh-interval = 1s
}
akka.test.single-expect-default = 10s
""")
.WithFallback(SqlPersistence.DefaultConfiguration);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// -----------------------------------------------------------------------
// <copyright file="PostgreSqlBugfix344Spec.cs" company="Akka.NET Project">
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Persistence.Sql.Tests.Common.Containers;
using Akka.Persistence.Sql.Tests.Common.Query;
using Xunit;
using Xunit.Abstractions;
#if !DEBUG
using Akka.Persistence.Sql.Tests.Common.Internal.Xunit;
#endif

namespace Akka.Persistence.Sql.Tests.Query.PostgreSql
{
/// <summary>
/// Need our own collection, to ensure that the database tables haven't been initialized yet
/// </summary>
[CollectionDefinition(nameof(PostgreSqlBugfix344Fixture), DisableParallelization = true)]
public sealed class PostgreSqlBugfix344Fixture : ICollectionFixture<PostgreSqlContainer> { }

#if !DEBUG
[SkipWindows]
#endif
[Collection(nameof(PostgreSqlBugfix344Fixture))]
public class PostgreSqlBugfix344Spec : Bugfix344Spec<PostgreSqlContainer>
{
public PostgreSqlBugfix344Spec(ITestOutputHelper output, PostgreSqlContainer fixture) : base(output, fixture)
{

}
}
}
7 changes: 6 additions & 1 deletion src/Akka.Persistence.Sql/Query/SqlReadJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ public SqlReadJournal(
{
var writePluginId = config.GetString("write-plugin");
_eventAdapters = Persistence.Instance.Apply(system).AdaptersFor(writePluginId);


// Fix for https://github.com/akkadotnet/Akka.Persistence.Sql/issues/344
var writeJournal = Persistence.Instance.Apply(system).JournalFor(writePluginId);
// we want to block, we want to crash if the journal is not available
var started = writeJournal.Ask<Initialized>(IsInitialized.Instance, TimeSpan.FromSeconds(5)).Result;

_readJournalConfig = new ReadJournalConfig(config);
_system = system;

Expand Down

0 comments on commit 8468332

Please sign in to comment.