Skip to content

Commit

Permalink
Fix PersistenceId Query and Sqlite unit tests (#5715)
Browse files Browse the repository at this point in the history
* Fix PersistenceId Query and Sqlite unit tests

* Fix unit test assert
  • Loading branch information
Arkatufus authored Mar 9, 2022
1 parent cef5031 commit 2e1d4ea
Show file tree
Hide file tree
Showing 19 changed files with 37 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ protected override bool Receive(object message)
{
case Request _:
_journalRef.Tell(new SelectCurrentPersistenceIds(0, Self));
Become(Initializing);
Become(Waiting);
return true;
case Continue _:
return true;
Expand All @@ -148,7 +148,7 @@ protected override bool Receive(object message)
}
}

private bool Initializing(object message)
private bool Waiting(object message)
{
switch (message)
{
Expand All @@ -175,16 +175,12 @@ private bool Active(object message)
{
switch (message)
{
case CurrentPersistenceIds added:
_lastOrderingOffset = added.HighestOrderingNumber;
_buffer.AddRange(added.AllPersistenceIds);
_buffer.DeliverBuffer(TotalDemand);
return true;
case Request _:
_buffer.DeliverBuffer(TotalDemand);
return true;
case Continue _:
_journalRef.Tell(new SelectCurrentPersistenceIds(_lastOrderingOffset, Self));
Become(Waiting);
return true;
case Cancel _:
Context.Stop(Self);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ protected override bool ReceivePluginInternal(object message)
return true;
case SelectCurrentPersistenceIds request:
SelectAllPersistenceIdsAsync(request.Offset)
.PipeTo(request.ReplyTo, success: result => new CurrentPersistenceIds(result.Ids, request.Offset));
.PipeTo(request.ReplyTo, success: result => new CurrentPersistenceIds(result.Ids, result.LastOrdering));
return true;
case SubscribeTag subscribe:
AddTagSubscriber(Sender, subscribe.Tag);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ public class BatchingSqliteAllEventsSpec : AllEventsSpec
public static Config Config => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
class = ""Akka.Persistence.Sqlite.Journal.BatchingSqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Filename=file:memdb-journal-eventsbytag-{Guid.NewGuid()}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ public class BatchingCurrentSqliteAllEventsSpec : CurrentAllEventsSpec
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
class = ""Akka.Persistence.Sqlite.Journal.BatchingSqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Filename=file:memdb-journal-eventsbytag-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ public class BatchingSqliteCurrentEventsByPersistenceIdSpec : CurrentEventsByPer
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
class = ""Akka.Persistence.Sqlite.Journal.BatchingSqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Datasource=memdb-journal-batch-currenteventsbypersistenceid-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class BatchingSqliteCurrentEventsByTagSpec : CurrentEventsByTagSpec
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
event-adapters {{
color-tagger = ""Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK""
Expand All @@ -34,7 +35,6 @@ class = ""Akka.Persistence.Sqlite.Journal.BatchingSqliteJournal, Akka.Persistenc
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Datasource=memdb-journal-batch-currenteventsbytag-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ public class BatchingSqliteCurrentPersistenceIdsSpec : CurrentPersistenceIdsSpec
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
class = ""Akka.Persistence.Sqlite.Journal.BatchingSqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Datasource=memdb-journal-batch-currentpersistenceids-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ public class BatchingSqliteEventsByPersistenceIdSpec : EventsByPersistenceIdSpec
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
class = ""Akka.Persistence.Sqlite.Journal.BatchingSqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Datasource=memdb-journal-batch-eventsbypersistenceid-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class BatchingSqliteEventsByTagSpec : EventsByTagSpec
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
event-adapters {{
color-tagger = ""Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK""
Expand All @@ -33,7 +34,6 @@ class = ""Akka.Persistence.Sqlite.Journal.BatchingSqliteJournal, Akka.Persistenc
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Datasource=memdb-journal-batch-eventsbytag-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class BatchingSqlitePersistenceIdSpec : PersistenceIdsSpec
}}
akka.persistence {{
publish-plugin-commands = on
query.journal.sql.refresh-interval = 200ms
journal {{
plugin = ""akka.persistence.journal.sqlite""
sqlite = {{
Expand All @@ -40,7 +41,6 @@ class = ""Akka.Persistence.Sqlite.Journal.SqliteJournal, Akka.Persistence.Sqlite
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""{ConnectionString("journal")}""
refresh-interval = 200ms
}}
}}
snapshot-store {{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ public class SqliteAllEventsSpec:AllEventsSpec
public static Config Config => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
class = ""Akka.Persistence.Sqlite.Journal.SqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Filename=file:memdb-journal-eventsbytag-{Guid.NewGuid()}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ public class SqliteCurrentAllEventsSpec:CurrentAllEventsSpec
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
class = ""Akka.Persistence.Sqlite.Journal.SqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Filename=file:memdb-journal-allevents-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ public class SqliteCurrentEventsByPersistenceIdSpec : CurrentEventsByPersistence
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
class = ""Akka.Persistence.Sqlite.Journal.SqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Filename=file:memdb-journal-currenteventsbypersistenceid-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class SqliteCurrentEventsByTagSpec : CurrentEventsByTagSpec
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
event-adapters {{
color-tagger = ""Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK""
Expand All @@ -35,7 +36,6 @@ class = ""Akka.Persistence.Sqlite.Journal.SqliteJournal, Akka.Persistence.Sqlite
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Filename=file:memdb-journal-currenteventsbytag-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ public class SqliteCurrentPersistenceIdsSpec : CurrentPersistenceIdsSpec
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
class = ""Akka.Persistence.Sqlite.Journal.SqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Filename=file:memdb-journal-currentpersistenceids-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ public class SqliteEventsByPersistenceIdSpec : EventsByPersistenceIdSpec
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
class = ""Akka.Persistence.Sqlite.Journal.SqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Filename=file:memdb-journal-eventsbypersistenceid-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class SqliteEventsByTagSpec : EventsByTagSpec

public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.journal.sqlite {{
event-adapters {{
Expand All @@ -35,7 +36,6 @@ class = ""Akka.Persistence.Sqlite.Journal.SqliteJournal, Akka.Persistence.Sqlite
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Filename=file:memdb-journal-eventsbytag-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
//-----------------------------------------------------------------------

using System;
using System.Threading;
using Akka.Actor;
using Akka.Configuration;
using Akka.Persistence.Query;
using Akka.Persistence.Query.Sql;
using Akka.Persistence.TCK.Query;
using Akka.Streams.TestKit;
using Akka.Util.Internal;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.Sqlite.Tests.Query
Expand All @@ -32,6 +35,7 @@ public class SqlitePersistenceIdsSpec : PersistenceIdsSpec
}}
akka.persistence {{
publish-plugin-commands = on
query.journal.sql.refresh-interval = 200ms
journal {{
plugin = ""akka.persistence.journal.sqlite""
sqlite = {{
Expand All @@ -41,7 +45,6 @@ class = ""Akka.Persistence.Sqlite.Journal.SqliteJournal, Akka.Persistence.Sqlite
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""{ConnectionString("journal")}""
refresh-interval = 200ms
}}
}}
snapshot-store {{
Expand Down
21 changes: 14 additions & 7 deletions src/core/Akka.Persistence.TCK/Query/PersistenceIdsSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
Expand All @@ -16,6 +17,7 @@
using Akka.Streams.TestKit;
using Akka.TestKit;
using Akka.Util.Internal;
using FluentAssertions;
using Reactive.Streams;
using Xunit;
using Xunit.Abstractions;
Expand Down Expand Up @@ -76,18 +78,23 @@ public virtual void ReadJournal_AllPersistenceIds_should_find_new_events_after_d
var source = queries.PersistenceIds();
var probe = source.RunWith(this.SinkProbe<string>(), Materializer);

var expected = new List<string> { "h", "i", "j" };
probe.Within(TimeSpan.FromSeconds(10), () =>
{
probe.Request(1).ExpectNext();
return probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
expected.Remove(probe.Request(1).ExpectNext()).Should().BeTrue();
return probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
});

Setup("j", 1);
probe.Within(TimeSpan.FromSeconds(10), () =>
{
probe.Request(5).ExpectNext();
return probe.ExpectNext();
});
probe.Within(TimeSpan.FromSeconds(10), () => probe.Request(5).ExpectNextUnordered(expected[0], expected[1]));

Setup("a1", 1);
Thread.Sleep(TimeSpan.FromSeconds(2));
probe.ExpectNext(TimeSpan.FromSeconds(10));

Thread.Sleep(TimeSpan.FromSeconds(2));
Setup("a2", 1);
probe.ExpectNext(TimeSpan.FromSeconds(10));
}

[Fact]
Expand Down

0 comments on commit 2e1d4ea

Please sign in to comment.