Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Use JsonSerializer in EventingTestKit #65

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import akka.javasdk.Metadata;
import akka.javasdk.eventsourcedentity.EventSourcedEntity;
import akka.javasdk.eventsourcedentity.EventSourcedEntityContext;
import akka.javasdk.impl.JsonMessageCodec;
import akka.javasdk.testkit.impl.EventSourcedEntityEffectsRunner;
import akka.javasdk.testkit.impl.TestKitEventSourcedEntityContext;

Expand All @@ -29,13 +28,10 @@ public class EventSourcedTestKit<S, E, ES extends EventSourcedEntity<S, E>>
private final ES entity;
private final String entityId;

private final JsonMessageCodec messageCodec;

private EventSourcedTestKit(ES entity, String entityId) {
super(entity);
this.entity = entity;
this.entityId = entityId;
this.messageCodec = new JsonMessageCodec();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

import akka.actor.typed.ActorSystem;
import akka.annotation.InternalApi;
import akka.javasdk.impl.serialization.JsonSerializer;
import com.google.protobuf.ByteString;
import akka.javasdk.Metadata;
import akka.javasdk.impl.MessageCodec;
import akka.javasdk.testkit.impl.EventingTestKitImpl;
import akka.javasdk.testkit.impl.OutgoingMessagesImpl;
import akka.javasdk.testkit.impl.TestKitMessageImpl;
Expand All @@ -23,8 +23,8 @@ public interface EventingTestKit {
* INTERNAL API
*/
@InternalApi
static EventingTestKit start(ActorSystem<?> system, String host, int port, MessageCodec codec) {
return EventingTestKitImpl.start(system, host, port, codec);
static EventingTestKit start(ActorSystem<?> system, String host, int port, JsonSerializer serializer) {
return EventingTestKitImpl.start(system, host, port, serializer);
}

OutgoingMessages getTopicOutgoingMessages(String topic);
Expand Down Expand Up @@ -198,10 +198,10 @@ interface OutgoingMessages {
}

class MessageBuilder {
private final MessageCodec messageCodec;
private final JsonSerializer serializer;

public MessageBuilder(MessageCodec messageCodec) {
this.messageCodec = messageCodec;
public MessageBuilder(JsonSerializer serializer) {
this.serializer = serializer;
}

/**
Expand All @@ -214,7 +214,7 @@ public MessageBuilder(MessageCodec messageCodec) {
* @return a Message object to be used in the context of the Testkit
*/
public <T> Message<T> of(T payload, String subject) {
return new TestKitMessageImpl<>(payload, TestKitMessageImpl.defaultMetadata(payload, subject, messageCodec));
return new TestKitMessageImpl<>(payload, TestKitMessageImpl.defaultMetadata(payload, subject, serializer));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,19 @@
package akka.javasdk.testkit;

import akka.actor.typed.ActorSystem;
import akka.annotation.InternalApi;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.HttpRequest;
import akka.javasdk.DependencyProvider;
import akka.javasdk.Metadata;
import akka.javasdk.client.ComponentClient;
import akka.javasdk.http.HttpClient;
import akka.javasdk.http.HttpClientProvider;
import akka.javasdk.impl.ApplicationConfig;
import akka.javasdk.impl.ErrorHandling;
import akka.javasdk.impl.JsonMessageCodec;
import akka.javasdk.impl.MessageCodec;
import akka.javasdk.impl.SdkRunner;
import akka.javasdk.impl.client.ComponentClientImpl;
import akka.javasdk.impl.http.HttpClientImpl;
import akka.javasdk.impl.serialization.JsonSerializer;
import akka.javasdk.impl.timer.TimerSchedulerImpl;
import akka.javasdk.testkit.EventingTestKit.IncomingMessages;
import akka.javasdk.timer.TimerScheduler;
Expand Down Expand Up @@ -423,7 +421,7 @@ private void startEventingTestkit() {
if (settings.eventingSupport == TEST_BROKER || settings.mockedEventing.hasConfig()) {
log.info("Eventing TestKit booting up on port: " + eventingTestKitPort);
// actual message codec instance not available until runtime/sdk started, thus this is called after discovery happens
eventingTestKit = EventingTestKit.start(runtimeActorSystem, "0.0.0.0", eventingTestKitPort, new JsonMessageCodec());
eventingTestKit = EventingTestKit.start(runtimeActorSystem, "0.0.0.0", eventingTestKitPort, new JsonSerializer());
}
}

Expand Down Expand Up @@ -518,8 +516,8 @@ public SpiSettings getSettings() {
selfHttpClient = new HttpClientImpl(runtimeActorSystem, "http://" + proxyHost + ":" + proxyPort);
httpClientProvider = startupContext.httpClientProvider();
timerScheduler = new TimerSchedulerImpl(componentClients.timerClient(), Metadata.EMPTY);
var codec = new JsonMessageCodec(); // FIXME replace with JsonSerializer
this.messageBuilder = new EventingTestKit.MessageBuilder(codec);
var serializer = new JsonSerializer();
this.messageBuilder = new EventingTestKit.MessageBuilder(serializer);

} catch (Exception ex) {
throw new RuntimeException("Error while starting testkit", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import akka.http.scaladsl.model.HttpResponse
import akka.javasdk.JsonSupport
import akka.javasdk.Metadata.{ MetadataEntry => SdkMetadataEntry }
import akka.javasdk.impl.AnySupport
import akka.javasdk.impl.MessageCodec
import akka.javasdk.impl.MetadataImpl
import akka.javasdk.testkit.EventingTestKit
import akka.javasdk.testkit.EventingTestKit.IncomingMessages
Expand Down Expand Up @@ -50,11 +49,11 @@ import kalix.testkit.protocol.eventing_test_backend.RunSourceCreate
import kalix.testkit.protocol.eventing_test_backend.SourceElem
import org.slf4j.LoggerFactory
import scalapb.GeneratedMessage

import java.time
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.{ List => JList }

import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._
import scala.jdk.OptionConverters._
Expand All @@ -66,6 +65,8 @@ import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success

import akka.javasdk.impl.serialization.JsonSerializer

object EventingTestKitImpl {

/**
Expand All @@ -75,10 +76,10 @@ object EventingTestKitImpl {
* The returned testkit can be used to expect and emit events to the proxy as if they came from an actual pub/sub
* event backend.
*/
def start(system: ActorSystem[_], host: String, port: Int, decoder: MessageCodec): EventingTestKit = {
def start(system: ActorSystem[_], host: String, port: Int, serializer: JsonSerializer): EventingTestKit = {

// Create service handlers
val service = new EventingTestServiceImpl(system, host, port, decoder)
val service = new EventingTestServiceImpl(system, host, port, serializer)
val handler: HttpRequest => Future[HttpResponse] =
EventingTestKitServiceHandler(new service.ServiceImpl)(system)

Expand Down Expand Up @@ -140,7 +141,7 @@ object EventingTestKitImpl {
* Implements the EventingTestKit protocol originally defined in proxy
* protocols/testkit/src/main/protobuf/eventing_test_backend.proto
*/
final class EventingTestServiceImpl(system: ActorSystem[_], val host: String, var port: Int, codec: MessageCodec)
final class EventingTestServiceImpl(system: ActorSystem[_], val host: String, var port: Int, serializer: JsonSerializer)
extends EventingTestKit {

private val log = LoggerFactory.getLogger(classOf[EventingTestServiceImpl])
Expand All @@ -159,36 +160,37 @@ final class EventingTestServiceImpl(system: ActorSystem[_], val host: String, va
private def getTopicIncomingMessagesImpl(topic: String): IncomingMessagesImpl =
topicSubscriptions.computeIfAbsent(
topic,
_ => new IncomingMessagesImpl(sys.actorOf(Props[SourcesHolder](), "topic-holder-" + topic), codec))
_ => new IncomingMessagesImpl(sys.actorOf(Props[SourcesHolder](), "topic-holder-" + topic), serializer))

override def getTopicOutgoingMessages(topic: String): OutgoingMessages = getTopicOutgoingMessagesImpl(topic)

private def getTopicOutgoingMessagesImpl(topic: String): OutgoingMessagesImpl =
topicDestinations.computeIfAbsent(topic, _ => new OutgoingMessagesImpl(TestProbe(), codec))
topicDestinations.computeIfAbsent(topic, _ => new OutgoingMessagesImpl(TestProbe(), serializer))

override def getKeyValueEntityIncomingMessages(typeId: String): IncomingMessages = getValueEntityIncomingMessagesImpl(
typeId)

private def getValueEntityIncomingMessagesImpl(typeId: String): VeIncomingMessagesImpl =
veSubscriptions.computeIfAbsent(
typeId,
_ => new VeIncomingMessagesImpl(sys.actorOf(Props[SourcesHolder](), "ve-holder-" + typeId), codec))
_ => new VeIncomingMessagesImpl(sys.actorOf(Props[SourcesHolder](), "ve-holder-" + typeId), serializer))

override def getEventSourcedEntityIncomingMessages(typeId: String): IncomingMessages =
getEventSourcedSubscriptionImpl(typeId)

private def getEventSourcedSubscriptionImpl(typeId: String): IncomingMessagesImpl =
esSubscriptions.computeIfAbsent(
typeId,
_ => new IncomingMessagesImpl(sys.actorOf(Props[SourcesHolder](), "es-holder-" + typeId), codec))
_ => new IncomingMessagesImpl(sys.actorOf(Props[SourcesHolder](), "es-holder-" + typeId), serializer))

override def getStreamIncomingMessages(service: String, streamId: String): IncomingMessages =
getStreamIncomingMessagesImpl(service, streamId)

private def getStreamIncomingMessagesImpl(service: String, streamId: String): IncomingMessagesImpl =
streamSubscriptions.computeIfAbsent(
service + "/" + streamId,
_ => new IncomingMessagesImpl(sys.actorOf(Props[SourcesHolder](), s"stream-holder-$service-$streamId"), codec))
_ =>
new IncomingMessagesImpl(sys.actorOf(Props[SourcesHolder](), s"stream-holder-$service-$streamId"), serializer))

final class ServiceImpl extends EventingTestKitService {
override def emitSingle(in: EmitSingleCommand): Future[EmitSingleResult] = {
Expand Down Expand Up @@ -264,7 +266,7 @@ final class EventingTestServiceImpl(system: ActorSystem[_], val host: String, va
}
}

private[testkit] class IncomingMessagesImpl(val sourcesHolder: ActorRef, val codec: MessageCodec)
private[testkit] class IncomingMessagesImpl(val sourcesHolder: ActorRef, val serializer: JsonSerializer)
extends IncomingMessages {

def addSourceProbe(runningSourceProbe: RunningSourceProbe): Unit = {
Expand Down Expand Up @@ -292,7 +294,7 @@ private[testkit] class IncomingMessagesImpl(val sourcesHolder: ActorRef, val cod
}

override def publish[T](message: T, subject: String): Unit = {
val md = defaultMetadata(message, subject, codec)
val md = defaultMetadata(message, subject, serializer)
publish(TestKitMessageImpl(message, md))
}

Expand All @@ -303,8 +305,10 @@ private[testkit] class IncomingMessagesImpl(val sourcesHolder: ActorRef, val cod
"Publishing a delete message is supported only for ValueEntity messages.")
}

private[testkit] class VeIncomingMessagesImpl(override val sourcesHolder: ActorRef, override val codec: MessageCodec)
extends IncomingMessagesImpl(sourcesHolder, codec) {
private[testkit] class VeIncomingMessagesImpl(
override val sourcesHolder: ActorRef,
override val serializer: JsonSerializer)
extends IncomingMessagesImpl(sourcesHolder, serializer) {

override def publishDelete(subject: String): Unit = {
publish(
Expand All @@ -319,7 +323,7 @@ private[testkit] class VeIncomingMessagesImpl(override val sourcesHolder: ActorR

private[testkit] class OutgoingMessagesImpl(
private[testkit] val destinationProbe: TestProbe,
protected val codec: MessageCodec)
protected val serializer: JsonSerializer)
extends OutgoingMessages {

val DefaultTimeout: time.Duration = time.Duration.ofSeconds(3)
Expand Down Expand Up @@ -350,14 +354,15 @@ private[testkit] class OutgoingMessagesImpl(
override def expectOneTyped[T](clazz: Class[T], timeout: time.Duration): TestKitMessage[T] = {
val msg = expectMsgInternal(destinationProbe, timeout, Some(clazz))
val metadata = MetadataImpl.of(msg.getMessage.getMetadata.entries)
// FIXME don't use proto
val scalaPb = ScalaPbAny(typeUrlFor(metadata), msg.getMessage.payload)

val decodedMsg = if (AnySupport.isJsonTypeUrl(typeUrlFor(metadata))) {
JsonSupport.getObjectMapper
.readerFor(clazz)
.readValue(msg.getMessage.payload.toByteArray)
val decodedMsg = if (serializer.isJsonContentType(typeUrlFor(metadata))) {
val bytesPayload = AnySupport.toSpiBytesPayload(scalaPb)
serializer.fromBytes(clazz, bytesPayload)
} else {
codec.decodeMessage(scalaPb)
val anySupport = new AnySupport(Array(), getClass.getClassLoader)
anySupport.decodeMessage(scalaPb)
}

val concreteType = TestKitMessageImpl.expectType(decodedMsg, clazz)
Expand All @@ -367,9 +372,10 @@ private[testkit] class OutgoingMessagesImpl(
private def anyFromMessage(m: kalix.testkit.protocol.eventing_test_backend.Message): TestKitMessage[_] = {
val metadata = MetadataImpl.of(m.metadata.getOrElse(Metadata.defaultInstance).entries)
val anyMsg = if (AnySupport.isJsonTypeUrl(typeUrlFor(metadata))) {
m.payload.toStringUtf8
m.payload.toStringUtf8 // FIXME isn't this strange?
} else {
codec.decodeMessage(ScalaPbAny(typeUrlFor(metadata), m.payload))
val anySupport = new AnySupport(Array(), getClass.getClassLoader)
anySupport.decodeMessage(ScalaPbAny(typeUrlFor(metadata), m.payload))
}
TestKitMessageImpl(anyMsg, metadata)
}
Expand Down Expand Up @@ -425,7 +431,7 @@ private[testkit] object TestKitMessageImpl {
TestKitMessageImpl[ByteString](m.payload, metadata).asInstanceOf[TestKitMessage[ByteString]]
}

def defaultMetadata(message: Any, subject: String, messageCodec: MessageCodec): SdkMetadata = {
def defaultMetadata(message: Any, subject: String, serializer: JsonSerializer): SdkMetadata = {
val (contentType, ceType) = message match {
case pbMsg: GeneratedMessageV3 =>
val desc = pbMsg.getDescriptorForType
Expand All @@ -436,7 +442,7 @@ private[testkit] object TestKitMessageImpl {
case _: String =>
("text/plain; charset=utf-8", "")
case _ =>
("application/json", AnySupport.stripJsonTypeUrlPrefix(messageCodec.typeUrlFor(message.getClass)))
("application/json", serializer.stripJsonContentTypePrefix(serializer.contentTypeFor(message.getClass)))
}

defaultMetadata(subject, contentType, ceType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package akka.javasdk.testkit.impl

import akka.actor.ActorSystem
import akka.actor.Props
import akka.javasdk.impl.AnySupport
import akka.javasdk.testkit.impl.EventingTestKitImpl.RunningSourceProbe
import akka.stream.BoundedSourceQueue
import akka.stream.QueueOfferResult
Expand All @@ -21,19 +20,20 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.BeforeAndAfterEach
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

import scala.collection.mutable

import akka.javasdk.impl.serialization.JsonSerializer

class IncomingMessagesImplSpec
extends TestKit(ActorSystem("MySpec"))
with AnyWordSpecLike
with Matchers
with BeforeAndAfterEach
with BeforeAndAfterAll {

private val anySupport = new AnySupport(Array(), getClass.getClassLoader)
private val serializer = new JsonSerializer
private val subscription =
new IncomingMessagesImpl(system.actorOf(Props[SourcesHolder](), "holder"), anySupport)
new IncomingMessagesImpl(system.actorOf(Props[SourcesHolder](), "holder"), serializer)
val queue = new DummyQueue(mutable.Queue.empty)

private val runningSourceProbe: RunningSourceProbe =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.BeforeAndAfterEach
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

import scala.collection.mutable
import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.language.existentials

import akka.javasdk.impl.serialization.JsonSerializer

class OutgoingMessagesImplSpec
extends TestKit(ActorSystem("MySpec"))
with AnyWordSpecLike
Expand All @@ -33,15 +34,19 @@ class OutgoingMessagesImplSpec
with BeforeAndAfterAll {

private val anySupport = new AnySupport(Array(), getClass.getClassLoader)
private val serializer = new JsonSerializer
private val outProbe = TestProbe()(system)
private val destination = new OutgoingMessagesImpl(outProbe, anySupport)
private val destination = new OutgoingMessagesImpl(outProbe, serializer)
val queue = new DummyQueue(mutable.Queue.empty)

private val textPlainHeader = MetadataEntry("Content-Type", StringValue("text/plain; charset=utf-8"))
private val bytesHeader = MetadataEntry("Content-Type", StringValue("application/octet-stream"))
private def msgWithMetadata(any: Any, mdEntry: MetadataEntry*) = EmitSingleCommand(
Some(EventDestination(Topic("test-topic"))),
Some(Message(anySupport.encodeScala(any).value, Some(Metadata(mdEntry)))))
private def msgWithMetadata(any: Any, mdEntry: MetadataEntry*): EmitSingleCommand = {
// FIXME don't use proto
EmitSingleCommand(
Some(EventDestination(Topic("test-topic"))),
Some(Message(anySupport.encodeScala(any).value, Some(Metadata(mdEntry)))))
}

"TopicImpl" must {
"provide utility to read typed messages - string" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ class JsonSerializer {
}

def isJson(bytesPayload: BytesPayload): Boolean =
isJsonTypeUrl(bytesPayload.contentType)
isJsonContentType(bytesPayload.contentType)

private def isJsonTypeUrl(contentType: String): Boolean =
def isJsonContentType(contentType: String): Boolean =
// check both new and old typeurl for compatibility, in case there are services with old type url stored in database
contentType.startsWith(JsonContentTypePrefix) || contentType.startsWith(KalixJsonContentTypePrefix)

Expand All @@ -166,7 +166,7 @@ class JsonSerializer {
// JsonContentTypePrefix + typeUrl.stripPrefix(KalixJsonContentTypePrefix)
// else typeUrl

private def stripJsonContentTypePrefix(contentType: String): String =
def stripJsonContentTypePrefix(contentType: String): String =
contentType.stripPrefix(JsonContentTypePrefix).stripPrefix(KalixJsonContentTypePrefix)

private def lookupTypeHintWithVersion(value: Any): String =
Expand Down
Loading