Skip to content

Commit

Permalink
Fix persistence allocation. (#6487)
Browse files Browse the repository at this point in the history
Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
F0b0s and Aaronontheweb authored Mar 6, 2023
1 parent bfe6636 commit 149cc05
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 78 deletions.
6 changes: 2 additions & 4 deletions src/core/Akka.Persistence/Eventsourced.Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using Akka.Actor;
using Akka.Persistence.Internal;

Expand Down Expand Up @@ -334,11 +332,11 @@ private void FlushBatch()
{
if (_eventBatch.Count > 0)
{
foreach (var p in _eventBatch.Reverse())
foreach (var p in _eventBatch)
{
_journalBatch.Add(p);
}
_eventBatch = new LinkedList<IPersistentEnvelope>();
_eventBatch.Clear();
}

FlushJournalBatch();
Expand Down
10 changes: 5 additions & 5 deletions src/core/Akka.Persistence/Eventsourced.cs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public void Persist<TEvent>(TEvent @event, Action<TEvent> handler)

_pendingStashingPersistInvocations++;
_pendingInvocations.AddLast(new StashingHandlerInvocation(@event, o => handler((TEvent)o)));
_eventBatch.AddFirst(new AtomicWrite(new Persistent(@event, persistenceId: PersistenceId,
_eventBatch.AddLast(new AtomicWrite(new Persistent(@event, persistenceId: PersistenceId,
sequenceNr: NextSequenceNr(), writerGuid: _writerGuid, sender: Sender)));
}

Expand Down Expand Up @@ -335,7 +335,7 @@ public void PersistAll<TEvent>(IEnumerable<TEvent> events, Action<TEvent> handle
}

if (persistents.Count > 0)
_eventBatch.AddFirst(new AtomicWrite(persistents.ToImmutable()));
_eventBatch.AddLast(new AtomicWrite(persistents.ToImmutable()));
}

/// <summary>
Expand Down Expand Up @@ -374,7 +374,7 @@ public void PersistAsync<TEvent>(TEvent @event, Action<TEvent> handler)
}

_pendingInvocations.AddLast(new AsyncHandlerInvocation(@event, o => handler((TEvent)o)));
_eventBatch.AddFirst(new AtomicWrite(new Persistent(@event, persistenceId: PersistenceId,
_eventBatch.AddLast(new AtomicWrite(new Persistent(@event, persistenceId: PersistenceId,
sequenceNr: NextSequenceNr(), writerGuid: _writerGuid, sender: Sender)));
}

Expand All @@ -400,7 +400,7 @@ public void PersistAllAsync<TEvent>(IEnumerable<TEvent> events, Action<TEvent> h
_pendingInvocations.AddLast(new AsyncHandlerInvocation(@event, Inv));
}

_eventBatch.AddFirst(new AtomicWrite(enumerable.Select(e => new Persistent(e, persistenceId: PersistenceId,
_eventBatch.AddLast(new AtomicWrite(enumerable.Select(e => new Persistent(e, persistenceId: PersistenceId,
sequenceNr: NextSequenceNr(), writerGuid: _writerGuid, sender: Sender))
.ToImmutableList<IPersistentRepresentation>()));
}
Expand Down Expand Up @@ -440,7 +440,7 @@ public void DeferAsync<TEvent>(TEvent evt, Action<TEvent> handler)
else
{
_pendingInvocations.AddLast(new AsyncHandlerInvocation(evt, o => handler((TEvent)o)));
_eventBatch.AddFirst(new NonPersistentMessage(evt, Sender));
_eventBatch.AddLast(new NonPersistentMessage(evt, Sender));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ private void HandleWriteMessages(WriteMessages message)
*/
var self = Self;
_resequencerCounter += message.Messages.Aggregate(1, (acc, m) => acc + m.Size);
var atomicWriteCount = message.Messages.OfType<AtomicWrite>().Count();
var atomicWriteCount = message.Messages.Count(x => x is AtomicWrite);

// Using an async local function instead of ContinueWith
#pragma warning disable CS4014
Expand Down
Loading

0 comments on commit 149cc05

Please sign in to comment.