Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: minor cleanup #581

Merged
merged 3 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,25 +113,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
row.payload.map(payload => serialization.deserialize(payload, row.serId, row.serManifest).get.asInstanceOf[Event])

private val _bySlice: BySliceQuery[SerializedJournalRow, EventEnvelope[Any]] = {
val createEnvelope: (TimestampOffset, SerializedJournalRow) => EventEnvelope[Any] = (offset, row) => {
val event = deserializePayload(row)
val metadata = row.metadata.map(meta => serialization.deserialize(meta.payload, meta.serId, meta.serManifest).get)
val source = if (event.isDefined) EnvelopeOrigin.SourceQuery else EnvelopeOrigin.SourceBacktracking
val filtered = row.serId == filteredPayloadSerId

new EventEnvelope(
offset,
row.persistenceId,
row.seqNr,
if (filtered) None else event,
row.dbTimestamp.toEpochMilli,
metadata,
row.entityType,
row.slice,
filtered,
source,
tags = row.tags)
}
val createEnvelope: (TimestampOffset, SerializedJournalRow) => EventEnvelope[Any] = createEventEnvelope

val extractOffset: EventEnvelope[Any] => TimestampOffset = env => env.offset.asInstanceOf[TimestampOffset]

Expand All @@ -141,6 +123,43 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
private def bySlice[Event]: BySliceQuery[SerializedJournalRow, EventEnvelope[Event]] =
_bySlice.asInstanceOf[BySliceQuery[SerializedJournalRow, EventEnvelope[Event]]]

private def deserializeBySliceRow[Event](row: SerializedJournalRow): EventEnvelope[Event] = {
val offset = TimestampOffset(row.dbTimestamp, row.readDbTimestamp, Map(row.persistenceId -> row.seqNr))
createEventEnvelope(offset, row)
}

private def createEventEnvelope[Event](offset: TimestampOffset, row: SerializedJournalRow): EventEnvelope[Event] = {
val event = deserializePayload(row)
val metadata = row.metadata.map(meta => serialization.deserialize(meta.payload, meta.serId, meta.serManifest).get)
val source = if (event.isDefined) EnvelopeOrigin.SourceQuery else EnvelopeOrigin.SourceBacktracking
val filtered = row.serId == filteredPayloadSerId

new EventEnvelope(
offset,
row.persistenceId,
row.seqNr,
if (filtered) None else event,
row.dbTimestamp.toEpochMilli,
metadata,
row.entityType,
row.slice,
filtered,
source,
tags = row.tags)
}

private def deserializeRow(row: SerializedJournalRow): ClassicEventEnvelope = {
val event = deserializePayload(row)
// note that it's not possible to filter out FilteredPayload here
val offset = TimestampOffset(row.dbTimestamp, row.readDbTimestamp, Map(row.persistenceId -> row.seqNr))
val envelope = ClassicEventEnvelope(offset, row.persistenceId, row.seqNr, event.get, row.dbTimestamp.toEpochMilli)
row.metadata match {
case None => envelope
case Some(meta) =>
envelope.withMetadata(serialization.deserialize(meta.payload, meta.serId, meta.serManifest).get)
}
}

private def snapshotsBySlice[Snapshot, Event](
transformSnapshot: Snapshot => Event): BySliceQuery[SerializedSnapshotRow, EventEnvelope[Event]] = {
val createEnvelope: (TimestampOffset, SerializedSnapshotRow) => EventEnvelope[Event] =
Expand Down Expand Up @@ -753,39 +772,6 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
.mapMaterializedValue(_ => NotUsed)
}

private def deserializeBySliceRow[Event](row: SerializedJournalRow): EventEnvelope[Event] = {
val event = deserializePayload[Event](row)
val offset = TimestampOffset(row.dbTimestamp, row.readDbTimestamp, Map(row.persistenceId -> row.seqNr))
val metadata = row.metadata.map(meta => serialization.deserialize(meta.payload, meta.serId, meta.serManifest).get)
val source = if (event.isDefined) EnvelopeOrigin.SourceQuery else EnvelopeOrigin.SourceBacktracking
val filtered = row.serId == filteredPayloadSerId

new EventEnvelope(
offset,
row.persistenceId,
row.seqNr,
if (filtered) None else event,
row.dbTimestamp.toEpochMilli,
metadata,
row.entityType,
row.slice,
filtered,
source,
tags = row.tags)
}

private def deserializeRow(row: SerializedJournalRow): ClassicEventEnvelope = {
val event = deserializePayload(row)
// note that it's not possible to filter out FilteredPayload here
val offset = TimestampOffset(row.dbTimestamp, row.readDbTimestamp, Map(row.persistenceId -> row.seqNr))
val envelope = ClassicEventEnvelope(offset, row.persistenceId, row.seqNr, event.get, row.dbTimestamp.toEpochMilli)
row.metadata match {
case None => envelope
case Some(meta) =>
envelope.withMetadata(serialization.deserialize(meta.payload, meta.serId, meta.serManifest).get)
}
}

override def currentPersistenceIds(afterId: Option[String], limit: Long): Source[String, NotUsed] =
queryDao.persistenceIds(afterId, limit)

Expand Down
13 changes: 7 additions & 6 deletions core/src/test/scala/akka/persistence/r2dbc/PayloadSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ package akka.persistence.r2dbc

import java.nio.charset.StandardCharsets.UTF_8

import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike

import akka.Done
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorSystem
import akka.persistence.r2dbc.TestActors.DurableStatePersister
import akka.persistence.r2dbc.TestActors.Persister
import akka.persistence.r2dbc.internal.codec.PayloadCodec
import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike

/**
* The purpose of this test is to verify JSONB payloads, but it can also be run with ordinary BYTEA payloads. To test
Expand Down Expand Up @@ -59,7 +61,6 @@ class PayloadSpec
import PayloadSpec._

override def typedSystem: ActorSystem[_] = system
private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc"))

private def testJournalPersister(persistenceId: String, msg: Any): Unit = {
val probe = createTestProbe[Any]()
Expand Down Expand Up @@ -88,7 +89,7 @@ class PayloadSpec
}

private def selectJournalRow(persistenceId: String): TestRow = {
import settings.codecSettings.JournalImplicits.journalPayloadCodec
implicit val codec: PayloadCodec = settings.codecSettings.JournalImplicits.journalPayloadCodec
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala3 doesn't like the import from settings, which is now a lazy val in trait.

[error] -- Error: /Users/patrik/dev/akka-persistence-r2dbc/core/src/test/scala/akka/persistence/r2dbc/PayloadSpec.scala:90:34
[error] 90 |    import settings.codecSettings.JournalImplicits.journalPayloadCodec

val slice = persistenceExt.sliceForPersistenceId(persistenceId)

r2dbcExecutor(slice)
Expand All @@ -108,7 +109,7 @@ class PayloadSpec
}

private def selectSnapshotRow(persistenceId: String): TestRow = {
import settings.codecSettings.SnapshotImplicits.snapshotPayloadCodec
implicit val codec: PayloadCodec = settings.codecSettings.SnapshotImplicits.snapshotPayloadCodec
val slice = persistenceExt.sliceForPersistenceId(persistenceId)

r2dbcExecutor(slice)
Expand All @@ -128,7 +129,7 @@ class PayloadSpec
}

private def selectDurableStateRow(persistenceId: String): TestRow = {
import settings.codecSettings.DurableStateImplicits.durableStatePayloadCodec
implicit val codec: PayloadCodec = settings.codecSettings.DurableStateImplicits.durableStatePayloadCodec
val slice = persistenceExt.sliceForPersistenceId(persistenceId)

r2dbcExecutor(slice)
Expand Down
14 changes: 7 additions & 7 deletions core/src/test/scala/akka/persistence/r2dbc/TestDbLifecycle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite =>

def testConfigPath: String = "akka.persistence.r2dbc"

lazy val r2dbcSettings: R2dbcSettings =
lazy val settings: R2dbcSettings =
R2dbcSettings(typedSystem.settings.config.getConfig(testConfigPath))

lazy val r2dbcExecutorProvider: R2dbcExecutorProvider =
new R2dbcExecutorProvider(
typedSystem,
r2dbcSettings.connectionFactorySettings.dialect.daoExecutionContext(r2dbcSettings, typedSystem),
r2dbcSettings,
settings.connectionFactorySettings.dialect.daoExecutionContext(settings, typedSystem),
settings,
testConfigPath + ".connection-factory",
LoggerFactory.getLogger(getClass))

Expand All @@ -48,22 +48,22 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite =>
lazy val persistenceExt: Persistence = Persistence(typedSystem)

def pendingIfMoreThanOneDataPartition(): Unit =
if (r2dbcSettings.numberOfDataPartitions > 1)
if (settings.numberOfDataPartitions > 1)
pending

override protected def beforeAll(): Unit = {
try {
r2dbcSettings.allJournalTablesWithSchema.foreach { case (table, minSlice) =>
settings.allJournalTablesWithSchema.foreach { case (table, minSlice) =>
Await.result(
r2dbcExecutor(minSlice).updateOne("beforeAll delete")(_.createStatement(s"delete from $table")),
10.seconds)
}
r2dbcSettings.allSnapshotTablesWithSchema.foreach { case (table, minSlice) =>
settings.allSnapshotTablesWithSchema.foreach { case (table, minSlice) =>
Await.result(
r2dbcExecutor(minSlice).updateOne("beforeAll delete")(_.createStatement(s"delete from $table")),
10.seconds)
}
r2dbcSettings.allDurableStateTablesWithSchema.foreach { case (table, minSlice) =>
settings.allDurableStateTablesWithSchema.foreach { case (table, minSlice) =>
Await.result(
r2dbcExecutor(minSlice).updateOne("beforeAll delete")(_.createStatement(s"delete from $table")),
10.seconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ class H2AdditionalInitForSchemaSpec
private def exists(slice: Int, whereCondition: String): Boolean =
r2dbcExecutor(slice)
.selectOne("count")(
_.createStatement(
s"select count(*) from ${r2dbcSettings.durableStateTableWithSchema(slice)} where $whereCondition"),
_.createStatement(s"select count(*) from ${settings.durableStateTableWithSchema(slice)} where $whereCondition"),
row => row.get(0, classOf[java.lang.Long]).longValue())
.futureValue
.contains(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ class R2dbcExecutorSpec

// need pg_sleep or similar
private def canBeTestedWithDialect: Boolean =
r2dbcSettings.connectionFactorySettings.dialect == PostgresDialect ||
r2dbcSettings.connectionFactorySettings.dialect == YugabyteDialect
settings.connectionFactorySettings.dialect == PostgresDialect ||
settings.connectionFactorySettings.dialect == YugabyteDialect

private def pendingIfCannotBeTestedWithDialect(): Unit = {
if (!canBeTestedWithDialect) {
info(s"Can't be tested with dialect [${r2dbcSettings.dialectName}]")
info(s"Can't be tested with dialect [${settings.dialectName}]")
pending
}
}
Expand Down Expand Up @@ -89,7 +89,7 @@ class R2dbcExecutorSpec
connection.createStatement(s"insert into $table (col) values ('b')"))
}

Thread.sleep(r2dbcSettings.connectionFactorySettings.poolSettings.closeCallsExceeding.get.toMillis)
Thread.sleep(settings.connectionFactorySettings.poolSettings.closeCallsExceeding.get.toMillis)

// The request will fail with PostgresConnectionClosedException
result.failed.futureValue shouldBe a[R2dbcNonTransientResourceException]
Expand Down Expand Up @@ -118,7 +118,7 @@ class R2dbcExecutorSpec

val result = r2dbcExecutor.updateOne("test")(_.createStatement("select pg_sleep(4)"))

Thread.sleep(r2dbcSettings.connectionFactorySettings.poolSettings.closeCallsExceeding.get.toMillis)
Thread.sleep(settings.connectionFactorySettings.poolSettings.closeCallsExceeding.get.toMillis)

// The request will fail with PostgresConnectionClosedException
result.failed.futureValue shouldBe a[R2dbcNonTransientResourceException]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorSystem
import akka.persistence.SerializedEvent
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.TestActors.Persister
import akka.persistence.r2dbc.TestConfig
import akka.persistence.r2dbc.TestData
Expand All @@ -27,7 +26,6 @@ class PersistSerializedEventSpec
with LogCapturing {

override def typedSystem: ActorSystem[_] = system
private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc"))

case class Row(pid: String, seqNr: Long, serializerId: Int, manifest: String)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ package akka.persistence.r2dbc.journal

import scala.concurrent.duration._

import org.scalatest.wordspec.AnyWordSpecLike

import akka.Done
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorSystem
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.TestActors.Persister
import akka.persistence.r2dbc.TestConfig
import akka.persistence.r2dbc.TestData
import akka.persistence.r2dbc.TestDbLifecycle
import akka.persistence.r2dbc.internal.codec.TagsCodec
import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichRow
import akka.persistence.typed.PersistenceId
import org.scalatest.wordspec.AnyWordSpecLike

class PersistTagsSpec
extends ScalaTestWithActorTestKit(TestConfig.config)
Expand All @@ -27,8 +28,7 @@ class PersistTagsSpec
with LogCapturing {

override def typedSystem: ActorSystem[_] = system
private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc"))
import settings.codecSettings.JournalImplicits.tagsCodec
implicit val codec: TagsCodec = settings.codecSettings.JournalImplicits.tagsCodec
case class Row(pid: String, seqNr: Long, tags: Set[String])

private def selectRows(table: String, minSlice: Int): IndexedSeq[Row] = {
Expand All @@ -44,7 +44,7 @@ class PersistTagsSpec
}

private def selectAllRows(): IndexedSeq[Row] =
r2dbcSettings.allJournalTablesWithSchema.toVector.sortBy(_._1).flatMap { case (table, minSlice) =>
settings.allJournalTablesWithSchema.toVector.sortBy(_._1).flatMap { case (table, minSlice) =>
selectRows(table, minSlice)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,27 @@
package akka.persistence.r2dbc.journal

import java.time.Instant

import scala.concurrent.duration._

import org.scalatest.wordspec.AnyWordSpecLike

import akka.Done
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorSystem
import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.TestActors.Persister
import akka.persistence.r2dbc.TestConfig
import akka.persistence.r2dbc.TestData
import akka.persistence.r2dbc.TestDbLifecycle
import akka.persistence.r2dbc.internal.codec.PayloadCodec
import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow
import akka.persistence.r2dbc.internal.codec.TimestampCodec
import akka.persistence.r2dbc.internal.codec.TimestampCodec.{ PostgresTimestampCodec, SqlServerTimestampCodec }
import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow
import akka.persistence.r2dbc.internal.codec.TimestampCodec.PostgresTimestampCodec
import akka.persistence.r2dbc.internal.codec.TimestampCodec.SqlServerTimestampCodec
import akka.persistence.typed.PersistenceId
import akka.serialization.SerializationExtension
import org.scalatest.wordspec.AnyWordSpecLike

class PersistTimestampSpec
extends ScalaTestWithActorTestKit(TestConfig.config)
Expand All @@ -30,9 +34,8 @@ class PersistTimestampSpec
with TestData
with LogCapturing {

import settings.codecSettings.JournalImplicits.journalPayloadCodec
implicit val payloadCodec: PayloadCodec = settings.codecSettings.JournalImplicits.journalPayloadCodec
override def typedSystem: ActorSystem[_] = system
private val settings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc"))
private val serialization = SerializationExtension(system)
case class Row(pid: String, seqNr: Long, dbTimestamp: Instant, event: String)

Expand Down Expand Up @@ -64,7 +67,7 @@ class PersistTimestampSpec
}

private def selectAllRows(): IndexedSeq[Row] =
r2dbcSettings.allJournalTablesWithSchema.toVector.sortBy(_._1).flatMap { case (table, minSlice) =>
settings.allJournalTablesWithSchema.toVector.sortBy(_._1).flatMap { case (table, minSlice) =>
selectRows(table, minSlice)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class R2dbcJournalPerfManyActorsSpec extends JournalPerfSpec(R2dbcJournalPerfSpe

"A PersistentActor's performance" must {

if (r2dbcSettings.dialectName == "sqlserver") {
if (settings.dialectName == "sqlserver") {
pending
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class R2dbcJournalPerfSpec extends JournalPerfSpec(R2dbcJournalPerfSpec.config)
override def typedSystem: ActorSystem[_] = system.toTyped

override def benchActor(replyAfter: Int): ActorRef = {
if (r2dbcSettings.dialectName == "sqlserver")
if (settings.dialectName == "sqlserver")
throw new TestPendingException
else
super.benchActor(replyAfter)
Expand Down
Loading