Skip to content

Commit

Permalink
feat: gRPC endpoint support (#161)
Browse files Browse the repository at this point in the history
Initial setup
  • Loading branch information
johanandren authored Jan 23, 2025
1 parent 87a87df commit 4a244fb
Show file tree
Hide file tree
Showing 19 changed files with 326 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package com.example;

import akka.javasdk.annotations.GrpcEndpoint;

@GrpcEndpoint
public class UserGrpcServiceImpl {
// would extend generated service stub and implement grpc methods but that's not important for this test

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@ import org.scalatest.wordspec.AnyWordSpec
class EndpointsDescriptorSpec extends AnyWordSpec with Matchers {

"akka-javasdk-components.conf" should {
"contain http endpoints components" in {
val config = ConfigFactory.load("META-INF/akka-javasdk-components.conf")
val config = ConfigFactory.load("META-INF/akka-javasdk-components.conf")

"contain http endpoint components" in {
val endpointComponents = config.getStringList("akka.javasdk.components.http-endpoint")
endpointComponents.size() shouldBe 2
endpointComponents should contain("com.example.HelloController")
endpointComponents should contain("com.example.UserRegistryController")
}

"contain grpc endpoint components" in {
val endpointComponents = config.getStringList("akka.javasdk.components.grpc-endpoint")
endpointComponents.size() shouldBe 1
endpointComponents should contain("com.example.UserGrpcServiceImpl")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
@SupportedAnnotationTypes(
{
"akka.javasdk.annotations.http.HttpEndpoint",
"akka.javasdk.annotations.GrpcEndpoint",
// all components will have this
"akka.javasdk.annotations.ComponentId",
// central config/lifecycle class
Expand All @@ -53,6 +54,7 @@ public class ComponentAnnotationProcessor extends AbstractProcessor {

// key of each component type under that parent path, containing a string list of concrete component classes
private static final String HTTP_ENDPOINT_KEY = "http-endpoint";
private static final String GRPC_ENDPOINT_KEY = "grpc-endpoint";
private static final String EVENT_SOURCED_ENTITY_KEY = "event-sourced-entity";
private static final String VALUE_ENTITY_KEY = "key-value-entity";
private static final String TIMED_ACTION_KEY = "timed-action";
Expand All @@ -61,7 +63,9 @@ public class ComponentAnnotationProcessor extends AbstractProcessor {
private static final String WORKFLOW_KEY = "workflow";
private static final String SERVICE_SETUP_KEY = "service-setup";

private static final List<String> ALL_COMPONENT_TYPES = List.of(HTTP_ENDPOINT_KEY, EVENT_SOURCED_ENTITY_KEY, VALUE_ENTITY_KEY, TIMED_ACTION_KEY, CONSUMER_KEY, VIEW_KEY, WORKFLOW_KEY, SERVICE_SETUP_KEY);
private static final List<String> ALL_COMPONENT_TYPES = List.of(HTTP_ENDPOINT_KEY, GRPC_ENDPOINT_KEY,
EVENT_SOURCED_ENTITY_KEY, VALUE_ENTITY_KEY, TIMED_ACTION_KEY, CONSUMER_KEY, VIEW_KEY, WORKFLOW_KEY,
SERVICE_SETUP_KEY);


private final boolean debugEnabled;
Expand Down Expand Up @@ -124,6 +128,7 @@ public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment
private String componentTypeFor(Element annotatedClass, TypeElement annotation) {
return switch (annotation.getQualifiedName().toString()) {
case "akka.javasdk.annotations.http.HttpEndpoint" -> HTTP_ENDPOINT_KEY;
case "akka.javasdk.annotations.GrpcEndpoint" -> GRPC_ENDPOINT_KEY;
case "akka.javasdk.annotations.Setup" -> SERVICE_SETUP_KEY;
case "akka.javasdk.annotations.ComponentId" -> componentType(annotatedClass);
default -> throw new IllegalArgumentException("Unknown annotation type: " + annotation.getQualifiedName());
Expand Down
2 changes: 1 addition & 1 deletion akka-javasdk-maven/akka-javasdk-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

<!-- These are dependent on runtime environment and cannot be customized by users -->
<maven.compiler.release>21</maven.compiler.release>
<kalix-runtime.version>1.3.0</kalix-runtime.version>
<kalix-runtime.version>1.3.1-6cd7992</kalix-runtime.version>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<skip.docker>false</skip.docker>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akkajavasdk.components.grpc;

import akka.javasdk.annotations.GrpcEndpoint;
import akkajavasdk.protocol.*;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@GrpcEndpoint
public class TestGrpcServiceImpl implements TestGrpcService {
@Override
public CompletionStage<TestGrpcServiceOuterClass.Out> simple(TestGrpcServiceOuterClass.In in) {
return CompletableFuture.completedFuture(
TestGrpcServiceOuterClass.Out.newBuilder().setData(in.getData()).build()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>

// gRPC interface for a gRPC endpoint component

syntax = "proto3";

package akkajavasdk;

option java_package = "akkajavasdk.protocol";

message In {
string data = 1;
}

message Out {
string data = 1;
}

service TestGrpcService {
rpc Simple(In) returns (Out) {};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.javasdk.annotations;

import java.lang.annotation.*;

/**
* Mark a class to be made available as a gRPC endpoint. The annotated class should extend a gRPC service interface
* generated using Akka gRPC, be public and have a public constructor.
* <p>
* Annotated classes can accept the following types to the constructor:
* <ul>
* <li>{@link akka.javasdk.client.ComponentClient}</li>
* <li>{@link akka.javasdk.http.HttpClientProvider}</li>
* <li>{@link akka.javasdk.timer.TimerScheduler}</li>
* <li>{@link akka.stream.Materializer}</li>
* <li>{@link com.typesafe.config.Config}</li>
* <li>Custom types provided by a {@link akka.javasdk.DependencyProvider} from the service setup</li>
* </ul>
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface GrpcEndpoint {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.javasdk.impl

import akka.actor.typed.ActorSystem
import akka.grpc.ServiceDescription
import akka.grpc.scaladsl.InstancePerRequestFactory
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.runtime.sdk.spi.GrpcEndpointDescriptor
import akka.runtime.sdk.spi.GrpcEndpointRequestConstructionContext

import scala.concurrent.Future

object GrpcEndpointDescriptorFactory {

def apply[T](grpcEndpointClass: Class[T], factory: () => T)(implicit
system: ActorSystem[_]): GrpcEndpointDescriptor[T] = {
// FIXME now way right now to know that it is a gRPC service interface
val serviceDefinitionClass: Class[_] = {
val interfaces = grpcEndpointClass.getInterfaces
if (interfaces.length != 1) {
throw new IllegalArgumentException(
s"Class [${grpcEndpointClass.getName}] must implement exactly one interface, the gRPC service generated by Akka gRPC.")
}
interfaces(0)
}

// FIXME a derivative should be injectable into user code as well
val instanceFactory = { (_: GrpcEndpointRequestConstructionContext) =>
factory()
}

val handlerFactory =
system.dynamicAccess
.createInstanceFor[InstancePerRequestFactory[T]](serviceDefinitionClass.getName + "ScalaHandlerFactory", Nil)
.get

// Pick up generated companion object for file descriptor (for reflection) and creating router
// static akka.grpc.ServiceDescription description in generated service interface
val description = serviceDefinitionClass.getField("description").get(null).asInstanceOf[ServiceDescription]
if (description eq null)
throw new RuntimeException(
s"Could not access static description from gRPC service interface [${serviceDefinitionClass.getName}]")

val routeFactory: (HttpRequest => T) => PartialFunction[HttpRequest, Future[HttpResponse]] = { serviceFactory =>
handlerFactory.partialInstancePerRequest(
serviceFactory,
description.name,
// FIXME default error handler, is it fine to leave like this, should runtime define?
PartialFunction.empty,
system)
}

new GrpcEndpointDescriptor[T](
grpcEndpointClass.getName,
description.name,
description.descriptor,
instanceFactory,
routeFactory)
}

}
22 changes: 19 additions & 3 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import java.lang.reflect.Method
import java.util
import java.util.Optional
import java.util.concurrent.CompletionStage

import scala.annotation.nowarn
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
Expand All @@ -21,7 +20,6 @@ import scala.jdk.OptionConverters.RichOption
import scala.jdk.OptionConverters.RichOptional
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import akka.Done
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
Expand All @@ -34,6 +32,7 @@ import akka.javasdk.Principals
import akka.javasdk.ServiceSetup
import akka.javasdk.Tracing
import akka.javasdk.annotations.ComponentId
import akka.javasdk.annotations.GrpcEndpoint
import akka.javasdk.annotations.Setup
import akka.javasdk.annotations.http.HttpEndpoint
import akka.javasdk.client.ComponentClient
Expand Down Expand Up @@ -201,6 +200,7 @@ private object ComponentType {
val KeyValueEntity = "key-value-entity"
val Workflow = "workflow"
val HttpEndpoint = "http-endpoint"
val GrpcEndpoint = "grpc-endpoint"
val Consumer = "consumer"
val TimedAction = "timed-action"
val View = "view"
Expand All @@ -225,6 +225,7 @@ private object ComponentLocator {
val kalixComponentTypeAndBaseClasses: Map[String, Class[_]] =
Map(
ComponentType.HttpEndpoint -> classOf[AnyRef],
ComponentType.GrpcEndpoint -> classOf[AnyRef],
ComponentType.TimedAction -> classOf[TimedAction],
ComponentType.Consumer -> classOf[Consumer],
ComponentType.EventSourcedEntity -> classOf[EventSourcedEntity[_, _]],
Expand Down Expand Up @@ -349,7 +350,7 @@ private final class Sdk(
true
} else {
//additional check to skip logging for endpoints
if (!clz.hasAnnotation[HttpEndpoint]) {
if (!clz.hasAnnotation[HttpEndpoint] && !clz.hasAnnotation[GrpcEndpoint]) {
//this could happen when we remove the @ComponentId annotation from the class,
//the file descriptor generated by annotation processor might still have this class entry,
//for instance when working with IDE and incremental compilation (without clean)
Expand Down Expand Up @@ -415,6 +416,13 @@ private final class Sdk(
HttpEndpointDescriptorFactory(httpEndpointClass, httpEndpointFactory(httpEndpointClass))
}

private val grpcEndpointDescriptors = componentClasses
.filter(Reflect.isGrpcEndpoint)
.map { grpcEndpointClass =>
val anyRefClass = grpcEndpointClass.asInstanceOf[Class[AnyRef]]
GrpcEndpointDescriptorFactory(anyRefClass, grpcEndpointFactory(anyRefClass))(system)
}

private var eventSourcedEntityDescriptors = Vector.empty[EventSourcedEntityDescriptor]
private var keyValueEntityDescriptors = Vector.empty[EventSourcedEntityDescriptor]
private var workflowDescriptors = Vector.empty[WorkflowDescriptor]
Expand Down Expand Up @@ -596,6 +604,7 @@ private final class Sdk(
(eventSourcedEntityDescriptors ++
keyValueEntityDescriptors ++
httpEndpointDescriptors ++
grpcEndpointDescriptors ++
timedActionDescriptors ++
consumerDescriptors ++
viewDescriptors ++
Expand Down Expand Up @@ -709,6 +718,13 @@ private final class Sdk(
instance
}

private def grpcEndpointFactory[E](grpcEndpointClass: Class[E]): () => E = () => {
wiredInstance(grpcEndpointClass) {
// FIXME missing span from request
sideEffectingComponentInjects(None)
}
}

private def wiredInstance[T](clz: Class[T])(partial: PartialFunction[Class[_], Any]): T = {
// only one constructor allowed
require(clz.getDeclaredConstructors.length == 1, s"Class [${clz.getSimpleName}] must have only one constructor.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package akka.javasdk.impl.reflection

import akka.annotation.InternalApi
import akka.javasdk.annotations.GrpcEndpoint
import akka.javasdk.annotations.http.HttpEndpoint
import akka.javasdk.client.ComponentClient
import akka.javasdk.consumer.Consumer
Expand All @@ -15,14 +16,14 @@ import akka.javasdk.timedaction.TimedAction
import akka.javasdk.view.TableUpdater
import akka.javasdk.view.View
import akka.javasdk.workflow.Workflow

import java.lang.annotation.Annotation
import java.lang.reflect.AnnotatedElement
import java.lang.reflect.Method
import java.lang.reflect.Modifier
import java.lang.reflect.ParameterizedType
import java.util
import java.util.Optional

import scala.annotation.tailrec
import scala.reflect.ClassTag

Expand Down Expand Up @@ -60,6 +61,9 @@ private[impl] object Reflect {
def isRestEndpoint(cls: Class[_]): Boolean =
cls.getAnnotation(classOf[HttpEndpoint]) != null

def isGrpcEndpoint(cls: Class[_]): Boolean =
cls.getAnnotation(classOf[GrpcEndpoint]) != null

def isEntity(cls: Class[_]): Boolean =
classOf[EventSourcedEntity[_, _]].isAssignableFrom(cls) ||
classOf[KeyValueEntity[_]].isAssignableFrom(cls)
Expand Down
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ lazy val akkaJavaSdkTestKit =

lazy val akkaJavaSdkTests =
Project(id = "akka-javasdk-tests", base = file("akka-javasdk-tests"))
.enablePlugins(AkkaGrpcPlugin)
.dependsOn(akkaJavaSdk, akkaJavaSdkTestKit)
.settings(
name := "akka-javasdk-testkit",
Expand All @@ -67,7 +68,11 @@ lazy val akkaJavaSdkTests =
Test / javacOptions ++= Seq("-parameters"),
// only tests here
publish / skip := true,
doc / sources := Seq.empty)
doc / sources := Seq.empty,
// generating test service
Test / akkaGrpcGeneratedLanguages := Seq(AkkaGrpc.Java),
Test / akkaGrpcGeneratedSources := Seq(AkkaGrpc.Client, AkkaGrpc.Server),
Test / PB.protoSources ++= (Compile / PB.protoSources).value)
.settings(inConfig(Test)(JupiterPlugin.scopedSettings))
.settings(Dependencies.tests)

Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ object Dependencies {
val ProtocolVersionMinor = 1
val RuntimeImage = "gcr.io/kalix-public/kalix-runtime"
// Remember to bump kalix-runtime.version in akka-javasdk-maven/akka-javasdk-parent if bumping this
val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0")
val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.1-6cd7992")
}
// NOTE: embedded SDK should have the AkkaVersion aligned, when updating RuntimeVersion, make sure to check
// if AkkaVersion and AkkaHttpVersion are aligned
Expand Down
3 changes: 2 additions & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
resolvers += "Akka repository".at("https://repo.akka.io/maven")

addSbtPlugin("com.github.sbt" % "sbt-dynver" % "5.0.1")
addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.4.4")
// Note: akka-grpc must be carefully kept in sync with the version used in the runtime
addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.5.1")
addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.7.0")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.7.0")
Expand Down
Loading

0 comments on commit 4a244fb

Please sign in to comment.