diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala index b3f57625..6f9164d1 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala @@ -31,7 +31,19 @@ import org.slf4j.Logger object QueryState { val empty: QueryState = - QueryState(TimestampOffset.Zero, 0, 0, 0, 0, backtrackingCount = 0, TimestampOffset.Zero, 0, 0, Buckets.empty) + QueryState( + latest = TimestampOffset.Zero, + rowCount = 0, + rowCountSinceBacktracking = 0, + queryCount = 0, + idleCount = 0, + backtrackingCount = 0, + latestBacktracking = TimestampOffset.Zero, + latestBacktrackingSeenCount = 0, + backtrackingExpectFiltered = 0, + buckets = Buckets.empty, + previous = TimestampOffset.Zero, + previousBacktracking = TimestampOffset.Zero) } final case class QueryState( @@ -44,7 +56,9 @@ import org.slf4j.Logger latestBacktracking: TimestampOffset, latestBacktrackingSeenCount: Int, backtrackingExpectFiltered: Int, - buckets: Buckets) { + buckets: Buckets, + previous: TimestampOffset, + previousBacktracking: TimestampOffset) { def backtracking: Boolean = backtrackingCount > 0 @@ -57,8 +71,8 @@ import org.slf4j.Logger else latest.timestamp def nextQueryFromSeqNr: Option[Long] = - if (backtracking) highestSeenSeqNr(latestBacktracking) - else highestSeenSeqNr(latest) + if (backtracking) highestSeenSeqNr(previousBacktracking, latestBacktracking) + else highestSeenSeqNr(previous, latest) def nextQueryToTimestamp(atLeastNumberOfEvents: Int): Option[Instant] = { buckets.findTimeForLimit(nextQueryFromTimestamp, atLeastNumberOfEvents) match { @@ -74,8 +88,11 @@ import org.slf4j.Logger } } - private def highestSeenSeqNr(offset: TimestampOffset): Option[Long] = - Option.when(offset.seen.nonEmpty)(offset.seen.values.max) + // only filter by highest seen seq nr when the next query is the same timestamp (or when unknown for initial queries) + private def highestSeenSeqNr(previous: TimestampOffset, latest: TimestampOffset): Option[Long] = + Option.when((previous == TimestampOffset.Zero || previous.timestamp == latest.timestamp) && latest.seen.nonEmpty) { + latest.seen.values.max + } object Buckets { type EpochSeconds = Long @@ -220,10 +237,10 @@ import org.slf4j.Logger // so continue until rowCount is 0. That means an extra query at the end to make sure there are no // more to fetch. if (state.queryCount == 0L || state.rowCount > 0) { - val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1) + val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1, previous = state.latest) val fromTimestamp = state.latest.timestamp - val fromSeqNr = highestSeenSeqNr(state.latest) + val fromSeqNr = highestSeenSeqNr(state.previous, state.latest) val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize) match { case Some(t) => @@ -325,7 +342,8 @@ import org.slf4j.Logger val newSeenCount = if (offset.timestamp == state.latestBacktracking.timestamp && - highestSeenSeqNr(offset) == highestSeenSeqNr(state.latestBacktracking)) + highestSeenSeqNr(state.previousBacktracking, offset) == + highestSeenSeqNr(state.previousBacktracking, state.latestBacktracking)) state.latestBacktrackingSeenCount + 1 else 1 @@ -462,7 +480,11 @@ import org.slf4j.Logger else s"Found [${state.rowCount}] rows in previous query.") } - newState -> + val newStateWithPrevious = + if (newState.backtracking) newState.copy(previousBacktracking = newState.latestBacktracking) + else newState.copy(previous = newState.latest) + + newStateWithPrevious -> Some( dao .rowsBySlices(