Skip to content

Commit

Permalink
chore: updating consumer SPI effect (#148)
Browse files Browse the repository at this point in the history
* chore: updating consumer SPI effect

* improvements

* bumping runtime and fixing compilation
  • Loading branch information
aludwiko authored Jan 15, 2025
1 parent 25bf480 commit f0c0759
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 28 deletions.
2 changes: 1 addition & 1 deletion akka-javasdk-maven/akka-javasdk-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

<!-- These are dependent on runtime environment and cannot be customized by users -->
<maven.compiler.release>21</maven.compiler.release>
<kalix-runtime.version>1.3.0-46b2781</kalix-runtime.version>
<kalix-runtime.version>1.3.0-940b627</kalix-runtime.version>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<skip.docker>false</skip.docker>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,11 @@ import akka.javasdk.consumer.Consumer
private[impl] object ConsumerEffectImpl {
sealed abstract class PrimaryEffect extends Consumer.Effect {}

final case class ProduceEffect[T](msg: T, metadata: Option[Metadata]) extends PrimaryEffect {
def isEmpty: Boolean = false
}
final case class ProduceEffect[T](msg: T, metadata: Option[Metadata]) extends PrimaryEffect {}

final case class AsyncEffect(effect: Future[Consumer.Effect]) extends PrimaryEffect {
def isEmpty: Boolean = false
}
case object ConsumedEffect extends PrimaryEffect {}

case object IgnoreEffect extends PrimaryEffect {
def isEmpty: Boolean = true
}
final case class AsyncEffect(effect: Future[Consumer.Effect]) extends PrimaryEffect {}

object Builder extends Consumer.Effect.Builder {
def produce[S](message: S): Consumer.Effect = ProduceEffect(message, None)
Expand All @@ -40,18 +34,21 @@ private[impl] object ConsumerEffectImpl {

def asyncProduce[S](futureMessage: CompletionStage[S]): Consumer.Effect =
asyncProduce(futureMessage, Metadata.EMPTY)

def asyncProduce[S](futureMessage: CompletionStage[S], metadata: Metadata): Consumer.Effect =
AsyncEffect(futureMessage.asScala.map(s => Builder.produce[S](s, metadata))(ExecutionContext.parasitic))

def asyncEffect(futureEffect: CompletionStage[Consumer.Effect]): Consumer.Effect =
AsyncEffect(futureEffect.asScala)

def ignore(): Consumer.Effect =
IgnoreEffect
ConsumedEffect

override def done(): Consumer.Effect =
ProduceEffect(Done, None)
ConsumedEffect

override def asyncDone(futureMessage: CompletionStage[Done]): Consumer.Effect =
AsyncEffect(futureMessage.asScala.map(done => Builder.produce(done))(ExecutionContext.parasitic))
AsyncEffect(futureMessage.asScala.map(_ => this.done())(ExecutionContext.parasitic))
}

def builder(): Consumer.Effect.Builder = Builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.control.NonFatal

import akka.Done
import akka.actor.ActorSystem
import akka.annotation.InternalApi
import akka.javasdk.Metadata
Expand All @@ -24,7 +23,7 @@ import akka.javasdk.impl.ComponentType
import akka.javasdk.impl.ErrorHandling
import akka.javasdk.impl.MetadataImpl
import akka.javasdk.impl.consumer.ConsumerEffectImpl.AsyncEffect
import akka.javasdk.impl.consumer.ConsumerEffectImpl.IgnoreEffect
import akka.javasdk.impl.consumer.ConsumerEffectImpl.ConsumedEffect
import akka.javasdk.impl.consumer.ConsumerEffectImpl.ProduceEffect
import akka.javasdk.impl.serialization.JsonSerializer
import akka.javasdk.impl.telemetry.ConsumerCategory
Expand Down Expand Up @@ -101,11 +100,7 @@ private[impl] final class ConsumerImpl[C <: Consumer](

private def toSpiEffect(message: Message, effect: Consumer.Effect): Future[Effect] = {
effect match {
case ProduceEffect(msg: Done, metadata) =>
Future.successful(
new SpiConsumer.ProduceEffect(
payload = Some(serializer.toBytes(msg)),
metadata = MetadataImpl.toSpi(metadata)))
case ConsumedEffect => Future.successful(SpiConsumer.ConsumedEffect)
case ProduceEffect(msg, metadata) =>
if (consumerDestination.isEmpty) {
val baseMsg = s"Consumer [$componentId] produced a message but no destination is defined."
Expand All @@ -123,8 +118,6 @@ private[impl] final class ConsumerImpl[C <: Consumer](
.recover { case NonFatal(ex) =>
handleUnexpectedException(message, ex)
}
case IgnoreEffect =>
Future.successful(SpiConsumer.IgnoreEffect)
case unknown =>
throw new IllegalArgumentException(s"Unknown TimedAction.Effect type ${unknown.getClass}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,12 @@ class WorkflowImpl[S, W <: Workflow[S]](
val failoverRecoverStrategy = definition.getStepRecoverStrategy.toScala.map(toRecovery)
val stepTimeout = definition.getStepTimeout.toScala.map(_.toScala)

val defaultStepConfig = Option.when(failoverRecoverStrategy.isDefined) {
new SpiWorkflow.StepConfig("", stepTimeout, failoverRecoverStrategy)
}

new SpiWorkflow.WorkflowConfig(
workflowTimeout = definition.getWorkflowTimeout.toScala.map(_.toScala),
failoverTo = failoverTo,
failoverRecoverStrategy = failoverRecoverStrategy,
defaultStepTimeout = stepTimeout,
defaultStepConfig = defaultStepConfig,
defaultStepRecoverStrategy = failoverRecoverStrategy,
stepConfigs = stepConfigs)
}

Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ object Dependencies {
val ProtocolVersionMinor = 1
val RuntimeImage = "gcr.io/kalix-public/kalix-runtime"
// Remember to bump kalix-runtime.version in akka-javasdk-maven/akka-javasdk-parent if bumping this
val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-46b2781")
val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-940b627")
}
// NOTE: embedded SDK should have the AkkaVersion aligned, when updating RuntimeVersion, make sure to check
// if AkkaVersion and AkkaHttpVersion are aligned
Expand Down

0 comments on commit f0c0759

Please sign in to comment.