From adc960e0ea9224c4f892ee95fb3fd89febd95df2 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 18 Jan 2024 13:35:37 +0100 Subject: [PATCH] fix: set default page-size (#1081) * max-buffer-size and max-result-size-query were not used * set the driver page size --- core/src/main/resources/reference.conf | 10 ++++++---- .../persistence/cassandra/query/QuerySettings.scala | 2 -- .../cassandra/query/AllPersistenceIdsSpec.scala | 7 +++---- .../cassandra/query/EventAdaptersReadSpec.scala | 3 +-- .../query/EventsByPersistenceIdFastForwardSpec.scala | 2 +- .../EventsByPersistenceIdMultiPartitionGapSpec.scala | 2 +- .../cassandra/query/EventsByPersistenceIdSpec.scala | 2 +- .../query/EventsByPersistenceIdWithControlSpec.scala | 2 +- .../persistence/cassandra/query/EventsByTagSpec.scala | 8 ++++---- .../cassandra/query/EventsByTagStageSpec.scala | 2 +- .../query/javadsl/CassandraReadJournalSpec.scala | 2 +- .../query/scaladsl/CassandraReadJournalSpec.scala | 2 +- 12 files changed, 21 insertions(+), 23 deletions(-) diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index e485f376..bea3ede9 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -223,10 +223,6 @@ akka.persistence.cassandra { # time to find the in-order sequence number before failing the stream events-by-persistence-id-gap-timeout = 10s - # How many events to fetch in one query (replay) and keep buffered until they - # are delivered downstreams. - max-buffer-size = 500 - # Deserialization of events is perfomed in an Akka streams mapAsync operator and this is the # parallelism for that. Increasing to means that deserialization is pipelined, which can # be an advantage for machines with many CPU cores but otherwise it might be slower because @@ -594,6 +590,12 @@ datastax-java-driver { consistency = QUORUM # the journal does not use any counters or collections default-idempotence = true + + # The page size. This controls how many rows will be retrieved simultaneously + # in a single network roundtrip (the goal being to avoid loading too many + # results in memory at the same time). If there are more results, additional + # requests will be used to retrieve them. + page-size = 500 } } akka-persistence-cassandra-snapshot-profile { diff --git a/core/src/main/scala/akka/persistence/cassandra/query/QuerySettings.scala b/core/src/main/scala/akka/persistence/cassandra/query/QuerySettings.scala index 6ac9b5e7..9a568890 100644 --- a/core/src/main/scala/akka/persistence/cassandra/query/QuerySettings.scala +++ b/core/src/main/scala/akka/persistence/cassandra/query/QuerySettings.scala @@ -32,8 +32,6 @@ import com.typesafe.config.Config val gapFreeSequenceNumbers: Boolean = queryConfig.getBoolean("gap-free-sequence-numbers") - val maxBufferSize: Int = queryConfig.getInt("max-buffer-size") - val deserializationParallelism: Int = queryConfig.getInt("deserialization-parallelism") val pluginDispatcher: String = queryConfig.getString("plugin-dispatcher") diff --git a/core/src/test/scala/akka/persistence/cassandra/query/AllPersistenceIdsSpec.scala b/core/src/test/scala/akka/persistence/cassandra/query/AllPersistenceIdsSpec.scala index d1257e03..9e8356df 100644 --- a/core/src/test/scala/akka/persistence/cassandra/query/AllPersistenceIdsSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/query/AllPersistenceIdsSpec.scala @@ -23,11 +23,10 @@ object AllPersistenceIdsSpec { akka.persistence.cassandra { journal.target-partition-size = 15 query { - max-buffer-size = 10 refresh-interval = 0.5s - max-result-size-query = 10 } - } + } + datastax-java-driver.profiles.akka-persistence-cassandra-profile.basic.request.page-size = 10 """).withFallback(CassandraLifecycle.config) } @@ -77,7 +76,7 @@ class AllPersistenceIdsSpec extends CassandraSpec(AllPersistenceIdsSpec.config) src.runWith(TestSink.probe[Any]).request(10).expectNext("d").expectComplete() } - "find existing persistence ids in batches if there is more of them than max-result-size-query" in { + "find existing persistence ids in batches if there is more of them than page-size" in { for (_ <- 1 to 1000) { setup(UUID.randomUUID().toString, 1) } diff --git a/core/src/test/scala/akka/persistence/cassandra/query/EventAdaptersReadSpec.scala b/core/src/test/scala/akka/persistence/cassandra/query/EventAdaptersReadSpec.scala index 119b7fc6..5f6fc1bc 100644 --- a/core/src/test/scala/akka/persistence/cassandra/query/EventAdaptersReadSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/query/EventAdaptersReadSpec.scala @@ -26,14 +26,13 @@ object EventAdaptersReadSpec { } } query { - max-buffer-size = 50 refresh-interval = 500ms - max-result-size-query = 2 } events-by-tag { flush-interval = 0ms } } + datastax-java-driver.profiles.akka-persistence-cassandra-profile.basic.request.page-size = 2 """).withFallback(CassandraLifecycle.config) } diff --git a/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdFastForwardSpec.scala b/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdFastForwardSpec.scala index 846befd0..5c8a1ba2 100644 --- a/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdFastForwardSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdFastForwardSpec.scala @@ -19,8 +19,8 @@ object EventsByPersistenceIdFastForwardSpec { val config = ConfigFactory.parseString(s""" akka.persistence.cassandra.journal.keyspace=EventsByPersistenceIdFastForwardSpec akka.persistence.cassandra.query.refresh-interval = 250ms - akka.persistence.cassandra.query.max-result-size-query = 2 akka.persistence.cassandra.journal.target-partition-size = 15 + datastax-java-driver.profiles.akka-persistence-cassandra-profile.basic.request.page-size = 2 """).withFallback(CassandraLifecycle.config) } diff --git a/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdMultiPartitionGapSpec.scala b/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdMultiPartitionGapSpec.scala index a389ebec..0ea6fe94 100644 --- a/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdMultiPartitionGapSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdMultiPartitionGapSpec.scala @@ -16,9 +16,9 @@ object EventsByPersistenceIdMultiPartitionGapSpec { akka.loglevel = INFO akka.persistence.cassandra.journal.target-partition-size = 15 akka.persistence.cassandra.query.refresh-interval = 0.5s - akka.persistence.cassandra.query.max-result-size-query = 2 akka.persistence.cassandra.query.events-by-persistence-id-gap-timeout = 4 seconds akka.persistence.cassandra.query.gap-free-sequence-numbers = off + datastax-java-driver.profiles.akka-persistence-cassandra-profile.basic.request.page-size = 2 akka.stream.materializer.max-input-buffer-size = 4 # there is an async boundary """).withFallback(CassandraLifecycle.config) } diff --git a/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdSpec.scala b/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdSpec.scala index 5414d80e..f5c5fe72 100644 --- a/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdSpec.scala @@ -23,8 +23,8 @@ object EventsByPersistenceIdSpec { val config = ConfigFactory.parseString(s""" akka.persistence.cassandra.journal.target-partition-size = 15 akka.persistence.cassandra.query.refresh-interval = 0.5s - akka.persistence.cassandra.query.max-result-size-query = 2 akka.persistence.cassandra.query.events-by-persistence-id-gap-timeout = 4 seconds + datastax-java-driver.profiles.akka-persistence-cassandra-profile.basic.request.page-size = 2 akka.stream.materializer.max-input-buffer-size = 4 # there is an async boundary """).withFallback(CassandraLifecycle.config) } diff --git a/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdWithControlSpec.scala b/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdWithControlSpec.scala index 2cecffa7..7108ff57 100644 --- a/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdWithControlSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/query/EventsByPersistenceIdWithControlSpec.scala @@ -19,7 +19,7 @@ object EventsByPersistenceIdWithControlSpec { akka.persistence.cassandra.journal.keyspace=EventsByPersistenceIdWithControlSpec akka.persistence.cassandra.journal.target-partition-size = 15 akka.persistence.cassandra.query.refresh-interval = 120s # effectively disabled - akka.persistence.cassandra.query.max-result-size-query = 20 + datastax-java-driver.profiles.akka-persistence-cassandra-profile.basic.request.page-size = 20 akka.stream.materializer.max-input-buffer-size = 4 # there is an async boundary """).withFallback(CassandraLifecycle.config) } diff --git a/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala b/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala index aafa29f4..c3196316 100644 --- a/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala @@ -60,7 +60,6 @@ object EventsByTagSpec { query { refresh-interval = 500ms - max-buffer-size = 50 } events-by-tag { @@ -73,6 +72,7 @@ object EventsByTagSpec { # coordinated-shutdown-on-error = on } + datastax-java-driver.profiles.akka-persistence-cassandra-profile.basic.request.page-size = 50 """).withFallback(CassandraLifecycle.config) val strictConfig = ConfigFactory.parseString(s""" @@ -922,7 +922,7 @@ class EventsByTagStrictBySeqMemoryIssueSpec extends AbstractEventsByTagSpec(Even val w2 = UUID.randomUUID().toString val w3 = UUID.randomUUID().toString - // max-buffer-size = 50 + // page-size = 50 // create 120 events per day in total, 60 from each one of the two persistenceId var lastT = t1 for { @@ -986,7 +986,7 @@ class EventsByTagStrictBySeqMemoryIssueSpec extends AbstractEventsByTagSpec(Even val w2 = UUID.randomUUID().toString val w3 = UUID.randomUUID().toString - // max-buffer-size = 50 + // buffer-size = 50 (1L to 100L).foreach { n => val eventA = PersistentRepr(s"A$n", n, "a", "", writerUuid = w1) val t = t1.plus(3 * n, ChronoUnit.MILLIS) @@ -1040,7 +1040,7 @@ class EventsByTagStrictBySeqMemoryIssueSpec extends AbstractEventsByTagSpec(Even val t1 = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(5).minusDays(5) val w1 = UUID.randomUUID().toString - // max-buffer-size = 50 + // page-size = 50 // start at seqNr 1 here to trigger the backtracking mode (101L to 430L).foreach { n => val eventA = PersistentRepr(s"B$n", n, "b", "", writerUuid = w1) diff --git a/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagStageSpec.scala b/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagStageSpec.scala index 30c80a4d..341c65b6 100644 --- a/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagStageSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagStageSpec.scala @@ -39,7 +39,6 @@ object EventsByTagStageSpec { log-queries = off query { - max-result-size-query = $fetchSize log-queries = on refresh-interval = 200ms } @@ -54,6 +53,7 @@ object EventsByTagStageSpec { new-persistence-id-scan-timeout = ${newPersistenceIdTimeout.toMillis}ms } } + datastax-java-driver.profiles.akka-persistence-cassandra-profile.basic.request.page-size = $fetchSize """).withFallback(CassandraLifecycle.config) } diff --git a/core/src/test/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournalSpec.scala b/core/src/test/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournalSpec.scala index 77e27845..e4090fdc 100644 --- a/core/src/test/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournalSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournalSpec.scala @@ -16,7 +16,6 @@ import scala.concurrent.duration._ object CassandraReadJournalSpec { val config = ConfigFactory.parseString(s""" akka.actor.serialize-messages=off - akka.persistence.cassandra.query.max-buffer-size = 10 akka.persistence.cassandra.query.refresh-interval = 0.5s akka.persistence.cassandra.journal.event-adapters { test-tagger = akka.persistence.cassandra.query.javadsl.TestTagger @@ -24,6 +23,7 @@ object CassandraReadJournalSpec { akka.persistence.cassandra.journal.event-adapter-bindings = { "java.lang.String" = test-tagger } + datastax-java-driver.profiles.akka-persistence-cassandra-profile.basic.request.page-size = 10 """).withFallback(CassandraLifecycle.config) } diff --git a/core/src/test/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournalSpec.scala b/core/src/test/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournalSpec.scala index 54187dac..6d5fe994 100644 --- a/core/src/test/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournalSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournalSpec.scala @@ -17,7 +17,6 @@ import scala.concurrent.duration._ object CassandraReadJournalSpec { val config = ConfigFactory.parseString(s""" akka.actor.serialize-messages=off - akka.persistence.cassandra.query.max-buffer-size = 10 akka.persistence.cassandra.query.refresh-interval = 0.5s akka.persistence.cassandra.journal.event-adapters { test-tagger = akka.persistence.cassandra.query.scaladsl.TestTagger @@ -26,6 +25,7 @@ object CassandraReadJournalSpec { "java.lang.String" = test-tagger } akka.persistence.cassandra.log-queries = off + datastax-java-driver.profiles.akka-persistence-cassandra-profile.basic.request.page-size = 10 """).withFallback(CassandraLifecycle.config) }