Skip to content

Commit

Permalink
Re-implement PersistenceIds persitence query to match scala implement…
Browse files Browse the repository at this point in the history
…ation behavior (#4531)

* Rebase to new dev

* Re-implement PersistenceIds Query

* Fix copy-paste error

* Make sure APIs are as backward compatible as possible, update API approver list.

* Add unit test for publisher (cache) deallocation.

* Move onTerminate callback from AllPersistenceIdsPublisher to FanoutProcessorImpl

* Update API approver list

* Remove redundant `as` cast

* Remove reference to Akka.Persistence.Query.Sql from Akka.Persistence.TCK

* Add concurrency locking on the internal persistence ids publisher.

* Make PersistenceIds to query both Journal and Metadata table. Add test for snapshot and journal case.

* Change IStreamBuffer access to internal.

* Update API approver list

* Go back to using ordeting column to filter the journal table for performance reasons

* Change synchronization to a simpler lock

* Change test to reflect changes in synchonization

* Update persistence ids SQL in BatchingSqlJournal

* Change how SubscribeManager checks for available data.

* Change confusing `Count()` to `AvailableData` property

* Update API approver list

* Make sure that tests are never run in parallel

* Prune excessive test

* Append a GUID on the db filename to make doubly sure that db never get used twice

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Arkatufus and Aaronontheweb authored Aug 20, 2020
1 parent 6ebceff commit d612c89
Show file tree
Hide file tree
Showing 31 changed files with 1,571 additions and 496 deletions.
1 change: 1 addition & 0 deletions src/Akka.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002ECSharpPlaceAttributeOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateThisQualifierSettings/@EntryIndexedValue">True</s:Boolean>

</wpf:ResourceDictionary>
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ internal sealed class LiveAllEventsPublisher : AbstractAllEventsPublisher
public LiveAllEventsPublisher(long fromOffset, TimeSpan refreshInterval, int maxBufferSize, string writeJournalPluginId)
: base(fromOffset, maxBufferSize, writeJournalPluginId)
{
_tickCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(refreshInterval, refreshInterval, Self, EventsByTagPublisher.Continue.Instance, Self);
_tickCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(refreshInterval, refreshInterval, Self, AllEventsPublisher.Continue.Instance, Self);
}

protected override long ToOffset => long.MaxValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,64 +5,193 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using Akka.Actor;
using Akka.Persistence.Sql.Common.Journal;
using Akka.Streams.Actors;

namespace Akka.Persistence.Query.Sql
{
internal sealed class AllPersistenceIdsPublisher : ActorPublisher<string>
internal sealed class CurrentPersistenceIdsPublisher : ActorPublisher<string>, IWithUnboundedStash
{
public static Props Props(bool liveQuery, string writeJournalPluginId)
public static Props Props(string writeJournalPluginId)
{
return Actor.Props.Create(() => new AllPersistenceIdsPublisher(liveQuery, writeJournalPluginId));
return Actor.Props.Create(() => new CurrentPersistenceIdsPublisher(writeJournalPluginId));
}

private readonly bool _liveQuery;
private readonly IActorRef _journalRef;

private readonly DeliveryBuffer<string> _buffer;

public AllPersistenceIdsPublisher(bool liveQuery, string writeJournalPluginId)
public IStash Stash { get; set; }

public CurrentPersistenceIdsPublisher(string writeJournalPluginId)
{
_liveQuery = liveQuery;
_buffer = new DeliveryBuffer<string>(OnNext);
_journalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId);
}

protected override bool Receive(object message) => message.Match()
.With<Request>(_ =>
protected override bool Receive(object message)
{
switch (message)
{
_journalRef.Tell(SubscribeAllPersistenceIds.Instance);
Become(Active);
})
.With<Cancel>(_ => Context.Stop(Self))
.WasHandled;

private bool Active(object message) => message.Match()
.With<CurrentPersistenceIds>(current =>
case Request _:
_journalRef.Tell(new SelectCurrentPersistenceIds(0, Self));
Become(Initializing);
return true;
case Cancel _:
Context.Stop(Self);
return true;
default:
return false;
}
}

private bool Initializing(object message)
{
switch (message)
{
_buffer.AddRange(current.AllPersistenceIds);
_buffer.DeliverBuffer(TotalDemand);
case CurrentPersistenceIds current:
_buffer.AddRange(current.AllPersistenceIds);
_buffer.DeliverBuffer(TotalDemand);

if (!_liveQuery && _buffer.IsEmpty)
OnCompleteThenStop();
})
.With<PersistenceIdAdded>(added =>
if (_buffer.IsEmpty)
{
OnCompleteThenStop();
return true;
}

Become(Active);
Stash.UnstashAll();
return true;
case Cancel _:
Context.Stop(Self);
return true;
default:
Stash.Stash();
return true;
}
}

private bool Active(object message)
{
switch (message)
{
if (_liveQuery)
{
_buffer.Add(added.PersistenceId);
case Request _:
_buffer.DeliverBuffer(TotalDemand);
}
})
.With<Request>(_ =>
if (_buffer.IsEmpty)
OnCompleteThenStop();
return true;
case Cancel _:
Context.Stop(Self);
return true;
default:
return false;
}
}
}

internal sealed class LivePersistenceIdsPublisher : ActorPublisher<string>, IWithUnboundedStash
{
private class Continue
{
public static readonly Continue Instance = new Continue();

private Continue() { }
}

public static Props Props(TimeSpan refreshInterval, string writeJournalPluginId)
{
return Actor.Props.Create(() => new LivePersistenceIdsPublisher(refreshInterval, writeJournalPluginId));
}

private long _lastOrderingOffset;
private readonly ICancelable _tickCancelable;
private readonly IActorRef _journalRef;
private readonly DeliveryBuffer<string> _buffer;

public IStash Stash { get; set; }

public LivePersistenceIdsPublisher(TimeSpan refreshInterval, string writeJournalPluginId)
{
_tickCancelable = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
refreshInterval,
refreshInterval,
Self,
Continue.Instance,
Self);
_buffer = new DeliveryBuffer<string>(OnNext);
_journalRef = Persistence.Instance.Apply(Context.System).JournalFor(writeJournalPluginId);
}

protected override void PostStop()
{
_tickCancelable.Cancel();
base.PostStop();
}

protected override bool Receive(object message)
{
switch (message)
{
_buffer.DeliverBuffer(TotalDemand);
if (!_liveQuery && _buffer.IsEmpty)
OnCompleteThenStop();
})
.With<Cancel>(_ => Context.Stop(Self))
.WasHandled;
case Request _:
_journalRef.Tell(new SelectCurrentPersistenceIds(0, Self));
Become(Initializing);
return true;
case Continue _:
return true;
case Cancel _:
Context.Stop(Self);
return true;
default:
return false;
}
}

private bool Initializing(object message)
{
switch (message)
{
case CurrentPersistenceIds current:
_lastOrderingOffset = current.HighestOrderingNumber;
_buffer.AddRange(current.AllPersistenceIds);
_buffer.DeliverBuffer(TotalDemand);

Become(Active);
Stash.UnstashAll();
return true;
case Continue _:
return true;
case Cancel _:
Context.Stop(Self);
return true;
default:
Stash.Stash();
return true;
}
}

private bool Active(object message)
{
switch (message)
{
case CurrentPersistenceIds added:
_lastOrderingOffset = added.HighestOrderingNumber;
_buffer.AddRange(added.AllPersistenceIds);
_buffer.DeliverBuffer(TotalDemand);
return true;
case Request _:
_buffer.DeliverBuffer(TotalDemand);
return true;
case Continue _:
_journalRef.Tell(new SelectCurrentPersistenceIds(_lastOrderingOffset, Self));
return true;
case Cancel _:
Context.Stop(Self);
return true;
default:
return false;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
//-----------------------------------------------------------------------

using System;
using System.Threading;
using Reactive.Streams;
using Akka.Actor;
using Akka.Configuration;
using Akka.Persistence.Journal;
using Akka.Streams.Dsl;
using Akka.Util.Internal;
using Akka.Streams;

namespace Akka.Persistence.Query.Sql
{
public class SqlReadJournal :
IReadJournal,
IPersistenceIdsQuery,
ICurrentPersistenceIdsQuery,
IEventsByPersistenceIdQuery,
Expand All @@ -40,12 +40,20 @@ public static Config DefaultConfiguration()
private readonly TimeSpan _refreshInterval;
private readonly string _writeJournalPluginId;
private readonly int _maxBufferSize;
private readonly ExtendedActorSystem _system;

private readonly object _lock = new object();
private IPublisher<string> _persistenceIdsPublisher;

public SqlReadJournal(ExtendedActorSystem system, Config config)
{
_refreshInterval = config.GetTimeSpan("refresh-interval", null);
_writeJournalPluginId = config.GetString("write-plugin", null);
_maxBufferSize = config.GetInt("max-buffer-size", 0);
_system = system;

_lock = new ReaderWriterLockSlim();
_persistenceIdsPublisher = null;
}

/// <summary>
Expand All @@ -68,20 +76,45 @@ public SqlReadJournal(ExtendedActorSystem system, Config config)
/// backend journal.
/// </para>
/// </summary>
public Source<string, NotUsed> PersistenceIds() =>
Source.ActorPublisher<string>(AllPersistenceIdsPublisher.Props(true, _writeJournalPluginId))
.MapMaterializedValue(_ => NotUsed.Instance)
.Named("AllPersistenceIds") as Source<string, NotUsed>;
public Source<string, NotUsed> PersistenceIds()
{
lock (_lock)
{
if (_persistenceIdsPublisher is null)
{
var graph =
Source.ActorPublisher<string>(
LivePersistenceIdsPublisher.Props(
_refreshInterval,
_writeJournalPluginId))
.ToMaterialized(Sink.DistinctRetainingFanOutPublisher<string>(PersistenceIdsShutdownCallback), Keep.Right);

_persistenceIdsPublisher = graph.Run(_system.Materializer());
}
return Source.FromPublisher(_persistenceIdsPublisher)
.MapMaterializedValue(_ => NotUsed.Instance)
.Named("AllPersistenceIds");
}

}

private void PersistenceIdsShutdownCallback()
{
lock (_lock)
{
_persistenceIdsPublisher = null;
}
}

/// <summary>
/// Same type of query as <see cref="PersistenceIds"/> but the stream
/// is completed immediately when it reaches the end of the "result set". Persistent
/// actors that are created after the query is completed are not included in the stream.
/// </summary>
public Source<string, NotUsed> CurrentPersistenceIds() =>
Source.ActorPublisher<string>(AllPersistenceIdsPublisher.Props(false, _writeJournalPluginId))
.MapMaterializedValue(_ => NotUsed.Instance)
.Named("CurrentPersistenceIds") as Source<string, NotUsed>;
public Source<string, NotUsed> CurrentPersistenceIds()
=> Source.ActorPublisher<string>(CurrentPersistenceIdsPublisher.Props(_writeJournalPluginId))
.MapMaterializedValue(_ => NotUsed.Instance)
.Named("CurrentPersistenceIds");

/// <summary>
/// <see cref="EventsByPersistenceId"/> is used for retrieving events for a specific
Expand Down Expand Up @@ -112,7 +145,7 @@ public Source<string, NotUsed> CurrentPersistenceIds() =>
public Source<EventEnvelope, NotUsed> EventsByPersistenceId(string persistenceId, long fromSequenceNr, long toSequenceNr) =>
Source.ActorPublisher<EventEnvelope>(EventsByPersistenceIdPublisher.Props(persistenceId, fromSequenceNr, toSequenceNr, _refreshInterval, _maxBufferSize, _writeJournalPluginId))
.MapMaterializedValue(_ => NotUsed.Instance)
.Named("EventsByPersistenceId-" + persistenceId) as Source<EventEnvelope, NotUsed>;
.Named("EventsByPersistenceId-" + persistenceId);

/// <summary>
/// Same type of query as <see cref="EventsByPersistenceId"/> but the event stream
Expand All @@ -122,7 +155,7 @@ public Source<EventEnvelope, NotUsed> EventsByPersistenceId(string persistenceId
public Source<EventEnvelope, NotUsed> CurrentEventsByPersistenceId(string persistenceId, long fromSequenceNr, long toSequenceNr) =>
Source.ActorPublisher<EventEnvelope>(EventsByPersistenceIdPublisher.Props(persistenceId, fromSequenceNr, toSequenceNr, null, _maxBufferSize, _writeJournalPluginId))
.MapMaterializedValue(_ => NotUsed.Instance)
.Named("CurrentEventsByPersistenceId-" + persistenceId) as Source<EventEnvelope, NotUsed>;
.Named("CurrentEventsByPersistenceId-" + persistenceId);

/// <summary>
/// <see cref="EventsByTag"/> is used for retrieving events that were marked with
Expand Down
Loading

0 comments on commit d612c89

Please sign in to comment.