Skip to content

Commit

Permalink
Fix PostgreSql does not like 2 deletes in a single command.
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus committed Mar 12, 2021
1 parent 3f00ae4 commit ce37f79
Showing 1 changed file with 58 additions and 0 deletions.
58 changes: 58 additions & 0 deletions src/Akka.Persistence.PostgreSql/Journal/PostgreSqlQueryExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Akka.Persistence.PostgreSql.Journal
{
Expand Down Expand Up @@ -61,6 +63,16 @@ SELECT MAX(e.{Configuration.SequenceNrColumnName}) as SeqNr FROM {Configuration.
UNION
SELECT MAX(m.{Configuration.SequenceNrColumnName}) as SeqNr FROM {Configuration.FullMetaTableName} m WHERE m.{Configuration.PersistenceIdColumnName} = @PersistenceId) as u";

// As per https://github.com/akkadotnet/Akka.Persistence.PostgreSql/pull/72, apparently PostgreSQL does not like
// it when you chain two deletes in a single command, so we have to split it into two.
// The performance penalty should be minimal, depending on the network speed
DeleteBatchSql = $@"
DELETE FROM {Configuration.FullJournalTableName}
WHERE {Configuration.PersistenceIdColumnName} = @PersistenceId AND {Configuration.SequenceNrColumnName} <= @ToSequenceNr;";

DeleteBatchSqlMetadata = $@"DELETE FROM {Configuration.FullMetaTableName}
WHERE {Configuration.PersistenceIdColumnName} = @PersistenceId AND {Configuration.SequenceNrColumnName} <= @ToSequenceNr;";

switch (configuration.StoredAs)
{
case StoredAsType.ByteA:
Expand Down Expand Up @@ -108,6 +120,8 @@ SELECT MAX(e.{Configuration.SequenceNrColumnName}) as SeqNr FROM {Configuration.
protected override string CreateEventsJournalSql { get; }
protected override string CreateMetaTableSql { get; }
protected override string HighestSequenceNrSql { get; }
protected override string DeleteBatchSql { get; }
protected virtual string DeleteBatchSqlMetadata { get; }

protected override void WriteEvent(DbCommand command, IPersistentRepresentation e, IImmutableSet<string> tags)
{
Expand Down Expand Up @@ -190,6 +204,50 @@ protected override IPersistentRepresentation ReadEvent(DbDataReader reader)

return new Persistent(deserialized, sequenceNr, persistenceId, manifest, isDeleted, ActorRefs.NoSender, null, timestamp);
}

public override async Task DeleteBatchAsync(DbConnection connection, CancellationToken cancellationToken, string persistenceId, long toSequenceNr)
{
using (var deleteCommand = GetCommand(connection, DeleteBatchSql))
using (var deleteMetadataCommand = GetCommand(connection, DeleteBatchSqlMetadata))
using (var highestSeqNrCommand = GetCommand(connection, HighestSequenceNrSql))
{
AddParameter(highestSeqNrCommand, "@PersistenceId", DbType.String, persistenceId);

AddParameter(deleteCommand, "@PersistenceId", DbType.String, persistenceId);
AddParameter(deleteCommand, "@ToSequenceNr", DbType.Int64, toSequenceNr);

AddParameter(deleteMetadataCommand, "@PersistenceId", DbType.String, persistenceId);
AddParameter(deleteMetadataCommand, "@ToSequenceNr", DbType.Int64, toSequenceNr);

using (var tx = connection.BeginTransaction())
{
deleteCommand.Transaction = tx;
deleteMetadataCommand.Transaction = tx;
highestSeqNrCommand.Transaction = tx;

var res = await highestSeqNrCommand.ExecuteScalarAsync(cancellationToken);
var highestSeqNr = res is long ? Convert.ToInt64(res) : 0L;

await deleteCommand.ExecuteNonQueryAsync(cancellationToken);
await deleteMetadataCommand.ExecuteNonQueryAsync(cancellationToken);

if (highestSeqNr <= toSequenceNr)
{
using (var updateCommand = GetCommand(connection, UpdateSequenceNrSql))
{
updateCommand.Transaction = tx;

AddParameter(updateCommand, "@PersistenceId", DbType.String, persistenceId);
AddParameter(updateCommand, "@SequenceNr", DbType.Int64, highestSeqNr);

await updateCommand.ExecuteNonQueryAsync(cancellationToken);
tx.Commit();
}
}
else tx.Commit();
}
}
}
}

public class PostgreSqlQueryConfiguration : QueryConfiguration
Expand Down

0 comments on commit ce37f79

Please sign in to comment.