diff --git a/akka-javasdk-maven/akka-javasdk-parent/pom.xml b/akka-javasdk-maven/akka-javasdk-parent/pom.xml index cdbd1eb0a..def125ec1 100644 --- a/akka-javasdk-maven/akka-javasdk-parent/pom.xml +++ b/akka-javasdk-maven/akka-javasdk-parent/pom.xml @@ -38,7 +38,7 @@ 21 - 1.2.2 + 1.2.2-7-3e32f0a1-SNAPSHOT UTF-8 false diff --git a/akka-javasdk-testkit/src/main/java/akka/javasdk/testkit/TestKit.java b/akka-javasdk-testkit/src/main/java/akka/javasdk/testkit/TestKit.java index 17072d564..dc1e4f3df 100644 --- a/akka-javasdk-testkit/src/main/java/akka/javasdk/testkit/TestKit.java +++ b/akka-javasdk-testkit/src/main/java/akka/javasdk/testkit/TestKit.java @@ -32,7 +32,7 @@ import akka.stream.SystemMaterializer; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import kalix.runtime.KalixRuntimeMain; +import kalix.runtime.AkkaRuntimeMain; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -474,7 +474,7 @@ public SpiSettings getSettings() { applicationConfig = runner.applicationConfig(); Config runtimeConfig = ConfigFactory.empty(); - runtimeActorSystem = KalixRuntimeMain.start(Some.apply(runtimeConfig), Some.apply(runner)); + runtimeActorSystem = AkkaRuntimeMain.start(Some.apply(runtimeConfig), runner); // wait for SDK to get on start callback (or fail starting), we need it to set up the component client var startupContext = runner.started().toCompletableFuture().get(20, TimeUnit.SECONDS); var componentClients = startupContext.componentClients(); diff --git a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitEventSourcedEntityCommandContext.scala b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitEventSourcedEntityCommandContext.scala index b39b8b208..641209711 100644 --- a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitEventSourcedEntityCommandContext.scala +++ b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitEventSourcedEntityCommandContext.scala @@ -15,6 +15,7 @@ final class TestKitEventSourcedEntityCommandContext( override val commandId: Long = 0L, override val commandName: String = "stubCommandName", override val sequenceNumber: Long = 0L, + override val isDeleted: Boolean = false, override val metadata: Metadata = Metadata.EMPTY) extends CommandContext with InternalContext { diff --git a/akka-javasdk-tests/src/test/java/akkajavasdk/EventSourcedEntityTest.java b/akka-javasdk-tests/src/test/java/akkajavasdk/EventSourcedEntityTest.java index eecf2670a..04dd69fdd 100644 --- a/akka-javasdk-tests/src/test/java/akkajavasdk/EventSourcedEntityTest.java +++ b/akka-javasdk-tests/src/test/java/akkajavasdk/EventSourcedEntityTest.java @@ -24,6 +24,7 @@ import static java.time.temporal.ChronoUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; @ExtendWith(Junit5LogCapturing.class) @@ -47,22 +48,30 @@ public void verifyCounterEventSourcedWiring() { @Test public void verifyCounterErrorEffect() { + var counterId = "hello-error"; + var client = componentClient.forEventSourcedEntity(counterId); + assertThrows(IllegalArgumentException.class, () -> + increaseCounterWithError(client, -1) + ); + } + @Test + public void httpVerifyCounterErrorEffect() { CompletableFuture> call = httpClient.POST("/akka/v1.0/entity/counter-entity/c001/increaseWithError") - .withRequestBody(-10) - .responseBodyAs(String.class) - .invokeAsync() - .toCompletableFuture(); + .withRequestBody(-10) + .responseBodyAs(String.class) + .invokeAsync() + .toCompletableFuture(); Awaitility.await() - .ignoreExceptions() - .atMost(5, TimeUnit.SECONDS) - .untilAsserted(() -> { - - assertThat(call).isCompletedExceptionally(); - assertThat(call.exceptionNow()).isInstanceOf(IllegalArgumentException.class); - assertThat(call.exceptionNow().getMessage()).contains("Value must be greater than 0"); - }); + .ignoreExceptions() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + + assertThat(call).isCompletedExceptionally(); + assertThat(call.exceptionNow()).isInstanceOf(IllegalArgumentException.class); + assertThat(call.exceptionNow().getMessage()).contains("Value must be greater than 0"); + }); } @Test @@ -185,6 +194,12 @@ private Integer increaseCounter(EventSourcedEntityClient client, int value) { .invokeAsync(value)); } + private Counter increaseCounterWithError(EventSourcedEntityClient client, int value) { + return await(client + .method(CounterEntity::increaseWithError) + .invokeAsync(value)); + } + private Integer multiplyCounter(EventSourcedEntityClient client, int value) { return await(client @@ -205,4 +220,4 @@ private Integer getCounter(EventSourcedEntityClient client) { return await(client.method(CounterEntity::get).invokeAsync()); } -} \ No newline at end of file +} diff --git a/akka-javasdk-tests/src/test/resources/logback-test.xml b/akka-javasdk-tests/src/test/resources/logback-test.xml index 52609e398..2eaa38a2a 100644 --- a/akka-javasdk-tests/src/test/resources/logback-test.xml +++ b/akka-javasdk-tests/src/test/resources/logback-test.xml @@ -12,15 +12,17 @@ - - + + + + diff --git a/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/CommandContext.java b/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/CommandContext.java index 325596d29..af9fb6cf5 100644 --- a/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/CommandContext.java +++ b/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/CommandContext.java @@ -37,6 +37,8 @@ public interface CommandContext extends MetadataContext { */ String entityId(); + boolean isDeleted(); + /** Access to tracing for custom app specific tracing. */ Tracing tracing(); } diff --git a/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/EventSourcedEntity.java b/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/EventSourcedEntity.java index caeafbb5b..3ba864c65 100644 --- a/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/EventSourcedEntity.java +++ b/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/EventSourcedEntity.java @@ -134,6 +134,16 @@ public void _internalSetCurrentState(S state) { currentState = Optional.ofNullable(state); } + /** + * INTERNAL API + * @hidden + */ + @InternalApi + public void _internalClearCurrentState() { + handlingCommands = false; + currentState = Optional.empty(); + } + /** * This is the main event handler method. Whenever an event is persisted, this handler will be called. * It should return the new state of the entity. diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala index 172ef9880..c590cfc41 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala @@ -7,12 +7,14 @@ package akka.javasdk.impl import java.lang.reflect.Constructor import java.lang.reflect.InvocationTargetException import java.util.concurrent.CompletionStage + import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise import scala.jdk.FutureConverters._ import scala.reflect.ClassTag import scala.util.control.NonFatal + import akka.Done import akka.actor.typed.ActorSystem import akka.annotation.InternalApi @@ -88,10 +90,12 @@ import kalix.protocol.value_entity.ValueEntities import kalix.protocol.view.Views import kalix.protocol.workflow_entity.WorkflowEntities import org.slf4j.LoggerFactory - import scala.jdk.OptionConverters.RichOptional import scala.jdk.CollectionConverters._ +import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityImpl +import akka.runtime.sdk.spi.EventSourcedEntityDescriptor + /** * INTERNAL API */ @@ -342,12 +346,34 @@ private final class Sdk( } // collect all Endpoints and compose them to build a larger router - private val httpEndpoints = componentClasses + private val httpEndpointDescriptors = componentClasses .filter(Reflect.isRestEndpoint) .map { httpEndpointClass => HttpEndpointDescriptorFactory(httpEndpointClass, httpEndpointFactory(httpEndpointClass)) } + private val eventSourcedEntityDescriptors = + componentClasses + .filter(hasComponentId) + .collect { + case clz if classOf[EventSourcedEntity[_, _]].isAssignableFrom(clz) => + val componentId = clz.getAnnotation(classOf[ComponentId]).value + val entitySpi = + new EventSourcedEntityImpl[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]]( + sdkSettings, + sdkTracerFactory, + componentId, + clz, + messageCodec, + context => + wiredInstance(clz.asInstanceOf[Class[EventSourcedEntity[AnyRef, AnyRef]]]) { + // remember to update component type API doc and docs if changing the set of injectables + case p if p == classOf[EventSourcedEntityContext] => context + }, + sdkSettings.snapshotEvery) + new EventSourcedEntityDescriptor(componentId, entitySpi) + } + // these are available for injecting in all kinds of component that are primarily // for side effects // Note: config is also always available through the combination with user DI way down below @@ -484,11 +510,15 @@ private final class Sdk( override def discovery: Discovery = discoveryEndpoint override def actions: Option[Actions] = actionsEndpoint override def eventSourcedEntities: Option[EventSourcedEntities] = eventSourcedEntitiesEndpoint + override def eventSourcedEntityDescriptors: Seq[EventSourcedEntityDescriptor] = + Sdk.this.eventSourcedEntityDescriptors override def valueEntities: Option[ValueEntities] = valueEntitiesEndpoint override def views: Option[Views] = viewsEndpoint override def workflowEntities: Option[WorkflowEntities] = workflowEntitiesEndpoint override def replicatedEntities: Option[ReplicatedEntities] = None - override def httpEndpointDescriptors: Seq[HttpEndpointDescriptor] = httpEndpoints + override def httpEndpointDescriptors: Seq[HttpEndpointDescriptor] = + Sdk.this.httpEndpointDescriptors + } } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala index 3412e5371..64111dbbd 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala @@ -290,6 +290,7 @@ private[impl] final class EventSourcedEntitiesImpl( with CommandContext with ActivatableContext { override def tracing(): Tracing = new SpanTracingImpl(span, tracerFactory) + override def isDeleted: Boolean = false // FIXME not supported by old spi } private class EventSourcedEntityContextImpl(override final val entityId: String) diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala new file mode 100644 index 000000000..639f6d1dc --- /dev/null +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala @@ -0,0 +1,295 @@ +/* + * Copyright (C) 2021-2024 Lightbend Inc. + */ + +package akka.javasdk.impl.eventsourcedentity + +import java.util.Optional + +import scala.concurrent.Future +import scala.util.control.NonFatal + +import akka.annotation.InternalApi +import akka.javasdk.Metadata +import akka.javasdk.Tracing +import akka.javasdk.eventsourcedentity.CommandContext +import akka.javasdk.eventsourcedentity.EventContext +import akka.javasdk.eventsourcedentity.EventSourcedEntity +import akka.javasdk.eventsourcedentity.EventSourcedEntityContext +import akka.javasdk.impl.AbstractContext +import akka.javasdk.impl.ActivatableContext +import akka.javasdk.impl.AnySupport +import akka.javasdk.impl.ComponentDescriptor +import akka.javasdk.impl.EntityExceptions +import akka.javasdk.impl.EntityExceptions.EntityException +import akka.javasdk.impl.ErrorHandling.BadRequestException +import akka.javasdk.impl.JsonMessageCodec +import akka.javasdk.impl.MetadataImpl +import akka.javasdk.impl.Settings +import akka.javasdk.impl.effect.ErrorReplyImpl +import akka.javasdk.impl.effect.MessageReplyImpl +import akka.javasdk.impl.effect.NoSecondaryEffectImpl +import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityEffectImpl.EmitEvents +import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityEffectImpl.NoPrimaryEffect +import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityRouter.CommandHandlerNotFound +import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityRouter.EventHandlerNotFound +import akka.javasdk.impl.telemetry.SpanTracingImpl +import akka.javasdk.impl.telemetry.Telemetry +import akka.runtime.sdk.spi.SpiEntity +import akka.runtime.sdk.spi.SpiEventSourcedEntity +import akka.runtime.sdk.spi.SpiSerialization +import akka.runtime.sdk.spi.SpiSerialization.Deserialized +import com.google.protobuf.ByteString +import com.google.protobuf.any.{ Any => ScalaPbAny } +import io.grpc.Status +import io.opentelemetry.api.trace.Span +import io.opentelemetry.api.trace.Tracer +import org.slf4j.LoggerFactory +import org.slf4j.MDC + +/** + * INTERNAL API + */ +@InternalApi +private[impl] object EventSourcedEntityImpl { + private val log = LoggerFactory.getLogger(this.getClass) + + private class CommandContextImpl( + override val entityId: String, + override val sequenceNumber: Long, + override val commandName: String, + override val commandId: Long, // FIXME remove + override val isDeleted: Boolean, + override val metadata: Metadata, + span: Option[Span], + tracerFactory: () => Tracer) + extends AbstractContext + with CommandContext + with ActivatableContext { + override def tracing(): Tracing = new SpanTracingImpl(span, tracerFactory) + } + + private class EventSourcedEntityContextImpl(override final val entityId: String) + extends AbstractContext + with EventSourcedEntityContext + + private final class EventContextImpl(entityId: String, override val sequenceNumber: Long) + extends EventSourcedEntityContextImpl(entityId) + with EventContext +} + +/** + * INTERNAL API + */ +@InternalApi +private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[S, E]]( + configuration: Settings, + tracerFactory: () => Tracer, + componentId: String, + componentClass: Class[_], + messageCodec: JsonMessageCodec, + factory: EventSourcedEntityContext => ES, + snapshotEvery: Int) + extends SpiEventSourcedEntity { + import EventSourcedEntityImpl._ + + if (snapshotEvery < 0) + log.warn("Snapshotting disabled for entity [{}], this is not recommended.", componentId) + + // FIXME +// private val traceInstrumentation = new TraceInstrumentation(componentId, EventSourcedEntityCategory, tracerFactory) + + private val componentDescriptor = ComponentDescriptor.descriptorFor(componentClass, messageCodec) + + // FIXME remove EventSourcedEntityRouter altogether, and only keep stateless ReflectiveEventSourcedEntityRouter + private def createRouter(context: EventSourcedEntityContext) + : ReflectiveEventSourcedEntityRouter[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]] = + new ReflectiveEventSourcedEntityRouter[S, E, ES]( + factory(context), + componentDescriptor.commandHandlers, + messageCodec) + .asInstanceOf[ReflectiveEventSourcedEntityRouter[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]]] + + override def emptyState(entityId: String): SpiEventSourcedEntity.State = { + // FIXME rather messy with the contexts here + val cmdContext = + new CommandContextImpl(entityId, 0L, "", 0, false, MetadataImpl.of(Nil), None, tracerFactory) + val context = new EventSourcedEntityContextImpl(entityId) + val router = createRouter(context) + router.entity._internalSetCommandContext(Optional.of(cmdContext)) + try { + router.entity.emptyState() + } finally { + router.entity._internalSetCommandContext(Optional.empty()) + } + } + + override def handleCommand( + state: SpiEventSourcedEntity.State, + command: SpiEntity.Command): Future[SpiEventSourcedEntity.Effect] = { + val entityId = command.entityId + + val span: Option[Span] = None // FIXME traceInstrumentation.buildSpan(service, command) + span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId)) + val cmd = + messageCodec.decodeMessage( + command.payload.getOrElse( + // FIXME smuggling 0 arity method called from component client through here + ScalaPbAny.defaultInstance.withTypeUrl(AnySupport.JsonTypeUrlPrefix).withValue(ByteString.empty()))) + val metadata: Metadata = + MetadataImpl.of(Nil) // FIXME MetadataImpl.of(command.metadata.map(_.entries.toVector).getOrElse(Nil)) + val cmdContext = + new CommandContextImpl( + entityId, + command.sequenceNumber, + command.name, + 0, + command.isDeleted, + metadata, + span, + tracerFactory) + + val context = new EventSourcedEntityContextImpl(entityId) + val router = createRouter(context) + router.entity._internalSetCommandContext(Optional.of(cmdContext)) + try { + router.entity._internalSetCurrentState(state) + val commandEffect = router + .handleCommand(command.name, state, cmd, cmdContext) + .asInstanceOf[EventSourcedEntityEffectImpl[AnyRef, E]] // FIXME improve? + + def replyOrError(updatedState: SpiEventSourcedEntity.State): (Option[ScalaPbAny], Option[SpiEntity.Error]) = { + commandEffect.secondaryEffect(updatedState) match { + case ErrorReplyImpl(description, status) => + val errorCode = status.map(_.value).getOrElse(Status.Code.UNKNOWN.value) + (None, Some(new SpiEntity.Error(description, errorCode))) + case MessageReplyImpl(message, _) => + // FIXME metadata? + // FIXME is this encoding correct? + val replyPayload = ScalaPbAny.fromJavaProto(messageCodec.encodeJava(message)) + (Some(replyPayload), None) + case NoSecondaryEffectImpl => + (None, None) + } + } + + var currentSequence = command.sequenceNumber + var updatedState = state + commandEffect.primaryEffect match { + case EmitEvents(events, deleteEntity) => + var shouldSnapshot = false + events.foreach { event => + updatedState = entityHandleEvent(updatedState, event.asInstanceOf[AnyRef], entityId, currentSequence) + if (updatedState == null) + throw new IllegalArgumentException("Event handler must not return null as the updated state.") + currentSequence += 1 + shouldSnapshot = shouldSnapshot || (snapshotEvery > 0 && currentSequence % snapshotEvery == 0) + } + + val (reply, error) = replyOrError(updatedState) + + if (error.isDefined) { + Future.successful( + new SpiEventSourcedEntity.Effect(events = Vector.empty, updatedState = state, reply = None, error, None)) + } else { + // snapshotting final state since that is the "atomic" write + // emptyState can be null but null snapshot should not be stored, but that can't even + // happen since event handler is not allowed to return null as newState + // FIXME +// val snapshot = +// if (shouldSnapshot) Option(updatedState) +// else None + + val delete = + if (deleteEntity) Some(configuration.cleanupDeletedEventSourcedEntityAfter) + else None + + val serializedEvents = + events.map(event => ScalaPbAny.fromJavaProto(messageCodec.encodeJava(event))).toVector + + Future.successful( + new SpiEventSourcedEntity.Effect(events = serializedEvents, updatedState = state, reply, error, delete)) + } + + case NoPrimaryEffect => + val (reply, error) = replyOrError(updatedState) + + Future.successful( + new SpiEventSourcedEntity.Effect(events = Vector.empty, updatedState = state, reply, error, None)) + } + + } catch { + case CommandHandlerNotFound(name) => + throw new EntityExceptions.EntityException( + entityId, + 0, // FIXME remove commandId + command.name, + s"No command handler found for command [$name] on ${router.entity.getClass}") + case BadRequestException(msg) => + Future.successful( + new SpiEventSourcedEntity.Effect( + events = Vector.empty, + updatedState = state, + reply = None, + error = Some(new SpiEntity.Error(msg, Status.Code.INVALID_ARGUMENT.value)), + delete = None)) + case e: EntityException => + throw e + case NonFatal(error) => + throw EntityException( + entityId = entityId, + commandId = 0, + commandName = command.name, + s"Unexpected failure: $error", + Some(error)) + } finally { + router.entity._internalSetCommandContext(Optional.empty()) + router.entity._internalClearCurrentState() + cmdContext.deactivate() // Very important! + + span.foreach { s => + MDC.remove(Telemetry.TRACE_ID) + s.end() + } + } + + } + + override def handleEvent( + state: SpiEventSourcedEntity.State, + eventEnv: SpiEventSourcedEntity.EventEnvelope): SpiEventSourcedEntity.State = { + val event = + messageCodec + .decodeMessage(eventEnv.payload) + .asInstanceOf[AnyRef] // FIXME empty? + entityHandleEvent(state, event, eventEnv.entityId, eventEnv.sequenceNumber) + } + + def entityHandleEvent( + state: SpiEventSourcedEntity.State, + event: AnyRef, + entityId: String, + sequenceNumber: Long): SpiEventSourcedEntity.State = { + val eventContext = new EventContextImpl(entityId, sequenceNumber) + val router = createRouter(eventContext) // FIXME reuse router instance? + router.entity._internalSetEventContext(Optional.of(eventContext)) + try { + router.handleEvent(state, event) + } catch { + case EventHandlerNotFound(eventClass) => + throw new IllegalArgumentException(s"Unknown event type [$eventClass] on ${router.entity.getClass}") + } finally { + router.entity._internalSetEventContext(Optional.empty()) + } + } + + override val stateSerializer: SpiSerialization.Serializer = + new SpiSerialization.Serializer { + + override def toProto(obj: Deserialized): ScalaPbAny = + ScalaPbAny.fromJavaProto(messageCodec.encodeJava(obj)) + + override def fromProto(pb: ScalaPbAny): Deserialized = + messageCodec.decodeMessage(pb).asInstanceOf[Deserialized] + } +} diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/ReflectiveEventSourcedEntityRouter.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/ReflectiveEventSourcedEntityRouter.scala index 788588442..d92629384 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/ReflectiveEventSourcedEntityRouter.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/ReflectiveEventSourcedEntityRouter.scala @@ -5,7 +5,6 @@ package akka.javasdk.impl.eventsourcedentity import akka.annotation.InternalApi -import akka.javasdk.JsonSupport import akka.javasdk.eventsourcedentity.CommandContext import akka.javasdk.eventsourcedentity.EventSourcedEntity import akka.javasdk.impl.AnySupport @@ -22,7 +21,7 @@ import com.google.protobuf.any.{ Any => ScalaPbAny } */ @InternalApi private[impl] class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedEntity[S, E]]( - override protected val entity: ES, + override val entity: ES, commandHandlers: Map[String, CommandHandler], messageCodec: JsonMessageCodec) extends EventSourcedEntityRouter[S, E, ES](entity) { @@ -39,7 +38,7 @@ private[impl] class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedE override def handleEvent(state: S, event: E): S = { - _extractAndSetCurrentState(state) + _setCurrentState(state) event match { case anyPb: ScalaPbAny => // replaying event coming from runtime @@ -60,7 +59,7 @@ private[impl] class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedE command: Any, commandContext: CommandContext): EventSourcedEntity.Effect[_] = { - _extractAndSetCurrentState(state) + _setCurrentState(state) val commandHandler = commandHandlerLookup(commandName) @@ -90,7 +89,7 @@ private[impl] class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedE } } - private def _extractAndSetCurrentState(state: S): Unit = { + private def _setCurrentState(state: S): Unit = { val entityStateType: Class[S] = Reflect.eventSourcedEntityStateType(this.entity.getClass).asInstanceOf[Class[S]] // the state: S received can either be of the entity "state" type (if coming from emptyState/memory) @@ -101,9 +100,12 @@ private[impl] class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedE // be able to call currentState() later entity._internalSetCurrentState(s) case s => - val deserializedState = - JsonSupport.decodeJson(entityStateType, ScalaPbAny.toJavaProto(s.asInstanceOf[ScalaPbAny])) - entity._internalSetCurrentState(deserializedState) + // FIXME this case should not be needed, maybe remove the type check + throw new IllegalArgumentException( + s"Unexpected state type [${s.getClass.getName}], expected [${entityStateType.getName}]") +// val deserializedState = +// JsonSupport.decodeJson(entityStateType, ScalaPbAny.toJavaProto(s.asInstanceOf[ScalaPbAny])) +// entity._internalSetCurrentState(deserializedState) } } } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 9e8ccfb18..09c328f1f 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -8,7 +8,7 @@ object Dependencies { val ProtocolVersionMinor = 1 val RuntimeImage = "gcr.io/kalix-public/kalix-runtime" // Remember to bump kalix-runtime.version in akka-javasdk-maven/akka-javasdk-parent if bumping this - val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.2.2") + val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.2.2-7-3e32f0a1-SNAPSHOT") } // NOTE: embedded SDK should have the AkkaVersion aligned, when updating RuntimeVersion, make sure to check // if AkkaVersion and AkkaHttpVersion are aligned diff --git a/samples/shopping-cart-quickstart/pom.xml b/samples/shopping-cart-quickstart/pom.xml index 25e79b2ff..5bc1159fd 100644 --- a/samples/shopping-cart-quickstart/pom.xml +++ b/samples/shopping-cart-quickstart/pom.xml @@ -5,7 +5,7 @@ io.akka akka-javasdk-parent - 3.0.0 + 3.0.0-b3221ba-11-587b68d4-dev-SNAPSHOT com.example diff --git a/samples/shopping-cart-quickstart/src/main/resources/include-dev-loggers.xml b/samples/shopping-cart-quickstart/src/main/resources/include-dev-loggers.xml index 295045188..9e02b6eb5 100644 --- a/samples/shopping-cart-quickstart/src/main/resources/include-dev-loggers.xml +++ b/samples/shopping-cart-quickstart/src/main/resources/include-dev-loggers.xml @@ -10,4 +10,7 @@ --> + + +