Skip to content

Commit

Permalink
chore: Align routers
Browse files Browse the repository at this point in the history
* remove router abstraction, but keep the ReflectiveRouter
* align HandlerNotFoundException
* misc cleanup
  • Loading branch information
patriknw committed Dec 17, 2024
1 parent dd80d30 commit 699a81b
Show file tree
Hide file tree
Showing 16 changed files with 120 additions and 351 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.google.protobuf.Descriptors
*/
@InternalApi
private[impl] final case class CommandHandler(
grpcMethodName: String,
methodName: String,
serializer: JsonSerializer,
requestMessageDescriptor: Descriptors.Descriptor,
methodInvokers: Map[String, MethodInvoker]) {
Expand Down Expand Up @@ -60,7 +60,7 @@ private[impl] final case class CommandHandler(

// for embedded SDK we expect components to be either zero or one arity
def getSingleNameInvoker(): MethodInvoker =
if (methodInvokers.size != 1) throw new IllegalStateException(s"More than one method defined for $grpcMethodName")
if (methodInvokers.size != 1) throw new IllegalStateException(s"More than one method defined for $methodName")
else methodInvokers.head._2
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
package akka.javasdk.impl

import akka.annotation.InternalApi
import akka.javasdk.JsonSupport
import com.google.protobuf.any.Any.toJavaProto
import com.google.protobuf.any.{ Any => ScalaPbAny }
import java.lang.reflect.Method
import java.lang.reflect.ParameterizedType
import java.util
Expand All @@ -23,49 +20,6 @@ import akka.runtime.sdk.spi.BytesPayload
@InternalApi
object CommandSerialization {

// FIXME remove or convert ScalaPbAny => BytesPayload
def deserializeComponentClientCommand(method: Method, command: ScalaPbAny): Option[AnyRef] = {
// special cased component client calls, lets json commands through all the way
val parameterTypes = method.getGenericParameterTypes
if (parameterTypes.isEmpty) None
else if (parameterTypes.size > 1)
throw new IllegalStateException(
s"Passing more than one parameter to the command handler [${method.getDeclaringClass.getName}.${method.getName}] is not supported, parameter types: [${parameterTypes.mkString}]")
else {
// we used to dispatch based on the type, since that is how it works in protobuf for eventing
// but here we have a concrete command name, and can pick up the expected serialized type from there

try {
parameterTypes.head match {
case paramClass: Class[_] =>
Some(JsonSupport.decodeJson(paramClass, command).asInstanceOf[AnyRef])
case parameterizedType: ParameterizedType =>
if (classOf[java.util.Collection[_]]
.isAssignableFrom(parameterizedType.getRawType.asInstanceOf[Class[_]])) {
val elementType = parameterizedType.getActualTypeArguments.head match {
case typeParamClass: Class[_] => typeParamClass
case _ =>
throw new RuntimeException(
s"Command handler [${method.getDeclaringClass.getName}.${method.getName}] accepts a parameter that is a collection with a generic type inside, this is not supported.")
}
Some(
JsonSupport.decodeJsonCollection(
elementType.asInstanceOf[Class[AnyRef]],
parameterizedType.getRawType.asInstanceOf[Class[util.Collection[AnyRef]]],
toJavaProto(command)))
} else
throw new RuntimeException(
s"Command handler [${method.getDeclaringClass.getName}.${method.getName}] handler accepts a parameter that is a generic type [$parameterizedType], this is not supported.")
}
} catch {
case NonFatal(ex) =>
throw new IllegalArgumentException(
s"Could not deserialize message for [${method.getDeclaringClass.getName}.${method.getName}]",
ex)
}
}
}

def deserializeComponentClientCommand(
method: Method,
command: BytesPayload,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.javasdk.impl

import akka.annotation.InternalApi

/**
* INTERNAL API
*/
@InternalApi
private[impl] final class HandlerNotFoundException(
handlerType: String,
val name: String,
componentClass: Class[_],
availableHandlers: Set[String])
extends RuntimeException(
s"no matching [$handlerType] handler for [$name] on [${componentClass.getName}]. " +
s"Available handlers are: [${availableHandlers.mkString(", ")}]")
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ private[impl] final class ConsumerImpl[C <: Consumer](
private implicit val executionContext: ExecutionContext = sdkExecutionContext
implicit val system: ActorSystem = _system

// FIXME remove router altogether
private def createRouter(): ReflectiveConsumerRouter[C] =
new ReflectiveConsumerRouter[C](
factory(),
Expand All @@ -78,12 +77,11 @@ private[impl] final class ConsumerImpl[C <: Consumer](
val messageContext = createMessageContext(message, span)
val payload: BytesPayload = message.payload.getOrElse(throw new IllegalArgumentException("No message payload"))
val effect = createRouter()
.handleUnary(MessageEnvelope.of(payload, messageContext.metadata()), messageContext)
.handleCommand(MessageEnvelope.of(payload, messageContext.metadata()), messageContext)
toSpiEffect(message, effect)
} catch {
case NonFatal(ex) =>
// command handler threw an "unexpected" error
span.foreach(_.end())
// command handler threw an "unexpected" error, also covers HandlerNotFoundException
Future.successful(handleUnexpectedException(message, ex))
} finally {
MDC.remove(Telemetry.TRACE_ID)
Expand Down Expand Up @@ -122,14 +120,11 @@ private[impl] final class ConsumerImpl[C <: Consumer](
}

private def handleUnexpectedException(message: Message, ex: Throwable): Effect = {
ex match {
case _ =>
ErrorHandling.withCorrelationId { correlationId =>
log.error(
s"Failure during handling message [${message.name}] from Consumer component [${consumerClass.getSimpleName}].",
ex)
protocolFailure(correlationId)
}
ErrorHandling.withCorrelationId { correlationId =>
log.error(
s"Failure during handling message [${message.name}] from Consumer component [${consumerClass.getSimpleName}].",
ex)
protocolFailure(correlationId)
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

package akka.javasdk.impl.consumer

import java.util.Optional

import akka.annotation.InternalApi
import akka.javasdk.consumer.Consumer
import akka.javasdk.consumer.MessageContext
import akka.javasdk.consumer.MessageEnvelope
import akka.javasdk.impl.AnySupport
import akka.javasdk.impl.AnySupport.ProtobufEmptyTypeUrl
import akka.javasdk.impl.MethodInvoker
import akka.javasdk.impl.reflection.ParameterExtractors
Expand All @@ -23,14 +25,16 @@ private[impl] class ReflectiveConsumerRouter[A <: Consumer](
consumer: A,
methodInvokers: Map[String, MethodInvoker],
serializer: JsonSerializer,
ignoreUnknown: Boolean)
extends ConsumerRouter[A](consumer) {
ignoreUnknown: Boolean) {

override def handleUnary(message: MessageEnvelope[BytesPayload]): Consumer.Effect = {
def handleCommand(message: MessageEnvelope[BytesPayload], context: MessageContext): Consumer.Effect = {
// only set, never cleared, to allow access from other threads in async callbacks in the consumer
// the same handler and consumer instance is expected to only ever be invoked for a single message
consumer._internalSetMessageContext(Optional.of(context))

val payload = message.payload()
// make sure we route based on the new type url if we get an old json type url message
val inputTypeUrl = serializer.removeVersion(AnySupport.replaceLegacyJsonPrefix(payload.contentType))
val inputTypeUrl = serializer.removeVersion(serializer.replaceLegacyJsonPrefix(payload.contentType))

// lookup ComponentClient
val componentClients = Reflect.lookupComponentClientFields(consumer)
Expand All @@ -41,7 +45,7 @@ private[impl] class ReflectiveConsumerRouter[A <: Consumer](
methodInvoker match {
case Some(invoker) =>
inputTypeUrl match {
case ProtobufEmptyTypeUrl =>
case BytesPayload.EmptyContentType | ProtobufEmptyTypeUrl =>
invoker
.invoke(consumer)
.asInstanceOf[Consumer.Effect]
Expand All @@ -55,9 +59,10 @@ private[impl] class ReflectiveConsumerRouter[A <: Consumer](
.asInstanceOf[Consumer.Effect]
}
case None if ignoreUnknown => ConsumerEffectImpl.Builder.ignore()
case None =>
case None =>
// FIXME IllegalStateException vs NoSuchElementException?
throw new NoSuchElementException(
s"Couldn't find any method with input type [$inputTypeUrl] in Consumer [$consumer].")
s"Couldn't find any method with input type [$inputTypeUrl] in Consumer [${consumer.getClass.getName}].")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ import akka.javasdk.eventsourcedentity.EventSourcedEntity
import akka.javasdk.eventsourcedentity.EventSourcedEntityContext
import akka.javasdk.impl.AbstractContext
import akka.javasdk.impl.ActivatableContext
import akka.javasdk.impl.AnySupport
import akka.javasdk.impl.ComponentDescriptor
import akka.javasdk.impl.ComponentType
import akka.javasdk.impl.EntityExceptions
import akka.javasdk.impl.EntityExceptions.EntityException
import akka.javasdk.impl.ErrorHandling.BadRequestException
import akka.javasdk.impl.MetadataImpl
Expand All @@ -39,7 +37,6 @@ import akka.javasdk.impl.telemetry.TraceInstrumentation
import akka.runtime.sdk.spi.BytesPayload
import akka.runtime.sdk.spi.SpiEntity
import akka.runtime.sdk.spi.SpiEventSourcedEntity
import akka.util.ByteString
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.Tracer
import org.slf4j.MDC
Expand Down Expand Up @@ -74,8 +71,6 @@ private[impl] object EventSourcedEntityImpl {
extends EventSourcedEntityContextImpl(entityId)
with EventContext

// 0 arity method
private val NoCommandPayload = new BytesPayload(ByteString.empty, AnySupport.JsonTypeUrlPrefix)
}

/**
Expand Down Expand Up @@ -114,9 +109,8 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
val span: Option[Span] =
traceInstrumentation.buildSpan(ComponentType.EventSourcedEntity, componentId, entityId, command)
span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId))
val cmdPayload = command.payload.getOrElse(
// smuggling 0 arity method called from component client through here
NoCommandPayload)
// smuggling 0 arity method called from component client through here
val cmdPayload = command.payload.getOrElse(BytesPayload.empty)
val metadata: Metadata = MetadataImpl.of(command.metadata)
val cmdContext =
new CommandContextImpl(
Expand All @@ -132,7 +126,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
entity._internalSetCommandContext(Optional.of(cmdContext))
entity._internalSetCurrentState(state)
val commandEffect = router
.handleCommand(command.name, cmdPayload, cmdContext)
.handleCommand(command.name, cmdPayload)
.asInstanceOf[EventSourcedEntityEffectImpl[AnyRef, E]] // FIXME improve?

def errorOrReply(updatedState: SpiEventSourcedEntity.State): Either[SpiEntity.Error, BytesPayload] = {
Expand Down Expand Up @@ -183,16 +177,12 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
}

} catch {
case e: HandlerNotFoundException =>
throw new EntityExceptions.EntityException(
entityId,
command.name,
s"No command handler found for command [${e.name}] on ${entity.getClass}")
case BadRequestException(msg) =>
Future.successful(new SpiEventSourcedEntity.ErrorEffect(error = new SpiEntity.Error(msg)))
case e: EntityException =>
throw e
case NonFatal(error) =>
// also covers HandlerNotFoundException
throw EntityException(
entityId = entityId,
commandName = command.name,
Expand Down
Loading

0 comments on commit 699a81b

Please sign in to comment.