From f0c075952c255f7826d702fc587c2dc15c52803c Mon Sep 17 00:00:00 2001 From: Andrzej Ludwikowski Date: Wed, 15 Jan 2025 10:15:44 +0100 Subject: [PATCH] chore: updating consumer SPI effect (#148) * chore: updating consumer SPI effect * improvements * bumping runtime and fixing compilation --- .../akka-javasdk-parent/pom.xml | 2 +- .../impl/consumer/ConsumerEffectImpl.scala | 21 ++++++++----------- .../javasdk/impl/consumer/ConsumerImpl.scala | 11 ++-------- .../javasdk/impl/workflow/WorkflowImpl.scala | 6 +----- project/Dependencies.scala | 2 +- 5 files changed, 14 insertions(+), 28 deletions(-) diff --git a/akka-javasdk-maven/akka-javasdk-parent/pom.xml b/akka-javasdk-maven/akka-javasdk-parent/pom.xml index 8587f1039..950e52cfc 100644 --- a/akka-javasdk-maven/akka-javasdk-parent/pom.xml +++ b/akka-javasdk-maven/akka-javasdk-parent/pom.xml @@ -38,7 +38,7 @@ 21 - 1.3.0-46b2781 + 1.3.0-940b627 UTF-8 false diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerEffectImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerEffectImpl.scala index 0b5da20fb..725fef4ad 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerEffectImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerEffectImpl.scala @@ -20,17 +20,11 @@ import akka.javasdk.consumer.Consumer private[impl] object ConsumerEffectImpl { sealed abstract class PrimaryEffect extends Consumer.Effect {} - final case class ProduceEffect[T](msg: T, metadata: Option[Metadata]) extends PrimaryEffect { - def isEmpty: Boolean = false - } + final case class ProduceEffect[T](msg: T, metadata: Option[Metadata]) extends PrimaryEffect {} - final case class AsyncEffect(effect: Future[Consumer.Effect]) extends PrimaryEffect { - def isEmpty: Boolean = false - } + case object ConsumedEffect extends PrimaryEffect {} - case object IgnoreEffect extends PrimaryEffect { - def isEmpty: Boolean = true - } + final case class AsyncEffect(effect: Future[Consumer.Effect]) extends PrimaryEffect {} object Builder extends Consumer.Effect.Builder { def produce[S](message: S): Consumer.Effect = ProduceEffect(message, None) @@ -40,18 +34,21 @@ private[impl] object ConsumerEffectImpl { def asyncProduce[S](futureMessage: CompletionStage[S]): Consumer.Effect = asyncProduce(futureMessage, Metadata.EMPTY) + def asyncProduce[S](futureMessage: CompletionStage[S], metadata: Metadata): Consumer.Effect = AsyncEffect(futureMessage.asScala.map(s => Builder.produce[S](s, metadata))(ExecutionContext.parasitic)) + def asyncEffect(futureEffect: CompletionStage[Consumer.Effect]): Consumer.Effect = AsyncEffect(futureEffect.asScala) + def ignore(): Consumer.Effect = - IgnoreEffect + ConsumedEffect override def done(): Consumer.Effect = - ProduceEffect(Done, None) + ConsumedEffect override def asyncDone(futureMessage: CompletionStage[Done]): Consumer.Effect = - AsyncEffect(futureMessage.asScala.map(done => Builder.produce(done))(ExecutionContext.parasitic)) + AsyncEffect(futureMessage.asScala.map(_ => this.done())(ExecutionContext.parasitic)) } def builder(): Consumer.Effect.Builder = Builder diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerImpl.scala index f03048ca7..409d36ddc 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerImpl.scala @@ -10,7 +10,6 @@ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.util.control.NonFatal -import akka.Done import akka.actor.ActorSystem import akka.annotation.InternalApi import akka.javasdk.Metadata @@ -24,7 +23,7 @@ import akka.javasdk.impl.ComponentType import akka.javasdk.impl.ErrorHandling import akka.javasdk.impl.MetadataImpl import akka.javasdk.impl.consumer.ConsumerEffectImpl.AsyncEffect -import akka.javasdk.impl.consumer.ConsumerEffectImpl.IgnoreEffect +import akka.javasdk.impl.consumer.ConsumerEffectImpl.ConsumedEffect import akka.javasdk.impl.consumer.ConsumerEffectImpl.ProduceEffect import akka.javasdk.impl.serialization.JsonSerializer import akka.javasdk.impl.telemetry.ConsumerCategory @@ -101,11 +100,7 @@ private[impl] final class ConsumerImpl[C <: Consumer]( private def toSpiEffect(message: Message, effect: Consumer.Effect): Future[Effect] = { effect match { - case ProduceEffect(msg: Done, metadata) => - Future.successful( - new SpiConsumer.ProduceEffect( - payload = Some(serializer.toBytes(msg)), - metadata = MetadataImpl.toSpi(metadata))) + case ConsumedEffect => Future.successful(SpiConsumer.ConsumedEffect) case ProduceEffect(msg, metadata) => if (consumerDestination.isEmpty) { val baseMsg = s"Consumer [$componentId] produced a message but no destination is defined." @@ -123,8 +118,6 @@ private[impl] final class ConsumerImpl[C <: Consumer]( .recover { case NonFatal(ex) => handleUnexpectedException(message, ex) } - case IgnoreEffect => - Future.successful(SpiConsumer.IgnoreEffect) case unknown => throw new IllegalArgumentException(s"Unknown TimedAction.Effect type ${unknown.getClass}") } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala index 6de408ba5..a246e1729 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala @@ -105,16 +105,12 @@ class WorkflowImpl[S, W <: Workflow[S]]( val failoverRecoverStrategy = definition.getStepRecoverStrategy.toScala.map(toRecovery) val stepTimeout = definition.getStepTimeout.toScala.map(_.toScala) - val defaultStepConfig = Option.when(failoverRecoverStrategy.isDefined) { - new SpiWorkflow.StepConfig("", stepTimeout, failoverRecoverStrategy) - } - new SpiWorkflow.WorkflowConfig( workflowTimeout = definition.getWorkflowTimeout.toScala.map(_.toScala), failoverTo = failoverTo, failoverRecoverStrategy = failoverRecoverStrategy, defaultStepTimeout = stepTimeout, - defaultStepConfig = defaultStepConfig, + defaultStepRecoverStrategy = failoverRecoverStrategy, stepConfigs = stepConfigs) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ccf18e709..115dc0012 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -8,7 +8,7 @@ object Dependencies { val ProtocolVersionMinor = 1 val RuntimeImage = "gcr.io/kalix-public/kalix-runtime" // Remember to bump kalix-runtime.version in akka-javasdk-maven/akka-javasdk-parent if bumping this - val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-46b2781") + val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-940b627") } // NOTE: embedded SDK should have the AkkaVersion aligned, when updating RuntimeVersion, make sure to check // if AkkaVersion and AkkaHttpVersion are aligned