diff --git a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala index 3e9849b2..ee4469f2 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala @@ -113,25 +113,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat row.payload.map(payload => serialization.deserialize(payload, row.serId, row.serManifest).get.asInstanceOf[Event]) private val _bySlice: BySliceQuery[SerializedJournalRow, EventEnvelope[Any]] = { - val createEnvelope: (TimestampOffset, SerializedJournalRow) => EventEnvelope[Any] = (offset, row) => { - val event = deserializePayload(row) - val metadata = row.metadata.map(meta => serialization.deserialize(meta.payload, meta.serId, meta.serManifest).get) - val source = if (event.isDefined) EnvelopeOrigin.SourceQuery else EnvelopeOrigin.SourceBacktracking - val filtered = row.serId == filteredPayloadSerId - - new EventEnvelope( - offset, - row.persistenceId, - row.seqNr, - if (filtered) None else event, - row.dbTimestamp.toEpochMilli, - metadata, - row.entityType, - row.slice, - filtered, - source, - tags = row.tags) - } + val createEnvelope: (TimestampOffset, SerializedJournalRow) => EventEnvelope[Any] = createEventEnvelope val extractOffset: EventEnvelope[Any] => TimestampOffset = env => env.offset.asInstanceOf[TimestampOffset] @@ -141,6 +123,43 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat private def bySlice[Event]: BySliceQuery[SerializedJournalRow, EventEnvelope[Event]] = _bySlice.asInstanceOf[BySliceQuery[SerializedJournalRow, EventEnvelope[Event]]] + private def deserializeBySliceRow[Event](row: SerializedJournalRow): EventEnvelope[Event] = { + val offset = TimestampOffset(row.dbTimestamp, row.readDbTimestamp, Map(row.persistenceId -> row.seqNr)) + createEventEnvelope(offset, row) + } + + private def createEventEnvelope[Event](offset: TimestampOffset, row: SerializedJournalRow): EventEnvelope[Event] = { + val event = deserializePayload(row) + val metadata = row.metadata.map(meta => serialization.deserialize(meta.payload, meta.serId, meta.serManifest).get) + val source = if (event.isDefined) EnvelopeOrigin.SourceQuery else EnvelopeOrigin.SourceBacktracking + val filtered = row.serId == filteredPayloadSerId + + new EventEnvelope( + offset, + row.persistenceId, + row.seqNr, + if (filtered) None else event, + row.dbTimestamp.toEpochMilli, + metadata, + row.entityType, + row.slice, + filtered, + source, + tags = row.tags) + } + + private def deserializeRow(row: SerializedJournalRow): ClassicEventEnvelope = { + val event = deserializePayload(row) + // note that it's not possible to filter out FilteredPayload here + val offset = TimestampOffset(row.dbTimestamp, row.readDbTimestamp, Map(row.persistenceId -> row.seqNr)) + val envelope = ClassicEventEnvelope(offset, row.persistenceId, row.seqNr, event.get, row.dbTimestamp.toEpochMilli) + row.metadata match { + case None => envelope + case Some(meta) => + envelope.withMetadata(serialization.deserialize(meta.payload, meta.serId, meta.serManifest).get) + } + } + private def snapshotsBySlice[Snapshot, Event]( transformSnapshot: Snapshot => Event): BySliceQuery[SerializedSnapshotRow, EventEnvelope[Event]] = { val createEnvelope: (TimestampOffset, SerializedSnapshotRow) => EventEnvelope[Event] = @@ -753,39 +772,6 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat .mapMaterializedValue(_ => NotUsed) } - private def deserializeBySliceRow[Event](row: SerializedJournalRow): EventEnvelope[Event] = { - val event = deserializePayload[Event](row) - val offset = TimestampOffset(row.dbTimestamp, row.readDbTimestamp, Map(row.persistenceId -> row.seqNr)) - val metadata = row.metadata.map(meta => serialization.deserialize(meta.payload, meta.serId, meta.serManifest).get) - val source = if (event.isDefined) EnvelopeOrigin.SourceQuery else EnvelopeOrigin.SourceBacktracking - val filtered = row.serId == filteredPayloadSerId - - new EventEnvelope( - offset, - row.persistenceId, - row.seqNr, - if (filtered) None else event, - row.dbTimestamp.toEpochMilli, - metadata, - row.entityType, - row.slice, - filtered, - source, - tags = row.tags) - } - - private def deserializeRow(row: SerializedJournalRow): ClassicEventEnvelope = { - val event = deserializePayload(row) - // note that it's not possible to filter out FilteredPayload here - val offset = TimestampOffset(row.dbTimestamp, row.readDbTimestamp, Map(row.persistenceId -> row.seqNr)) - val envelope = ClassicEventEnvelope(offset, row.persistenceId, row.seqNr, event.get, row.dbTimestamp.toEpochMilli) - row.metadata match { - case None => envelope - case Some(meta) => - envelope.withMetadata(serialization.deserialize(meta.payload, meta.serId, meta.serManifest).get) - } - } - override def currentPersistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed] = queryDao.persistenceIds(afterId, limit) diff --git a/core/src/test/scala/akka/persistence/r2dbc/PayloadSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/PayloadSpec.scala index e0a87bf6..2a0e47a0 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/PayloadSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/PayloadSpec.scala @@ -6,15 +6,17 @@ package akka.persistence.r2dbc import java.nio.charset.StandardCharsets.UTF_8 +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike + import akka.Done import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorSystem import akka.persistence.r2dbc.TestActors.DurableStatePersister import akka.persistence.r2dbc.TestActors.Persister +import akka.persistence.r2dbc.internal.codec.PayloadCodec import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow -import com.typesafe.config.ConfigFactory -import org.scalatest.wordspec.AnyWordSpecLike /** * The purpose of this test is to verify JSONB payloads, but it can also be run with ordinary BYTEA payloads. To test @@ -59,7 +61,6 @@ class PayloadSpec import PayloadSpec._ override def typedSystem: ActorSystem[_] = system - private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc")) private def testJournalPersister(persistenceId: String, msg: Any): Unit = { val probe = createTestProbe[Any]() @@ -88,7 +89,7 @@ class PayloadSpec } private def selectJournalRow(persistenceId: String): TestRow = { - import settings.codecSettings.JournalImplicits.journalPayloadCodec + implicit val codec: PayloadCodec = settings.codecSettings.JournalImplicits.journalPayloadCodec val slice = persistenceExt.sliceForPersistenceId(persistenceId) r2dbcExecutor(slice) @@ -108,7 +109,7 @@ class PayloadSpec } private def selectSnapshotRow(persistenceId: String): TestRow = { - import settings.codecSettings.SnapshotImplicits.snapshotPayloadCodec + implicit val codec: PayloadCodec = settings.codecSettings.SnapshotImplicits.snapshotPayloadCodec val slice = persistenceExt.sliceForPersistenceId(persistenceId) r2dbcExecutor(slice) @@ -128,7 +129,7 @@ class PayloadSpec } private def selectDurableStateRow(persistenceId: String): TestRow = { - import settings.codecSettings.DurableStateImplicits.durableStatePayloadCodec + implicit val codec: PayloadCodec = settings.codecSettings.DurableStateImplicits.durableStatePayloadCodec val slice = persistenceExt.sliceForPersistenceId(persistenceId) r2dbcExecutor(slice) diff --git a/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala b/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala index e9fe8559..859b2995 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala @@ -28,14 +28,14 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite => def testConfigPath: String = "akka.persistence.r2dbc" - lazy val r2dbcSettings: R2dbcSettings = + lazy val settings: R2dbcSettings = R2dbcSettings(typedSystem.settings.config.getConfig(testConfigPath)) lazy val r2dbcExecutorProvider: R2dbcExecutorProvider = new R2dbcExecutorProvider( typedSystem, - r2dbcSettings.connectionFactorySettings.dialect.daoExecutionContext(r2dbcSettings, typedSystem), - r2dbcSettings, + settings.connectionFactorySettings.dialect.daoExecutionContext(settings, typedSystem), + settings, testConfigPath + ".connection-factory", LoggerFactory.getLogger(getClass)) @@ -48,22 +48,22 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite => lazy val persistenceExt: Persistence = Persistence(typedSystem) def pendingIfMoreThanOneDataPartition(): Unit = - if (r2dbcSettings.numberOfDataPartitions > 1) + if (settings.numberOfDataPartitions > 1) pending override protected def beforeAll(): Unit = { try { - r2dbcSettings.allJournalTablesWithSchema.foreach { case (table, minSlice) => + settings.allJournalTablesWithSchema.foreach { case (table, minSlice) => Await.result( r2dbcExecutor(minSlice).updateOne("beforeAll delete")(_.createStatement(s"delete from $table")), 10.seconds) } - r2dbcSettings.allSnapshotTablesWithSchema.foreach { case (table, minSlice) => + settings.allSnapshotTablesWithSchema.foreach { case (table, minSlice) => Await.result( r2dbcExecutor(minSlice).updateOne("beforeAll delete")(_.createStatement(s"delete from $table")), 10.seconds) } - r2dbcSettings.allDurableStateTablesWithSchema.foreach { case (table, minSlice) => + settings.allDurableStateTablesWithSchema.foreach { case (table, minSlice) => Await.result( r2dbcExecutor(minSlice).updateOne("beforeAll delete")(_.createStatement(s"delete from $table")), 10.seconds) diff --git a/core/src/test/scala/akka/persistence/r2dbc/internal/H2AdditionalInitForSchemaSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/internal/H2AdditionalInitForSchemaSpec.scala index 6e1d1457..988b92ae 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/internal/H2AdditionalInitForSchemaSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/internal/H2AdditionalInitForSchemaSpec.scala @@ -55,8 +55,7 @@ class H2AdditionalInitForSchemaSpec private def exists(slice: Int, whereCondition: String): Boolean = r2dbcExecutor(slice) .selectOne("count")( - _.createStatement( - s"select count(*) from ${r2dbcSettings.durableStateTableWithSchema(slice)} where $whereCondition"), + _.createStatement(s"select count(*) from ${settings.durableStateTableWithSchema(slice)} where $whereCondition"), row => row.get(0, classOf[java.lang.Long]).longValue()) .futureValue .contains(1) diff --git a/core/src/test/scala/akka/persistence/r2dbc/internal/R2dbcExecutorSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/internal/R2dbcExecutorSpec.scala index 28c3d22f..401ecd9a 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/internal/R2dbcExecutorSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/internal/R2dbcExecutorSpec.scala @@ -48,12 +48,12 @@ class R2dbcExecutorSpec // need pg_sleep or similar private def canBeTestedWithDialect: Boolean = - r2dbcSettings.connectionFactorySettings.dialect == PostgresDialect || - r2dbcSettings.connectionFactorySettings.dialect == YugabyteDialect + settings.connectionFactorySettings.dialect == PostgresDialect || + settings.connectionFactorySettings.dialect == YugabyteDialect private def pendingIfCannotBeTestedWithDialect(): Unit = { if (!canBeTestedWithDialect) { - info(s"Can't be tested with dialect [${r2dbcSettings.dialectName}]") + info(s"Can't be tested with dialect [${settings.dialectName}]") pending } } @@ -89,7 +89,7 @@ class R2dbcExecutorSpec connection.createStatement(s"insert into $table (col) values ('b')")) } - Thread.sleep(r2dbcSettings.connectionFactorySettings.poolSettings.closeCallsExceeding.get.toMillis) + Thread.sleep(settings.connectionFactorySettings.poolSettings.closeCallsExceeding.get.toMillis) // The request will fail with PostgresConnectionClosedException result.failed.futureValue shouldBe a[R2dbcNonTransientResourceException] @@ -118,7 +118,7 @@ class R2dbcExecutorSpec val result = r2dbcExecutor.updateOne("test")(_.createStatement("select pg_sleep(4)")) - Thread.sleep(r2dbcSettings.connectionFactorySettings.poolSettings.closeCallsExceeding.get.toMillis) + Thread.sleep(settings.connectionFactorySettings.poolSettings.closeCallsExceeding.get.toMillis) // The request will fail with PostgresConnectionClosedException result.failed.futureValue shouldBe a[R2dbcNonTransientResourceException] diff --git a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistSerializedEventSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistSerializedEventSpec.scala index 08c10762..78ecf8a1 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistSerializedEventSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistSerializedEventSpec.scala @@ -9,7 +9,6 @@ import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorSystem import akka.persistence.SerializedEvent -import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.TestActors.Persister import akka.persistence.r2dbc.TestConfig import akka.persistence.r2dbc.TestData @@ -27,7 +26,6 @@ class PersistSerializedEventSpec with LogCapturing { override def typedSystem: ActorSystem[_] = system - private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc")) case class Row(pid: String, seqNr: Long, serializerId: Int, manifest: String) diff --git a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala index 5351a7d3..63191dce 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTagsSpec.scala @@ -6,18 +6,19 @@ package akka.persistence.r2dbc.journal import scala.concurrent.duration._ +import org.scalatest.wordspec.AnyWordSpecLike + import akka.Done import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorSystem -import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.TestActors.Persister import akka.persistence.r2dbc.TestConfig import akka.persistence.r2dbc.TestData import akka.persistence.r2dbc.TestDbLifecycle +import akka.persistence.r2dbc.internal.codec.TagsCodec import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichRow import akka.persistence.typed.PersistenceId -import org.scalatest.wordspec.AnyWordSpecLike class PersistTagsSpec extends ScalaTestWithActorTestKit(TestConfig.config) @@ -27,8 +28,7 @@ class PersistTagsSpec with LogCapturing { override def typedSystem: ActorSystem[_] = system - private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc")) - import settings.codecSettings.JournalImplicits.tagsCodec + implicit val codec: TagsCodec = settings.codecSettings.JournalImplicits.tagsCodec case class Row(pid: String, seqNr: Long, tags: Set[String]) private def selectRows(table: String, minSlice: Int): IndexedSeq[Row] = { @@ -44,7 +44,7 @@ class PersistTagsSpec } private def selectAllRows(): IndexedSeq[Row] = - r2dbcSettings.allJournalTablesWithSchema.toVector.sortBy(_._1).flatMap { case (table, minSlice) => + settings.allJournalTablesWithSchema.toVector.sortBy(_._1).flatMap { case (table, minSlice) => selectRows(table, minSlice) } diff --git a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTimestampSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTimestampSpec.scala index bacf5cf5..fb5a28ba 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTimestampSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/journal/PersistTimestampSpec.scala @@ -5,23 +5,27 @@ package akka.persistence.r2dbc.journal import java.time.Instant + import scala.concurrent.duration._ + +import org.scalatest.wordspec.AnyWordSpecLike + import akka.Done import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorSystem -import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow -import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.TestActors.Persister import akka.persistence.r2dbc.TestConfig import akka.persistence.r2dbc.TestData import akka.persistence.r2dbc.TestDbLifecycle +import akka.persistence.r2dbc.internal.codec.PayloadCodec +import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow import akka.persistence.r2dbc.internal.codec.TimestampCodec -import akka.persistence.r2dbc.internal.codec.TimestampCodec.{ PostgresTimestampCodec, SqlServerTimestampCodec } import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow +import akka.persistence.r2dbc.internal.codec.TimestampCodec.PostgresTimestampCodec +import akka.persistence.r2dbc.internal.codec.TimestampCodec.SqlServerTimestampCodec import akka.persistence.typed.PersistenceId import akka.serialization.SerializationExtension -import org.scalatest.wordspec.AnyWordSpecLike class PersistTimestampSpec extends ScalaTestWithActorTestKit(TestConfig.config) @@ -30,9 +34,8 @@ class PersistTimestampSpec with TestData with LogCapturing { - import settings.codecSettings.JournalImplicits.journalPayloadCodec + implicit val payloadCodec: PayloadCodec = settings.codecSettings.JournalImplicits.journalPayloadCodec override def typedSystem: ActorSystem[_] = system - private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc")) private val serialization = SerializationExtension(system) case class Row(pid: String, seqNr: Long, dbTimestamp: Instant, event: String) @@ -64,7 +67,7 @@ class PersistTimestampSpec } private def selectAllRows(): IndexedSeq[Row] = - r2dbcSettings.allJournalTablesWithSchema.toVector.sortBy(_._1).flatMap { case (table, minSlice) => + settings.allJournalTablesWithSchema.toVector.sortBy(_._1).flatMap { case (table, minSlice) => selectRows(table, minSlice) } diff --git a/core/src/test/scala/akka/persistence/r2dbc/journal/R2dbcJournalPerfManyActorsSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/journal/R2dbcJournalPerfManyActorsSpec.scala index 6b8b718c..e1acc81c 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/journal/R2dbcJournalPerfManyActorsSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/journal/R2dbcJournalPerfManyActorsSpec.scala @@ -34,7 +34,7 @@ class R2dbcJournalPerfManyActorsSpec extends JournalPerfSpec(R2dbcJournalPerfSpe "A PersistentActor's performance" must { - if (r2dbcSettings.dialectName == "sqlserver") { + if (settings.dialectName == "sqlserver") { pending } diff --git a/core/src/test/scala/akka/persistence/r2dbc/journal/R2dbcJournalPerfSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/journal/R2dbcJournalPerfSpec.scala index c57bea6c..ac2f3c5c 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/journal/R2dbcJournalPerfSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/journal/R2dbcJournalPerfSpec.scala @@ -31,7 +31,7 @@ class R2dbcJournalPerfSpec extends JournalPerfSpec(R2dbcJournalPerfSpec.config) override def typedSystem: ActorSystem[_] = system.toTyped override def benchActor(replyAfter: Int): ActorRef = { - if (r2dbcSettings.dialectName == "sqlserver") + if (settings.dialectName == "sqlserver") throw new TestPendingException else super.benchActor(replyAfter) diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala index 0c52fab4..0957eb65 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala @@ -12,10 +12,8 @@ import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorSystem import akka.persistence.query.PersistenceQuery -import akka.persistence.query.TimestampOffset import akka.persistence.query.TimestampOffset.toTimestampOffset import akka.persistence.query.{ EventEnvelope => ClassicEventEnvelope } -import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.TestActors import akka.persistence.r2dbc.TestActors.Persister import akka.persistence.r2dbc.TestActors.Persister.PersistWithAck @@ -45,7 +43,6 @@ class EventsByPersistenceIdSpec import EventsByPersistenceIdSpec._ override def typedSystem: ActorSystem[_] = system - private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc")) private val query = PersistenceQuery(testKit.system).readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier) diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala index 3ccabfef..6e8dd420 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala @@ -6,7 +6,13 @@ package akka.persistence.r2dbc.query import java.time.Instant import java.time.temporal.ChronoUnit + import scala.concurrent.duration._ + +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike +import org.slf4j.LoggerFactory + import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorSystem @@ -16,25 +22,22 @@ import akka.persistence.query.Offset import akka.persistence.query.PersistenceQuery import akka.persistence.query.TimestampOffset import akka.persistence.query.typed.EventEnvelope -import akka.persistence.r2dbc.R2dbcSettings -import akka.persistence.r2dbc.internal.codec.TimestampCodec -import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement -import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement -import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.TestConfig import akka.persistence.r2dbc.TestData import akka.persistence.r2dbc.TestDbLifecycle import akka.persistence.r2dbc.internal.EnvelopeOrigin import akka.persistence.r2dbc.internal.InstantFactory +import akka.persistence.r2dbc.internal.codec.PayloadCodec +import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement +import akka.persistence.r2dbc.internal.codec.TimestampCodec +import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement +import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter import akka.persistence.r2dbc.internal.codec.QueryAdapter import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal import akka.persistence.typed.PersistenceId import akka.serialization.SerializationExtension import akka.stream.testkit.TestSubscriber import akka.stream.testkit.scaladsl.TestSink -import com.typesafe.config.ConfigFactory -import org.scalatest.wordspec.AnyWordSpecLike -import org.slf4j.LoggerFactory object EventsBySliceBacktrackingSpec { private val BufferSize = 10 // small buffer for testing @@ -55,8 +58,9 @@ class EventsBySliceBacktrackingSpec with LogCapturing { override def typedSystem: ActorSystem[_] = system - private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc")) - import settings.codecSettings.JournalImplicits._ + implicit val payloadCodec: PayloadCodec = settings.codecSettings.JournalImplicits.journalPayloadCodec + implicit val timestampCodec: TimestampCodec = settings.codecSettings.JournalImplicits.timestampCodec + implicit val queryAdapter: QueryAdapter = settings.codecSettings.JournalImplicits.queryAdapter private val query = PersistenceQuery(testKit.system) .readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier) @@ -152,7 +156,7 @@ class EventsBySliceBacktrackingSpec // no backtracking yet result.expectNoMessage(settings.querySettings.refreshInterval + 100.millis) - // after 1/2 of the backtracking widow, to kick off a backtracking query + // after 1/2 of the backtracking, to kick off a backtracking query writeEvent( slice1, pid1, diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySlicePerfSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySlicePerfSpec.scala index de083e1a..65f2cd63 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySlicePerfSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySlicePerfSpec.scala @@ -59,7 +59,7 @@ class EventsBySlicePerfSpec s"EventsBySlices performance" should { - if (r2dbcSettings.dialectName == "sqlserver") { + if (settings.dialectName == "sqlserver") { pending } @@ -175,9 +175,9 @@ class EventsBySlicePerfSpec val delayed = (EnvelopeOrigin.fromPubSub(env) && lagMillis > 50) || (EnvelopeOrigin.fromQuery( - env) && lagMillis > r2dbcSettings.querySettings.refreshInterval.toMillis + 300) || + env) && lagMillis > settings.querySettings.refreshInterval.toMillis + 300) || (EnvelopeOrigin.fromPubSub( - env) && lagMillis > r2dbcSettings.querySettings.backtrackingWindow.toMillis / 2 + 300) + env) && lagMillis > settings.querySettings.backtrackingWindow.toMillis / 2 + 300) if (delayed) println( s"# received ${newAcc.size}$duplicate from ${env.source}: ${env.persistenceId} seqNr ${env.sequenceNr}, lag $lagMillis ms") diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala index b1e0c9bf..4f13632d 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala @@ -224,7 +224,7 @@ class EventsBySlicePubSubSpec .via( query.skipPubSubTooFarAhead( enabled = true, - maxAheadOfBacktracking = JDuration.ofMillis(r2dbcSettings.querySettings.backtrackingWindow.toMillis))) + maxAheadOfBacktracking = JDuration.ofMillis(settings.querySettings.backtrackingWindow.toMillis))) .toMat(TestSink[EventEnvelope[String]]())(Keep.both) .run() out.request(100) @@ -244,7 +244,7 @@ class EventsBySlicePubSubSpec val time2 = envA1.offset .asInstanceOf[TimestampOffset] .timestamp - .plusMillis(r2dbcSettings.querySettings.backtrackingWindow.toMillis) + .plusMillis(settings.querySettings.backtrackingWindow.toMillis) val envC1 = createEnvelope(pidC, 1L, "c1", time2.plusMillis(1)) val envC2 = createEnvelope(pidC, 2L, "c2", time2.plusMillis(2)) in.sendNext(envC1) diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala index 8e80b39f..1dc6a919 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala @@ -5,7 +5,6 @@ package akka.persistence.r2dbc.query import scala.concurrent.Await -import scala.concurrent.duration._ import akka.Done import akka.NotUsed import akka.actor.testkit.typed.scaladsl.LogCapturing @@ -19,7 +18,6 @@ import akka.persistence.query.TimestampOffset import akka.persistence.query.typed.EventEnvelope import akka.persistence.query.typed.scaladsl.EventTimestampQuery import akka.persistence.query.typed.scaladsl.LoadEventQuery -import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.TestActors import akka.persistence.r2dbc.TestActors.Persister import akka.persistence.r2dbc.TestActors.Persister.Persist @@ -75,7 +73,6 @@ class EventsBySliceSpec import EventsBySliceSpec._ override def typedSystem: ActorSystem[_] = system - private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc")) private val query = PersistenceQuery(testKit.system).readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier) diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceStartingFromSnapshotSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceStartingFromSnapshotSpec.scala index 5a4c405b..9cb3ea72 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceStartingFromSnapshotSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceStartingFromSnapshotSpec.scala @@ -4,8 +4,6 @@ package akka.persistence.r2dbc.query -import scala.concurrent.duration._ - import akka.Done import akka.NotUsed import akka.actor.testkit.typed.scaladsl.LogCapturing @@ -17,12 +15,9 @@ import akka.persistence.query.Offset import akka.persistence.query.PersistenceQuery import akka.persistence.query.TimestampOffset.toTimestampOffset import akka.persistence.query.typed.EventEnvelope -import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.TestActors import akka.persistence.r2dbc.TestActors.Persister -import akka.persistence.r2dbc.TestActors.Persister.Persist import akka.persistence.r2dbc.TestActors.Persister.PersistWithAck -import akka.persistence.r2dbc.TestActors.Persister.Ping import akka.persistence.r2dbc.TestConfig import akka.persistence.r2dbc.TestData import akka.persistence.r2dbc.TestDbLifecycle @@ -66,7 +61,6 @@ class EventsBySliceStartingFromSnapshotSpec import EventsBySliceStartingFromSnapshotSpec._ override def typedSystem: ActorSystem[_] = system - private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc")) private val query = PersistenceQuery(testKit.system).readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier) diff --git a/core/src/test/scala/akka/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala b/core/src/test/scala/akka/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala index 7a2ee209..7d62818a 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/state/CurrentPersistenceIdsQuerySpec.scala @@ -62,18 +62,18 @@ class CurrentPersistenceIdsQuerySpec private val customPid1 = nextPid(customEntityType) private val customPid2 = nextPid(customEntityType) - private def customTable(slice: Int) = r2dbcSettings.getDurableStateTableWithSchema("CustomEntity", slice) + private def customTable(slice: Int) = settings.getDurableStateTableWithSchema("CustomEntity", slice) private def createTable(slice: Int) = { - if (r2dbcSettings.dialectName == "sqlserver") { - s"IF object_id('${customTable(slice)}') is null SELECT * into ${customTable(slice)} from ${r2dbcSettings.durableStateTableWithSchema(slice)} where persistence_id = ''" + if (settings.dialectName == "sqlserver") { + s"IF object_id('${customTable(slice)}') is null SELECT * into ${customTable(slice)} from ${settings.durableStateTableWithSchema(slice)} where persistence_id = ''" } else { - s"create table if not exists ${customTable(slice)} as select * from ${r2dbcSettings.durableStateTableWithSchema(slice)} where persistence_id = ''" + s"create table if not exists ${customTable(slice)} as select * from ${settings.durableStateTableWithSchema(slice)} where persistence_id = ''" } } override protected def beforeAll(): Unit = { - r2dbcSettings.dataPartitionSliceRanges.foreach { sliceRange => + settings.dataPartitionSliceRanges.foreach { sliceRange => val dataPartitionSlice = sliceRange.min Await.result( r2dbcExecutor(dataPartitionSlice).executeDdl("beforeAll create durable_state_test")( @@ -113,7 +113,7 @@ class CurrentPersistenceIdsQuerySpec "retrieve all ids" in { val result = store.currentPersistenceIds().runWith(Sink.seq).futureValue - if (r2dbcSettings.numberOfDataPartitions == 1) + if (settings.numberOfDataPartitions == 1) result shouldBe pids.map(_.id) else result.sorted shouldBe pids.map(_.id).sorted @@ -146,7 +146,7 @@ class CurrentPersistenceIdsQuerySpec createPidsInCustomTable() val result = store.currentPersistenceIds().runWith(Sink.seq).futureValue // note that custom tables always come afterwards, i.e. not strictly sorted on the pids (but that should be ok) - if (r2dbcSettings.numberOfDataPartitions == 1) + if (settings.numberOfDataPartitions == 1) result shouldBe (pids.map(_.id) :+ customPid1 :+ customPid2) else result.sorted shouldBe (pids.map(_.id) :+ customPid1 :+ customPid2).sorted diff --git a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreAdditionalColumnSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreAdditionalColumnSpec.scala index c3d03dd7..7efc36ef 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreAdditionalColumnSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreAdditionalColumnSpec.scala @@ -66,17 +66,17 @@ class DurableStateStoreAdditionalColumnSpec with TestData with LogCapturing { - private def customTable(slice: Int) = r2dbcSettings.getDurableStateTableWithSchema("CustomEntity", slice) + private def customTable(slice: Int) = settings.getDurableStateTableWithSchema("CustomEntity", slice) private def createCustomTable(slice: Int): String = { - if (r2dbcSettings.dialectName == "sqlserver") - s"IF object_id('${customTable(slice)}') is null SELECT * INTO ${customTable(slice)} FROM ${r2dbcSettings.durableStateTableWithSchema(slice)} where persistence_id = ''" + if (settings.dialectName == "sqlserver") + s"IF object_id('${customTable(slice)}') is null SELECT * INTO ${customTable(slice)} FROM ${settings.durableStateTableWithSchema(slice)} where persistence_id = ''" else - s"create table if not exists ${customTable(slice)} as select * from ${r2dbcSettings.durableStateTableWithSchema(slice)} where persistence_id = ''" + s"create table if not exists ${customTable(slice)} as select * from ${settings.durableStateTableWithSchema(slice)} where persistence_id = ''" } private def alterCustomTable(slice: Int, col: String, colType: String): String = { - if (r2dbcSettings.dialectName == "sqlserver") + if (settings.dialectName == "sqlserver") s"IF COL_LENGTH('${customTable(slice)}', '$col') IS NULL ALTER TABLE ${customTable(slice)} ADD $col $colType" else s"alter table ${customTable(slice)} add if not exists $col $colType" @@ -85,7 +85,7 @@ class DurableStateStoreAdditionalColumnSpec override def typedSystem: ActorSystem[_] = system override def beforeAll(): Unit = { - r2dbcSettings.dataPartitionSliceRanges.foreach { sliceRange => + settings.dataPartitionSliceRanges.foreach { sliceRange => val dataPartitionSlice = sliceRange.min Await.result( r2dbcExecutor(dataPartitionSlice).executeDdl("beforeAll create durable_state_test")( diff --git a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreSpec.scala index 04ba76f6..15e4aa84 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateStoreSpec.scala @@ -73,7 +73,7 @@ class DurableStateStoreSpec } "detect and reject concurrent updates" in { - if (!r2dbcSettings.durableStateAssertSingleWriter) + if (!settings.durableStateAssertSingleWriter) pending val entityType = nextEntityType() @@ -151,7 +151,7 @@ class DurableStateStoreSpec } "detect and reject concurrent deletes" in { - if (!r2dbcSettings.durableStateAssertSingleWriter) + if (!settings.durableStateAssertSingleWriter) pending val entityType = nextEntityType() diff --git a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala index 5f39a5e4..84bbd57f 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/state/DurableStateUpdateWithChangeEventStoreSpec.scala @@ -113,7 +113,7 @@ class DurableStateUpdateWithChangeEventStoreSpec } "detect and reject concurrent updates, and not store change event" in { - if (!r2dbcSettings.durableStateAssertSingleWriter) + if (!settings.durableStateAssertSingleWriter) pending val entityType = nextEntityType() @@ -154,7 +154,7 @@ class DurableStateUpdateWithChangeEventStoreSpec } "detect and reject concurrent deletes, and not store change event" in { - if (!r2dbcSettings.durableStateAssertSingleWriter) + if (!settings.durableStateAssertSingleWriter) pending val entityType = nextEntityType() diff --git a/migration-tests/src/test/scala/akka/persistence/r2dbc/migration/MigrationToolSpec.scala b/migration-tests/src/test/scala/akka/persistence/r2dbc/migration/MigrationToolSpec.scala index e0edcfb5..4d4f27f1 100644 --- a/migration-tests/src/test/scala/akka/persistence/r2dbc/migration/MigrationToolSpec.scala +++ b/migration-tests/src/test/scala/akka/persistence/r2dbc/migration/MigrationToolSpec.scala @@ -110,8 +110,8 @@ class MigrationToolSpec private val migration = new MigrationTool(system) - private val hasChangeHandler = r2dbcSettings.durableStateChangeHandlerClasses.nonEmpty - private val hasAdditionalColumn = r2dbcSettings.durableStateAdditionalColumnClasses.nonEmpty + private val hasChangeHandler = settings.durableStateChangeHandlerClasses.nonEmpty + private val hasAdditionalColumn = settings.durableStateAdditionalColumnClasses.nonEmpty // don't run this for Yugabyte since it is using akka-persistence-jdbc private val postgresTest = dialect == "postgres" @@ -250,10 +250,10 @@ class MigrationToolSpec r2dbcExecutor.updateOne("beforeAll migration_progress")(_.createStatement("delete from migration_progress")), 10.seconds) - r2dbcSettings.dataPartitionSliceRanges.foreach { sliceRange => + settings.dataPartitionSliceRanges.foreach { sliceRange => val dataPartitionSlice = sliceRange.min - val stateTable = r2dbcSettings.getDurableStateTableWithSchema("", dataPartitionSlice) + val stateTable = settings.getDurableStateTableWithSchema("", dataPartitionSlice) Await.result( r2dbcExecutor(dataPartitionSlice).executeDdl("add column 'test_column'")( _.createStatement(s"alter table $stateTable add column if not exists test_column VARCHAR(255)")), @@ -497,7 +497,7 @@ class MigrationToolSpec if (hasChangeHandler) { import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter // Scala 3 needs the extra step here - val durableStateImplicits = r2dbcSettings.codecSettings.DurableStateImplicits + val durableStateImplicits = settings.codecSettings.DurableStateImplicits import durableStateImplicits._ val pid = PersistenceId.ofUniqueId(nextPid()) val slice = persistenceExt.sliceForPersistenceId(pid.id) @@ -527,7 +527,7 @@ class MigrationToolSpec if (hasAdditionalColumn) { import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter // Scala 3 needs the extra step here - val durableStateImplicits = r2dbcSettings.codecSettings.DurableStateImplicits + val durableStateImplicits = settings.codecSettings.DurableStateImplicits import durableStateImplicits._ val pid = PersistenceId.ofUniqueId(nextPid()) persistDurableState(pid, "s-column") @@ -535,7 +535,7 @@ class MigrationToolSpec assertDurableState(pid, "s-column") val slice = persistenceExt.sliceForPersistenceId(pid.id) - val stateTable = r2dbcSettings.getDurableStateTableWithSchema("", slice) + val stateTable = settings.getDurableStateTableWithSchema("", slice) val query = r2dbcExecutor(slice).select("select value for additional column")( _.createStatement(sql"SELECT test_column from $stateTable where persistence_id=?")