Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: stuck queries when too many events with same timestamp #586

Merged
merged 7 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# internals: fromSeqNr added to rowsBySlices
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#Dao.rowsBySlices")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#Dao.rowsBySlices")
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ import org.slf4j.Logger
if (backtracking) latestBacktracking.timestamp
else latest.timestamp

def nextQueryFromSeqNr: Option[Long] =
if (backtracking) highestSeenSeqNr(latestBacktracking)
else highestSeenSeqNr(latest)

def nextQueryToTimestamp(atLeastNumberOfEvents: Int): Option[Instant] = {
buckets.findTimeForLimit(nextQueryFromTimestamp, atLeastNumberOfEvents) match {
case Some(t) =>
Expand All @@ -70,6 +74,10 @@ import org.slf4j.Logger
}
}

private def highestSeenSeqNr(offset: TimestampOffset): Option[Long] = {
if (offset.seen.isEmpty) None else Some(offset.seen.values.max)
}
pvlugter marked this conversation as resolved.
Show resolved Hide resolved

object Buckets {
type EpochSeconds = Long
type Count = Long
Expand Down Expand Up @@ -157,6 +165,7 @@ import org.slf4j.Logger
minSlice: Int,
maxSlice: Int,
fromTimestamp: Instant,
fromSeqNr: Option[Long], // for events with same timestamp as `fromTimestamp`
toTimestamp: Option[Instant],
behindCurrentTime: FiniteDuration,
backtracking: Boolean): Source[SerializedRow, NotUsed]
Expand Down Expand Up @@ -214,6 +223,9 @@ import org.slf4j.Logger
if (state.queryCount == 0L || state.rowCount > 0) {
val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1)

val fromTimestamp = state.latest.timestamp
val fromSeqNr = highestSeenSeqNr(state.latest)

val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize) match {
case Some(t) =>
if (t.isBefore(endTimestamp)) t else endTimestamp
Expand All @@ -228,7 +240,7 @@ import org.slf4j.Logger
state.queryCount,
minSlice,
maxSlice,
state.latest.timestamp,
fromTimestamp,
toTimestamp,
state.rowCount)

Expand All @@ -238,7 +250,8 @@ import org.slf4j.Logger
entityType,
minSlice,
maxSlice,
state.latest.timestamp,
fromTimestamp,
fromSeqNr,
toTimestamp = Some(toTimestamp),
behindCurrentTime = Duration.Zero,
backtracking = false)
Expand Down Expand Up @@ -312,7 +325,10 @@ import org.slf4j.Logger
s"Unexpected offset [$offset] before latestBacktracking [${state.latestBacktracking}].")

val newSeenCount =
if (offset.timestamp == state.latestBacktracking.timestamp) state.latestBacktrackingSeenCount + 1 else 1
if (offset.timestamp == state.latestBacktracking.timestamp &&
highestSeenSeqNr(offset) == highestSeenSeqNr(state.latestBacktracking))
state.latestBacktrackingSeenCount + 1
else 1

state.copy(
latestBacktracking = offset,
Expand Down Expand Up @@ -420,6 +436,7 @@ import org.slf4j.Logger
else settings.querySettings.behindCurrentTime

val fromTimestamp = newState.nextQueryFromTimestamp
val fromSeqNr = newState.nextQueryFromSeqNr
val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize)

if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -454,6 +471,7 @@ import org.slf4j.Logger
minSlice,
maxSlice,
fromTimestamp,
fromSeqNr,
toTimestamp,
behindCurrentTime,
backtracking = newState.backtracking)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to adjust BySliceQuery.deserializeAndAddOffset? It has a check on the buffer size and throws IllegalStateException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That check is whether the seen map exceeds the buffer size (more persistence ids on the same timestamp than the buffer size). We could adjust it to only throw if all the sequence numbers are the same, which would not be handled by this fix (would be stuck on both timestamp and seq number). But that many persistence ids with events on the exact same timestamp already feels exceptional, so thought that it's useful to leave as is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree 👍

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ private[r2dbc] class H2QueryDao(executorProvider: R2dbcExecutorProvider) extends
override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2QueryDao])

override protected def eventsBySlicesRangeSql(
fromSeqNrParam: Boolean,
toDbTimestampParam: Boolean,
behindCurrentTime: FiniteDuration,
backtracking: Boolean,
minSlice: Int,
maxSlice: Int): String = {
// not caching, too many combinations

def fromSeqNrParamCondition =
if (fromSeqNrParam) "AND (db_timestamp != ? OR seq_nr >= ?)" else ""
pvlugter marked this conversation as resolved.
Show resolved Hide resolved

def toDbTimestampParamCondition =
if (toDbTimestampParam) "AND db_timestamp <= ?" else ""

Expand All @@ -51,7 +55,7 @@ private[r2dbc] class H2QueryDao(executorProvider: R2dbcExecutorProvider) extends
FROM ${journalTable(minSlice)}
WHERE entity_type = ?
AND ${sliceCondition(minSlice, maxSlice)}
AND db_timestamp >= ? $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition
AND db_timestamp >= ? $fromSeqNrParamCondition $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition
AND deleted = false
ORDER BY db_timestamp, seq_nr
LIMIT ?"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv
minSlice: Int,
maxSlice: Int,
fromTimestamp: Instant,
fromSeqNr: Option[Long],
toTimestamp: Option[Instant],
behindCurrentTime: FiniteDuration,
backtracking: Boolean): Source[SerializedStateRow, NotUsed] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,17 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
"SELECT CURRENT_TIMESTAMP AS db_timestamp"

protected def eventsBySlicesRangeSql(
fromSeqNrParam: Boolean,
toDbTimestampParam: Boolean,
behindCurrentTime: FiniteDuration,
backtracking: Boolean,
minSlice: Int,
maxSlice: Int): String = {
// not caching, too many combinations

def fromSeqNrParamCondition =
if (fromSeqNrParam) "AND (db_timestamp != ? OR seq_nr >= ?)" else ""

def toDbTimestampParamCondition =
if (toDbTimestampParam) "AND db_timestamp <= ?" else ""

Expand All @@ -96,7 +100,7 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
FROM ${journalTable(minSlice)}
WHERE entity_type = ?
AND ${sliceCondition(minSlice, maxSlice)}
AND db_timestamp >= ? $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition
AND db_timestamp >= ? $fromSeqNrParamCondition $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition
AND deleted = false
ORDER BY db_timestamp, seq_nr
LIMIT ?"""
Expand Down Expand Up @@ -209,16 +213,25 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
stmt: Statement,
entityType: String,
fromTimestamp: Instant,
fromSeqNr: Option[Long],
toTimestamp: Option[Instant]): Statement = {
stmt
.bind(0, entityType)
.bindTimestamp(1, fromTimestamp)
val index1 = 2
val index2 = fromSeqNr match {
case Some(seqNr) =>
stmt.bindTimestamp(index1, fromTimestamp)
stmt.bind(index1 + 1, seqNr)
index1 + 2
case None => index1
}
toTimestamp match {
case Some(until) =>
stmt.bindTimestamp(2, until)
stmt.bind(3, settings.querySettings.bufferSize)
stmt.bindTimestamp(index2, until)
stmt.bind(index2 + 1, settings.querySettings.bufferSize)
case None =>
stmt.bind(2, settings.querySettings.bufferSize)
stmt.bind(index2, settings.querySettings.bufferSize)
}
}

Expand All @@ -227,6 +240,7 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
minSlice: Int,
maxSlice: Int,
fromTimestamp: Instant,
fromSeqNr: Option[Long],
toTimestamp: Option[Instant],
behindCurrentTime: FiniteDuration,
backtracking: Boolean): Source[SerializedJournalRow, NotUsed] = {
Expand All @@ -241,12 +255,13 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e
val stmt = connection
.createStatement(
eventsBySlicesRangeSql(
fromSeqNrParam = fromSeqNr.isDefined,
toDbTimestampParam = toTimestamp.isDefined,
behindCurrentTime,
backtracking,
minSlice,
maxSlice))
bindEventsBySlicesRangeSql(stmt, entityType, fromTimestamp, toTimestamp)
bindEventsBySlicesRangeSql(stmt, entityType, fromTimestamp, fromSeqNr, toTimestamp)
},
row =>
if (backtracking)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider
minSlice: Int,
maxSlice: Int,
fromTimestamp: Instant,
fromSeqNr: Option[Long],
toTimestamp: Option[Instant],
behindCurrentTime: FiniteDuration,
backtracking: Boolean): Source[SerializedSnapshotRow, NotUsed] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,17 @@ private[r2dbc] class SqlServerQueryDao(executorProvider: R2dbcExecutorProvider)
}

override protected def eventsBySlicesRangeSql(
fromSeqNrParam: Boolean,
toDbTimestampParam: Boolean,
behindCurrentTime: FiniteDuration,
backtracking: Boolean,
minSlice: Int,
maxSlice: Int): String = {
// not caching, too many combinations

def fromSeqNrParamCondition =
if (fromSeqNrParam) "AND (db_timestamp != @from OR seq_nr >= @fromSeqNr)" else ""

def toDbTimestampParamCondition =
if (toDbTimestampParam) "AND db_timestamp <= @until" else ""

Expand All @@ -139,7 +143,7 @@ private[r2dbc] class SqlServerQueryDao(executorProvider: R2dbcExecutorProvider)
FROM ${journalTable(minSlice)}
WHERE entity_type = @entityType
AND ${sliceCondition(minSlice, maxSlice)}
AND db_timestamp >= @from $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition
AND db_timestamp >= @from $fromSeqNrParamCondition $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition
AND deleted = $sqlFalse
ORDER BY db_timestamp, seq_nr"""
}
Expand All @@ -148,11 +152,13 @@ private[r2dbc] class SqlServerQueryDao(executorProvider: R2dbcExecutorProvider)
stmt: Statement,
entityType: String,
fromTimestamp: Instant,
fromSeqNr: Option[Long],
toTimestamp: Option[Instant]): Statement = {
stmt
.bind("@limit", settings.querySettings.bufferSize)
.bind("@entityType", entityType)
.bindTimestamp("@from", fromTimestamp)
fromSeqNr.foreach(seqNr => stmt.bind("@fromSeqNr", seqNr))
toTimestamp.foreach(timestamp => stmt.bindTimestamp("@until", timestamp))
stmt
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,22 @@ class EventsBySliceSpec
assertFinished(result)
}

"handle more events with same timestamp than buffer size" in new Setup {
val queryWithSmallBuffer = PersistenceQuery(testKit.system) // buffer size = 4
.readJournalFor[R2dbcReadJournal]("akka.persistence.r2dbc-small-buffer.query")
persister ! PersistAll((1 to 10).map(i => s"e-$i").toList)
persister ! Ping(probe.ref)
probe.expectMessage(Done)
val result: TestSubscriber.Probe[EventEnvelope[String]] =
doQuery(entityType, slice, slice, NoOffset, queryWithSmallBuffer)
.runWith(sinkProbe)
.request(11)
for (i <- 1 to 10) {
result.expectNext().event shouldBe s"e-$i"
}
assertFinished(result)
}

"include metadata" in {
val probe = testKit.createTestProbe[Done]()
val entityType = nextEntityType()
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-sqlserver.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '2.2'
services:
sqlserver:
image: mcr.microsoft.com/mssql/server:2022-latest
image: mcr.microsoft.com/mssql/server:2022-CU13-ubuntu-22.04
container_name: sqlserver-db
environment:
- MSSQL_SA_PASSWORD=<YourStrong@Passw0rd>
Expand Down