Skip to content

Commit

Permalink
chore: Snapshots and StateSerializer type (#48)
Browse files Browse the repository at this point in the history
* test that verifies snapshots
* pass snapshotEvery in SpiSettings
* use entityStateType when deserializing state (snapshot)
* metadata
  • Loading branch information
patriknw authored Dec 4, 2024
1 parent e11eb2d commit 2b9a1bc
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
package akkajavasdk;

import akka.javasdk.http.StrictResponse;
import akka.javasdk.testkit.TestKit;
import akka.javasdk.testkit.TestKitSupport;
import akkajavasdk.components.eventsourcedentities.counter.Counter;
import akkajavasdk.components.eventsourcedentities.counter.CounterEntity;
import akka.javasdk.client.EventSourcedEntityClient;
import akkajavasdk.components.eventsourcedentities.hierarchy.AbstractTextConsumer;
import akkajavasdk.components.eventsourcedentities.hierarchy.TextEsEntity;
import com.typesafe.config.ConfigFactory;
import org.awaitility.Awaitility;
import org.hamcrest.core.IsEqual;
import org.junit.jupiter.api.Assertions;
Expand All @@ -30,6 +32,13 @@
@ExtendWith(Junit5LogCapturing.class)
public class EventSourcedEntityTest extends TestKitSupport {

@Override
protected TestKit.Settings testKitSettings() {
return TestKit.Settings.DEFAULT.withAdditionalConfig(ConfigFactory.parseString("""
akka.javasdk.event-sourced-entity.snapshot-every = 10
"""));
}

@Test
public void verifyCounterEventSourcedWiring() throws InterruptedException {

Expand Down Expand Up @@ -144,7 +153,7 @@ public void verifyCounterEventSourcedAfterRestart() {
@Test
public void verifyCounterEventSourcedAfterRestartFromSnapshot() {

// snapshotting with kalix.event-sourced-entity.snapshot-every = 10
// snapshotting with akka.javasdk.event-sourced-entity.snapshot-every = 10
var counterId = "restartFromSnapshot";
var client = componentClient.forEventSourcedEntity(counterId);

Expand Down
1 change: 0 additions & 1 deletion akka-javasdk-tests/src/test/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
kalix.event-sourced-entity.snapshot-every = 10
# Using a different port to not conflict with parallel tests
akka.javasdk.testkit.http-port = 39391
1 change: 1 addition & 0 deletions akka-javasdk-tests/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

<logger name="akka" level="WARN"/>
<logger name="akka.runtime" level="DEBUG"/>
<logger name="kalix.runtime" level="DEBUG"/>
<logger name="akka.javasdk" level="DEBUG"/>
<logger name="kalix.runtime.views" level="INFO"/>
<logger name="akka.http" level="WARN"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,8 @@ private[javasdk] class JsonMessageCodec extends MessageCodec {
value
}

def decodeMessage[T](expectedType: Class[T], bytes: akka.util.ByteString): T = {
// FIXME could we avoid the copy?
JsonSupport.parseBytes(bytes.toArrayUnsafe(), expectedType)
def decodeMessage[T](expectedType: Class[T], pb: ScalaPbAny): T = {
JsonSupport.decodeJson(expectedType, pb)
}

private[akka] def removeVersion(typeName: String) = {
Expand Down
8 changes: 5 additions & 3 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ class SdkRunner private (dependencyProvider: Option[DependencyProvider]) extends
@nowarn("msg=deprecated") //TODO remove deprecation once we remove the old constructor
override def getSettings: SpiSettings = {
val applicationConf = applicationConfig

val eventSourcedEntitySnapshotEvery = applicationConfig.getInt("akka.javasdk.event-sourced-entity.snapshot-every")

val devModeSettings =
if (applicationConf.getBoolean("akka.javasdk.dev-mode.enabled"))
Some(
Expand All @@ -133,7 +136,7 @@ class SdkRunner private (dependencyProvider: Option[DependencyProvider]) extends
else
None

new SpiSettings(devModeSettings)
new SpiSettings(eventSourcedEntitySnapshotEvery, devModeSettings)
}

private def extractBrokerConfig(eventingConf: Config): SpiEventingSupportSettings = {
Expand Down Expand Up @@ -393,8 +396,7 @@ private final class Sdk(
wiredInstance(clz.asInstanceOf[Class[EventSourcedEntity[AnyRef, AnyRef]]]) {
// remember to update component type API doc and docs if changing the set of injectables
case p if p == classOf[EventSourcedEntityContext] => context
},
sdkSettings.snapshotEvery)
})
}
new EventSourcedEntityDescriptor(componentId, readOnlyCommandNames, instanceFactory)
}
Expand Down
2 changes: 0 additions & 2 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ private[impl] object Settings {

def apply(sdkConfig: Config): Settings = {
Settings(
snapshotEvery = sdkConfig.getInt("event-sourced-entity.snapshot-every"),
cleanupDeletedEventSourcedEntityAfter = sdkConfig.getDuration("event-sourced-entity.cleanup-deleted-after"),
cleanupDeletedKeyValueEntityAfter = sdkConfig.getDuration("key-value-entity.cleanup-deleted-after"),
devModeSettings = Option.when(sdkConfig.getBoolean("dev-mode.enabled"))(
Expand All @@ -35,7 +34,6 @@ private[impl] object Settings {
*/
@InternalApi
private[impl] final case class Settings(
snapshotEvery: Int,
cleanupDeletedEventSourcedEntityAfter: Duration,
cleanupDeletedKeyValueEntityAfter: Duration,
devModeSettings: Option[DevModeSettings])
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private[impl] final class EventSourcedEntitiesImpl(
if (service.snapshotEvery < 0)
log.warn("Snapshotting disabled for entity [{}], this is not recommended.", service.componentId)
// FIXME overlay configuration provided by _system
(name, if (service.snapshotEvery == 0) service.withSnapshotEvery(configuration.snapshotEvery) else service)
(name, if (service.snapshotEvery == 0) service else service)
}.toMap

private val instrumentations: Map[String, TraceInstrumentation] = services.values.map { s =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,13 @@ import com.google.protobuf.ByteString
import com.google.protobuf.any.{ Any => ScalaPbAny }
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.Tracer
import org.slf4j.LoggerFactory
import org.slf4j.MDC

/**
* INTERNAL API
*/
@InternalApi
private[impl] object EventSourcedEntityImpl {
private val log = LoggerFactory.getLogger(this.getClass)

private class CommandContextImpl(
override val entityId: String,
Expand Down Expand Up @@ -86,14 +84,10 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
componentClass: Class[_],
entityId: String,
messageCodec: JsonMessageCodec,
factory: EventSourcedEntityContext => ES,
snapshotEvery: Int)
factory: EventSourcedEntityContext => ES)
extends SpiEventSourcedEntity {
import EventSourcedEntityImpl._

if (snapshotEvery < 0)
log.warn("Snapshotting disabled for entity [{}], this is not recommended.", componentId)

// FIXME
// private val traceInstrumentation = new TraceInstrumentation(componentId, EventSourcedEntityCategory, tracerFactory)

Expand Down Expand Up @@ -126,8 +120,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
command.payload.getOrElse(
// FIXME smuggling 0 arity method called from component client through here
ScalaPbAny.defaultInstance.withTypeUrl(AnySupport.JsonTypeUrlPrefix).withValue(ByteString.empty())))
val metadata: Metadata =
MetadataImpl.of(Nil) // FIXME MetadataImpl.of(command.metadata.map(_.entries.toVector).getOrElse(Nil))
val metadata: Metadata = MetadataImpl.of(command.metadata)
val cmdContext =
new CommandContextImpl(
entityId,
Expand Down Expand Up @@ -161,16 +154,14 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
}

var currentSequence = command.sequenceNumber
var updatedState = state
commandEffect.primaryEffect match {
case EmitEvents(events, deleteEntity) =>
var shouldSnapshot = false
var updatedState = state
events.foreach { event =>
updatedState = entityHandleEvent(updatedState, event.asInstanceOf[AnyRef], entityId, currentSequence)
if (updatedState == null)
throw new IllegalArgumentException("Event handler must not return null as the updated state.")
currentSequence += 1
shouldSnapshot = shouldSnapshot || (snapshotEvery > 0 && currentSequence % snapshotEvery == 0)
}

val (reply, error) = replyOrError(updatedState)
Expand All @@ -179,14 +170,6 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
Future.successful(
new SpiEventSourcedEntity.Effect(events = Vector.empty, updatedState = state, reply = None, error, None))
} else {
// snapshotting final state since that is the "atomic" write
// emptyState can be null but null snapshot should not be stored, but that can't even
// happen since event handler is not allowed to return null as newState
// FIXME
// val snapshot =
// if (shouldSnapshot) Option(updatedState)
// else None

val delete =
if (deleteEntity) Some(configuration.cleanupDeletedEventSourcedEntityAfter)
else None
Expand All @@ -199,7 +182,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
}

case NoPrimaryEffect =>
val (reply, error) = replyOrError(updatedState)
val (reply, error) = replyOrError(state)

Future.successful(
new SpiEventSourcedEntity.Effect(events = Vector.empty, updatedState = state, reply, error, None))
Expand Down Expand Up @@ -273,5 +256,5 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
ScalaPbAny.fromJavaProto(messageCodec.encodeJava(obj))

override def stateFromProto(pb: ScalaPbAny): SpiEventSourcedEntity.State =
messageCodec.decodeMessage(pb).asInstanceOf[SpiEventSourcedEntity.State]
messageCodec.decodeMessage(router.entityStateType, pb)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ private[impl] class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedE
// similar to workflow, we preemptively register the events type to the message codec
Reflect.allKnownEventTypes[S, E, ES](entity).foreach(messageCodec.registerTypeHints)

val entityStateType: Class[S] = Reflect.eventSourcedEntityStateType(entity.getClass).asInstanceOf[Class[S]]

private def commandHandlerLookup(commandName: String) =
commandHandlers.getOrElse(
commandName,
Expand Down Expand Up @@ -90,7 +92,6 @@ private[impl] class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedE
}

private def _setCurrentState(state: S): Unit = {
val entityStateType: Class[S] = Reflect.eventSourcedEntityStateType(this.entity.getClass).asInstanceOf[Class[S]]

// the state: S received can either be of the entity "state" type (if coming from emptyState/memory)
// or PB Any type (if coming from the runtime)
Expand Down

0 comments on commit 2b9a1bc

Please sign in to comment.