Skip to content

Commit

Permalink
Store offset for failed skipped envelope (#204)
Browse files Browse the repository at this point in the history
* Store offset for failed skipped envelope
* fixes pending tests
* update to Akka Projections 1.2.4
  • Loading branch information
patriknw authored May 23, 2022
1 parent 146ec2a commit 0ccb325
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 21 deletions.
1 change: 0 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def common: Seq[Setting[_]] =
crossVersion := CrossVersion.binary,
scalafmtOnCompile := true,
sonatypeProfileName := "com.lightbend",
resolvers += Resolver.sonatypeRepo("snapshots"), // FIXME remove when using stable akka-projection
// Setting javac options in common allows IntelliJ IDEA to import them automatically
Compile / javacOptions ++= Seq("-encoding", "UTF-8", "-source", "1.8", "-target", "1.8"),
headerLicense := Some(HeaderLicense.Custom("""Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>""")),
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ object Dependencies {
val Scala213 = "2.13.8"
val AkkaVersion = System.getProperty("override.akka.version", "2.6.19")
val AkkaVersionInDocs = AkkaVersion.take(3)
val AkkaProjectionVersion = "1.2.3"
val AkkaProjectionVersion = "1.2.4"
val AkkaProjectionVersionInDocs = "current"

object Compile {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import akka.persistence.state.scaladsl.DurableStateStore
import akka.persistence.state.scaladsl.GetObjectResult
import akka.projection.BySlicesSourceProvider
import akka.projection.HandlerRecoveryStrategy
import akka.projection.HandlerRecoveryStrategy.Internal.RetryAndSkip
import akka.projection.HandlerRecoveryStrategy.Internal.Skip
import akka.projection.ProjectionContext
import akka.projection.ProjectionId
import akka.projection.RunningProjection
Expand Down Expand Up @@ -452,6 +454,12 @@ private[projection] class R2dbcProjectionImpl[Offset, Envelope](
implicit val executionContext: ExecutionContext = system.executionContext
override val logger: LoggingAdapter = Logging(system.classicSystem, this.getClass)

private val isExactlyOnceWithSkip: Boolean =
offsetStrategy match {
case ExactlyOnce(Some(Skip)) | ExactlyOnce(Some(_: RetryAndSkip)) => true
case _ => false
}

override def readPaused(): Future[Boolean] =
offsetStore.readManagementState().map(_.exists(_.paused))

Expand All @@ -469,7 +477,7 @@ private[projection] class R2dbcProjectionImpl[Offset, Envelope](
import R2dbcProjectionImpl.FutureDone
val envelope = projectionContext.envelope

if (offsetStore.isInflight(envelope)) {
if (offsetStore.isInflight(envelope) || isExactlyOnceWithSkip) {
super.saveOffsetAndReport(projectionId, projectionContext, batchSize)
} else {
FutureDone
Expand All @@ -481,10 +489,15 @@ private[projection] class R2dbcProjectionImpl[Offset, Envelope](
batch: Seq[ProjectionContextImpl[Offset, Envelope]]): Future[Done] = {
import R2dbcProjectionImpl.FutureDone

val acceptedContexts = batch.iterator.filter { ctx =>
val env = ctx.envelope
offsetStore.isInflight(env)
}.toVector
val acceptedContexts =
if (isExactlyOnceWithSkip)
batch.toVector
else {
batch.iterator.filter { ctx =>
val env = ctx.envelope
offsetStore.isInflight(env)
}.toVector
}

if (acceptedContexts.isEmpty) {
FutureDone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,29 @@ class R2dbcProjectionSpec
offsetShouldBe(6L)
}

"store offset for failing events when using RecoveryStrategy.skip" in {
implicit val entityId = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
implicit val offsetStore = createOffsetStore(projectionId)

val bogusEventHandler = new ConcatHandler(_ == 6)

val projectionFailing =
R2dbcProjection
.exactlyOnce(
projectionId,
Some(settings),
sourceProvider = sourceProvider(entityId),
handler = () => bogusEventHandler)
.withRecoveryStrategy(HandlerRecoveryStrategy.skip)

offsetShouldBeEmpty()
projectionTestKit.run(projectionFailing) {
projectedValueShouldBe("e1|e2|e3|e4|e5")
}
offsetShouldBe(6L)
}

"skip failing events after retrying when using RecoveryStrategy.retryAndSkip" in {
implicit val entityId = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
Expand Down Expand Up @@ -338,8 +361,10 @@ class R2dbcProjectionSpec
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(entityId, 1, "e1")))
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(entityId, 2, "e2")))
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(entityId, 3, "e3")))
// Offset 4 is not stored so it is not reported.
// Offset 4 is stored even though it failed and was skipped
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(entityId, 4, "e4")))
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(entityId, 5, "e5")))
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(entityId, 6, "e6")))

offsetShouldBe(6L)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,12 +355,6 @@ class R2dbcTimestampOffsetProjectionSpec
}

"skip failing events when using RecoveryStrategy.skip" in {
// FIXME for exactlyOnce it is not added to OffsetStore inflight and that is why e5 is not accepted
// but the solution should not be to just add it to inflight because that can cause a leak and growing
// inflight Map that is not cleared up correctly.
// Would be better if the offset was saved for skipped envelopes.
pending

implicit val pid1 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid1, 6)
Expand All @@ -381,13 +375,28 @@ class R2dbcTimestampOffsetProjectionSpec
offsetShouldBe(envelopes.last.offset)
}

"skip failing events after retrying when using RecoveryStrategy.retryAndSkip" in {
// FIXME for exactlyOnce it is not added to OffsetStore inflight and that is why e5 is not accepted
// but the solution should not be to just add it to inflight because that can cause a leak and growing
// inflight Map that is not cleared up correctly.
// Would be better if the offset was saved for skipped envelopes.
pending
"store offset for failing events when using RecoveryStrategy.skip" in {
implicit val pid1 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid1, 6)
val sourceProvider = createSourceProvider(envelopes)
implicit val offsetStore = createOffsetStore(projectionId, sourceProvider)

val bogusEventHandler = new ConcatHandler(_.sequenceNr == 6)

val projectionFailing =
R2dbcProjection
.exactlyOnce(projectionId, Some(settings), sourceProvider, handler = () => bogusEventHandler)
.withRecoveryStrategy(HandlerRecoveryStrategy.skip)

offsetShouldBeEmpty()
projectionTestKit.run(projectionFailing) {
projectedValueShouldBe("e1|e2|e3|e4|e5")
}
offsetShouldBe(envelopes.last.offset)
}

"skip failing events after retrying when using RecoveryStrategy.retryAndSkip" in {
implicit val pid1 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid1, 6)
Expand Down Expand Up @@ -422,8 +431,10 @@ class R2dbcTimestampOffsetProjectionSpec
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 1, "e1")))
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 2, "e2")))
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 3, "e3")))
// Offset 4 is not stored so it is not reported.
// Offset 4 is stored even though it failed and was skipped
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 4, "e4")))
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 5, "e5")))
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 6, "e6")))

offsetShouldBe(envelopes.last.offset)
}
Expand Down

0 comments on commit 0ccb325

Please sign in to comment.