Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: misc cleanups #79

Merged
merged 2 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,13 @@

package akka.javasdk.impl

import akka.annotation.InternalApi
import akka.javasdk.annotations.Acl
import akka.javasdk.annotations.ComponentId
import akka.javasdk.eventsourcedentity.EventSourcedEntity
import akka.javasdk.impl.reflection.CombinedSubscriptionServiceMethod
import akka.javasdk.impl.reflection.KalixMethod
import akka.javasdk.impl.reflection.NameGenerator
import akka.javasdk.impl.reflection.Reflect
import java.lang.reflect.AnnotatedElement
import java.lang.reflect.Method
import java.lang.reflect.ParameterizedType

import akka.annotation.InternalApi
import akka.javasdk.annotations.Acl
import akka.javasdk.annotations.ComponentId
import akka.javasdk.annotations.Consume.FromEventSourcedEntity
import akka.javasdk.annotations.Consume.FromKeyValueEntity
import akka.javasdk.annotations.Consume.FromServiceStream
Expand All @@ -24,6 +19,11 @@ import akka.javasdk.annotations.DeleteHandler
import akka.javasdk.annotations.Produce.ServiceStream
import akka.javasdk.annotations.Produce.ToTopic
import akka.javasdk.consumer.Consumer
import akka.javasdk.eventsourcedentity.EventSourcedEntity
import akka.javasdk.impl.reflection.CombinedSubscriptionServiceMethod
import akka.javasdk.impl.reflection.KalixMethod
import akka.javasdk.impl.reflection.NameGenerator
import akka.javasdk.impl.reflection.Reflect
import akka.javasdk.impl.serialization.JsonSerializer
import akka.javasdk.impl.view.ViewDescriptorFactory
import akka.javasdk.keyvalueentity.KeyValueEntity
Expand All @@ -33,17 +33,13 @@ import akka.javasdk.view.View
import akka.javasdk.workflow.Workflow
import akka.runtime.sdk.spi.ConsumerDestination
import akka.runtime.sdk.spi.ConsumerSource
import kalix.DirectDestination
import kalix.DirectSource
import kalix.EventDestination
import kalix.EventSource
import kalix.Eventing
import kalix.MethodOptions
import kalix.ServiceEventing
import kalix.ServiceEventingOut
import kalix.ServiceOptions
// TODO: abstract away spring dependency
import Reflect.Syntax._
import akka.javasdk.impl.reflection.Reflect.Syntax._

/**
* INTERNAL API
Expand Down Expand Up @@ -204,24 +200,6 @@ private[impl] object ComponentDescriptorFactory {
else false
}

def valueEntityEventSource(clazz: Class[_], handleDeletes: Boolean) = {
val entityType = findValueEntityType(clazz)
EventSource
.newBuilder()
.setValueEntity(entityType)
.setHandleDeletes(handleDeletes)
.build()
}

def topicEventDestination(clazz: Class[_]): Option[EventDestination] = {
if (hasTopicPublication(clazz)) {
val topicName = findPublicationTopicName(clazz)
Some(EventDestination.newBuilder().setTopic(topicName).build())
} else {
None
}
}

def consumerSource(clazz: Class[_]): ConsumerSource = {
if (hasValueEntitySubscription(clazz)) {
val kveType = findValueEntityType(clazz)
Expand Down Expand Up @@ -280,22 +258,12 @@ private[impl] object ComponentDescriptorFactory {
}
}

def topicEventSource(javaMethod: Method): EventSource = {
val topicName = findSubscriptionTopicName(javaMethod)
val consumerGroup = findSubscriptionConsumerGroup(javaMethod)
EventSource.newBuilder().setTopic(topicName).setConsumerGroup(consumerGroup).build()
}

def topicEventSource(clazz: Class[_]): EventSource = {
val topicName = findSubscriptionTopicName(clazz)
val consumerGroup = findSubscriptionConsumerGroup(clazz)
EventSource.newBuilder().setTopic(topicName).setConsumerGroup(consumerGroup).build()
}

def eventingOutForTopic(clazz: Class[_]): Option[Eventing] = {
topicEventDestination(clazz).map(eventSource => Eventing.newBuilder().setOut(eventSource).build())
}

def eventingInForValueEntity(clazz: Class[_], handleDeletes: Boolean): Eventing = {
val entityType = findValueEntityType(clazz)
val eventSource = EventSource
Expand Down Expand Up @@ -330,29 +298,6 @@ private[impl] object ComponentDescriptorFactory {
}
}

def publishToEventStream(component: Class[_]): Option[kalix.ServiceOptions] = {
Option(component.getAnnotation(classOf[ServiceStream])).map { streamAnn =>

val direct = DirectDestination
.newBuilder()
.setEventStreamId(streamAnn.id())

val out = ServiceEventingOut
.newBuilder()
.setDirect(direct)

val eventing =
ServiceEventing
.newBuilder()
.setOut(out)

kalix.ServiceOptions
.newBuilder()
.setEventing(eventing)
.build()
}
}

// TODO: add more validations here
// we should let users know if components are missing required annotations,
// eg: Workflow and Entities require @TypeId, View requires @Consume
Expand All @@ -367,37 +312,6 @@ private[impl] object ComponentDescriptorFactory {
TimedActionDescriptorFactory
}

def combineByES(
subscriptions: Seq[KalixMethod],
serializer: JsonSerializer,
component: Class[_]): Seq[KalixMethod] = {

def groupByES(methods: Seq[KalixMethod]): Map[String, Seq[KalixMethod]] = {
val withEventSourcedIn = methods.filter(kalixMethod =>
kalixMethod.methodOptions.exists(option =>
option.hasEventing && option.getEventing.hasIn && option.getEventing.getIn.hasEventSourcedEntity))
//Assuming there is only one eventing.in annotation per method, therefore head is as good as any other
withEventSourcedIn.groupBy(m => m.methodOptions.head.getEventing.getIn.getEventSourcedEntity)
}

combineBy("ES", groupByES(subscriptions), serializer, component)
}

def combineByTopic(
kalixMethods: Seq[KalixMethod],
serializer: JsonSerializer,
component: Class[_]): Seq[KalixMethod] = {
def groupByTopic(methods: Seq[KalixMethod]): Map[String, Seq[KalixMethod]] = {
val withTopicIn = methods.filter(kalixMethod =>
kalixMethod.methodOptions.exists(option =>
option.hasEventing && option.getEventing.hasIn && option.getEventing.getIn.hasTopic))
//Assuming there is only one topic annotation per method, therefore head is as good as any other
withTopicIn.groupBy(m => m.methodOptions.head.getEventing.getIn.getTopic)
}

combineBy("Topic", groupByTopic(kalixMethods), serializer, component)
}

def combineBy(
sourceName: String,
groupedSubscriptions: Map[String, Seq[KalixMethod]],
Expand Down Expand Up @@ -445,10 +359,6 @@ private[impl] object ComponentDescriptorFactory {
value.replaceAll("[\\._\\-]", "")
}

private[impl] def buildEventingOutOptions(clazz: Class[_]): Option[MethodOptions] =
eventingOutForTopic(clazz)
.map(eventingOut => kalix.MethodOptions.newBuilder().setEventing(eventingOut).build())

def mergeServiceOptions(allOptions: Option[kalix.ServiceOptions]*): Option[ServiceOptions] = {
val mergedOptions =
allOptions.flatten
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,4 @@ private[javasdk] object WorkflowExceptions {
def apply(init: WorkflowEntityInit, message: String): WorkflowException =
ProtocolException(init.entityId, message)
}

def failureMessageForLog(cause: Throwable): String = cause match {
case WorkflowException(workflowId, commandName, _, _) =>
val workflowDescription = if (workflowId.nonEmpty) s" [$workflowId]" else ""
s"Terminating workflow$workflowDescription due to unexpected failure for command [$commandName]"
case _ => "Terminating workflow due to unexpected failure"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private[impl] final class ConsumerImpl[C <: Consumer](
val messageContext = createMessageContext(message, span)
val payload: BytesPayload = message.payload.getOrElse(throw new IllegalArgumentException("No message payload"))
val effect = createRouter()
.handleUnary(message.name, MessageEnvelope.of(payload, messageContext.metadata()), messageContext)
.handleUnary(MessageEnvelope.of(payload, messageContext.metadata()), messageContext)
toSpiEffect(message, effect)
} catch {
case NonFatal(ex) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
package akka.javasdk.impl.consumer

import java.util.Optional

import ConsumerRouter.HandlerNotFound
import akka.annotation.InternalApi
import akka.javasdk.consumer.Consumer
import akka.javasdk.consumer.MessageContext
import akka.javasdk.consumer.MessageEnvelope
import akka.runtime.sdk.spi.BytesPayload

/**
* INTERNAL API
Expand All @@ -28,31 +30,27 @@ private[impl] abstract class ConsumerRouter[A <: Consumer](protected val consume
/**
* Handle a unary call.
*
* @param commandName
* The name of the command this call is for.
* @param message
* The message envelope of the message.
* @param context
* The message context.
* @return
* A future of the message to return.
*/
final def handleUnary(commandName: String, message: MessageEnvelope[Any], context: MessageContext): Consumer.Effect =
final def handleUnary(message: MessageEnvelope[BytesPayload], context: MessageContext): Consumer.Effect =
callWithContext(context) { () =>
handleUnary(commandName, message)
handleUnary(message)
}

/**
* Handle a unary call.
*
* @param commandName
* The name of the command this call is for.
* @param message
* The message envelope of the message.
* @return
* A future of the message to return.
*/
def handleUnary(commandName: String, message: MessageEnvelope[Any]): Consumer.Effect
def handleUnary(message: MessageEnvelope[BytesPayload]): Consumer.Effect

//TODO rethink this part
private def callWithContext[T](context: MessageContext)(func: () => T) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ private[impl] class ReflectiveConsumerRouter[A <: Consumer](
ignoreUnknown: Boolean)
extends ConsumerRouter[A](consumer) {

override def handleUnary(commandName: String, message: MessageEnvelope[Any]): Consumer.Effect = {
override def handleUnary(message: MessageEnvelope[BytesPayload]): Consumer.Effect = {

val payload = message.payload().asInstanceOf[BytesPayload]
val payload = message.payload()
// make sure we route based on the new type url if we get an old json type url message
val inputTypeUrl = serializer.removeVersion(AnySupport.replaceLegacyJsonPrefix(payload.contentType))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ private[impl] final class ReflectiveTimedActionRouter[A <: TimedAction](
serializer: JsonSerializer)
extends TimedActionRouter[A](action) {

private def commandHandlerLookup(commandName: String) =
private def commandHandlerLookup(methodName: String) =
commandHandlers.getOrElse(
commandName,
methodName,
throw new RuntimeException(
s"no matching method for '$commandName' on [${action.getClass}], existing are [${commandHandlers.keySet
s"no matching method for '$methodName' on [${action.getClass}], existing are [${commandHandlers.keySet
.mkString(", ")}]"))

override def handleUnary(commandName: String, message: CommandEnvelope[Any]): TimedAction.Effect = {
override def handleUnary(methodName: String, message: CommandEnvelope[BytesPayload]): TimedAction.Effect = {

val commandHandler = commandHandlerLookup(commandName)
val commandHandler = commandHandlerLookup(methodName)

val payload = message.payload().asInstanceOf[BytesPayload]
val payload = message.payload()
// make sure we route based on the new type url if we get an old json type url message
val updatedContentType = AnySupport.replaceLegacyJsonPrefix(payload.contentType)
if ((AnySupport.isJson(updatedContentType) || payload.bytes.isEmpty) && commandHandler.isSingleNameInvoker) {
Expand All @@ -49,7 +49,7 @@ private[impl] final class ReflectiveTimedActionRouter[A <: TimedAction](
result.asInstanceOf[TimedAction.Effect]
} else {
throw new IllegalStateException(
"Could not find a matching command handler for command: " + commandName + ", content type: " + updatedContentType + ", invokers keys: " + commandHandler.methodInvokers.keys
"Could not find a matching command handler for method: " + methodName + ", content type: " + updatedContentType + ", invokers keys: " + commandHandler.methodInvokers.keys
.mkString(", "))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ private[impl] final class TimedActionImpl[TA <: TimedAction](
val fut =
try {
val commandContext = createCommandContext(command, span)
//TODO reverting to previous version, timers payloads are always json.akka.io/object
val payload: BytesPayload = command.payload.getOrElse(throw new IllegalArgumentException("No command payload"))
val effect = createRouter()
.handleUnary(command.name, CommandEnvelope.of(payload, commandContext.metadata()), commandContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import akka.annotation.InternalApi
import akka.javasdk.timedaction.CommandContext
import akka.javasdk.timedaction.CommandEnvelope
import akka.javasdk.timedaction.TimedAction

import java.util.Optional

import akka.runtime.sdk.spi.BytesPayload

/**
* INTERNAL API
*/
Expand All @@ -29,8 +30,8 @@ abstract class TimedActionRouter[A <: TimedAction](protected val action: A) {
/**
* Handle a unary call.
*
* @param commandName
* The name of the command this call is for.
* @param methodName
* The name of the method to call.
* @param message
* The message envelope of the message.
* @param context
Expand All @@ -39,25 +40,25 @@ abstract class TimedActionRouter[A <: TimedAction](protected val action: A) {
* A future of the message to return.
*/
final def handleUnary(
commandName: String,
message: CommandEnvelope[Any],
methodName: String,
message: CommandEnvelope[BytesPayload],
context: CommandContext): TimedAction.Effect =
callWithContext(context) { () =>
handleUnary(commandName, message)
handleUnary(methodName, message)
}

/**
* Handle a unary call.
*
* @param commandName
* The name of the command this call is for.
* @param methodName
* The name of the method to call.
* @param message
* The message envelope of the message.
* @return
* A future of the message to return.
*/
//TODO commandName rename to methodName
def handleUnary(commandName: String, message: CommandEnvelope[Any]): TimedAction.Effect
def handleUnary(methodName: String, message: CommandEnvelope[BytesPayload]): TimedAction.Effect

private def callWithContext[T](context: CommandContext)(func: () => T) = {
// only set, never cleared, to allow access from other threads in async callbacks in the action
Expand All @@ -70,6 +71,4 @@ abstract class TimedActionRouter[A <: TimedAction](protected val action: A) {
throw new RuntimeException(s"No call handler found for call $name on ${action.getClass.getName}")
}
}

def actionClass(): Class[_] = action.getClass
}
Loading