diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerEffectImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerEffectImpl.scala index 9f424fa5e..0b5da20fb 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerEffectImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerEffectImpl.scala @@ -20,7 +20,7 @@ import akka.javasdk.consumer.Consumer private[impl] object ConsumerEffectImpl { sealed abstract class PrimaryEffect extends Consumer.Effect {} - final case class ReplyEffect[T](msg: T, metadata: Option[Metadata]) extends PrimaryEffect { + final case class ProduceEffect[T](msg: T, metadata: Option[Metadata]) extends PrimaryEffect { def isEmpty: Boolean = false } @@ -33,10 +33,10 @@ private[impl] object ConsumerEffectImpl { } object Builder extends Consumer.Effect.Builder { - def produce[S](message: S): Consumer.Effect = ReplyEffect(message, None) + def produce[S](message: S): Consumer.Effect = ProduceEffect(message, None) def produce[S](message: S, metadata: Metadata): Consumer.Effect = - ReplyEffect(message, Some(metadata)) + ProduceEffect(message, Some(metadata)) def asyncProduce[S](futureMessage: CompletionStage[S]): Consumer.Effect = asyncProduce(futureMessage, Metadata.EMPTY) @@ -48,7 +48,7 @@ private[impl] object ConsumerEffectImpl { IgnoreEffect override def done(): Consumer.Effect = - ReplyEffect(Done, None) + ProduceEffect(Done, None) override def asyncDone(futureMessage: CompletionStage[Done]): Consumer.Effect = AsyncEffect(futureMessage.asScala.map(done => Builder.produce(done))(ExecutionContext.parasitic)) diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerImpl.scala index 620a01d44..cd0a99032 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerImpl.scala @@ -23,7 +23,7 @@ import akka.javasdk.impl.ErrorHandling import akka.javasdk.impl.MetadataImpl import akka.javasdk.impl.consumer.ConsumerEffectImpl.AsyncEffect import akka.javasdk.impl.consumer.ConsumerEffectImpl.IgnoreEffect -import akka.javasdk.impl.consumer.ConsumerEffectImpl.ReplyEffect +import akka.javasdk.impl.consumer.ConsumerEffectImpl.ProduceEffect import akka.javasdk.impl.serialization.JsonSerializer import akka.javasdk.impl.telemetry.SpanTracingImpl import akka.javasdk.impl.telemetry.Telemetry @@ -33,7 +33,6 @@ import akka.runtime.sdk.spi.BytesPayload import akka.runtime.sdk.spi.SpiConsumer import akka.runtime.sdk.spi.SpiConsumer.Effect import akka.runtime.sdk.spi.SpiConsumer.Message -import akka.runtime.sdk.spi.SpiMetadata import akka.runtime.sdk.spi.SpiMetadataEntry import akka.runtime.sdk.spi.TimerClient import io.opentelemetry.api.trace.Span @@ -101,13 +100,11 @@ private[impl] final class ConsumerImpl[C <: Consumer]( private def toSpiEffect(message: Message, effect: Consumer.Effect): Future[Effect] = { effect match { - case ReplyEffect(msg, metadata) => + case ProduceEffect(msg, metadata) => Future.successful( - new Effect( - ignore = false, - reply = Some(serializer.toBytes(msg)), - metadata = MetadataImpl.toSpi(metadata), - error = None)) + new SpiConsumer.ProduceEffect( + payload = Some(serializer.toBytes(msg)), + metadata = MetadataImpl.toSpi(metadata))) case AsyncEffect(futureEffect) => futureEffect .flatMap { effect => toSpiEffect(message, effect) } @@ -115,7 +112,7 @@ private[impl] final class ConsumerImpl[C <: Consumer]( handleUnexpectedException(message, ex) } case IgnoreEffect => - Future.successful(new Effect(ignore = true, reply = None, metadata = SpiMetadata.empty, error = None)) + Future.successful(SpiConsumer.IgnoreEffect) case unknown => throw new IllegalArgumentException(s"Unknown TimedAction.Effect type ${unknown.getClass}") } @@ -134,11 +131,7 @@ private[impl] final class ConsumerImpl[C <: Consumer]( } private def protocolFailure(correlationId: String): Effect = { - new Effect( - ignore = false, - reply = None, - metadata = SpiMetadata.empty, - error = Some(new SpiConsumer.Error(s"Unexpected error [$correlationId]"))) + new SpiConsumer.ErrorEffect(error = new SpiConsumer.Error(s"Unexpected error [$correlationId]")) } } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/timedaction/TimedActionImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/timedaction/TimedActionImpl.scala index 40bb92e19..a12624d4f 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/timedaction/TimedActionImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/timedaction/TimedActionImpl.scala @@ -125,7 +125,7 @@ private[impl] final class TimedActionImpl[TA <: TimedAction]( private def toSpiEffect(command: Command, effect: TimedAction.Effect): Future[Effect] = { effect match { case ReplyEffect(_) => //FIXME remove meta, not used in the reply - Future.successful(new Effect(None)) + Future.successful(SpiTimedAction.SuccessEffect) case AsyncEffect(futureEffect) => futureEffect .flatMap { effect => toSpiEffect(command, effect) } @@ -133,7 +133,7 @@ private[impl] final class TimedActionImpl[TA <: TimedAction]( handleUnexpectedException(command, ex) } case ErrorEffect(description) => - Future.successful(new Effect(Some(new SpiTimedAction.Error(description)))) + Future.successful(new SpiTimedAction.ErrorEffect(new SpiTimedAction.Error(description))) case unknown => throw new IllegalArgumentException(s"Unknown TimedAction.Effect type ${unknown.getClass}") } @@ -152,7 +152,7 @@ private[impl] final class TimedActionImpl[TA <: TimedAction]( } private def protocolFailure(correlationId: String): Effect = { - new Effect(Some(new SpiTimedAction.Error(s"Unexpected error [$correlationId]"))) + new SpiTimedAction.ErrorEffect(new SpiTimedAction.Error(s"Unexpected error [$correlationId]")) } } diff --git a/akka-javasdk/src/test/scala/akka/javasdk/impl/timedaction/TimedActionImplSpec.scala b/akka-javasdk/src/test/scala/akka/javasdk/impl/timedaction/TimedActionImplSpec.scala index fb66e0583..6c9daa327 100644 --- a/akka-javasdk/src/test/scala/akka/javasdk/impl/timedaction/TimedActionImplSpec.scala +++ b/akka-javasdk/src/test/scala/akka/javasdk/impl/timedaction/TimedActionImplSpec.scala @@ -87,7 +87,7 @@ class TimedActionImplSpec new SpiTimedAction.Command("MyMethod", Some(new BytesPayload(ByteString.empty, "")), SpiMetadata.empty)) .futureValue - reply.error shouldBe empty + reply shouldBe an[SpiTimedAction.SuccessEffect.type] } "turn thrown command handler exceptions into failure responses" in { @@ -104,9 +104,10 @@ class TimedActionImplSpec Some(new BytesPayload(ByteString.empty, "")), SpiMetadata.empty)) .futureValue + .asInstanceOf[SpiTimedAction.ErrorEffect] } - reply.error.value.description should startWith("Unexpected error") + reply.error.description should startWith("Unexpected error") } }