Skip to content

Commit

Permalink
chore: timed action & consumer effect spi
Browse files Browse the repository at this point in the history
  • Loading branch information
aludwiko committed Dec 17, 2024
1 parent 0f55d7c commit 9bfd656
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import akka.javasdk.consumer.Consumer
private[impl] object ConsumerEffectImpl {
sealed abstract class PrimaryEffect extends Consumer.Effect {}

final case class ReplyEffect[T](msg: T, metadata: Option[Metadata]) extends PrimaryEffect {
final case class ProduceEffect[T](msg: T, metadata: Option[Metadata]) extends PrimaryEffect {
def isEmpty: Boolean = false
}

Expand All @@ -33,10 +33,10 @@ private[impl] object ConsumerEffectImpl {
}

object Builder extends Consumer.Effect.Builder {
def produce[S](message: S): Consumer.Effect = ReplyEffect(message, None)
def produce[S](message: S): Consumer.Effect = ProduceEffect(message, None)

def produce[S](message: S, metadata: Metadata): Consumer.Effect =
ReplyEffect(message, Some(metadata))
ProduceEffect(message, Some(metadata))

def asyncProduce[S](futureMessage: CompletionStage[S]): Consumer.Effect =
asyncProduce(futureMessage, Metadata.EMPTY)
Expand All @@ -48,7 +48,7 @@ private[impl] object ConsumerEffectImpl {
IgnoreEffect

override def done(): Consumer.Effect =
ReplyEffect(Done, None)
ProduceEffect(Done, None)

override def asyncDone(futureMessage: CompletionStage[Done]): Consumer.Effect =
AsyncEffect(futureMessage.asScala.map(done => Builder.produce(done))(ExecutionContext.parasitic))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import akka.javasdk.impl.ErrorHandling
import akka.javasdk.impl.MetadataImpl
import akka.javasdk.impl.consumer.ConsumerEffectImpl.AsyncEffect
import akka.javasdk.impl.consumer.ConsumerEffectImpl.IgnoreEffect
import akka.javasdk.impl.consumer.ConsumerEffectImpl.ReplyEffect
import akka.javasdk.impl.consumer.ConsumerEffectImpl.ProduceEffect
import akka.javasdk.impl.serialization.JsonSerializer
import akka.javasdk.impl.telemetry.SpanTracingImpl
import akka.javasdk.impl.telemetry.Telemetry
Expand All @@ -33,7 +33,6 @@ import akka.runtime.sdk.spi.BytesPayload
import akka.runtime.sdk.spi.SpiConsumer
import akka.runtime.sdk.spi.SpiConsumer.Effect
import akka.runtime.sdk.spi.SpiConsumer.Message
import akka.runtime.sdk.spi.SpiMetadata
import akka.runtime.sdk.spi.SpiMetadataEntry
import akka.runtime.sdk.spi.TimerClient
import io.opentelemetry.api.trace.Span
Expand Down Expand Up @@ -101,21 +100,19 @@ private[impl] final class ConsumerImpl[C <: Consumer](

private def toSpiEffect(message: Message, effect: Consumer.Effect): Future[Effect] = {
effect match {
case ReplyEffect(msg, metadata) =>
case ProduceEffect(msg, metadata) =>
Future.successful(
new Effect(
ignore = false,
reply = Some(serializer.toBytes(msg)),
metadata = MetadataImpl.toSpi(metadata),
error = None))
new SpiConsumer.ProduceEffect(
payload = Some(serializer.toBytes(msg)),
metadata = MetadataImpl.toSpi(metadata)))
case AsyncEffect(futureEffect) =>
futureEffect
.flatMap { effect => toSpiEffect(message, effect) }
.recover { case NonFatal(ex) =>
handleUnexpectedException(message, ex)
}
case IgnoreEffect =>
Future.successful(new Effect(ignore = true, reply = None, metadata = SpiMetadata.empty, error = None))
Future.successful(new SpiConsumer.IgnoreEffect)
case unknown =>
throw new IllegalArgumentException(s"Unknown TimedAction.Effect type ${unknown.getClass}")
}
Expand All @@ -134,11 +131,7 @@ private[impl] final class ConsumerImpl[C <: Consumer](
}

private def protocolFailure(correlationId: String): Effect = {
new Effect(
ignore = false,
reply = None,
metadata = SpiMetadata.empty,
error = Some(new SpiConsumer.Error(s"Unexpected error [$correlationId]")))
new SpiConsumer.ErrorEffect(error = new SpiConsumer.Error(s"Unexpected error [$correlationId]"))
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,15 @@ private[impl] final class TimedActionImpl[TA <: TimedAction](
private def toSpiEffect(command: Command, effect: TimedAction.Effect): Future[Effect] = {
effect match {
case ReplyEffect(_) => //FIXME remove meta, not used in the reply
Future.successful(new Effect(None))
Future.successful(new SpiTimedAction.SuccessEffect)
case AsyncEffect(futureEffect) =>
futureEffect
.flatMap { effect => toSpiEffect(command, effect) }
.recover { case NonFatal(ex) =>
handleUnexpectedException(command, ex)
}
case ErrorEffect(description) =>
Future.successful(new Effect(Some(new SpiTimedAction.Error(description))))
Future.successful(new SpiTimedAction.ErrorEffect(new SpiTimedAction.Error(description)))
case unknown =>
throw new IllegalArgumentException(s"Unknown TimedAction.Effect type ${unknown.getClass}")
}
Expand All @@ -152,7 +152,7 @@ private[impl] final class TimedActionImpl[TA <: TimedAction](
}

private def protocolFailure(correlationId: String): Effect = {
new Effect(Some(new SpiTimedAction.Error(s"Unexpected error [$correlationId]")))
new SpiTimedAction.ErrorEffect(new SpiTimedAction.Error(s"Unexpected error [$correlationId]"))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class TimedActionImplSpec
new SpiTimedAction.Command("MyMethod", Some(new BytesPayload(ByteString.empty, "")), SpiMetadata.empty))
.futureValue

reply.error shouldBe empty
reply shouldBe an[SpiTimedAction.SuccessEffect]
}

"turn thrown command handler exceptions into failure responses" in {
Expand All @@ -104,9 +104,10 @@ class TimedActionImplSpec
Some(new BytesPayload(ByteString.empty, "")),
SpiMetadata.empty))
.futureValue
.asInstanceOf[SpiTimedAction.ErrorEffect]
}

reply.error.value.description should startWith("Unexpected error")
reply.error.description should startWith("Unexpected error")
}

}
Expand Down

0 comments on commit 9bfd656

Please sign in to comment.