Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix PersistenceId Query and Sqlite unit tests #5715

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ protected override bool Receive(object message)
{
case Request _:
_journalRef.Tell(new SelectCurrentPersistenceIds(0, Self));
Become(Initializing);
Become(Waiting);
return true;
case Continue _:
return true;
Expand All @@ -148,7 +148,7 @@ protected override bool Receive(object message)
}
}

private bool Initializing(object message)
private bool Waiting(object message)
{
switch (message)
{
Expand All @@ -175,16 +175,12 @@ private bool Active(object message)
{
switch (message)
{
case CurrentPersistenceIds added:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CurrentPersistenceIds message will only be received in the Waiting method (previously called Initializing)

_lastOrderingOffset = added.HighestOrderingNumber;
_buffer.AddRange(added.AllPersistenceIds);
_buffer.DeliverBuffer(TotalDemand);
return true;
case Request _:
_buffer.DeliverBuffer(TotalDemand);
return true;
case Continue _:
_journalRef.Tell(new SelectCurrentPersistenceIds(_lastOrderingOffset, Self));
Become(Waiting);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With short refresh-interval, we will be spamming the back-end with sql queries, this will throttle the refresh frequency to the speed of the back-end

return true;
case Cancel _:
Context.Stop(Self);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ protected override bool ReceivePluginInternal(object message)
return true;
case SelectCurrentPersistenceIds request:
SelectAllPersistenceIdsAsync(request.Offset)
.PipeTo(request.ReplyTo, success: result => new CurrentPersistenceIds(result.Ids, request.Offset));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a major bug, if we kept sending request.Offset, then we will always retrieve ALL of persistence ids from offset 0 on all SelectCurrentPersistenceIds command. result.LastOrdering is the proper HighestOrderingNumber.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch

.PipeTo(request.ReplyTo, success: result => new CurrentPersistenceIds(result.Ids, result.LastOrdering));
return true;
case SubscribeTag subscribe:
AddTagSubscriber(Sender, subscribe.Tag);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ public class BatchingSqliteAllEventsSpec : AllEventsSpec
public static Config Config => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have been using refresh-interval in Query unit tests wrong, refresh-interval is not part of the journal settings, but the query settings

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going to need to update documentation anywhere for this? @eaba

akka.persistence.journal.sqlite {{
class = ""Akka.Persistence.Sqlite.Journal.BatchingSqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Filename=file:memdb-journal-eventsbytag-{Guid.NewGuid()}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ public class BatchingCurrentSqliteAllEventsSpec : CurrentAllEventsSpec
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
class = ""Akka.Persistence.Sqlite.Journal.BatchingSqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Filename=file:memdb-journal-eventsbytag-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ public class BatchingSqliteCurrentEventsByPersistenceIdSpec : CurrentEventsByPer
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
class = ""Akka.Persistence.Sqlite.Journal.BatchingSqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Datasource=memdb-journal-batch-currenteventsbypersistenceid-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class BatchingSqliteCurrentEventsByTagSpec : CurrentEventsByTagSpec
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
event-adapters {{
color-tagger = ""Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK""
Expand All @@ -34,7 +35,6 @@ class = ""Akka.Persistence.Sqlite.Journal.BatchingSqliteJournal, Akka.Persistenc
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Datasource=memdb-journal-batch-currenteventsbytag-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ public class BatchingSqliteCurrentPersistenceIdsSpec : CurrentPersistenceIdsSpec
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
class = ""Akka.Persistence.Sqlite.Journal.BatchingSqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Datasource=memdb-journal-batch-currentpersistenceids-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ public class BatchingSqliteEventsByPersistenceIdSpec : EventsByPersistenceIdSpec
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
class = ""Akka.Persistence.Sqlite.Journal.BatchingSqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Datasource=memdb-journal-batch-eventsbypersistenceid-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class BatchingSqliteEventsByTagSpec : EventsByTagSpec
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
event-adapters {{
color-tagger = ""Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK""
Expand All @@ -33,7 +34,6 @@ class = ""Akka.Persistence.Sqlite.Journal.BatchingSqliteJournal, Akka.Persistenc
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Datasource=memdb-journal-batch-eventsbytag-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class BatchingSqlitePersistenceIdSpec : PersistenceIdsSpec
}}
akka.persistence {{
publish-plugin-commands = on
query.journal.sql.refresh-interval = 200ms
journal {{
plugin = ""akka.persistence.journal.sqlite""
sqlite = {{
Expand All @@ -40,7 +41,6 @@ class = ""Akka.Persistence.Sqlite.Journal.SqliteJournal, Akka.Persistence.Sqlite
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""{ConnectionString("journal")}""
refresh-interval = 200ms
}}
}}
snapshot-store {{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ public class SqliteAllEventsSpec:AllEventsSpec
public static Config Config => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
class = ""Akka.Persistence.Sqlite.Journal.SqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Filename=file:memdb-journal-eventsbytag-{Guid.NewGuid()}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ public class SqliteCurrentAllEventsSpec:CurrentAllEventsSpec
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
class = ""Akka.Persistence.Sqlite.Journal.SqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Filename=file:memdb-journal-allevents-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ public class SqliteCurrentEventsByPersistenceIdSpec : CurrentEventsByPersistence
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
class = ""Akka.Persistence.Sqlite.Journal.SqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Filename=file:memdb-journal-currenteventsbypersistenceid-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class SqliteCurrentEventsByTagSpec : CurrentEventsByTagSpec
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
event-adapters {{
color-tagger = ""Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK""
Expand All @@ -35,7 +36,6 @@ class = ""Akka.Persistence.Sqlite.Journal.SqliteJournal, Akka.Persistence.Sqlite
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Filename=file:memdb-journal-currenteventsbytag-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ public class SqliteCurrentPersistenceIdsSpec : CurrentPersistenceIdsSpec
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
class = ""Akka.Persistence.Sqlite.Journal.SqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Filename=file:memdb-journal-currentpersistenceids-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ public class SqliteEventsByPersistenceIdSpec : EventsByPersistenceIdSpec
public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.sqlite {{
class = ""Akka.Persistence.Sqlite.Journal.SqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Filename=file:memdb-journal-eventsbypersistenceid-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class SqliteEventsByTagSpec : EventsByTagSpec

public static Config Config(int id) => ConfigurationFactory.ParseString($@"
akka.loglevel = INFO
akka.persistence.query.journal.sql.refresh-interval = 1s
akka.persistence.journal.plugin = ""akka.persistence.journal.sqlite""
akka.persistence.journal.sqlite {{
event-adapters {{
Expand All @@ -35,7 +36,6 @@ class = ""Akka.Persistence.Sqlite.Journal.SqliteJournal, Akka.Persistence.Sqlite
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""Filename=file:memdb-journal-eventsbytag-{id}.db;Mode=Memory;Cache=Shared""
refresh-interval = 1s
}}
akka.test.single-expect-default = 10s")
.WithFallback(SqlReadJournal.DefaultConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
//-----------------------------------------------------------------------

using System;
using System.Threading;
using Akka.Actor;
using Akka.Configuration;
using Akka.Persistence.Query;
using Akka.Persistence.Query.Sql;
using Akka.Persistence.TCK.Query;
using Akka.Streams.TestKit;
using Akka.Util.Internal;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.Sqlite.Tests.Query
Expand All @@ -32,6 +35,7 @@ public class SqlitePersistenceIdsSpec : PersistenceIdsSpec
}}
akka.persistence {{
publish-plugin-commands = on
query.journal.sql.refresh-interval = 200ms
journal {{
plugin = ""akka.persistence.journal.sqlite""
sqlite = {{
Expand All @@ -41,7 +45,6 @@ class = ""Akka.Persistence.Sqlite.Journal.SqliteJournal, Akka.Persistence.Sqlite
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""{ConnectionString("journal")}""
refresh-interval = 200ms
}}
}}
snapshot-store {{
Expand Down
21 changes: 14 additions & 7 deletions src/core/Akka.Persistence.TCK/Query/PersistenceIdsSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
Expand All @@ -16,6 +17,7 @@
using Akka.Streams.TestKit;
using Akka.TestKit;
using Akka.Util.Internal;
using FluentAssertions;
using Reactive.Streams;
using Xunit;
using Xunit.Abstractions;
Expand Down Expand Up @@ -76,18 +78,23 @@ public virtual void ReadJournal_AllPersistenceIds_should_find_new_events_after_d
var source = queries.PersistenceIds();
var probe = source.RunWith(this.SinkProbe<string>(), Materializer);

var expected = new List<string> { "h", "i", "j" };
probe.Within(TimeSpan.FromSeconds(10), () =>
{
probe.Request(1).ExpectNext();
return probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));
expected.Remove(probe.Request(1).ExpectNext()).Should().BeTrue();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is enough to test that the returned value is part of the expected values

return probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
});

Setup("j", 1);
probe.Within(TimeSpan.FromSeconds(10), () =>
{
probe.Request(5).ExpectNext();
return probe.ExpectNext();
});
probe.Within(TimeSpan.FromSeconds(10), () => probe.Request(5).ExpectNextUnordered(expected[0], expected[1]));

Setup("a1", 1);
Thread.Sleep(TimeSpan.FromSeconds(2));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is enough to trigger the bug

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer Task.Delay over Thread.Sleep but this is fine for now.

probe.ExpectNext(TimeSpan.FromSeconds(10));

Thread.Sleep(TimeSpan.FromSeconds(2));
Setup("a2", 1);
probe.ExpectNext(TimeSpan.FromSeconds(10));
}

[Fact]
Expand Down