Skip to content

Commit

Permalink
notify in andThen
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Sep 20, 2023
1 parent b0ca096 commit 5f54fc5
Showing 1 changed file with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Success

import akka.Done
import akka.actor.typed.ActorSystem
Expand Down Expand Up @@ -535,11 +536,11 @@ private[projection] class R2dbcOffsetStore(
}

if (validationObservers.nonEmpty)
result.foreach(_.foreach {
case (env, validation) => notifyValidationObserver(env, validation)
})

result
result.andThen {
case Success(xs) => xs.foreach { case (env, v) => notifyValidationObserver(env, v) }
}
else
result
}

/**
Expand All @@ -552,8 +553,12 @@ private[projection] class R2dbcOffsetStore(
case Some(recordWithOffset) => validate(recordWithOffset, getInflight())
case None => Validation.FutureAccepted
}
notifyValidationObserver(envelope, result)
result
if (validationObservers.nonEmpty)
result.andThen {
case Success(v) => notifyValidationObserver(envelope, v)
}
else
result
}

private def validate(recordWithOffset: RecordWithOffset, currentInflight: Map[Pid, SeqNr]): Future[Validation] = {
Expand Down Expand Up @@ -682,11 +687,6 @@ private[projection] class R2dbcOffsetStore(
}
}

private def notifyValidationObserver[Envelope](env: Envelope, validation: Future[Validation]): Unit = {
if (validationObservers.nonEmpty)
validation.foreach(notifyValidationObserver(env, _))
}

private def notifyValidationObserver[Envelope](env: Envelope, validation: Validation): Unit = {
if (validationObservers.nonEmpty) {
validationObservers.foreach(_.onOffsetValidated(env, validation))
Expand Down

0 comments on commit 5f54fc5

Please sign in to comment.