Skip to content

Commit

Permalink
Store offset for failed skipped envelope
Browse files Browse the repository at this point in the history
* fixes pending tests
* update to Akka Projections 1.2.4
  • Loading branch information
patriknw committed May 12, 2022
1 parent 9cb97de commit afeb47e
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 20 deletions.
1 change: 0 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def common: Seq[Setting[_]] =
crossVersion := CrossVersion.binary,
scalafmtOnCompile := true,
sonatypeProfileName := "com.lightbend",
resolvers += Resolver.sonatypeRepo("snapshots"), // FIXME remove when using stable akka-projection
// Setting javac options in common allows IntelliJ IDEA to import them automatically
Compile / javacOptions ++= Seq("-encoding", "UTF-8", "-source", "1.8", "-target", "1.8"),
headerLicense := Some(HeaderLicense.Custom("""Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>""")),
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ object Dependencies {
val Scala213 = "2.13.8"
val AkkaVersion = System.getProperty("override.akka.version", "2.6.19")
val AkkaVersionInDocs = AkkaVersion.take(3)
val AkkaProjectionVersion = "1.2.3"
val AkkaProjectionVersion = "1.2.4"
val AkkaProjectionVersionInDocs = "current"

object Compile {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import akka.persistence.state.scaladsl.DurableStateStore
import akka.persistence.state.scaladsl.GetObjectResult
import akka.projection.BySlicesSourceProvider
import akka.projection.HandlerRecoveryStrategy
import akka.projection.HandlerRecoveryStrategy.Internal.RetryAndSkip
import akka.projection.HandlerRecoveryStrategy.Internal.Skip
import akka.projection.ProjectionContext
import akka.projection.ProjectionId
import akka.projection.RunningProjection
Expand Down Expand Up @@ -452,6 +454,12 @@ private[projection] class R2dbcProjectionImpl[Offset, Envelope](
implicit val executionContext: ExecutionContext = system.executionContext
override val logger: LoggingAdapter = Logging(system.classicSystem, this.getClass)

private val isExactlyOnceWithSkip: Boolean =
offsetStrategy match {
case ExactlyOnce(Some(Skip)) | ExactlyOnce(Some(_: RetryAndSkip)) => true
case _ => false
}

override def readPaused(): Future[Boolean] =
offsetStore.readManagementState().map(_.exists(_.paused))

Expand All @@ -469,7 +477,7 @@ private[projection] class R2dbcProjectionImpl[Offset, Envelope](
import R2dbcProjectionImpl.FutureDone
val envelope = projectionContext.envelope

if (offsetStore.isInflight(envelope)) {
if (offsetStore.isInflight(envelope) || isExactlyOnceWithSkip) {
super.saveOffsetAndReport(projectionId, projectionContext, batchSize)
} else {
FutureDone
Expand All @@ -481,10 +489,15 @@ private[projection] class R2dbcProjectionImpl[Offset, Envelope](
batch: Seq[ProjectionContextImpl[Offset, Envelope]]): Future[Done] = {
import R2dbcProjectionImpl.FutureDone

val acceptedContexts = batch.iterator.filter { ctx =>
val env = ctx.envelope
offsetStore.isInflight(env)
}.toVector
val acceptedContexts =
if (isExactlyOnceWithSkip)
batch.toVector
else {
batch.iterator.filter { ctx =>
val env = ctx.envelope
offsetStore.isInflight(env)
}.toVector
}

if (acceptedContexts.isEmpty) {
FutureDone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,12 +355,6 @@ class R2dbcTimestampOffsetProjectionSpec
}

"skip failing events when using RecoveryStrategy.skip" in {
// FIXME for exactlyOnce it is not added to OffsetStore inflight and that is why e5 is not accepted
// but the solution should not be to just add it to inflight because that can cause a leak and growing
// inflight Map that is not cleared up correctly.
// Would be better if the offset was saved for skipped envelopes.
pending

implicit val pid1 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid1, 6)
Expand All @@ -381,13 +375,28 @@ class R2dbcTimestampOffsetProjectionSpec
offsetShouldBe(envelopes.last.offset)
}

"skip failing events after retrying when using RecoveryStrategy.retryAndSkip" in {
// FIXME for exactlyOnce it is not added to OffsetStore inflight and that is why e5 is not accepted
// but the solution should not be to just add it to inflight because that can cause a leak and growing
// inflight Map that is not cleared up correctly.
// Would be better if the offset was saved for skipped envelopes.
pending
"store offset for failing events when using RecoveryStrategy.skip" in {
implicit val pid1 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid1, 6)
val sourceProvider = createSourceProvider(envelopes)
implicit val offsetStore = createOffsetStore(projectionId, sourceProvider)

val bogusEventHandler = new ConcatHandler(_.sequenceNr == 6)

val projectionFailing =
R2dbcProjection
.exactlyOnce(projectionId, Some(settings), sourceProvider, handler = () => bogusEventHandler)
.withRecoveryStrategy(HandlerRecoveryStrategy.skip)

offsetShouldBeEmpty()
projectionTestKit.run(projectionFailing) {
projectedValueShouldBe("e1|e2|e3|e4|e5")
}
offsetShouldBe(envelopes.last.offset)
}

"skip failing events after retrying when using RecoveryStrategy.retryAndSkip" in {
implicit val pid1 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid1, 6)
Expand Down Expand Up @@ -422,8 +431,10 @@ class R2dbcTimestampOffsetProjectionSpec
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 1, "e1")))
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 2, "e2")))
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 3, "e3")))
// Offset 4 is not stored so it is not reported.
// Offset 4 is stored even though it failed and was skipped
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 4, "e4")))
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 5, "e5")))
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 6, "e6")))

offsetShouldBe(envelopes.last.offset)
}
Expand Down

0 comments on commit afeb47e

Please sign in to comment.