Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: View SPI #72

Merged
merged 10 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion akka-javasdk-maven/akka-javasdk-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

<!-- These are dependent on runtime environment and cannot be customized by users -->
<maven.compiler.release>21</maven.compiler.release>
<kalix-runtime.version>1.3.0-f2e86bc</kalix-runtime.version>
<kalix-runtime.version>1.3.0-8e0bc86</kalix-runtime.version>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<skip.docker>false</skip.docker>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,14 @@ public void verifyFindCounterByValue() {
var emptyCounter = await(
componentClient.forView()
.method(CountersByValue::getCounterByValue)
.invokeAsync(CountersByValue.queryParam(10)));
.invokeAsync(CountersByValue.queryParam(101)));

assertThat(emptyCounter).isEmpty();

await(
componentClient.forEventSourcedEntity("abc")
.method(CounterEntity::increase)
.invokeAsync(10));
.invokeAsync(101));


// the view is eventually updated
Expand All @@ -228,26 +228,27 @@ public void verifyFindCounterByValue() {
var byValue = await(
componentClient.forView()
.method(CountersByValue::getCounterByValue)
.invokeAsync(CountersByValue.queryParam(10)));
.invokeAsync(CountersByValue.queryParam(101)));

assertThat(byValue).hasValue(new Counter(10));
assertThat(byValue).hasValue(new Counter(101));
});
}

@Disabled // pending primitive query parameters working
@Test
public void verifyHierarchyView() {

var emptyCounter = await(
componentClient.forView()
.method(HierarchyCountersByValue::getCounterByValue)
.invokeAsync(10));
.invokeAsync(201));

assertThat(emptyCounter).isEmpty();

await(
componentClient.forEventSourcedEntity("bcd")
.method(CounterEntity::increase)
.invokeAsync(20));
.invokeAsync(201));


// the view is eventually updated
Expand All @@ -259,9 +260,9 @@ public void verifyHierarchyView() {
var byValue = await(
componentClient.forView()
.method(HierarchyCountersByValue::getCounterByValue)
.invokeAsync(20));
.invokeAsync(201));

assertThat(byValue).hasValue(new Counter(20));
assertThat(byValue).hasValue(new Counter(201));
});
}

Expand All @@ -271,12 +272,12 @@ public void verifyCounterViewMultipleSubscriptions() {
await(
componentClient.forEventSourcedEntity("hello2")
.method(CounterEntity::increase)
.invokeAsync(1));
.invokeAsync(74));

await(
componentClient.forEventSourcedEntity("hello3")
.method(CounterEntity::increase)
.invokeAsync(1));
.invokeAsync(74));

Awaitility.await()
.ignoreExceptions()
Expand All @@ -285,15 +286,15 @@ public void verifyCounterViewMultipleSubscriptions() {
() ->
await(componentClient.forView()
.method(CountersByValueSubscriptions::getCounterByValue)
.invokeAsync(new CountersByValueSubscriptions.QueryParameters(1)))
.invokeAsync(new CountersByValueSubscriptions.QueryParameters(74)))
.counters().size(),
new IsEqual<>(2));
}

@Test
public void verifyTransformedUserViewWiring() {

TestUser user = new TestUser("123", "john@doe.com", "JohnDoe");
TestUser user = new TestUser("123", "john123@doe.com", "JohnDoe");

createUser(user);

Expand All @@ -317,7 +318,7 @@ public void verifyTransformedUserViewWiring() {
@Test
public void verifyUserSubscriptionAction() {

TestUser user = new TestUser("123", "john@doe.com", "JohnDoe");
TestUser user = new TestUser("123", "john345@doe.com", "JohnDoe");

createUser(user);

Expand All @@ -337,6 +338,7 @@ public void verifyUserSubscriptionAction() {
}


@Disabled // pending primitive query parameters working
@Test
public void shouldAcceptPrimitivesForViewQueries() {

Expand Down Expand Up @@ -474,7 +476,7 @@ public void verifyFindUsersByEmailAndName() {
// the view is eventually updated
Awaitility.await()
.ignoreExceptions()
.atMost(10, TimeUnit.SECONDS)
.atMost(20, TimeUnit.SECONDS)
.untilAsserted(
() -> {
var request = new UsersByEmailAndName.QueryParameters(user.email(), user.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public interface UpdateContext extends MetadataContext {
*/
Optional<String> eventSubject();

// FIXME is this needed anymore?
/** The name of the event being handled. */
String eventName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import akka.javasdk.impl.reflection.KalixMethod
import akka.javasdk.impl.reflection.NameGenerator
import akka.javasdk.impl.reflection.Reflect
import akka.javasdk.impl.serialization.JsonSerializer
import akka.javasdk.impl.view.ViewDescriptorFactory
import akka.javasdk.keyvalueentity.KeyValueEntity
import akka.javasdk.timedaction.TimedAction
import akka.javasdk.view.TableUpdater
Expand Down Expand Up @@ -304,8 +303,6 @@ private[impl] object ComponentDescriptorFactory {
def getFactoryFor(component: Class[_]): ComponentDescriptorFactory = {
if (Reflect.isEntity(component) || Reflect.isWorkflow(component))
EntityDescriptorFactory
else if (Reflect.isView(component))
ViewDescriptorFactory
else if (Reflect.isConsumer(component))
ConsumerDescriptorFactory
else
Expand Down
33 changes: 12 additions & 21 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import java.lang.reflect.Constructor
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import java.util.concurrent.CompletionStage

import scala.annotation.nowarn
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
Expand All @@ -18,7 +17,6 @@ import scala.jdk.FutureConverters._
import scala.jdk.OptionConverters.RichOptional
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import akka.Done
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
Expand Down Expand Up @@ -60,8 +58,7 @@ import akka.javasdk.impl.telemetry.SpanTracingImpl
import akka.javasdk.impl.telemetry.TraceInstrumentation
import akka.javasdk.impl.timedaction.TimedActionImpl
import akka.javasdk.impl.timer.TimerSchedulerImpl
import akka.javasdk.impl.view.ViewService
import akka.javasdk.impl.view.ViewsImpl
import akka.javasdk.impl.view.ViewDescriptorFactory
import akka.javasdk.impl.workflow.WorkflowImpl
import akka.javasdk.impl.workflow.WorkflowService
import akka.javasdk.keyvalueentity.KeyValueEntity
Expand All @@ -86,6 +83,7 @@ import akka.runtime.sdk.spi.SpiSettings
import akka.runtime.sdk.spi.SpiWorkflow
import akka.runtime.sdk.spi.StartContext
import akka.runtime.sdk.spi.TimedActionDescriptor
import akka.runtime.sdk.spi.views.SpiViewDescriptor
import akka.runtime.sdk.spi.WorkflowDescriptor
import akka.stream.Materializer
import com.google.protobuf.Descriptors
Expand All @@ -95,7 +93,6 @@ import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.Tracer
import io.opentelemetry.context.{ Context => OtelContext }
import kalix.protocol.discovery.Discovery
import kalix.protocol.view.Views
import org.slf4j.LoggerFactory

/**
Expand Down Expand Up @@ -330,7 +327,7 @@ private final class Sdk(
Some(keyValueEntityService(clz.asInstanceOf[Class[KeyValueEntity[Nothing]]]))
} else if (Reflect.isView(clz)) {
logger.debug(s"Registering View [${clz.getName}]")
Some(viewService(clz.asInstanceOf[Class[View]]))
None // no factory, handled below
} else throw new IllegalArgumentException(s"Component class of unknown component type [$clz]")

service match {
Expand Down Expand Up @@ -531,6 +528,13 @@ private final class Sdk(
logger.warn("Unknown component [{}]", clz.getName)
}

val viewDescriptors: Seq[SpiViewDescriptor] =
componentClasses
.filter(hasComponentId)
.collect {
case clz if classOf[View].isAssignableFrom(clz) => ViewDescriptorFactory(clz, serializer, sdkExecutionContext)
}

// these are available for injecting in all kinds of component that are primarily
// for side effects
// Note: config is also always available through the combination with user DI way down below
Expand All @@ -544,8 +548,6 @@ private final class Sdk(

def spiComponents: SpiComponents = {

var viewsEndpoint: Option[Views] = None

val classicSystem = system.classicSystem

val services = componentServices.map { case (serviceDescriptor, service) =>
Expand All @@ -563,10 +565,6 @@ private final class Sdk(
case (serviceClass, _: Map[String, WorkflowService[_, _]] @unchecked)
if serviceClass == classOf[WorkflowService[_, _]] =>

case (serviceClass, viewServices: Map[String, ViewService[_]] @unchecked)
if serviceClass == classOf[ViewService[_]] =>
viewsEndpoint = Some(new ViewsImpl(viewServices, sdkDispatcherName))

case (serviceClass, _) =>
sys.error(s"Unknown service type: $serviceClass")
}
Expand Down Expand Up @@ -637,11 +635,11 @@ private final class Sdk(
override def consumersDescriptors: Seq[ConsumerDescriptor] =
Sdk.this.consumerDescriptors

override def viewDescriptors: Seq[SpiViewDescriptor] = Sdk.this.viewDescriptors

override def workflowDescriptors: Seq[WorkflowDescriptor] =
Sdk.this.workflowDescriptors

override def views: Option[Views] = viewsEndpoint

}
}

Expand Down Expand Up @@ -681,13 +679,6 @@ private final class Sdk(
private def keyValueEntityService[S, VE <: KeyValueEntity[S]](clz: Class[VE]): KeyValueEntityService[S, VE] =
new KeyValueEntityService(clz, serializer)

private def viewService[V <: View](clz: Class[V]): ViewService[V] =
new ViewService[V](
clz,
serializer,
// remember to update component type API doc and docs if changing the set of injectables
wiredInstance(_)(PartialFunction.empty))

private def httpEndpointFactory[E](httpEndpointClass: Class[E]): HttpEndpointConstructionContext => E = {
(context: HttpEndpointConstructionContext) =>
lazy val requestContext = new RequestContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[javasdk] object ViewClientImpl {
// extract view id
val declaringClass = method.getDeclaringClass
val componentId = ComponentDescriptorFactory.readComponentIdIdValue(declaringClass)
val methodName = method.getName.capitalize
val methodName = method.getName
val queryReturnType = getViewQueryReturnType(method)
ViewMethodProperties(componentId, method, methodName, declaringClass, queryReturnType)
}
Expand Down

This file was deleted.

Loading
Loading