From afeb47e1512a13d33aec3738c00feafc02968a24 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Sun, 13 Feb 2022 19:29:56 +0100 Subject: [PATCH 1/2] Store offset for failed skipped envelope * fixes pending tests * update to Akka Projections 1.2.4 --- build.sbt | 1 - project/Dependencies.scala | 2 +- .../r2dbc/internal/R2dbcProjectionImpl.scala | 23 +++++++++--- .../R2dbcTimestampOffsetProjectionSpec.scala | 37 ++++++++++++------- 4 files changed, 43 insertions(+), 20 deletions(-) diff --git a/build.sbt b/build.sbt index c1cf4f25..886a7d5a 100644 --- a/build.sbt +++ b/build.sbt @@ -34,7 +34,6 @@ def common: Seq[Setting[_]] = crossVersion := CrossVersion.binary, scalafmtOnCompile := true, sonatypeProfileName := "com.lightbend", - resolvers += Resolver.sonatypeRepo("snapshots"), // FIXME remove when using stable akka-projection // Setting javac options in common allows IntelliJ IDEA to import them automatically Compile / javacOptions ++= Seq("-encoding", "UTF-8", "-source", "1.8", "-target", "1.8"), headerLicense := Some(HeaderLicense.Custom("""Copyright (C) 2021 Lightbend Inc. """)), diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 0a32af04..d1beacac 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -9,7 +9,7 @@ object Dependencies { val Scala213 = "2.13.8" val AkkaVersion = System.getProperty("override.akka.version", "2.6.19") val AkkaVersionInDocs = AkkaVersion.take(3) - val AkkaProjectionVersion = "1.2.3" + val AkkaProjectionVersion = "1.2.4" val AkkaProjectionVersionInDocs = "current" object Compile { diff --git a/projection/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala b/projection/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala index f37177b0..f68ebfad 100644 --- a/projection/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala +++ b/projection/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala @@ -25,6 +25,8 @@ import akka.persistence.state.scaladsl.DurableStateStore import akka.persistence.state.scaladsl.GetObjectResult import akka.projection.BySlicesSourceProvider import akka.projection.HandlerRecoveryStrategy +import akka.projection.HandlerRecoveryStrategy.Internal.RetryAndSkip +import akka.projection.HandlerRecoveryStrategy.Internal.Skip import akka.projection.ProjectionContext import akka.projection.ProjectionId import akka.projection.RunningProjection @@ -452,6 +454,12 @@ private[projection] class R2dbcProjectionImpl[Offset, Envelope]( implicit val executionContext: ExecutionContext = system.executionContext override val logger: LoggingAdapter = Logging(system.classicSystem, this.getClass) + private val isExactlyOnceWithSkip: Boolean = + offsetStrategy match { + case ExactlyOnce(Some(Skip)) | ExactlyOnce(Some(_: RetryAndSkip)) => true + case _ => false + } + override def readPaused(): Future[Boolean] = offsetStore.readManagementState().map(_.exists(_.paused)) @@ -469,7 +477,7 @@ private[projection] class R2dbcProjectionImpl[Offset, Envelope]( import R2dbcProjectionImpl.FutureDone val envelope = projectionContext.envelope - if (offsetStore.isInflight(envelope)) { + if (offsetStore.isInflight(envelope) || isExactlyOnceWithSkip) { super.saveOffsetAndReport(projectionId, projectionContext, batchSize) } else { FutureDone @@ -481,10 +489,15 @@ private[projection] class R2dbcProjectionImpl[Offset, Envelope]( batch: Seq[ProjectionContextImpl[Offset, Envelope]]): Future[Done] = { import R2dbcProjectionImpl.FutureDone - val acceptedContexts = batch.iterator.filter { ctx => - val env = ctx.envelope - offsetStore.isInflight(env) - }.toVector + val acceptedContexts = + if (isExactlyOnceWithSkip) + batch.toVector + else { + batch.iterator.filter { ctx => + val env = ctx.envelope + offsetStore.isInflight(env) + }.toVector + } if (acceptedContexts.isEmpty) { FutureDone diff --git a/projection/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala b/projection/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala index 1ff9068e..aeae2e99 100644 --- a/projection/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala +++ b/projection/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala @@ -355,12 +355,6 @@ class R2dbcTimestampOffsetProjectionSpec } "skip failing events when using RecoveryStrategy.skip" in { - // FIXME for exactlyOnce it is not added to OffsetStore inflight and that is why e5 is not accepted - // but the solution should not be to just add it to inflight because that can cause a leak and growing - // inflight Map that is not cleared up correctly. - // Would be better if the offset was saved for skipped envelopes. - pending - implicit val pid1 = UUID.randomUUID().toString val projectionId = genRandomProjectionId() val envelopes = createEnvelopes(pid1, 6) @@ -381,13 +375,28 @@ class R2dbcTimestampOffsetProjectionSpec offsetShouldBe(envelopes.last.offset) } - "skip failing events after retrying when using RecoveryStrategy.retryAndSkip" in { - // FIXME for exactlyOnce it is not added to OffsetStore inflight and that is why e5 is not accepted - // but the solution should not be to just add it to inflight because that can cause a leak and growing - // inflight Map that is not cleared up correctly. - // Would be better if the offset was saved for skipped envelopes. - pending + "store offset for failing events when using RecoveryStrategy.skip" in { + implicit val pid1 = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + val envelopes = createEnvelopes(pid1, 6) + val sourceProvider = createSourceProvider(envelopes) + implicit val offsetStore = createOffsetStore(projectionId, sourceProvider) + + val bogusEventHandler = new ConcatHandler(_.sequenceNr == 6) + + val projectionFailing = + R2dbcProjection + .exactlyOnce(projectionId, Some(settings), sourceProvider, handler = () => bogusEventHandler) + .withRecoveryStrategy(HandlerRecoveryStrategy.skip) + + offsetShouldBeEmpty() + projectionTestKit.run(projectionFailing) { + projectedValueShouldBe("e1|e2|e3|e4|e5") + } + offsetShouldBe(envelopes.last.offset) + } + "skip failing events after retrying when using RecoveryStrategy.retryAndSkip" in { implicit val pid1 = UUID.randomUUID().toString val projectionId = genRandomProjectionId() val envelopes = createEnvelopes(pid1, 6) @@ -422,8 +431,10 @@ class R2dbcTimestampOffsetProjectionSpec progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 1, "e1"))) progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 2, "e2"))) progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 3, "e3"))) - // Offset 4 is not stored so it is not reported. + // Offset 4 is stored even though it failed and was skipped + progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 4, "e4"))) progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 5, "e5"))) + progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(pid1, 6, "e6"))) offsetShouldBe(envelopes.last.offset) } From cf9ab22bcafedd6f0342b9620099f5a1b5cc50aa Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 20 May 2022 16:20:53 +0200 Subject: [PATCH 2/2] fix test --- .../r2dbc/R2dbcProjectionSpec.scala | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/projection/src/test/scala/akka/projection/r2dbc/R2dbcProjectionSpec.scala b/projection/src/test/scala/akka/projection/r2dbc/R2dbcProjectionSpec.scala index 2139370e..4e46bdac 100644 --- a/projection/src/test/scala/akka/projection/r2dbc/R2dbcProjectionSpec.scala +++ b/projection/src/test/scala/akka/projection/r2dbc/R2dbcProjectionSpec.scala @@ -299,6 +299,29 @@ class R2dbcProjectionSpec offsetShouldBe(6L) } + "store offset for failing events when using RecoveryStrategy.skip" in { + implicit val entityId = UUID.randomUUID().toString + val projectionId = genRandomProjectionId() + implicit val offsetStore = createOffsetStore(projectionId) + + val bogusEventHandler = new ConcatHandler(_ == 6) + + val projectionFailing = + R2dbcProjection + .exactlyOnce( + projectionId, + Some(settings), + sourceProvider = sourceProvider(entityId), + handler = () => bogusEventHandler) + .withRecoveryStrategy(HandlerRecoveryStrategy.skip) + + offsetShouldBeEmpty() + projectionTestKit.run(projectionFailing) { + projectedValueShouldBe("e1|e2|e3|e4|e5") + } + offsetShouldBe(6L) + } + "skip failing events after retrying when using RecoveryStrategy.retryAndSkip" in { implicit val entityId = UUID.randomUUID().toString val projectionId = genRandomProjectionId() @@ -338,8 +361,10 @@ class R2dbcProjectionSpec progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(entityId, 1, "e1"))) progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(entityId, 2, "e2"))) progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(entityId, 3, "e3"))) - // Offset 4 is not stored so it is not reported. + // Offset 4 is stored even though it failed and was skipped + progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(entityId, 4, "e4"))) progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(entityId, 5, "e5"))) + progressProbe.expectMessage(TestStatusObserver.OffsetProgress(Envelope(entityId, 6, "e6"))) offsetShouldBe(6L) }