Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: SDK implementation of SpiEventSourcedEntity #39

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion akka-javasdk-maven/akka-javasdk-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

<!-- These are dependent on runtime environment and cannot be customized by users -->
<maven.compiler.release>21</maven.compiler.release>
<kalix-runtime.version>1.2.2</kalix-runtime.version>
<kalix-runtime.version>1.2.2-7-3e32f0a1-SNAPSHOT</kalix-runtime.version>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<skip.docker>false</skip.docker>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import akka.stream.SystemMaterializer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import kalix.runtime.KalixRuntimeMain;
import kalix.runtime.AkkaRuntimeMain;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -474,7 +474,7 @@ public SpiSettings getSettings() {
applicationConfig = runner.applicationConfig();

Config runtimeConfig = ConfigFactory.empty();
runtimeActorSystem = KalixRuntimeMain.start(Some.apply(runtimeConfig), Some.apply(runner));
runtimeActorSystem = AkkaRuntimeMain.start(Some.apply(runtimeConfig), runner);
// wait for SDK to get on start callback (or fail starting), we need it to set up the component client
var startupContext = runner.started().toCompletableFuture().get(20, TimeUnit.SECONDS);
var componentClients = startupContext.componentClients();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ final class TestKitEventSourcedEntityCommandContext(
override val commandId: Long = 0L,
override val commandName: String = "stubCommandName",
override val sequenceNumber: Long = 0L,
override val isDeleted: Boolean = false,
override val metadata: Metadata = Metadata.EMPTY)
extends CommandContext
with InternalContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import static java.time.temporal.ChronoUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;

@ExtendWith(Junit5LogCapturing.class)
Expand All @@ -47,22 +48,30 @@ public void verifyCounterEventSourcedWiring() {

@Test
public void verifyCounterErrorEffect() {
var counterId = "hello-error";
var client = componentClient.forEventSourcedEntity(counterId);
assertThrows(IllegalArgumentException.class, () ->
increaseCounterWithError(client, -1)
);
}

@Test
public void httpVerifyCounterErrorEffect() {
CompletableFuture<StrictResponse<String>> call = httpClient.POST("/akka/v1.0/entity/counter-entity/c001/increaseWithError")
.withRequestBody(-10)
.responseBodyAs(String.class)
.invokeAsync()
.toCompletableFuture();
.withRequestBody(-10)
.responseBodyAs(String.class)
.invokeAsync()
.toCompletableFuture();

Awaitility.await()
.ignoreExceptions()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {

assertThat(call).isCompletedExceptionally();
assertThat(call.exceptionNow()).isInstanceOf(IllegalArgumentException.class);
assertThat(call.exceptionNow().getMessage()).contains("Value must be greater than 0");
});
.ignoreExceptions()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {

assertThat(call).isCompletedExceptionally();
assertThat(call.exceptionNow()).isInstanceOf(IllegalArgumentException.class);
assertThat(call.exceptionNow().getMessage()).contains("Value must be greater than 0");
});
}

@Test
Expand Down Expand Up @@ -185,6 +194,12 @@ private Integer increaseCounter(EventSourcedEntityClient client, int value) {
.invokeAsync(value));
}

private Counter increaseCounterWithError(EventSourcedEntityClient client, int value) {
return await(client
.method(CounterEntity::increaseWithError)
.invokeAsync(value));
}


private Integer multiplyCounter(EventSourcedEntityClient client, int value) {
return await(client
Expand All @@ -205,4 +220,4 @@ private Integer getCounter(EventSourcedEntityClient client) {
return await(client.method(CounterEntity::get).invokeAsync());
}

}
}
6 changes: 4 additions & 2 deletions akka-javasdk-tests/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@
</logger>

<logger name="akka" level="WARN"/>
<logger name="akka.runtime" level="INFO"/>
<logger name="akka.javasdk" level="INFO"/>
<logger name="akka.runtime" level="DEBUG"/>
<logger name="akka.javasdk" level="DEBUG"/>
<logger name="kalix.runtime.views" level="INFO"/>
<logger name="akka.http" level="WARN"/>
<logger name="io.grpc" level="WARN"/>
<logger name="io.r2dbc" level="WARN"/>
<logger name="akka.javasdk.impl" level="INFO"/>
<logger name="akka.javasdk.testkit" level="INFO"/>

<root level="DEBUG">
<appender-ref ref="CapturingAppender"/>
<!-- <appender-ref ref="STDOUT"/>-->
</root>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public interface CommandContext extends MetadataContext {
*/
String entityId();

boolean isDeleted();

/** Access to tracing for custom app specific tracing. */
Tracing tracing();
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ public void _internalSetCurrentState(S state) {
currentState = Optional.ofNullable(state);
}

/**
* INTERNAL API
* @hidden
*/
@InternalApi
public void _internalClearCurrentState() {
handlingCommands = false;
currentState = Optional.empty();
}

/**
* This is the main event handler method. Whenever an event is persisted, this handler will be called.
* It should return the new state of the entity.
Expand Down
36 changes: 33 additions & 3 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ package akka.javasdk.impl
import java.lang.reflect.Constructor
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.CompletionStage

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.jdk.FutureConverters._
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import akka.Done
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
Expand Down Expand Up @@ -88,10 +90,12 @@ import kalix.protocol.value_entity.ValueEntities
import kalix.protocol.view.Views
import kalix.protocol.workflow_entity.WorkflowEntities
import org.slf4j.LoggerFactory

import scala.jdk.OptionConverters.RichOptional
import scala.jdk.CollectionConverters._

import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityImpl
import akka.runtime.sdk.spi.EventSourcedEntityDescriptor

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -342,12 +346,34 @@ private final class Sdk(
}

// collect all Endpoints and compose them to build a larger router
private val httpEndpoints = componentClasses
private val httpEndpointDescriptors = componentClasses
.filter(Reflect.isRestEndpoint)
.map { httpEndpointClass =>
HttpEndpointDescriptorFactory(httpEndpointClass, httpEndpointFactory(httpEndpointClass))
}

private val eventSourcedEntityDescriptors =
componentClasses
.filter(hasComponentId)
.collect {
case clz if classOf[EventSourcedEntity[_, _]].isAssignableFrom(clz) =>
val componentId = clz.getAnnotation(classOf[ComponentId]).value
val entitySpi =
new EventSourcedEntityImpl[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]](
sdkSettings,
sdkTracerFactory,
componentId,
clz,
messageCodec,
context =>
wiredInstance(clz.asInstanceOf[Class[EventSourcedEntity[AnyRef, AnyRef]]]) {
// remember to update component type API doc and docs if changing the set of injectables
case p if p == classOf[EventSourcedEntityContext] => context
},
sdkSettings.snapshotEvery)
new EventSourcedEntityDescriptor(componentId, entitySpi)
}

// these are available for injecting in all kinds of component that are primarily
// for side effects
// Note: config is also always available through the combination with user DI way down below
Expand Down Expand Up @@ -484,11 +510,15 @@ private final class Sdk(
override def discovery: Discovery = discoveryEndpoint
override def actions: Option[Actions] = actionsEndpoint
override def eventSourcedEntities: Option[EventSourcedEntities] = eventSourcedEntitiesEndpoint
override def eventSourcedEntityDescriptors: Seq[EventSourcedEntityDescriptor] =
Sdk.this.eventSourcedEntityDescriptors
override def valueEntities: Option[ValueEntities] = valueEntitiesEndpoint
override def views: Option[Views] = viewsEndpoint
override def workflowEntities: Option[WorkflowEntities] = workflowEntitiesEndpoint
override def replicatedEntities: Option[ReplicatedEntities] = None
override def httpEndpointDescriptors: Seq[HttpEndpointDescriptor] = httpEndpoints
override def httpEndpointDescriptors: Seq[HttpEndpointDescriptor] =
Sdk.this.httpEndpointDescriptors

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ private[impl] final class EventSourcedEntitiesImpl(
with CommandContext
with ActivatableContext {
override def tracing(): Tracing = new SpanTracingImpl(span, tracerFactory)
override def isDeleted: Boolean = false // FIXME not supported by old spi
}

private class EventSourcedEntityContextImpl(override final val entityId: String)
Expand Down
Loading
Loading