From 90aeaac7fcc72052fa1fd44f9c91efd77baf311b Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Fri, 2 Aug 2024 17:20:33 +1200 Subject: [PATCH] suggestions from code review --- .../r2dbc/internal/BySliceQuery.scala | 5 +-- .../internal/postgres/PostgresQueryDao.scala | 5 +++ .../r2dbc/query/EventsBySliceSpec.scala | 41 ++++++++++++------- 3 files changed, 33 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala index 2a6c3f0b..b3f57625 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala @@ -74,9 +74,8 @@ import org.slf4j.Logger } } - private def highestSeenSeqNr(offset: TimestampOffset): Option[Long] = { - if (offset.seen.isEmpty) None else Some(offset.seen.values.max) - } + private def highestSeenSeqNr(offset: TimestampOffset): Option[Long] = + Option.when(offset.seen.nonEmpty)(offset.seen.values.max) object Buckets { type EpochSeconds = Long diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala index fab0eb10..84b97c03 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala @@ -77,6 +77,11 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e maxSlice: Int): String = { // not caching, too many combinations + // If more events than the buffer size are all on the same timestamp, then the query will get stuck on that same + // timestamp. Avoid this by also starting from the highest seen sequence number for that timestamp, using the fact + // that events are ordered by db_timestamp, seq_nr. Note that sequence numbers are per persistence id, so a later + // timestamp can have an earlier sequence number. Add a logical conditional only when db_timestamp = fromTimestamp + // to also filter for seq_nr >= fromSeqNr. Expressed in a logically equivalent form, where A -> B === ~A v B. def fromSeqNrParamCondition = if (fromSeqNrParam) "AND (db_timestamp != ? OR seq_nr >= ?)" else "" diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala index 5f0d6b46..66e44377 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala @@ -235,21 +235,27 @@ class EventsBySliceSpec val pid1 = nextPid(entityType) val pid2 = nextPid(entityType) val pid3 = nextPid(entityType) + val pid4 = nextPid(entityType) val slice1 = query.sliceForPersistenceId(pid1) val slice2 = query.sliceForPersistenceId(pid2) val slice3 = query.sliceForPersistenceId(pid3) - val slices = Seq(slice1, slice2, slice3) - - val timestamp = InstantFactory.now() - writeEvent(slice1, pid1, 1L, timestamp, "A1") - writeEvent(slice1, pid1, 2L, timestamp, "A2") - writeEvent(slice1, pid1, 3L, timestamp, "A3") - writeEvent(slice1, pid1, 4L, timestamp, "A4") - writeEvent(slice1, pid1, 5L, timestamp, "A5") - writeEvent(slice1, pid1, 6L, timestamp, "A6") - writeEvent(slice2, pid2, 3L, timestamp, "B3") - writeEvent(slice2, pid2, 4L, timestamp, "B4") - writeEvent(slice3, pid3, 3L, timestamp, "C3") + val slice4 = query.sliceForPersistenceId(pid4) + val slices = Seq(slice1, slice2, slice3, slice4) + val t1 = InstantFactory.now() + val t2 = t1.plusMillis(100) + + writeEvent(slice1, pid1, 1L, t1, "A1") + writeEvent(slice1, pid1, 2L, t1, "A2") + writeEvent(slice1, pid1, 3L, t1, "A3") + writeEvent(slice1, pid1, 4L, t1, "A4") + writeEvent(slice1, pid1, 5L, t1, "A5") + writeEvent(slice1, pid1, 6L, t1, "A6") + writeEvent(slice2, pid2, 3L, t1, "B3") + writeEvent(slice2, pid2, 4L, t1, "B4") + writeEvent(slice3, pid3, 3L, t1, "C3") + writeEvent(slice4, pid4, 1L, t2, "D1") + writeEvent(slice4, pid4, 2L, t2, "D2") + writeEvent(slice4, pid4, 3L, t2, "D3") val queryWithSmallBuffer = PersistenceQuery(testKit.system) // buffer size = 4 .readJournalFor[R2dbcReadJournal]("akka.persistence.r2dbc-small-buffer.query") @@ -259,15 +265,20 @@ class EventsBySliceSpec val result: TestSubscriber.Probe[EventEnvelope[String]] = doQuery(entityType, slices.min, slices.max, NoOffset, queryWithSmallBuffer) .runWith(sinkProbe) - .request(10) + .request(15) def take(n: Int): Set[String] = (1 to n).map(_ => result.expectNext().event).toSet - take(2) shouldBe Set("A1", "A2") + take(1) shouldBe Set("A1") + take(1) shouldBe Set("A2") take(3) shouldBe Set("A3", "B3", "C3") take(2) shouldBe Set("A4", "B4") - take(2) shouldBe Set("A5", "A6") + take(1) shouldBe Set("A5") + take(1) shouldBe Set("A6") + take(1) shouldBe Set("D1") + take(1) shouldBe Set("D2") + take(1) shouldBe Set("D3") assertFinished(result) }