From a5573284e06aaa76da4b403d2957fde8fba78ed9 Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Thu, 1 Aug 2024 17:23:22 +1200 Subject: [PATCH 1/7] fix: stuck queries when too many events with same timestamp --- .../r2dbc/internal/BySliceQuery.scala | 24 +++++++++++++++--- .../r2dbc/internal/h2/H2QueryDao.scala | 6 ++++- .../postgres/PostgresDurableStateDao.scala | 1 + .../internal/postgres/PostgresQueryDao.scala | 25 +++++++++++++++---- .../postgres/PostgresSnapshotDao.scala | 1 + .../sqlserver/SqlServerQueryDao.scala | 8 +++++- .../r2dbc/query/EventsBySliceSpec.scala | 16 ++++++++++++ 7 files changed, 71 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala index e5370e1c..2a6c3f0b 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala @@ -56,6 +56,10 @@ import org.slf4j.Logger if (backtracking) latestBacktracking.timestamp else latest.timestamp + def nextQueryFromSeqNr: Option[Long] = + if (backtracking) highestSeenSeqNr(latestBacktracking) + else highestSeenSeqNr(latest) + def nextQueryToTimestamp(atLeastNumberOfEvents: Int): Option[Instant] = { buckets.findTimeForLimit(nextQueryFromTimestamp, atLeastNumberOfEvents) match { case Some(t) => @@ -70,6 +74,10 @@ import org.slf4j.Logger } } + private def highestSeenSeqNr(offset: TimestampOffset): Option[Long] = { + if (offset.seen.isEmpty) None else Some(offset.seen.values.max) + } + object Buckets { type EpochSeconds = Long type Count = Long @@ -157,6 +165,7 @@ import org.slf4j.Logger minSlice: Int, maxSlice: Int, fromTimestamp: Instant, + fromSeqNr: Option[Long], // for events with same timestamp as `fromTimestamp` toTimestamp: Option[Instant], behindCurrentTime: FiniteDuration, backtracking: Boolean): Source[SerializedRow, NotUsed] @@ -214,6 +223,9 @@ import org.slf4j.Logger if (state.queryCount == 0L || state.rowCount > 0) { val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1) + val fromTimestamp = state.latest.timestamp + val fromSeqNr = highestSeenSeqNr(state.latest) + val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize) match { case Some(t) => if (t.isBefore(endTimestamp)) t else endTimestamp @@ -228,7 +240,7 @@ import org.slf4j.Logger state.queryCount, minSlice, maxSlice, - state.latest.timestamp, + fromTimestamp, toTimestamp, state.rowCount) @@ -238,7 +250,8 @@ import org.slf4j.Logger entityType, minSlice, maxSlice, - state.latest.timestamp, + fromTimestamp, + fromSeqNr, toTimestamp = Some(toTimestamp), behindCurrentTime = Duration.Zero, backtracking = false) @@ -312,7 +325,10 @@ import org.slf4j.Logger s"Unexpected offset [$offset] before latestBacktracking [${state.latestBacktracking}].") val newSeenCount = - if (offset.timestamp == state.latestBacktracking.timestamp) state.latestBacktrackingSeenCount + 1 else 1 + if (offset.timestamp == state.latestBacktracking.timestamp && + highestSeenSeqNr(offset) == highestSeenSeqNr(state.latestBacktracking)) + state.latestBacktrackingSeenCount + 1 + else 1 state.copy( latestBacktracking = offset, @@ -420,6 +436,7 @@ import org.slf4j.Logger else settings.querySettings.behindCurrentTime val fromTimestamp = newState.nextQueryFromTimestamp + val fromSeqNr = newState.nextQueryFromSeqNr val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize) if (log.isDebugEnabled()) { @@ -454,6 +471,7 @@ import org.slf4j.Logger minSlice, maxSlice, fromTimestamp, + fromSeqNr, toTimestamp, behindCurrentTime, backtracking = newState.backtracking) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala index ecfad567..920b3896 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2QueryDao.scala @@ -24,6 +24,7 @@ private[r2dbc] class H2QueryDao(executorProvider: R2dbcExecutorProvider) extends override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[H2QueryDao]) override protected def eventsBySlicesRangeSql( + fromSeqNrParam: Boolean, toDbTimestampParam: Boolean, behindCurrentTime: FiniteDuration, backtracking: Boolean, @@ -31,6 +32,9 @@ private[r2dbc] class H2QueryDao(executorProvider: R2dbcExecutorProvider) extends maxSlice: Int): String = { // not caching, too many combinations + def fromSeqNrParamCondition = + if (fromSeqNrParam) "AND (db_timestamp != ? OR seq_nr >= ?)" else "" + def toDbTimestampParamCondition = if (toDbTimestampParam) "AND db_timestamp <= ?" else "" @@ -51,7 +55,7 @@ private[r2dbc] class H2QueryDao(executorProvider: R2dbcExecutorProvider) extends FROM ${journalTable(minSlice)} WHERE entity_type = ? AND ${sliceCondition(minSlice, maxSlice)} - AND db_timestamp >= ? $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition + AND db_timestamp >= ? $fromSeqNrParamCondition $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition AND deleted = false ORDER BY db_timestamp, seq_nr LIMIT ?""" diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index 73bcf697..74faf857 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -653,6 +653,7 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv minSlice: Int, maxSlice: Int, fromTimestamp: Instant, + fromSeqNr: Option[Long], toTimestamp: Option[Instant], behindCurrentTime: FiniteDuration, backtracking: Boolean): Source[SerializedStateRow, NotUsed] = { diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala index dfca379a..fab0eb10 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala @@ -69,6 +69,7 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e "SELECT CURRENT_TIMESTAMP AS db_timestamp" protected def eventsBySlicesRangeSql( + fromSeqNrParam: Boolean, toDbTimestampParam: Boolean, behindCurrentTime: FiniteDuration, backtracking: Boolean, @@ -76,6 +77,9 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e maxSlice: Int): String = { // not caching, too many combinations + def fromSeqNrParamCondition = + if (fromSeqNrParam) "AND (db_timestamp != ? OR seq_nr >= ?)" else "" + def toDbTimestampParamCondition = if (toDbTimestampParam) "AND db_timestamp <= ?" else "" @@ -96,7 +100,7 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e FROM ${journalTable(minSlice)} WHERE entity_type = ? AND ${sliceCondition(minSlice, maxSlice)} - AND db_timestamp >= ? $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition + AND db_timestamp >= ? $fromSeqNrParamCondition $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition AND deleted = false ORDER BY db_timestamp, seq_nr LIMIT ?""" @@ -209,16 +213,25 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e stmt: Statement, entityType: String, fromTimestamp: Instant, + fromSeqNr: Option[Long], toTimestamp: Option[Instant]): Statement = { stmt .bind(0, entityType) .bindTimestamp(1, fromTimestamp) + val index1 = 2 + val index2 = fromSeqNr match { + case Some(seqNr) => + stmt.bindTimestamp(index1, fromTimestamp) + stmt.bind(index1 + 1, seqNr) + index1 + 2 + case None => index1 + } toTimestamp match { case Some(until) => - stmt.bindTimestamp(2, until) - stmt.bind(3, settings.querySettings.bufferSize) + stmt.bindTimestamp(index2, until) + stmt.bind(index2 + 1, settings.querySettings.bufferSize) case None => - stmt.bind(2, settings.querySettings.bufferSize) + stmt.bind(index2, settings.querySettings.bufferSize) } } @@ -227,6 +240,7 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e minSlice: Int, maxSlice: Int, fromTimestamp: Instant, + fromSeqNr: Option[Long], toTimestamp: Option[Instant], behindCurrentTime: FiniteDuration, backtracking: Boolean): Source[SerializedJournalRow, NotUsed] = { @@ -241,12 +255,13 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e val stmt = connection .createStatement( eventsBySlicesRangeSql( + fromSeqNrParam = fromSeqNr.isDefined, toDbTimestampParam = toTimestamp.isDefined, behindCurrentTime, backtracking, minSlice, maxSlice)) - bindEventsBySlicesRangeSql(stmt, entityType, fromTimestamp, toTimestamp) + bindEventsBySlicesRangeSql(stmt, entityType, fromTimestamp, fromSeqNr, toTimestamp) }, row => if (backtracking) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala index 8324b943..f43386d2 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala @@ -388,6 +388,7 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider minSlice: Int, maxSlice: Int, fromTimestamp: Instant, + fromSeqNr: Option[Long], toTimestamp: Option[Instant], behindCurrentTime: FiniteDuration, backtracking: Boolean): Source[SerializedSnapshotRow, NotUsed] = { diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala index afc60431..ca104c8b 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerQueryDao.scala @@ -112,6 +112,7 @@ private[r2dbc] class SqlServerQueryDao(executorProvider: R2dbcExecutorProvider) } override protected def eventsBySlicesRangeSql( + fromSeqNrParam: Boolean, toDbTimestampParam: Boolean, behindCurrentTime: FiniteDuration, backtracking: Boolean, @@ -119,6 +120,9 @@ private[r2dbc] class SqlServerQueryDao(executorProvider: R2dbcExecutorProvider) maxSlice: Int): String = { // not caching, too many combinations + def fromSeqNrParamCondition = + if (fromSeqNrParam) "AND (db_timestamp != @from OR seq_nr >= @fromSeqNr)" else "" + def toDbTimestampParamCondition = if (toDbTimestampParam) "AND db_timestamp <= @until" else "" @@ -139,7 +143,7 @@ private[r2dbc] class SqlServerQueryDao(executorProvider: R2dbcExecutorProvider) FROM ${journalTable(minSlice)} WHERE entity_type = @entityType AND ${sliceCondition(minSlice, maxSlice)} - AND db_timestamp >= @from $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition + AND db_timestamp >= @from $fromSeqNrParamCondition $toDbTimestampParamCondition $behindCurrentTimeIntervalCondition AND deleted = $sqlFalse ORDER BY db_timestamp, seq_nr""" } @@ -148,11 +152,13 @@ private[r2dbc] class SqlServerQueryDao(executorProvider: R2dbcExecutorProvider) stmt: Statement, entityType: String, fromTimestamp: Instant, + fromSeqNr: Option[Long], toTimestamp: Option[Instant]): Statement = { stmt .bind("@limit", settings.querySettings.bufferSize) .bind("@entityType", entityType) .bindTimestamp("@from", fromTimestamp) + fromSeqNr.foreach(seqNr => stmt.bind("@fromSeqNr", seqNr)) toTimestamp.foreach(timestamp => stmt.bindTimestamp("@until", timestamp)) stmt } diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala index 1dc6a919..d1b43c01 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala @@ -168,6 +168,22 @@ class EventsBySliceSpec assertFinished(result) } + "handle more events with same timestamp than buffer size" in new Setup { + val queryWithSmallBuffer = PersistenceQuery(testKit.system) // buffer size = 4 + .readJournalFor[R2dbcReadJournal]("akka.persistence.r2dbc-small-buffer.query") + persister ! PersistAll((1 to 10).map(i => s"e-$i").toList) + persister ! Ping(probe.ref) + probe.expectMessage(Done) + val result: TestSubscriber.Probe[EventEnvelope[String]] = + doQuery(entityType, slice, slice, NoOffset, queryWithSmallBuffer) + .runWith(sinkProbe) + .request(11) + for (i <- 1 to 10) { + result.expectNext().event shouldBe s"e-$i" + } + assertFinished(result) + } + "include metadata" in { val probe = testKit.createTestProbe[Done]() val entityType = nextEntityType() From a36b640f464215b9f9b303a32d300c5d23e5fd83 Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Thu, 1 Aug 2024 17:23:24 +1200 Subject: [PATCH 2/7] ci: add mima filters --- .../1.2.4.backwards.excludes/rows-by-slices.excludes | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 core/src/main/mima-filters/1.2.4.backwards.excludes/rows-by-slices.excludes diff --git a/core/src/main/mima-filters/1.2.4.backwards.excludes/rows-by-slices.excludes b/core/src/main/mima-filters/1.2.4.backwards.excludes/rows-by-slices.excludes new file mode 100644 index 00000000..8c1d067e --- /dev/null +++ b/core/src/main/mima-filters/1.2.4.backwards.excludes/rows-by-slices.excludes @@ -0,0 +1,3 @@ +# internals: fromSeqNr added to rowsBySlices +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#Dao.rowsBySlices") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.r2dbc.internal.BySliceQuery#Dao.rowsBySlices") From 6505f547e0fb673a1c0b8d13c1dcf8a567ab5c2e Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Thu, 1 Aug 2024 17:24:00 +1200 Subject: [PATCH 3/7] ci: pin sqlserver to the previous version --- docker/docker-compose-sqlserver.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/docker-compose-sqlserver.yml b/docker/docker-compose-sqlserver.yml index 76f95fb9..5f134a9e 100644 --- a/docker/docker-compose-sqlserver.yml +++ b/docker/docker-compose-sqlserver.yml @@ -1,7 +1,7 @@ version: '2.2' services: sqlserver: - image: mcr.microsoft.com/mssql/server:2022-latest + image: mcr.microsoft.com/mssql/server:2022-CU13-ubuntu-22.04 container_name: sqlserver-db environment: - MSSQL_SA_PASSWORD= From 3203ac8877206c708486f3ddbd2cdbd70039d1e0 Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Thu, 1 Aug 2024 18:39:24 +1200 Subject: [PATCH 4/7] test: same timestamp, overlapping seq numbers --- .../r2dbc/query/EventsBySliceSpec.scala | 90 ++++++++++++++++++- 1 file changed, 89 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala index d1b43c01..5f0d6b46 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala @@ -4,11 +4,15 @@ package akka.persistence.r2dbc.query +import java.time.Instant + import scala.concurrent.Await + import akka.Done import akka.NotUsed import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.{ ActorRef, ActorSystem } import akka.persistence.FilteredPayload import akka.persistence.query.NoOffset @@ -20,16 +24,23 @@ import akka.persistence.query.typed.scaladsl.EventTimestampQuery import akka.persistence.query.typed.scaladsl.LoadEventQuery import akka.persistence.r2dbc.TestActors import akka.persistence.r2dbc.TestActors.Persister -import akka.persistence.r2dbc.TestActors.Persister.Persist import akka.persistence.r2dbc.TestActors.Persister.PersistAll import akka.persistence.r2dbc.TestActors.Persister.PersistWithAck import akka.persistence.r2dbc.TestActors.Persister.Ping import akka.persistence.r2dbc.TestConfig import akka.persistence.r2dbc.TestData import akka.persistence.r2dbc.TestDbLifecycle +import akka.persistence.r2dbc.internal.InstantFactory +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter +import akka.persistence.r2dbc.internal.codec.PayloadCodec +import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement +import akka.persistence.r2dbc.internal.codec.QueryAdapter +import akka.persistence.r2dbc.internal.codec.TimestampCodec +import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal import akka.persistence.typed.PersistenceId import akka.persistence.typed.internal.ReplicatedEventMetadata +import akka.serialization.SerializationExtension import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import akka.stream.testkit.TestSubscriber @@ -37,6 +48,7 @@ import akka.stream.testkit.scaladsl.TestSink import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike +import org.slf4j.LoggerFactory object EventsBySliceSpec { sealed trait QueryType @@ -76,6 +88,40 @@ class EventsBySliceSpec private val query = PersistenceQuery(testKit.system).readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier) + implicit val payloadCodec: PayloadCodec = settings.codecSettings.JournalImplicits.journalPayloadCodec + implicit val timestampCodec: TimestampCodec = settings.codecSettings.JournalImplicits.timestampCodec + implicit val queryAdapter: QueryAdapter = settings.codecSettings.JournalImplicits.queryAdapter + + private val stringSerializer = SerializationExtension(system).serializerFor(classOf[String]) + + private val log = LoggerFactory.getLogger(getClass) + + // to be able to store events with specific timestamps + private def writeEvent(slice: Int, persistenceId: String, seqNr: Long, timestamp: Instant, event: String): Unit = { + log.debugN("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, seqNr, event, timestamp) + + val insertEventSql = sql""" + INSERT INTO ${settings.journalTableWithSchema(slice)} + (slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload) + VALUES (?, ?, ?, ?, ?, '', '', ?, '', ?)""" + + val entityType = PersistenceId.extractEntityType(persistenceId) + + val result = r2dbcExecutor(slice).updateOne("test writeEvent") { connection => + connection + .createStatement(insertEventSql) + .bind(0, slice) + .bind(1, entityType) + .bind(2, persistenceId) + .bind(3, seqNr) + .bindTimestamp(4, timestamp) + .bind(5, stringSerializer.identifier) + .bindPayload(6, stringSerializer.toBinary(event)) + } + + result.futureValue shouldBe 1 + } + private class Setup { val entityType = nextEntityType() val persistenceId = nextPid(entityType) @@ -184,6 +230,48 @@ class EventsBySliceSpec assertFinished(result) } + "handle more events with same timestamp than buffer size, with overlapping seq numbers" in { + val entityType = nextEntityType() + val pid1 = nextPid(entityType) + val pid2 = nextPid(entityType) + val pid3 = nextPid(entityType) + val slice1 = query.sliceForPersistenceId(pid1) + val slice2 = query.sliceForPersistenceId(pid2) + val slice3 = query.sliceForPersistenceId(pid3) + val slices = Seq(slice1, slice2, slice3) + + val timestamp = InstantFactory.now() + writeEvent(slice1, pid1, 1L, timestamp, "A1") + writeEvent(slice1, pid1, 2L, timestamp, "A2") + writeEvent(slice1, pid1, 3L, timestamp, "A3") + writeEvent(slice1, pid1, 4L, timestamp, "A4") + writeEvent(slice1, pid1, 5L, timestamp, "A5") + writeEvent(slice1, pid1, 6L, timestamp, "A6") + writeEvent(slice2, pid2, 3L, timestamp, "B3") + writeEvent(slice2, pid2, 4L, timestamp, "B4") + writeEvent(slice3, pid3, 3L, timestamp, "C3") + + val queryWithSmallBuffer = PersistenceQuery(testKit.system) // buffer size = 4 + .readJournalFor[R2dbcReadJournal]("akka.persistence.r2dbc-small-buffer.query") + + val sinkProbe = TestSink[EventEnvelope[String]]() + + val result: TestSubscriber.Probe[EventEnvelope[String]] = + doQuery(entityType, slices.min, slices.max, NoOffset, queryWithSmallBuffer) + .runWith(sinkProbe) + .request(10) + + def take(n: Int): Set[String] = + (1 to n).map(_ => result.expectNext().event).toSet + + take(2) shouldBe Set("A1", "A2") + take(3) shouldBe Set("A3", "B3", "C3") + take(2) shouldBe Set("A4", "B4") + take(2) shouldBe Set("A5", "A6") + + assertFinished(result) + } + "include metadata" in { val probe = testKit.createTestProbe[Done]() val entityType = nextEntityType() From f2ace61989c3011e4c0d2c5952eeacf2195f15b3 Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Fri, 2 Aug 2024 17:20:33 +1200 Subject: [PATCH 5/7] suggestions from code review --- .../r2dbc/internal/BySliceQuery.scala | 5 +-- .../internal/postgres/PostgresQueryDao.scala | 5 +++ .../r2dbc/query/EventsBySliceSpec.scala | 41 ++++++++++++------- 3 files changed, 33 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala index 2a6c3f0b..b3f57625 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala @@ -74,9 +74,8 @@ import org.slf4j.Logger } } - private def highestSeenSeqNr(offset: TimestampOffset): Option[Long] = { - if (offset.seen.isEmpty) None else Some(offset.seen.values.max) - } + private def highestSeenSeqNr(offset: TimestampOffset): Option[Long] = + Option.when(offset.seen.nonEmpty)(offset.seen.values.max) object Buckets { type EpochSeconds = Long diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala index fab0eb10..84b97c03 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala @@ -77,6 +77,11 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e maxSlice: Int): String = { // not caching, too many combinations + // If more events than the buffer size are all on the same timestamp, then the query will get stuck on that same + // timestamp. Avoid this by also starting from the highest seen sequence number for that timestamp, using the fact + // that events are ordered by db_timestamp, seq_nr. Note that sequence numbers are per persistence id, so a later + // timestamp can have an earlier sequence number. Add a logical conditional only when db_timestamp = fromTimestamp + // to also filter for seq_nr >= fromSeqNr. Expressed in a logically equivalent form, where A -> B === ~A v B. def fromSeqNrParamCondition = if (fromSeqNrParam) "AND (db_timestamp != ? OR seq_nr >= ?)" else "" diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala index 5f0d6b46..dd75ada3 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala @@ -235,21 +235,27 @@ class EventsBySliceSpec val pid1 = nextPid(entityType) val pid2 = nextPid(entityType) val pid3 = nextPid(entityType) + val pid4 = nextPid(entityType) val slice1 = query.sliceForPersistenceId(pid1) val slice2 = query.sliceForPersistenceId(pid2) val slice3 = query.sliceForPersistenceId(pid3) - val slices = Seq(slice1, slice2, slice3) - - val timestamp = InstantFactory.now() - writeEvent(slice1, pid1, 1L, timestamp, "A1") - writeEvent(slice1, pid1, 2L, timestamp, "A2") - writeEvent(slice1, pid1, 3L, timestamp, "A3") - writeEvent(slice1, pid1, 4L, timestamp, "A4") - writeEvent(slice1, pid1, 5L, timestamp, "A5") - writeEvent(slice1, pid1, 6L, timestamp, "A6") - writeEvent(slice2, pid2, 3L, timestamp, "B3") - writeEvent(slice2, pid2, 4L, timestamp, "B4") - writeEvent(slice3, pid3, 3L, timestamp, "C3") + val slice4 = query.sliceForPersistenceId(pid4) + val slices = Seq(slice1, slice2, slice3, slice4) + val t1 = InstantFactory.now() + val t2 = t1.plusMillis(1) + + writeEvent(slice1, pid1, 1L, t1, "A1") + writeEvent(slice1, pid1, 2L, t1, "A2") + writeEvent(slice1, pid1, 3L, t1, "A3") + writeEvent(slice1, pid1, 4L, t1, "A4") + writeEvent(slice1, pid1, 5L, t1, "A5") + writeEvent(slice1, pid1, 6L, t1, "A6") + writeEvent(slice2, pid2, 3L, t1, "B3") + writeEvent(slice2, pid2, 4L, t1, "B4") + writeEvent(slice3, pid3, 3L, t1, "C3") + writeEvent(slice4, pid4, 1L, t2, "D1") + writeEvent(slice4, pid4, 2L, t2, "D2") + writeEvent(slice4, pid4, 3L, t2, "D3") val queryWithSmallBuffer = PersistenceQuery(testKit.system) // buffer size = 4 .readJournalFor[R2dbcReadJournal]("akka.persistence.r2dbc-small-buffer.query") @@ -259,15 +265,20 @@ class EventsBySliceSpec val result: TestSubscriber.Probe[EventEnvelope[String]] = doQuery(entityType, slices.min, slices.max, NoOffset, queryWithSmallBuffer) .runWith(sinkProbe) - .request(10) + .request(15) def take(n: Int): Set[String] = (1 to n).map(_ => result.expectNext().event).toSet - take(2) shouldBe Set("A1", "A2") + take(1) shouldBe Set("A1") + take(1) shouldBe Set("A2") take(3) shouldBe Set("A3", "B3", "C3") take(2) shouldBe Set("A4", "B4") - take(2) shouldBe Set("A5", "A6") + take(1) shouldBe Set("A5") + take(1) shouldBe Set("A6") + take(1) shouldBe Set("D1") + take(1) shouldBe Set("D2") + take(1) shouldBe Set("D3") assertFinished(result) } From 09e05fcc8b3c1b90aa18525b1990c9c3882da9ab Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Mon, 5 Aug 2024 13:33:29 +1200 Subject: [PATCH 6/7] track previous timestamp to only filter by seq nr when same timestamp --- .../r2dbc/internal/BySliceQuery.scala | 42 ++++++++++++++----- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala index b3f57625..6f9164d1 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala @@ -31,7 +31,19 @@ import org.slf4j.Logger object QueryState { val empty: QueryState = - QueryState(TimestampOffset.Zero, 0, 0, 0, 0, backtrackingCount = 0, TimestampOffset.Zero, 0, 0, Buckets.empty) + QueryState( + latest = TimestampOffset.Zero, + rowCount = 0, + rowCountSinceBacktracking = 0, + queryCount = 0, + idleCount = 0, + backtrackingCount = 0, + latestBacktracking = TimestampOffset.Zero, + latestBacktrackingSeenCount = 0, + backtrackingExpectFiltered = 0, + buckets = Buckets.empty, + previous = TimestampOffset.Zero, + previousBacktracking = TimestampOffset.Zero) } final case class QueryState( @@ -44,7 +56,9 @@ import org.slf4j.Logger latestBacktracking: TimestampOffset, latestBacktrackingSeenCount: Int, backtrackingExpectFiltered: Int, - buckets: Buckets) { + buckets: Buckets, + previous: TimestampOffset, + previousBacktracking: TimestampOffset) { def backtracking: Boolean = backtrackingCount > 0 @@ -57,8 +71,8 @@ import org.slf4j.Logger else latest.timestamp def nextQueryFromSeqNr: Option[Long] = - if (backtracking) highestSeenSeqNr(latestBacktracking) - else highestSeenSeqNr(latest) + if (backtracking) highestSeenSeqNr(previousBacktracking, latestBacktracking) + else highestSeenSeqNr(previous, latest) def nextQueryToTimestamp(atLeastNumberOfEvents: Int): Option[Instant] = { buckets.findTimeForLimit(nextQueryFromTimestamp, atLeastNumberOfEvents) match { @@ -74,8 +88,11 @@ import org.slf4j.Logger } } - private def highestSeenSeqNr(offset: TimestampOffset): Option[Long] = - Option.when(offset.seen.nonEmpty)(offset.seen.values.max) + // only filter by highest seen seq nr when the next query is the same timestamp (or when unknown for initial queries) + private def highestSeenSeqNr(previous: TimestampOffset, latest: TimestampOffset): Option[Long] = + Option.when((previous == TimestampOffset.Zero || previous.timestamp == latest.timestamp) && latest.seen.nonEmpty) { + latest.seen.values.max + } object Buckets { type EpochSeconds = Long @@ -220,10 +237,10 @@ import org.slf4j.Logger // so continue until rowCount is 0. That means an extra query at the end to make sure there are no // more to fetch. if (state.queryCount == 0L || state.rowCount > 0) { - val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1) + val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1, previous = state.latest) val fromTimestamp = state.latest.timestamp - val fromSeqNr = highestSeenSeqNr(state.latest) + val fromSeqNr = highestSeenSeqNr(state.previous, state.latest) val toTimestamp = newState.nextQueryToTimestamp(settings.querySettings.bufferSize) match { case Some(t) => @@ -325,7 +342,8 @@ import org.slf4j.Logger val newSeenCount = if (offset.timestamp == state.latestBacktracking.timestamp && - highestSeenSeqNr(offset) == highestSeenSeqNr(state.latestBacktracking)) + highestSeenSeqNr(state.previousBacktracking, offset) == + highestSeenSeqNr(state.previousBacktracking, state.latestBacktracking)) state.latestBacktrackingSeenCount + 1 else 1 @@ -462,7 +480,11 @@ import org.slf4j.Logger else s"Found [${state.rowCount}] rows in previous query.") } - newState -> + val newStateWithPrevious = + if (newState.backtracking) newState.copy(previousBacktracking = newState.latestBacktracking) + else newState.copy(previous = newState.latest) + + newStateWithPrevious -> Some( dao .rowsBySlices( From faee7275a50d9cf7439bb8de53986e5d01b7255b Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Mon, 5 Aug 2024 13:57:42 +1200 Subject: [PATCH 7/7] ci: update docker compose command for yugabyte tests --- .github/workflows/build-test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 730116da..45152999 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -203,7 +203,7 @@ jobs: - name: Start DB run: |- - docker-compose -f docker/docker-compose-yugabyte.yml up -d + docker compose -f docker/docker-compose-yugabyte.yml up -d # TODO: could we poll the port instead of sleep? sleep 10 docker exec -i yb-tserver-n1 /home/yugabyte/bin/ysqlsh -h yb-tserver-n1 -t < ddl-scripts/create_tables_yugabyte.sql