From 4eb8068db47b55829ef4c3d5dc00a64517cffd94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 16 Jan 2025 17:24:47 +0100 Subject: [PATCH] feat: gRPC endpoint support --- .../java/com/example/UserGrpcServiceImpl.java | 13 ++++ .../com/example/EndpointsDescriptorSpec.scala | 10 ++- .../ComponentAnnotationProcessor.java | 7 ++- .../akka-javasdk-parent/pom.xml | 4 ++ .../components/grpc/TestGrpcServiceImpl.java | 21 +++++++ .../akkajavasdk/test_grpc_service.proto | 21 +++++++ .../javasdk/annotations/GrpcEndpoint.java | 27 ++++++++ .../impl/GrpcEndpointDescriptorFactory.scala | 61 +++++++++++++++++++ .../scala/akka/javasdk/impl/SdkRunner.scala | 20 +++++- .../javasdk/impl/reflection/Reflect.scala | 6 +- build.sbt | 8 ++- project/Dependencies.scala | 4 ++ project/plugins.sbt | 2 +- 13 files changed, 196 insertions(+), 8 deletions(-) create mode 100644 akka-javasdk-annotation-processor-tests/rest-endpoints-descriptors/src/main/java/com/example/UserGrpcServiceImpl.java create mode 100644 akka-javasdk-tests/src/test/java/akkajavasdk/components/grpc/TestGrpcServiceImpl.java create mode 100644 akka-javasdk-tests/src/test/protobuf/akkajavasdk/test_grpc_service.proto create mode 100644 akka-javasdk/src/main/java/akka/javasdk/annotations/GrpcEndpoint.java create mode 100644 akka-javasdk/src/main/scala/akka/javasdk/impl/GrpcEndpointDescriptorFactory.scala diff --git a/akka-javasdk-annotation-processor-tests/rest-endpoints-descriptors/src/main/java/com/example/UserGrpcServiceImpl.java b/akka-javasdk-annotation-processor-tests/rest-endpoints-descriptors/src/main/java/com/example/UserGrpcServiceImpl.java new file mode 100644 index 000000000..e0a259937 --- /dev/null +++ b/akka-javasdk-annotation-processor-tests/rest-endpoints-descriptors/src/main/java/com/example/UserGrpcServiceImpl.java @@ -0,0 +1,13 @@ +/* + * Copyright (C) 2021-2024 Lightbend Inc. + */ + +package com.example; + +import akka.javasdk.annotations.GrpcEndpoint; + +@GrpcEndpoint +public class UserGrpcServiceImpl { + // would extend generated service stub and implement grpc methods but that's not important for this test + +} diff --git a/akka-javasdk-annotation-processor-tests/rest-endpoints-descriptors/src/test/scala/com/example/EndpointsDescriptorSpec.scala b/akka-javasdk-annotation-processor-tests/rest-endpoints-descriptors/src/test/scala/com/example/EndpointsDescriptorSpec.scala index 66703520d..ad28b88e2 100644 --- a/akka-javasdk-annotation-processor-tests/rest-endpoints-descriptors/src/test/scala/com/example/EndpointsDescriptorSpec.scala +++ b/akka-javasdk-annotation-processor-tests/rest-endpoints-descriptors/src/test/scala/com/example/EndpointsDescriptorSpec.scala @@ -11,13 +11,19 @@ import org.scalatest.wordspec.AnyWordSpec class EndpointsDescriptorSpec extends AnyWordSpec with Matchers { "akka-javasdk-components.conf" should { - "contain http endpoints components" in { - val config = ConfigFactory.load("META-INF/akka-javasdk-components.conf") + val config = ConfigFactory.load("META-INF/akka-javasdk-components.conf") + "contain http endpoint components" in { val endpointComponents = config.getStringList("akka.javasdk.components.http-endpoint") endpointComponents.size() shouldBe 2 endpointComponents should contain("com.example.HelloController") endpointComponents should contain("com.example.UserRegistryController") } + + "contain grpc endpoint components" in { + val endpointComponents = config.getStringList("akka.javasdk.components.grpc-endpoint") + endpointComponents.size() shouldBe 1 + endpointComponents should contain("com.example.UserGrpcServiceImpl") + } } } diff --git a/akka-javasdk-annotation-processor/src/main/java/akka/javasdk/tooling/processor/ComponentAnnotationProcessor.java b/akka-javasdk-annotation-processor/src/main/java/akka/javasdk/tooling/processor/ComponentAnnotationProcessor.java index 1b5567181..0b5da1e41 100644 --- a/akka-javasdk-annotation-processor/src/main/java/akka/javasdk/tooling/processor/ComponentAnnotationProcessor.java +++ b/akka-javasdk-annotation-processor/src/main/java/akka/javasdk/tooling/processor/ComponentAnnotationProcessor.java @@ -37,6 +37,7 @@ @SupportedAnnotationTypes( { "akka.javasdk.annotations.http.HttpEndpoint", + "akka.javasdk.annotations.GrpcEndpoint", // all components will have this "akka.javasdk.annotations.ComponentId", // central config/lifecycle class @@ -53,6 +54,7 @@ public class ComponentAnnotationProcessor extends AbstractProcessor { // key of each component type under that parent path, containing a string list of concrete component classes private static final String HTTP_ENDPOINT_KEY = "http-endpoint"; + private static final String GRPC_ENDPOINT_KEY = "grpc-endpoint"; private static final String EVENT_SOURCED_ENTITY_KEY = "event-sourced-entity"; private static final String VALUE_ENTITY_KEY = "key-value-entity"; private static final String TIMED_ACTION_KEY = "timed-action"; @@ -61,7 +63,9 @@ public class ComponentAnnotationProcessor extends AbstractProcessor { private static final String WORKFLOW_KEY = "workflow"; private static final String SERVICE_SETUP_KEY = "service-setup"; - private static final List ALL_COMPONENT_TYPES = List.of(HTTP_ENDPOINT_KEY, EVENT_SOURCED_ENTITY_KEY, VALUE_ENTITY_KEY, TIMED_ACTION_KEY, CONSUMER_KEY, VIEW_KEY, WORKFLOW_KEY, SERVICE_SETUP_KEY); + private static final List ALL_COMPONENT_TYPES = List.of(HTTP_ENDPOINT_KEY, GRPC_ENDPOINT_KEY, + EVENT_SOURCED_ENTITY_KEY, VALUE_ENTITY_KEY, TIMED_ACTION_KEY, CONSUMER_KEY, VIEW_KEY, WORKFLOW_KEY, + SERVICE_SETUP_KEY); private final boolean debugEnabled; @@ -124,6 +128,7 @@ public boolean process(Set annotations, RoundEnvironment private String componentTypeFor(Element annotatedClass, TypeElement annotation) { return switch (annotation.getQualifiedName().toString()) { case "akka.javasdk.annotations.http.HttpEndpoint" -> HTTP_ENDPOINT_KEY; + case "akka.javasdk.annotations.GrpcEndpoint" -> GRPC_ENDPOINT_KEY; case "akka.javasdk.annotations.Setup" -> SERVICE_SETUP_KEY; case "akka.javasdk.annotations.ComponentId" -> componentType(annotatedClass); default -> throw new IllegalArgumentException("Unknown annotation type: " + annotation.getQualifiedName()); diff --git a/akka-javasdk-maven/akka-javasdk-parent/pom.xml b/akka-javasdk-maven/akka-javasdk-parent/pom.xml index 2cf701eaf..480d78c79 100644 --- a/akka-javasdk-maven/akka-javasdk-parent/pom.xml +++ b/akka-javasdk-maven/akka-javasdk-parent/pom.xml @@ -38,7 +38,11 @@ 21 +<<<<<<< Updated upstream 1.3.0 +======= + 1.3.0-1-dd515bee-SNAPSHOT +>>>>>>> Stashed changes UTF-8 false diff --git a/akka-javasdk-tests/src/test/java/akkajavasdk/components/grpc/TestGrpcServiceImpl.java b/akka-javasdk-tests/src/test/java/akkajavasdk/components/grpc/TestGrpcServiceImpl.java new file mode 100644 index 000000000..7485cbd26 --- /dev/null +++ b/akka-javasdk-tests/src/test/java/akkajavasdk/components/grpc/TestGrpcServiceImpl.java @@ -0,0 +1,21 @@ +/* + * Copyright (C) 2021-2024 Lightbend Inc. + */ + +package akkajavasdk.components.grpc; + +import akka.javasdk.annotations.GrpcEndpoint; +import akkajavasdk.protocol.*; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +@GrpcEndpoint +public class TestGrpcServiceImpl implements TestGrpcService { + @Override + public CompletionStage simple(TestGrpcServiceOuterClass.In in) { + return CompletableFuture.completedFuture( + TestGrpcServiceOuterClass.Out.newBuilder().setData(in.getData()).build() + ); + } +} diff --git a/akka-javasdk-tests/src/test/protobuf/akkajavasdk/test_grpc_service.proto b/akka-javasdk-tests/src/test/protobuf/akkajavasdk/test_grpc_service.proto new file mode 100644 index 000000000..e101c827f --- /dev/null +++ b/akka-javasdk-tests/src/test/protobuf/akkajavasdk/test_grpc_service.proto @@ -0,0 +1,21 @@ +// Copyright (C) 2021-2024 Lightbend Inc. + +// gRPC interface for a gRPC endpoint component + +syntax = "proto3"; + +package akkajavasdk; + +option java_package = "akkajavasdk.protocol"; + +message In { + string data = 1; +} + +message Out { + string data = 1; +} + +service TestGrpcService { + rpc Simple(In) returns (Out) {}; +} \ No newline at end of file diff --git a/akka-javasdk/src/main/java/akka/javasdk/annotations/GrpcEndpoint.java b/akka-javasdk/src/main/java/akka/javasdk/annotations/GrpcEndpoint.java new file mode 100644 index 000000000..bef015a0a --- /dev/null +++ b/akka-javasdk/src/main/java/akka/javasdk/annotations/GrpcEndpoint.java @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2021-2024 Lightbend Inc. + */ + +package akka.javasdk.annotations; + +import java.lang.annotation.*; + +/** + * Mark a class to be made available as a gRPC endpoint. The annotated class should extend a gRPC service interface + * generated using Akka gRPC, be public and have a public constructor. + *

+ * Annotated classes can accept the following types to the constructor: + *

    + *
  • {@link akka.javasdk.client.ComponentClient}
  • + *
  • {@link akka.javasdk.http.HttpClientProvider}
  • + *
  • {@link akka.javasdk.timer.TimerScheduler}
  • + *
  • {@link akka.stream.Materializer}
  • + *
  • {@link com.typesafe.config.Config}
  • + *
  • Custom types provided by a {@link akka.javasdk.DependencyProvider} from the service setup
  • + *
+ */ +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface GrpcEndpoint { +} diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/GrpcEndpointDescriptorFactory.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/GrpcEndpointDescriptorFactory.scala new file mode 100644 index 000000000..b2a058fff --- /dev/null +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/GrpcEndpointDescriptorFactory.scala @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2021-2024 Lightbend Inc. + */ + +package akka.javasdk.impl + +import akka.actor.typed.ActorSystem +import akka.grpc.AkkaGrpcGenerated +import akka.grpc.scaladsl.InstancePerRequestFactory +import akka.http.scaladsl.model.HttpRequest +import akka.http.scaladsl.model.HttpResponse +import akka.runtime.sdk.spi.GrpcEndpointDescriptor +import akka.runtime.sdk.spi.GrpcEndpointRequestConstructionContext + +import scala.concurrent.Future + +object GrpcEndpointDescriptorFactory { + + def apply[T](grpcEndpointClass: Class[T], factory: () => T)(implicit + system: ActorSystem[_]): GrpcEndpointDescriptor[T] = { + val serviceDefinitionClass: Class[_] = grpcEndpointClass.getSuperclass + + // Validate that it is a grpc service (no supertype so this is the best we can do) + if (serviceDefinitionClass.getAnnotation(classOf[AkkaGrpcGenerated]) == null) { + throw new IllegalArgumentException( + s"Class [${grpcEndpointClass.getName}] is with @GrpcEndpoint but the direct supertype [${serviceDefinitionClass.getName}] generated by Akka gRPC. " + + "This is not supported.") + } + + val instanceFactory = { (_: GrpcEndpointRequestConstructionContext) => + factory() + } + + // FIXME creating router is concrete method, not any interface, would be better to have an interface and not do reflectively + // FIXME service lifecycle - one instance vs per request for HTTP endpoint, should we align? + val handlerFactory = + system.dynamicAccess + .createInstanceFor[InstancePerRequestFactory[T]](serviceDefinitionClass.getName + "ScalaHandlerFactory", Nil) + .get + + val routeFactory: (HttpRequest => T) => PartialFunction[HttpRequest, Future[HttpResponse]] = { serviceFactory => + handlerFactory.partialInstancePerRequest( + serviceFactory, + "", + // FIXME default error handler, is it fine to leave like this, should runtime define? + PartialFunction.empty, + system) + } + + // Pick up generated companion object for file descriptor (for reflection) and creating router + val companion = system.dynamicAccess.getObjectFor[akka.grpc.ServiceDescription](grpcEndpointClass.getName).get + + new GrpcEndpointDescriptor[T]( + grpcEndpointClass.getName, + companion.name, + companion.descriptor, + instanceFactory, + routeFactory) + } + +} diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala index e34364887..74f79ce2c 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala @@ -10,7 +10,6 @@ import java.lang.reflect.Method import java.util import java.util.Optional import java.util.concurrent.CompletionStage - import scala.annotation.nowarn import scala.concurrent.ExecutionContext import scala.concurrent.Future @@ -21,7 +20,6 @@ import scala.jdk.OptionConverters.RichOption import scala.jdk.OptionConverters.RichOptional import scala.reflect.ClassTag import scala.util.control.NonFatal - import akka.Done import akka.actor.typed.ActorSystem import akka.annotation.InternalApi @@ -415,6 +413,12 @@ private final class Sdk( HttpEndpointDescriptorFactory(httpEndpointClass, httpEndpointFactory(httpEndpointClass)) } + private val grpcEndpointDescriptors = componentClasses + .filter(Reflect.isGrpcEndpoint) + .filterNot(isDisabled) + .map(grpcEndpointClass => + GrpcEndpointDescriptorFactory(grpcEndpointClass, grpcEndpointFactory(grpcEndpointClass))(system)) + private var eventSourcedEntityDescriptors = Vector.empty[EventSourcedEntityDescriptor] private var keyValueEntityDescriptors = Vector.empty[EventSourcedEntityDescriptor] private var workflowDescriptors = Vector.empty[WorkflowDescriptor] @@ -593,6 +597,7 @@ private final class Sdk( (serviceSetup.map(_.disabledComponents().asScala.toSet).getOrElse(Set.empty) ++ disabledComponents).map(_.getName) val descriptors = +<<<<<<< Updated upstream (eventSourcedEntityDescriptors ++ keyValueEntityDescriptors ++ httpEndpointDescriptors ++ @@ -601,6 +606,10 @@ private final class Sdk( viewDescriptors ++ workflowDescriptors) .filterNot(isDisabled(combinedDisabledComponents)) +======= + eventSourcedEntityDescriptors ++ keyValueEntityDescriptors ++ httpEndpointDescriptors ++ timedActionDescriptors ++ + consumerDescriptors ++ viewDescriptors ++ workflowDescriptors ++ grpcEndpointDescriptors +>>>>>>> Stashed changes val preStart = { (_: ActorSystem[_]) => serviceSetup match { @@ -709,6 +718,13 @@ private final class Sdk( instance } + private def grpcEndpointFactory[E](grpcEndpointClass: Class[E]): () => E = () => { + wiredInstance(grpcEndpointClass) { + // FIXME missing span from request + sideEffectingComponentInjects(None) + } + } + private def wiredInstance[T](clz: Class[T])(partial: PartialFunction[Class[_], Any]): T = { // only one constructor allowed require(clz.getDeclaredConstructors.length == 1, s"Class [${clz.getSimpleName}] must have only one constructor.") diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/reflection/Reflect.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/reflection/Reflect.scala index c3eec0dd2..57d180c14 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/reflection/Reflect.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/reflection/Reflect.scala @@ -5,6 +5,7 @@ package akka.javasdk.impl.reflection import akka.annotation.InternalApi +import akka.javasdk.annotations.GrpcEndpoint import akka.javasdk.annotations.http.HttpEndpoint import akka.javasdk.client.ComponentClient import akka.javasdk.consumer.Consumer @@ -15,6 +16,7 @@ import akka.javasdk.timedaction.TimedAction import akka.javasdk.view.TableUpdater import akka.javasdk.view.View import akka.javasdk.workflow.Workflow + import java.lang.annotation.Annotation import java.lang.reflect.AnnotatedElement import java.lang.reflect.Method @@ -22,7 +24,6 @@ import java.lang.reflect.Modifier import java.lang.reflect.ParameterizedType import java.util import java.util.Optional - import scala.annotation.tailrec import scala.reflect.ClassTag @@ -60,6 +61,9 @@ private[impl] object Reflect { def isRestEndpoint(cls: Class[_]): Boolean = cls.getAnnotation(classOf[HttpEndpoint]) != null + def isGrpcEndpoint(cls: Class[_]): Boolean = + cls.getAnnotation(classOf[GrpcEndpoint]) != null + def isEntity(cls: Class[_]): Boolean = classOf[EventSourcedEntity[_, _]].isAssignableFrom(cls) || classOf[KeyValueEntity[_]].isAssignableFrom(cls) diff --git a/build.sbt b/build.sbt index 92785971b..882aef67d 100644 --- a/build.sbt +++ b/build.sbt @@ -57,6 +57,7 @@ lazy val akkaJavaSdkTestKit = lazy val akkaJavaSdkTests = Project(id = "akka-javasdk-tests", base = file("akka-javasdk-tests")) + .enablePlugins(AkkaGrpcPlugin) .dependsOn(akkaJavaSdk, akkaJavaSdkTestKit) .settings( name := "akka-javasdk-testkit", @@ -67,7 +68,12 @@ lazy val akkaJavaSdkTests = Test / javacOptions ++= Seq("-parameters"), // only tests here publish / skip := true, - doc / sources := Seq.empty) + doc / sources := Seq.empty, + // generating test service + Test / akkaGrpcGeneratedLanguages := Seq(AkkaGrpc.Java), + Test / akkaGrpcGeneratedSources := Seq(AkkaGrpc.Client, AkkaGrpc.Server), + Test / PB.protoSources ++= (Compile / PB.protoSources).value + ) .settings(inConfig(Test)(JupiterPlugin.scopedSettings)) .settings(Dependencies.tests) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index b240a3a17..55ce20a4c 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -8,7 +8,11 @@ object Dependencies { val ProtocolVersionMinor = 1 val RuntimeImage = "gcr.io/kalix-public/kalix-runtime" // Remember to bump kalix-runtime.version in akka-javasdk-maven/akka-javasdk-parent if bumping this +<<<<<<< Updated upstream val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0") +======= + val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-1-dd515bee-SNAPSHOT") +>>>>>>> Stashed changes } // NOTE: embedded SDK should have the AkkaVersion aligned, when updating RuntimeVersion, make sure to check // if AkkaVersion and AkkaHttpVersion are aligned diff --git a/project/plugins.sbt b/project/plugins.sbt index 500bf9dc3..9ab9b08a8 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,7 +1,7 @@ resolvers += "Akka repository".at("https://repo.akka.io/maven") addSbtPlugin("com.github.sbt" % "sbt-dynver" % "5.0.1") -addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.4.4") +addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.5.0-14-cac946c2-SNAPSHOT") addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.7.0") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6") addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.7.0")