diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs index 36aeb315709..97d1dfcdf7e 100644 --- a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/BatchingSqlJournal.cs @@ -13,6 +13,7 @@ using System.Diagnostics; using System.Linq; using System.Runtime.CompilerServices; +using System.Runtime.ExceptionServices; using System.Runtime.Serialization; using System.Text; using System.Threading.Tasks; @@ -800,8 +801,15 @@ private void FailChunkExecution(ChunkExecutionFailure message) replayAll.ReplyTo.Tell(new EventReplayFailure(cause)); break; + case SelectCurrentPersistenceIds select: + // SqlJournal handled this failure case by using the default PipeTo failure + // handler which sends a Status.Failure message back to the sender. + select.ReplyTo.Tell(new Status.Failure(cause)); + break; + default: - throw new Exception($"Unknown persistence journal request type [{request.GetType()}]"); + Log.Error(cause, $"Batching failure not reported to original sender. Unknown batched persistence journal request type [{request.GetType()}]."); + break; } } @@ -1096,10 +1104,12 @@ protected virtual async Task HandleSelectCurrentPersistenceIds(SelectCurrentPers command.Parameters.Clear(); AddParameter(command, "@Ordering", DbType.Int64, message.Offset); - var reader = await command.ExecuteReaderAsync(); - while (await reader.ReadAsync()) + using (var reader = await command.ExecuteReaderAsync()) { - result.Add(reader.GetString(0)); + while (await reader.ReadAsync()) + { + result.Add(reader.GetString(0)); + } } message.ReplyTo.Tell(new CurrentPersistenceIds(result, highestOrderingNumber));