-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Re-implement PersistenceIds persitence query to match scala implementation behavior #4531
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't finish the entire review, but wanted to submit what I have so far
@@ -141,7 +141,7 @@ internal sealed class LiveAllEventsPublisher : AbstractAllEventsPublisher | |||
public LiveAllEventsPublisher(long fromOffset, TimeSpan refreshInterval, int maxBufferSize, string writeJournalPluginId) | |||
: base(fromOffset, maxBufferSize, writeJournalPluginId) | |||
{ | |||
_tickCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(refreshInterval, refreshInterval, Self, EventsByTagPublisher.Continue.Instance, Self); | |||
_tickCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(refreshInterval, refreshInterval, Self, AllEventsPublisher.Continue.Instance, Self); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good fix
src/contrib/persistence/Akka.Persistence.Query.Sql/AllPersistenceIdsPublisher.cs
Show resolved
Hide resolved
|
||
return Source.FromPublisher(_persistenceIdsPublisher) | ||
.MapMaterializedValue(_ => NotUsed.Instance) | ||
.Named("AllPersistenceIds") as Source<string, NotUsed>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just do a direct cast here rather than an as
- that way we get a hard failure instead of passing null
back to the consumer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cast was not necessary anyway, I think it's there as a readability helper. I'll remove it.
.Named("AllPersistenceIds") as Source<string, NotUsed>; | ||
public Source<string, NotUsed> PersistenceIds() | ||
{ | ||
if (_persistenceIdsPublisher is null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call might need to be synchronized - we want all of the consumer of the PersistentId
s query to receive the same instance, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you check and see if my implementation looks good?
|
||
private void PersistenceIdsShutdownCallback() | ||
{ | ||
_persistenceIdsPublisher = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above, RE: synchronization
@@ -326,7 +331,8 @@ protected AbstractQueryExecutor(QueryConfiguration configuration, Akka.Serializa | |||
|
|||
AllPersistenceIdsSql = $@" | |||
SELECT DISTINCT e.{Configuration.PersistenceIdColumnName} as PersistenceId | |||
FROM {Configuration.FullJournalTableName} e;"; | |||
FROM {Configuration.FullJournalTableName} e | |||
WHERE e.{Configuration.OrderingColumnName} > @Ordering"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So to get all persistent Ids we query from the journal? What happens in the event that a persistent Id exists but only has a snapshot currently? This happens with AtLeastOnceDelivery
actors frequently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Metadata
table will always store the full set of persistent Ids - that's where we retrieve the maximum sequence number from for each entity, and I don't believe we allow any persistent entities to have their information deleted from the metadata table either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I didn't know that. So I need to do a join of the same query against the Journal
and Metadata
table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just query Metadata
directly - no need to touch the journal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation will be different for each plugin - we should add a TCK method to check to see that that PersistenceIds query can return a PersistenceId for an entity that saves only snapshots. Could you add that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, reference removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, added test that checks for PersistenceIds from both snapshotted and journaled actors.
@@ -3016,6 +3024,17 @@ namespace Akka.Streams.Implementation | |||
void WaitForUpstream(int waitForUpstream); | |||
} | |||
public interface ISpecViolation { } | |||
public interface IStreamBuffer<T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you come up with this or is this a port from the JVM?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I came up with it on my own, I have to have an interface to base my custom buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Want to make this internal
? Any reason it needs to be public
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'll move it to internal, user should never touch that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, moved to internal.
@@ -10,6 +10,7 @@ | |||
</PropertyGroup> | |||
|
|||
<ItemGroup> | |||
<ProjectReference Include="..\..\contrib\persistence\Akka.Persistence.Query.Sql\Akka.Persistence.Query.Sql.csproj" /> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does the Query.Sql library need to be included inside the TCK for all persistence plugins generally, including non-SQL ones?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to access some of its internal class in the test so I can write a test to check that the buffer is actually released when it ran out of subscribers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TCK should work broadly, without any connection to a specific plugin, since the TCK is what's used to validate all plugins. We'll need to clean that up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, I'll move it to Query.Sql.Testkit instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, moved to PersistenceIdsSpec instead
…t for snapshot and journal case.
…to Fix_PersistenceIds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more changes needed to pagination for PersistenceIds
FROM ( | ||
SELECT DISTINCT e.{Configuration.PersistenceIdColumnName} as Id | ||
FROM {Configuration.FullJournalTableName} e | ||
WHERE e.{Configuration.SequenceNrColumnName} > @SequenceNr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should try to add pagination support for this query - the sequence number is unique to each entity and thus can't be used for ordering. I'd just do a full scan of the index for the time being since that's a covered query.
If we start getting complaints about non-performance, we can look at adding a "date added" timestamp that indicates the age when this persistent Id was first used and use that as the Ordering
for a query.
ROWID won't work for SQL Server and other RDBMS because it's not consistent across multiple queries on the same table: https://stackoverflow.com/a/58916093/377476
/// <returns>TBD</returns> | ||
Task<ImmutableArray<string>> SelectAllPersistenceIdsAsync(DbConnection connection, CancellationToken cancellationToken); | ||
Task<ImmutableArray<string>> SelectAllPersistenceIdsAsync(DbConnection connection, CancellationToken cancellationToken, long offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd skip pagination
{ | ||
get | ||
{ | ||
_lock.EnterReadLock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stick with just a simple lock
keyword here - no need for a ReaderWriterLock
. This query likely won't be subject to large amounts of contention - better to go with something simple.
@@ -77,6 +88,36 @@ public virtual void ReadJournal_AllPersistenceIds_should_find_new_events_after_d | |||
}); | |||
} | |||
|
|||
[Fact] | |||
public virtual void ReadJournal_AllPersistenceIds_should_find_events_on_both_journal_and_snapshot_store() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is excellent
@@ -72,27 +199,27 @@ public interface ICursor | |||
/// </summary> | |||
/// <typeparam name="T">TBD</typeparam> | |||
[InternalApi] | |||
public class ResizableMultiReaderRingBuffer<T> | |||
public class ResizableMultiReaderRingBuffer<T> : IStreamBuffer<T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
if (!_buffer.Value.Write(value)) | ||
throw new IllegalStateException("Output buffer overflow"); | ||
if (Dispatch(_subscriptions)) | ||
if (_buffer.Value.Length > oldBufferLength && Dispatch(_subscriptions)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this work exactly? Wouldn't we just want to pull so long as there's demand?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, that's not supposed to be there. I'm fixing it right now.
// http://graphics.stanford.edu/~seander/bithacks.html | ||
namespace Akka.Streams.Util | ||
{ | ||
// TODO: replace this with the official System.Numerics.BitOperations when we move on to .NET Core 3.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call
And
Both failed in an earlier run in this PR - any idea why? |
No idea, I'd have to look more into that |
Let's check it out so we don't have any more racy tests in the suite - |
@@ -23,13 +21,13 @@ class = ""Akka.Persistence.Sqlite.Journal.BatchingSqliteJournal, Akka.Persistenc | |||
table-name = event_journal | |||
metadata-table-name = journal_metadata | |||
auto-initialize = on | |||
connection-string = ""Filename=file:memdb-journal-eventsbytag-{id}.db;Mode=Memory;Cache=Shared"" | |||
connection-string = ""Filename=file:memdb-journal-eventsbytag-{Guid.NewGuid()}.db;Mode=Memory;Cache=Shared"" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea
@akkadotnet/core - should we move the PostgreSql and MySQL plugins out of AkkaNetContrib and into the main organization? |
Would be nice to see them as first class providers :) |
Related to #4524
The current implementation PersistenceIds query does not pull its updates from the database, instead, it relies on an internal cache that only caches messages that passes the
ActorSystem
locally. This became a problem on a multi-ActorSystem
system, as it does not capture the state of the whole system.Changes:
FanoutPublisher
.FanoutPublisher
buffer/cache is expanded to handle long records size.FanoutPublisher
buffer/cache that can handle distinct and unbounded records cache.FanoutPublisher
is dynamically created and destroyed based on subscriber count.