Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store offset for failed skipped envelope #204

Merged
merged 2 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def common: Seq[Setting[_]] =
crossVersion := CrossVersion.binary,
scalafmtOnCompile := true,
sonatypeProfileName := "com.lightbend",
resolvers += Resolver.sonatypeRepo("snapshots"), // FIXME remove when using stable akka-projection
// Setting javac options in common allows IntelliJ IDEA to import them automatically
Compile / javacOptions ++= Seq("-encoding", "UTF-8", "-source", "1.8", "-target", "1.8"),
headerLicense := Some(HeaderLicense.Custom("""Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>""")),
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ object Dependencies {
val Scala213 = "2.13.8"
val AkkaVersion = System.getProperty("override.akka.version", "2.6.19")
val AkkaVersionInDocs = AkkaVersion.take(3)
val AkkaProjectionVersion = "1.2.3"
val AkkaProjectionVersion = "1.2.4"
val AkkaProjectionVersionInDocs = "current"

object Compile {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import akka.persistence.state.scaladsl.DurableStateStore
import akka.persistence.state.scaladsl.GetObjectResult
import akka.projection.BySlicesSourceProvider
import akka.projection.HandlerRecoveryStrategy
import akka.projection.HandlerRecoveryStrategy.Internal.RetryAndSkip
import akka.projection.HandlerRecoveryStrategy.Internal.Skip
import akka.projection.ProjectionContext
import akka.projection.ProjectionId
import akka.projection.RunningProjection
Expand Down Expand Up @@ -452,6 +454,12 @@ private[projection] class R2dbcProjectionImpl[Offset, Envelope](
implicit val executionContext: ExecutionContext = system.executionContext
override val logger: LoggingAdapter = Logging(system.classicSystem, this.getClass)

private val isExactlyOnceWithSkip: Boolean =
offsetStrategy match {
case ExactlyOnce(Some(Skip)) | ExactlyOnce(Some(_: RetryAndSkip)) => true
case _ => false
}

override def readPaused(): Future[Boolean] =
offsetStore.readManagementState().map(_.exists(_.paused))

Expand All @@ -469,7 +477,7 @@ private[projection] class R2dbcProjectionImpl[Offset, Envelope](
import R2dbcProjectionImpl.FutureDone
val envelope = projectionContext.envelope

if (offsetStore.isInflight(envelope)) {
if (offsetStore.isInflight(envelope) || isExactlyOnceWithSkip) {
super.saveOffsetAndReport(projectionId, projectionContext, batchSize)
} else {
FutureDone
Expand All @@ -481,10 +489,15 @@ private[projection] class R2dbcProjectionImpl[Offset, Envelope](
batch: Seq[ProjectionContextImpl[Offset, Envelope]]): Future[Done] = {
import R2dbcProjectionImpl.FutureDone

val acceptedContexts = batch.iterator.filter { ctx =>
val env = ctx.envelope
offsetStore.isInflight(env)
}.toVector
val acceptedContexts =
if (isExactlyOnceWithSkip)
batch.toVector
else {
batch.iterator.filter { ctx =>
val env = ctx.envelope
offsetStore.isInflight(env)
}.toVector
}

if (acceptedContexts.isEmpty) {
FutureDone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,29 @@ class R2dbcProjectionSpec
offsetShouldBe(6L)
}

"store offset for failing events when using RecoveryStrategy.skip" in {
implicit val entityId = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
implicit val offsetStore = createOffsetStore(projectionId)

val bogusEventHandler = new ConcatHandler(_ == 6)

val projectionFailing =
R2dbcProjection
.exactlyOnce(
projectionId,
Some(settings),
sourceProvider = sourceProvider(entityId),
handler = () => bogusEventHandler)
.withRecoveryStrategy(HandlerRecoveryStrategy.skip)

offsetShouldBeEmpty()
projectionTestKit.run(projectionFailing) {
projectedValueShouldBe("e1|e2|e3|e4|e5")
}
offsetShouldBe(6L)
}

"skip failing events after retrying when using RecoveryStrategy.retryAndSkip" in {
implicit val entityId = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
Expand Down Expand Up @@ -338,8 +361,10 @@ class R2dbcProjectionSpec
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(entityId, 1, "e1")))
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(entityId, 2, "e2")))
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(entityId, 3, "e3")))
// Offset 4 is not stored so it is not reported.
// Offset 4 is stored even though it failed and was skipped
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(entityId, 4, "e4")))
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(entityId, 5, "e5")))
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(entityId, 6, "e6")))

offsetShouldBe(6L)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,12 +355,6 @@ class R2dbcTimestampOffsetProjectionSpec
}

"skip failing events when using RecoveryStrategy.skip" in {
// FIXME for exactlyOnce it is not added to OffsetStore inflight and that is why e5 is not accepted
// but the solution should not be to just add it to inflight because that can cause a leak and growing
// inflight Map that is not cleared up correctly.
// Would be better if the offset was saved for skipped envelopes.
pending

implicit val pid1 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid1, 6)
Expand All @@ -381,13 +375,28 @@ class R2dbcTimestampOffsetProjectionSpec
offsetShouldBe(envelopes.last.offset)
}

"skip failing events after retrying when using RecoveryStrategy.retryAndSkip" in {
// FIXME for exactlyOnce it is not added to OffsetStore inflight and that is why e5 is not accepted
// but the solution should not be to just add it to inflight because that can cause a leak and growing
// inflight Map that is not cleared up correctly.
// Would be better if the offset was saved for skipped envelopes.
pending
"store offset for failing events when using RecoveryStrategy.skip" in {
implicit val pid1 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid1, 6)
val sourceProvider = createSourceProvider(envelopes)
implicit val offsetStore = createOffsetStore(projectionId, sourceProvider)

val bogusEventHandler = new ConcatHandler(_.sequenceNr == 6)

val projectionFailing =
R2dbcProjection
.exactlyOnce(projectionId, Some(settings), sourceProvider, handler = () => bogusEventHandler)
.withRecoveryStrategy(HandlerRecoveryStrategy.skip)

offsetShouldBeEmpty()
projectionTestKit.run(projectionFailing) {
projectedValueShouldBe("e1|e2|e3|e4|e5")
}
offsetShouldBe(envelopes.last.offset)
}

"skip failing events after retrying when using RecoveryStrategy.retryAndSkip" in {
implicit val pid1 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()
val envelopes = createEnvelopes(pid1, 6)
Expand Down Expand Up @@ -422,8 +431,10 @@ class R2dbcTimestampOffsetProjectionSpec
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 1, "e1")))
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 2, "e2")))
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 3, "e3")))
// Offset 4 is not stored so it is not reported.
// Offset 4 is stored even though it failed and was skipped
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 4, "e4")))
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 5, "e5")))
progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 6, "e6")))

offsetShouldBe(envelopes.last.offset)
}
Expand Down