Skip to content

Commit

Permalink
feat: gRPC endpoint support
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Jan 20, 2025
1 parent 87a87df commit 4eb8068
Show file tree
Hide file tree
Showing 13 changed files with 196 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package com.example;

import akka.javasdk.annotations.GrpcEndpoint;

@GrpcEndpoint
public class UserGrpcServiceImpl {
// would extend generated service stub and implement grpc methods but that's not important for this test

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@ import org.scalatest.wordspec.AnyWordSpec
class EndpointsDescriptorSpec extends AnyWordSpec with Matchers {

"akka-javasdk-components.conf" should {
"contain http endpoints components" in {
val config = ConfigFactory.load("META-INF/akka-javasdk-components.conf")
val config = ConfigFactory.load("META-INF/akka-javasdk-components.conf")

"contain http endpoint components" in {
val endpointComponents = config.getStringList("akka.javasdk.components.http-endpoint")
endpointComponents.size() shouldBe 2
endpointComponents should contain("com.example.HelloController")
endpointComponents should contain("com.example.UserRegistryController")
}

"contain grpc endpoint components" in {
val endpointComponents = config.getStringList("akka.javasdk.components.grpc-endpoint")
endpointComponents.size() shouldBe 1
endpointComponents should contain("com.example.UserGrpcServiceImpl")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
@SupportedAnnotationTypes(
{
"akka.javasdk.annotations.http.HttpEndpoint",
"akka.javasdk.annotations.GrpcEndpoint",
// all components will have this
"akka.javasdk.annotations.ComponentId",
// central config/lifecycle class
Expand All @@ -53,6 +54,7 @@ public class ComponentAnnotationProcessor extends AbstractProcessor {

// key of each component type under that parent path, containing a string list of concrete component classes
private static final String HTTP_ENDPOINT_KEY = "http-endpoint";
private static final String GRPC_ENDPOINT_KEY = "grpc-endpoint";
private static final String EVENT_SOURCED_ENTITY_KEY = "event-sourced-entity";
private static final String VALUE_ENTITY_KEY = "key-value-entity";
private static final String TIMED_ACTION_KEY = "timed-action";
Expand All @@ -61,7 +63,9 @@ public class ComponentAnnotationProcessor extends AbstractProcessor {
private static final String WORKFLOW_KEY = "workflow";
private static final String SERVICE_SETUP_KEY = "service-setup";

private static final List<String> ALL_COMPONENT_TYPES = List.of(HTTP_ENDPOINT_KEY, EVENT_SOURCED_ENTITY_KEY, VALUE_ENTITY_KEY, TIMED_ACTION_KEY, CONSUMER_KEY, VIEW_KEY, WORKFLOW_KEY, SERVICE_SETUP_KEY);
private static final List<String> ALL_COMPONENT_TYPES = List.of(HTTP_ENDPOINT_KEY, GRPC_ENDPOINT_KEY,
EVENT_SOURCED_ENTITY_KEY, VALUE_ENTITY_KEY, TIMED_ACTION_KEY, CONSUMER_KEY, VIEW_KEY, WORKFLOW_KEY,
SERVICE_SETUP_KEY);


private final boolean debugEnabled;
Expand Down Expand Up @@ -124,6 +128,7 @@ public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment
private String componentTypeFor(Element annotatedClass, TypeElement annotation) {
return switch (annotation.getQualifiedName().toString()) {
case "akka.javasdk.annotations.http.HttpEndpoint" -> HTTP_ENDPOINT_KEY;
case "akka.javasdk.annotations.GrpcEndpoint" -> GRPC_ENDPOINT_KEY;
case "akka.javasdk.annotations.Setup" -> SERVICE_SETUP_KEY;
case "akka.javasdk.annotations.ComponentId" -> componentType(annotatedClass);
default -> throw new IllegalArgumentException("Unknown annotation type: " + annotation.getQualifiedName());
Expand Down
4 changes: 4 additions & 0 deletions akka-javasdk-maven/akka-javasdk-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@

<!-- These are dependent on runtime environment and cannot be customized by users -->
<maven.compiler.release>21</maven.compiler.release>
<<<<<<< Updated upstream
<kalix-runtime.version>1.3.0</kalix-runtime.version>
=======
<kalix-runtime.version>1.3.0-1-dd515bee-SNAPSHOT</kalix-runtime.version>
>>>>>>> Stashed changes

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<skip.docker>false</skip.docker>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akkajavasdk.components.grpc;

import akka.javasdk.annotations.GrpcEndpoint;
import akkajavasdk.protocol.*;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@GrpcEndpoint
public class TestGrpcServiceImpl implements TestGrpcService {
@Override
public CompletionStage<TestGrpcServiceOuterClass.Out> simple(TestGrpcServiceOuterClass.In in) {
return CompletableFuture.completedFuture(
TestGrpcServiceOuterClass.Out.newBuilder().setData(in.getData()).build()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>

// gRPC interface for a gRPC endpoint component

syntax = "proto3";

package akkajavasdk;

option java_package = "akkajavasdk.protocol";

message In {
string data = 1;
}

message Out {
string data = 1;
}

service TestGrpcService {
rpc Simple(In) returns (Out) {};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.javasdk.annotations;

import java.lang.annotation.*;

/**
* Mark a class to be made available as a gRPC endpoint. The annotated class should extend a gRPC service interface
* generated using Akka gRPC, be public and have a public constructor.
* <p>
* Annotated classes can accept the following types to the constructor:
* <ul>
* <li>{@link akka.javasdk.client.ComponentClient}</li>
* <li>{@link akka.javasdk.http.HttpClientProvider}</li>
* <li>{@link akka.javasdk.timer.TimerScheduler}</li>
* <li>{@link akka.stream.Materializer}</li>
* <li>{@link com.typesafe.config.Config}</li>
* <li>Custom types provided by a {@link akka.javasdk.DependencyProvider} from the service setup</li>
* </ul>
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface GrpcEndpoint {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.javasdk.impl

import akka.actor.typed.ActorSystem
import akka.grpc.AkkaGrpcGenerated
import akka.grpc.scaladsl.InstancePerRequestFactory
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.runtime.sdk.spi.GrpcEndpointDescriptor
import akka.runtime.sdk.spi.GrpcEndpointRequestConstructionContext

import scala.concurrent.Future

object GrpcEndpointDescriptorFactory {

def apply[T](grpcEndpointClass: Class[T], factory: () => T)(implicit
system: ActorSystem[_]): GrpcEndpointDescriptor[T] = {
val serviceDefinitionClass: Class[_] = grpcEndpointClass.getSuperclass

// Validate that it is a grpc service (no supertype so this is the best we can do)
if (serviceDefinitionClass.getAnnotation(classOf[AkkaGrpcGenerated]) == null) {
throw new IllegalArgumentException(
s"Class [${grpcEndpointClass.getName}] is with @GrpcEndpoint but the direct supertype [${serviceDefinitionClass.getName}] generated by Akka gRPC. " +
"This is not supported.")
}

val instanceFactory = { (_: GrpcEndpointRequestConstructionContext) =>
factory()
}

// FIXME creating router is concrete method, not any interface, would be better to have an interface and not do reflectively
// FIXME service lifecycle - one instance vs per request for HTTP endpoint, should we align?
val handlerFactory =
system.dynamicAccess
.createInstanceFor[InstancePerRequestFactory[T]](serviceDefinitionClass.getName + "ScalaHandlerFactory", Nil)
.get

val routeFactory: (HttpRequest => T) => PartialFunction[HttpRequest, Future[HttpResponse]] = { serviceFactory =>
handlerFactory.partialInstancePerRequest(
serviceFactory,
"",
// FIXME default error handler, is it fine to leave like this, should runtime define?
PartialFunction.empty,
system)
}

// Pick up generated companion object for file descriptor (for reflection) and creating router
val companion = system.dynamicAccess.getObjectFor[akka.grpc.ServiceDescription](grpcEndpointClass.getName).get

new GrpcEndpointDescriptor[T](
grpcEndpointClass.getName,
companion.name,
companion.descriptor,
instanceFactory,
routeFactory)
}

}
20 changes: 18 additions & 2 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import java.lang.reflect.Method
import java.util
import java.util.Optional
import java.util.concurrent.CompletionStage

import scala.annotation.nowarn
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
Expand All @@ -21,7 +20,6 @@ import scala.jdk.OptionConverters.RichOption
import scala.jdk.OptionConverters.RichOptional
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import akka.Done
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
Expand Down Expand Up @@ -415,6 +413,12 @@ private final class Sdk(
HttpEndpointDescriptorFactory(httpEndpointClass, httpEndpointFactory(httpEndpointClass))
}

private val grpcEndpointDescriptors = componentClasses
.filter(Reflect.isGrpcEndpoint)
.filterNot(isDisabled)
.map(grpcEndpointClass =>
GrpcEndpointDescriptorFactory(grpcEndpointClass, grpcEndpointFactory(grpcEndpointClass))(system))

private var eventSourcedEntityDescriptors = Vector.empty[EventSourcedEntityDescriptor]
private var keyValueEntityDescriptors = Vector.empty[EventSourcedEntityDescriptor]
private var workflowDescriptors = Vector.empty[WorkflowDescriptor]
Expand Down Expand Up @@ -593,6 +597,7 @@ private final class Sdk(
(serviceSetup.map(_.disabledComponents().asScala.toSet).getOrElse(Set.empty) ++ disabledComponents).map(_.getName)

val descriptors =
<<<<<<< Updated upstream
(eventSourcedEntityDescriptors ++
keyValueEntityDescriptors ++
httpEndpointDescriptors ++
Expand All @@ -601,6 +606,10 @@ private final class Sdk(
viewDescriptors ++
workflowDescriptors)
.filterNot(isDisabled(combinedDisabledComponents))
=======
eventSourcedEntityDescriptors ++ keyValueEntityDescriptors ++ httpEndpointDescriptors ++ timedActionDescriptors ++
consumerDescriptors ++ viewDescriptors ++ workflowDescriptors ++ grpcEndpointDescriptors
>>>>>>> Stashed changes

val preStart = { (_: ActorSystem[_]) =>
serviceSetup match {
Expand Down Expand Up @@ -709,6 +718,13 @@ private final class Sdk(
instance
}

private def grpcEndpointFactory[E](grpcEndpointClass: Class[E]): () => E = () => {
wiredInstance(grpcEndpointClass) {
// FIXME missing span from request
sideEffectingComponentInjects(None)
}
}

private def wiredInstance[T](clz: Class[T])(partial: PartialFunction[Class[_], Any]): T = {
// only one constructor allowed
require(clz.getDeclaredConstructors.length == 1, s"Class [${clz.getSimpleName}] must have only one constructor.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package akka.javasdk.impl.reflection

import akka.annotation.InternalApi
import akka.javasdk.annotations.GrpcEndpoint
import akka.javasdk.annotations.http.HttpEndpoint
import akka.javasdk.client.ComponentClient
import akka.javasdk.consumer.Consumer
Expand All @@ -15,14 +16,14 @@ import akka.javasdk.timedaction.TimedAction
import akka.javasdk.view.TableUpdater
import akka.javasdk.view.View
import akka.javasdk.workflow.Workflow

import java.lang.annotation.Annotation
import java.lang.reflect.AnnotatedElement
import java.lang.reflect.Method
import java.lang.reflect.Modifier
import java.lang.reflect.ParameterizedType
import java.util
import java.util.Optional

import scala.annotation.tailrec
import scala.reflect.ClassTag

Expand Down Expand Up @@ -60,6 +61,9 @@ private[impl] object Reflect {
def isRestEndpoint(cls: Class[_]): Boolean =
cls.getAnnotation(classOf[HttpEndpoint]) != null

def isGrpcEndpoint(cls: Class[_]): Boolean =
cls.getAnnotation(classOf[GrpcEndpoint]) != null

def isEntity(cls: Class[_]): Boolean =
classOf[EventSourcedEntity[_, _]].isAssignableFrom(cls) ||
classOf[KeyValueEntity[_]].isAssignableFrom(cls)
Expand Down
8 changes: 7 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ lazy val akkaJavaSdkTestKit =

lazy val akkaJavaSdkTests =
Project(id = "akka-javasdk-tests", base = file("akka-javasdk-tests"))
.enablePlugins(AkkaGrpcPlugin)
.dependsOn(akkaJavaSdk, akkaJavaSdkTestKit)
.settings(
name := "akka-javasdk-testkit",
Expand All @@ -67,7 +68,12 @@ lazy val akkaJavaSdkTests =
Test / javacOptions ++= Seq("-parameters"),
// only tests here
publish / skip := true,
doc / sources := Seq.empty)
doc / sources := Seq.empty,
// generating test service
Test / akkaGrpcGeneratedLanguages := Seq(AkkaGrpc.Java),
Test / akkaGrpcGeneratedSources := Seq(AkkaGrpc.Client, AkkaGrpc.Server),
Test / PB.protoSources ++= (Compile / PB.protoSources).value
)
.settings(inConfig(Test)(JupiterPlugin.scopedSettings))
.settings(Dependencies.tests)

Expand Down
4 changes: 4 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ object Dependencies {
val ProtocolVersionMinor = 1
val RuntimeImage = "gcr.io/kalix-public/kalix-runtime"
// Remember to bump kalix-runtime.version in akka-javasdk-maven/akka-javasdk-parent if bumping this
<<<<<<< Updated upstream
val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0")
=======
val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-1-dd515bee-SNAPSHOT")
>>>>>>> Stashed changes
}
// NOTE: embedded SDK should have the AkkaVersion aligned, when updating RuntimeVersion, make sure to check
// if AkkaVersion and AkkaHttpVersion are aligned
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
resolvers += "Akka repository".at("https://repo.akka.io/maven")

addSbtPlugin("com.github.sbt" % "sbt-dynver" % "5.0.1")
addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.4.4")
addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.5.0-14-cac946c2-SNAPSHOT")
addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.7.0")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.7.0")
Expand Down

0 comments on commit 4eb8068

Please sign in to comment.