Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-implement PersistenceIds persitence query to match scala implementation behavior #4531

Merged
merged 30 commits into from
Aug 20, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3a6af76
Rebase to new dev
Arkatufus Jul 29, 2020
ef43856
Re-implement PersistenceIds Query
Arkatufus Jul 29, 2020
a7e2e5f
Fix copy-paste error
Arkatufus Jul 21, 2020
7796f2f
Make sure APIs are as backward compatible as possible, update API app…
Arkatufus Jul 29, 2020
ea97555
Merge branch 'dev' into Fix_PersistenceIds
Arkatufus Jul 30, 2020
f98a9d7
Add unit test for publisher (cache) deallocation.
Arkatufus Jul 31, 2020
edb8e52
Move onTerminate callback from AllPersistenceIdsPublisher to FanoutPr…
Arkatufus Jul 31, 2020
238fca8
Update API approver list
Arkatufus Jul 31, 2020
f8c8460
Merge branch 'dev' into Fix_PersistenceIds
Arkatufus Aug 5, 2020
1d8f659
Merge branch 'dev' into Fix_PersistenceIds
Arkatufus Aug 7, 2020
738f855
Remove redundant `as` cast
Arkatufus Aug 17, 2020
1b79bf5
Remove reference to Akka.Persistence.Query.Sql from Akka.Persistence.TCK
Arkatufus Aug 17, 2020
1352b0e
Add concurrency locking on the internal persistence ids publisher.
Arkatufus Aug 17, 2020
1ec20e4
Make PersistenceIds to query both Journal and Metadata table. Add tes…
Arkatufus Aug 18, 2020
e983d47
Change IStreamBuffer access to internal.
Arkatufus Aug 18, 2020
a7c5f81
Merge branch 'dev' into Fix_PersistenceIds
Arkatufus Aug 18, 2020
f6f1b60
Update API approver list
Arkatufus Aug 18, 2020
9e70477
Merge branch 'Fix_PersistenceIds' of github.com:Arkatufus/akka.net in…
Arkatufus Aug 18, 2020
fabeaa1
Go back to using ordeting column to filter the journal table for perf…
Arkatufus Aug 18, 2020
386d6fc
Change synchronization to a simpler lock
Arkatufus Aug 18, 2020
e7d174a
Change test to reflect changes in synchonization
Arkatufus Aug 18, 2020
6982944
Update persistence ids SQL in BatchingSqlJournal
Arkatufus Aug 18, 2020
5a08ec5
Change how SubscribeManager checks for available data.
Arkatufus Aug 19, 2020
8df7e67
Merge branch 'dev' into Fix_PersistenceIds
Arkatufus Aug 19, 2020
4eb24e2
Merge branch 'dev' into Fix_PersistenceIds
Aaronontheweb Aug 19, 2020
e5f7d98
Change confusing `Count()` to `AvailableData` property
Arkatufus Aug 19, 2020
8954464
Update API approver list
Arkatufus Aug 19, 2020
c20ddb3
Make sure that tests are never run in parallel
Arkatufus Aug 20, 2020
56f4f47
Prune excessive test
Arkatufus Aug 20, 2020
d75c717
Append a GUID on the db filename to make doubly sure that db never ge…
Arkatufus Aug 20, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ internal sealed class LiveAllEventsPublisher : AbstractAllEventsPublisher
public LiveAllEventsPublisher(long fromOffset, TimeSpan refreshInterval, int maxBufferSize, string writeJournalPluginId)
: base(fromOffset, maxBufferSize, writeJournalPluginId)
{
_tickCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(refreshInterval, refreshInterval, Self, EventsByTagPublisher.Continue.Instance, Self);
_tickCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(refreshInterval, refreshInterval, Self, AllEventsPublisher.Continue.Instance, Self);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good fix

}

protected override long ToOffset => long.MaxValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,64 +5,193 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using Akka.Actor;
using Akka.Persistence.Sql.Common.Journal;
using Akka.Streams.Actors;

namespace Akka.Persistence.Query.Sql
{
internal sealed class AllPersistenceIdsPublisher : ActorPublisher<string>
internal sealed class CurrentPersistenceIdsPublisher : ActorPublisher<string>, IWithUnboundedStash
{
public static Props Props(bool liveQuery, string writeJournalPluginId)
public static Props Props(string writeJournalPluginId)
{
return Actor.Props.Create(() => new AllPersistenceIdsPublisher(liveQuery, writeJournalPluginId));
return Actor.Props.Create(() => new CurrentPersistenceIdsPublisher(writeJournalPluginId));
}

private readonly bool _liveQuery;
private readonly IActorRef _journalRef;

private readonly DeliveryBuffer<string> _buffer;

public AllPersistenceIdsPublisher(bool liveQuery, string writeJournalPluginId)
public IStash Stash { get; set; }

public CurrentPersistenceIdsPublisher(string writeJournalPluginId)
{
_liveQuery = liveQuery;
_buffer = new DeliveryBuffer<string>(OnNext);
_journalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId);
}

protected override bool Receive(object message) => message.Match()
.With<Request>(_ =>
protected override bool Receive(object message)
{
switch (message)
{
_journalRef.Tell(SubscribeAllPersistenceIds.Instance);
Become(Active);
})
.With<Cancel>(_ => Context.Stop(Self))
.WasHandled;

private bool Active(object message) => message.Match()
.With<CurrentPersistenceIds>(current =>
case Request _:
_journalRef.Tell(new SelectCurrentPersistenceIds(0, Self));
Become(Initializing);
return true;
case Cancel _:
Context.Stop(Self);
return true;
default:
return false;
}
}

private bool Initializing(object message)
{
switch (message)
{
_buffer.AddRange(current.AllPersistenceIds);
_buffer.DeliverBuffer(TotalDemand);
case CurrentPersistenceIds current:
_buffer.AddRange(current.AllPersistenceIds);
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved
_buffer.DeliverBuffer(TotalDemand);

if (!_liveQuery && _buffer.IsEmpty)
OnCompleteThenStop();
})
.With<PersistenceIdAdded>(added =>
if (_buffer.IsEmpty)
{
OnCompleteThenStop();
return true;
}

Become(Active);
Stash.UnstashAll();
return true;
case Cancel _:
Context.Stop(Self);
return true;
default:
Stash.Stash();
return true;
}
}

private bool Active(object message)
{
switch (message)
{
if (_liveQuery)
{
_buffer.Add(added.PersistenceId);
case Request _:
_buffer.DeliverBuffer(TotalDemand);
}
})
.With<Request>(_ =>
if (_buffer.IsEmpty)
OnCompleteThenStop();
return true;
case Cancel _:
Context.Stop(Self);
return true;
default:
return false;
}
}
}

internal sealed class LivePersistenceIdsPublisher : ActorPublisher<string>, IWithUnboundedStash
{
private class Continue
{
public static readonly Continue Instance = new Continue();

private Continue() { }
}

public static Props Props(TimeSpan refreshInterval, string writeJournalPluginId)
{
return Actor.Props.Create(() => new LivePersistenceIdsPublisher(refreshInterval, writeJournalPluginId));
}

private long _lastOrderingOffset;
private readonly ICancelable _tickCancelable;
private readonly IActorRef _journalRef;
private readonly DeliveryBuffer<string> _buffer;

public IStash Stash { get; set; }

public LivePersistenceIdsPublisher(TimeSpan refreshInterval, string writeJournalPluginId)
{
_tickCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
refreshInterval,
refreshInterval,
Self,
Continue.Instance,
Self);
_buffer = new DeliveryBuffer<string>(OnNext);
_journalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId);
}

protected override void PostStop()
{
_tickCancelable.Cancel();
base.PostStop();
}

protected override bool Receive(object message)
{
switch (message)
{
_buffer.DeliverBuffer(TotalDemand);
if (!_liveQuery && _buffer.IsEmpty)
OnCompleteThenStop();
})
.With<Cancel>(_ => Context.Stop(Self))
.WasHandled;
case Request _:
_journalRef.Tell(new SelectCurrentPersistenceIds(0, Self));
Become(Initializing);
return true;
case Continue _:
return true;
case Cancel _:
Context.Stop(Self);
return true;
default:
return false;
}
}

private bool Initializing(object message)
{
switch (message)
{
case CurrentPersistenceIds current:
_lastOrderingOffset = current.HighestOrderingNumber;
_buffer.AddRange(current.AllPersistenceIds);
_buffer.DeliverBuffer(TotalDemand);

Become(Active);
Stash.UnstashAll();
return true;
case Continue _:
return true;
case Cancel _:
Context.Stop(Self);
return true;
default:
Stash.Stash();
return true;
}
}

private bool Active(object message)
{
switch (message)
{
case CurrentPersistenceIds added:
_lastOrderingOffset = added.HighestOrderingNumber;
_buffer.AddRange(added.AllPersistenceIds);
_buffer.DeliverBuffer(TotalDemand);
return true;
case Request _:
_buffer.DeliverBuffer(TotalDemand);
return true;
case Continue _:
_journalRef.Tell(new SelectCurrentPersistenceIds(_lastOrderingOffset, Self));
return true;
case Cancel _:
Context.Stop(Self);
return true;
default:
return false;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Akka.Configuration;
using Akka.Persistence.Journal;
using Akka.Streams.Dsl;
using Akka.Streams;
using Akka.Util.Internal;

namespace Akka.Persistence.Query.Sql
Expand Down Expand Up @@ -40,12 +41,16 @@ public static Config DefaultConfiguration()
private readonly TimeSpan _refreshInterval;
private readonly string _writeJournalPluginId;
private readonly int _maxBufferSize;
private readonly ExtendedActorSystem _system;

private IPublisher<string> _persistenceIdsPublisher = null;

public SqlReadJournal(ExtendedActorSystem system, Config config)
{
_refreshInterval = config.GetTimeSpan("refresh-interval", null);
_writeJournalPluginId = config.GetString("write-plugin", null);
_maxBufferSize = config.GetInt("max-buffer-size", 0);
_system = system;
}

/// <summary>
Expand All @@ -68,18 +73,36 @@ public SqlReadJournal(ExtendedActorSystem system, Config config)
/// backend journal.
/// </para>
/// </summary>
public Source<string, NotUsed> PersistenceIds() =>
Source.ActorPublisher<string>(AllPersistenceIdsPublisher.Props(true, _writeJournalPluginId))
.MapMaterializedValue(_ => NotUsed.Instance)
.Named("AllPersistenceIds") as Source<string, NotUsed>;
public Source<string, NotUsed> PersistenceIds()
{
if (_persistenceIdsPublisher is null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call might need to be synchronized - we want all of the consumer of the PersistentIds query to receive the same instance, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check and see if my implementation looks good?

{
var graph =
Source.ActorPublisher<string>(
LivePersistenceIdsPublisher.Props(
_refreshInterval,
_writeJournalPluginId))
.ToMaterialized(Sink.DistinctRetainingFanOutPublisher<string>(PersistenceIdsShutdownCallback), Keep.Right);
_persistenceIdsPublisher = graph.Run(_system.Materializer());
}

return Source.FromPublisher(_persistenceIdsPublisher)
.MapMaterializedValue(_ => NotUsed.Instance)
.Named("AllPersistenceIds") as Source<string, NotUsed>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just do a direct cast here rather than an as - that way we get a hard failure instead of passing null back to the consumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cast was not necessary anyway, I think it's there as a readability helper. I'll remove it.

}

private void PersistenceIdsShutdownCallback()
{
_persistenceIdsPublisher = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above, RE: synchronization

}

/// <summary>
/// Same type of query as <see cref="PersistenceIds"/> but the stream
/// is completed immediately when it reaches the end of the "result set". Persistent
/// actors that are created after the query is completed are not included in the stream.
/// </summary>
public Source<string, NotUsed> CurrentPersistenceIds() =>
Source.ActorPublisher<string>(AllPersistenceIdsPublisher.Props(false, _writeJournalPluginId))
public Source<string, NotUsed> CurrentPersistenceIds()
=> Source.ActorPublisher<string>(CurrentPersistenceIdsPublisher.Props(_writeJournalPluginId))
.MapMaterializedValue(_ => NotUsed.Instance)
.Named("CurrentPersistenceIds") as Source<string, NotUsed>;

Expand Down
Loading