diff --git a/akka-javasdk-maven/akka-javasdk-parent/pom.xml b/akka-javasdk-maven/akka-javasdk-parent/pom.xml
index 8587f1039..950e52cfc 100644
--- a/akka-javasdk-maven/akka-javasdk-parent/pom.xml
+++ b/akka-javasdk-maven/akka-javasdk-parent/pom.xml
@@ -38,7 +38,7 @@
21
- 1.3.0-46b2781
+ 1.3.0-940b627
UTF-8
false
diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerEffectImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerEffectImpl.scala
index 0b5da20fb..725fef4ad 100644
--- a/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerEffectImpl.scala
+++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerEffectImpl.scala
@@ -20,17 +20,11 @@ import akka.javasdk.consumer.Consumer
private[impl] object ConsumerEffectImpl {
sealed abstract class PrimaryEffect extends Consumer.Effect {}
- final case class ProduceEffect[T](msg: T, metadata: Option[Metadata]) extends PrimaryEffect {
- def isEmpty: Boolean = false
- }
+ final case class ProduceEffect[T](msg: T, metadata: Option[Metadata]) extends PrimaryEffect {}
- final case class AsyncEffect(effect: Future[Consumer.Effect]) extends PrimaryEffect {
- def isEmpty: Boolean = false
- }
+ case object ConsumedEffect extends PrimaryEffect {}
- case object IgnoreEffect extends PrimaryEffect {
- def isEmpty: Boolean = true
- }
+ final case class AsyncEffect(effect: Future[Consumer.Effect]) extends PrimaryEffect {}
object Builder extends Consumer.Effect.Builder {
def produce[S](message: S): Consumer.Effect = ProduceEffect(message, None)
@@ -40,18 +34,21 @@ private[impl] object ConsumerEffectImpl {
def asyncProduce[S](futureMessage: CompletionStage[S]): Consumer.Effect =
asyncProduce(futureMessage, Metadata.EMPTY)
+
def asyncProduce[S](futureMessage: CompletionStage[S], metadata: Metadata): Consumer.Effect =
AsyncEffect(futureMessage.asScala.map(s => Builder.produce[S](s, metadata))(ExecutionContext.parasitic))
+
def asyncEffect(futureEffect: CompletionStage[Consumer.Effect]): Consumer.Effect =
AsyncEffect(futureEffect.asScala)
+
def ignore(): Consumer.Effect =
- IgnoreEffect
+ ConsumedEffect
override def done(): Consumer.Effect =
- ProduceEffect(Done, None)
+ ConsumedEffect
override def asyncDone(futureMessage: CompletionStage[Done]): Consumer.Effect =
- AsyncEffect(futureMessage.asScala.map(done => Builder.produce(done))(ExecutionContext.parasitic))
+ AsyncEffect(futureMessage.asScala.map(_ => this.done())(ExecutionContext.parasitic))
}
def builder(): Consumer.Effect.Builder = Builder
diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerImpl.scala
index f03048ca7..409d36ddc 100644
--- a/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerImpl.scala
+++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerImpl.scala
@@ -10,7 +10,6 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.control.NonFatal
-import akka.Done
import akka.actor.ActorSystem
import akka.annotation.InternalApi
import akka.javasdk.Metadata
@@ -24,7 +23,7 @@ import akka.javasdk.impl.ComponentType
import akka.javasdk.impl.ErrorHandling
import akka.javasdk.impl.MetadataImpl
import akka.javasdk.impl.consumer.ConsumerEffectImpl.AsyncEffect
-import akka.javasdk.impl.consumer.ConsumerEffectImpl.IgnoreEffect
+import akka.javasdk.impl.consumer.ConsumerEffectImpl.ConsumedEffect
import akka.javasdk.impl.consumer.ConsumerEffectImpl.ProduceEffect
import akka.javasdk.impl.serialization.JsonSerializer
import akka.javasdk.impl.telemetry.ConsumerCategory
@@ -101,11 +100,7 @@ private[impl] final class ConsumerImpl[C <: Consumer](
private def toSpiEffect(message: Message, effect: Consumer.Effect): Future[Effect] = {
effect match {
- case ProduceEffect(msg: Done, metadata) =>
- Future.successful(
- new SpiConsumer.ProduceEffect(
- payload = Some(serializer.toBytes(msg)),
- metadata = MetadataImpl.toSpi(metadata)))
+ case ConsumedEffect => Future.successful(SpiConsumer.ConsumedEffect)
case ProduceEffect(msg, metadata) =>
if (consumerDestination.isEmpty) {
val baseMsg = s"Consumer [$componentId] produced a message but no destination is defined."
@@ -123,8 +118,6 @@ private[impl] final class ConsumerImpl[C <: Consumer](
.recover { case NonFatal(ex) =>
handleUnexpectedException(message, ex)
}
- case IgnoreEffect =>
- Future.successful(SpiConsumer.IgnoreEffect)
case unknown =>
throw new IllegalArgumentException(s"Unknown TimedAction.Effect type ${unknown.getClass}")
}
diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala
index 6de408ba5..a246e1729 100644
--- a/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala
+++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala
@@ -105,16 +105,12 @@ class WorkflowImpl[S, W <: Workflow[S]](
val failoverRecoverStrategy = definition.getStepRecoverStrategy.toScala.map(toRecovery)
val stepTimeout = definition.getStepTimeout.toScala.map(_.toScala)
- val defaultStepConfig = Option.when(failoverRecoverStrategy.isDefined) {
- new SpiWorkflow.StepConfig("", stepTimeout, failoverRecoverStrategy)
- }
-
new SpiWorkflow.WorkflowConfig(
workflowTimeout = definition.getWorkflowTimeout.toScala.map(_.toScala),
failoverTo = failoverTo,
failoverRecoverStrategy = failoverRecoverStrategy,
defaultStepTimeout = stepTimeout,
- defaultStepConfig = defaultStepConfig,
+ defaultStepRecoverStrategy = failoverRecoverStrategy,
stepConfigs = stepConfigs)
}
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index ccf18e709..115dc0012 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -8,7 +8,7 @@ object Dependencies {
val ProtocolVersionMinor = 1
val RuntimeImage = "gcr.io/kalix-public/kalix-runtime"
// Remember to bump kalix-runtime.version in akka-javasdk-maven/akka-javasdk-parent if bumping this
- val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-46b2781")
+ val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-940b627")
}
// NOTE: embedded SDK should have the AkkaVersion aligned, when updating RuntimeVersion, make sure to check
// if AkkaVersion and AkkaHttpVersion are aligned