Skip to content

Commit

Permalink
track previous timestamp to only filter by seq nr when same timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter committed Aug 5, 2024
1 parent f2ace61 commit 09e05fc
Showing 1 changed file with 32 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,19 @@ import org.slf4j.Logger

object QueryState {
val empty: QueryState =
QueryState(TimestampOffset.Zero, 0, 0, 0, 0, backtrackingCount = 0, TimestampOffset.Zero, 0, 0, Buckets.empty)
QueryState(
latest = TimestampOffset.Zero,
rowCount = 0,
rowCountSinceBacktracking = 0,
queryCount = 0,
idleCount = 0,
backtrackingCount = 0,
latestBacktracking = TimestampOffset.Zero,
latestBacktrackingSeenCount = 0,
backtrackingExpectFiltered = 0,
buckets = Buckets.empty,
previous = TimestampOffset.Zero,
previousBacktracking = TimestampOffset.Zero)
}

final case class QueryState(
Expand All @@ -44,7 +56,9 @@ import org.slf4j.Logger
latestBacktracking: TimestampOffset,
latestBacktrackingSeenCount: Int,
backtrackingExpectFiltered: Int,
buckets: Buckets) {
buckets: Buckets,
previous: TimestampOffset,
previousBacktracking: TimestampOffset) {

def backtracking: Boolean = backtrackingCount > 0

Expand All @@ -57,8 +71,8 @@ import org.slf4j.Logger
else latest.timestamp

def nextQueryFromSeqNr: Option[Long] =
if (backtracking) highestSeenSeqNr(latestBacktracking)
else highestSeenSeqNr(latest)
if (backtracking) highestSeenSeqNr(previousBacktracking, latestBacktracking)
else highestSeenSeqNr(previous, latest)

def nextQueryToTimestamp(atLeastNumberOfEvents: Int): Option[Instant] = {
buckets.findTimeForLimit(nextQueryFromTimestamp, atLeastNumberOfEvents) match {
Expand All @@ -74,8 +88,11 @@ import org.slf4j.Logger
}
}

private def highestSeenSeqNr(offset: TimestampOffset): Option[Long] =
Option.when(offset.seen.nonEmpty)(offset.seen.values.max)
// only filter by highest seen seq nr when the next query is the same timestamp (or when unknown for initial queries)
private def highestSeenSeqNr(previous: TimestampOffset, latest: TimestampOffset): Option[Long] =
Option.when((previous == TimestampOffset.Zero || previous.timestamp == latest.timestamp) && latest.seen.nonEmpty) {
latest.seen.values.max
}

object Buckets {
type EpochSeconds = Long
Expand Down Expand Up @@ -220,10 +237,10 @@ import org.slf4j.Logger
// so continue until rowCount is 0. That means an extra query at the end to make sure there are no
// more to fetch.
if (state.queryCount == 0L || state.rowCount > 0) {
val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1)
val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1, previous = state.latest)

val fromTimestamp = state.latest.timestamp
val fromSeqNr = highestSeenSeqNr(state.latest)
val fromSeqNr = highestSeenSeqNr(state.previous, state.latest)

val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize) match {
case Some(t) =>
Expand Down Expand Up @@ -325,7 +342,8 @@ import org.slf4j.Logger

val newSeenCount =
if (offset.timestamp == state.latestBacktracking.timestamp &&
highestSeenSeqNr(offset) == highestSeenSeqNr(state.latestBacktracking))
highestSeenSeqNr(state.previousBacktracking, offset) ==
highestSeenSeqNr(state.previousBacktracking, state.latestBacktracking))
state.latestBacktrackingSeenCount + 1
else 1

Expand Down Expand Up @@ -462,7 +480,11 @@ import org.slf4j.Logger
else s"Found [${state.rowCount}] rows in previous query.")
}

newState ->
val newStateWithPrevious =
if (newState.backtracking) newState.copy(previousBacktracking = newState.latestBacktracking)
else newState.copy(previous = newState.latest)

newStateWithPrevious ->
Some(
dao
.rowsBySlices(
Expand Down

0 comments on commit 09e05fc

Please sign in to comment.