Skip to content

Commit

Permalink
replace more protocol Metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Dec 16, 2024
1 parent 79cb2d2 commit 9285e42
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.{ List => JList }

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._
import scala.jdk.OptionConverters._
Expand All @@ -66,6 +67,8 @@ import scala.util.Failure
import scala.util.Success

import akka.javasdk.impl.serialization.JsonSerializer
import akka.runtime.sdk.spi.SpiMetadata
import akka.runtime.sdk.spi.SpiMetadataEntry

object EventingTestKitImpl {

Expand Down Expand Up @@ -118,7 +121,9 @@ object EventingTestKitImpl {
if (sdkMetadataEntry.isText) {
mde.withStringValue(sdkMetadataEntry.getValue)
} else {
mde.withBytesValue(ByteString.copyFrom(sdkMetadataEntry.getBinaryValue))
@nowarn("msg=deprecated")
val binary = sdkMetadataEntry.getBinaryValue;
mde.withBytesValue(ByteString.copyFrom(binary))
}
}

Expand All @@ -135,6 +140,21 @@ object EventingTestKitImpl {
}

}

def metadataToSpi(metadata: Option[Metadata]): SpiMetadata =
metadata.map(metadataToSpi).getOrElse(SpiMetadata.empty)

def metadataToSpi(metadata: Metadata): SpiMetadata = {
import kalix.protocol.component.MetadataEntry.Value
val entries = metadata.entries.map(entry =>
entry.value match {
case Value.Empty => new SpiMetadataEntry(entry.key, "")
case Value.StringValue(value) => new SpiMetadataEntry(entry.key, value)
case Value.BytesValue(value) =>
new SpiMetadataEntry(entry.key, value.toStringUtf8) //FIXME binary not supported
})
new SpiMetadata(entries)
}
}

/**
Expand Down Expand Up @@ -325,6 +345,7 @@ private[testkit] class OutgoingMessagesImpl(
private[testkit] val destinationProbe: TestProbe,
protected val serializer: JsonSerializer)
extends OutgoingMessages {
import EventingTestKitImpl.metadataToSpi

val DefaultTimeout: time.Duration = time.Duration.ofSeconds(3)

Expand Down Expand Up @@ -353,7 +374,7 @@ private[testkit] class OutgoingMessagesImpl(

override def expectOneTyped[T](clazz: Class[T], timeout: time.Duration): TestKitMessage[T] = {
val msg = expectMsgInternal(destinationProbe, timeout, Some(clazz))
val metadata = MetadataImpl.of(msg.getMessage.getMetadata.entries)
val metadata = MetadataImpl.of(metadataToSpi(msg.getMessage.getMetadata))
// FIXME don't use proto
val scalaPb = ScalaPbAny(typeUrlFor(metadata), msg.getMessage.payload)

Expand All @@ -371,7 +392,7 @@ private[testkit] class OutgoingMessagesImpl(
}

private def anyFromMessage(m: kalix.testkit.protocol.eventing_test_backend.Message): TestKitMessage[_] = {
val metadata = MetadataImpl.of(m.metadata.getOrElse(Metadata.defaultInstance).entries)
val metadata = MetadataImpl.of(metadataToSpi(m.metadata))
val anyMsg = if (AnySupport.isJsonTypeUrl(typeUrlFor(metadata))) {
m.payload.toStringUtf8 // FIXME isn't this strange?
} else {
Expand Down Expand Up @@ -427,8 +448,10 @@ private[testkit] case class TestKitMessageImpl[P](payload: P, metadata: SdkMetad
}

private[testkit] object TestKitMessageImpl {
import EventingTestKitImpl.metadataToSpi

def ofProtocolMessage(m: kalix.testkit.protocol.eventing_test_backend.Message): TestKitMessage[ByteString] = {
val metadata = MetadataImpl.of(m.metadata.getOrElse(Metadata()).entries)
val metadata = MetadataImpl.of(metadataToSpi(m.metadata))
TestKitMessageImpl[ByteString](m.payload, metadata).asInstanceOf[TestKitMessage[ByteString]]
}

Expand Down
8 changes: 8 additions & 0 deletions akka-javasdk/src/main/java/akka/javasdk/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ public interface Metadata extends Iterable<Metadata.MetadataEntry> {
* @param key The key to set.
* @param value The value to set.
* @return A copy of this Metadata object with the entry set.
* @deprecated binary not supported, use {@link #set(String, String)}
*/
@Deprecated
Metadata setBinary(String key, ByteBuffer value);

/**
Expand Down Expand Up @@ -154,7 +156,9 @@ public interface Metadata extends Iterable<Metadata.MetadataEntry> {
* @param key The key to add.
* @param value The value to add.
* @return A copy of this Metadata object with the entry added.
* @deprecated binary not supported, use {@link #add(String, String)}
*/
@Deprecated
Metadata addBinary(String key, ByteBuffer value);

/**
Expand Down Expand Up @@ -243,7 +247,9 @@ interface MetadataEntry {
* The binary value for the metadata entry.
*
* @return The binary value, or null if this entry is not a string Metadata entry.
* @deprecated binary not supported, use {@link #getValue()}
*/
@Deprecated
ByteBuffer getBinaryValue();

/**
Expand All @@ -257,7 +263,9 @@ interface MetadataEntry {
* Whether this entry is a binary entry.
*
* @return True if this entry is a binary entry.
* @deprecated binary not supported, use {@link #getValue()}
*/
@Deprecated
boolean isBinary();
}

Expand Down
88 changes: 32 additions & 56 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/MetadataImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,16 @@ import akka.javasdk.impl.telemetry.Telemetry
import akka.javasdk.impl.telemetry.Telemetry.metadataGetter
import akka.runtime.sdk.spi.SpiMetadata
import akka.runtime.sdk.spi.SpiMetadataEntry
import com.google.protobuf.ByteString
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanContext
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.context.{ Context => OtelContext }
import kalix.protocol.component.MetadataEntry
import kalix.protocol.component.MetadataEntry.Value

/**
* INTERNAL API
*/
@InternalApi
private[javasdk] class MetadataImpl private (val entries: Seq[MetadataEntry]) extends Metadata with CloudEvent {
private[javasdk] class MetadataImpl private (val entries: Seq[SpiMetadataEntry]) extends Metadata with CloudEvent {

override def has(key: String): Boolean = entries.exists(_.key.equalsIgnoreCase(key))

Expand All @@ -43,8 +40,7 @@ private[javasdk] class MetadataImpl private (val entries: Seq[MetadataEntry]) ex

private[akka] def getScala(key: String): Option[String] =
entries.collectFirst {
case MetadataEntry(k, MetadataEntry.Value.StringValue(value), _) if key.equalsIgnoreCase(k) =>
value
case entry if key.equalsIgnoreCase(entry.key) => entry.value
}

def withTracing(spanContext: SpanContext): Metadata = {
Expand All @@ -53,7 +49,7 @@ private[javasdk] class MetadataImpl private (val entries: Seq[MetadataEntry]) ex

def withTracing(span: Span): MetadataImpl = {
// remove parent trace parent and trace state from the metadata so they can be re-injected with current span context
val builder = Vector.newBuilder[MetadataEntry]
val builder = Vector.newBuilder[SpiMetadataEntry]
builder.addAll(
entries.iterator.filter(m => m.key != Telemetry.TRACE_PARENT_KEY && m.key != Telemetry.TRACE_STATE_KEY))
W3CTraceContextPropagator
Expand All @@ -67,71 +63,59 @@ private[javasdk] class MetadataImpl private (val entries: Seq[MetadataEntry]) ex

private[akka] def getAllScala(key: String): Seq[String] =
entries.collect {
case MetadataEntry(k, MetadataEntry.Value.StringValue(value), _) if key.equalsIgnoreCase(k) =>
value
case entry if key.equalsIgnoreCase(entry.key) => entry.value
}

override def getBinary(key: String): Optional[ByteBuffer] =
getBinaryScala(key).toJava

private[akka] def getBinaryScala(key: String): Option[ByteBuffer] =
entries.collectFirst {
case MetadataEntry(k, MetadataEntry.Value.BytesValue(value), _) if key.equalsIgnoreCase(k) =>
value.asReadOnlyByteBuffer()
}
Optional.empty[ByteBuffer] // binary not supported

override def getBinaryAll(key: String): util.List[ByteBuffer] =
getBinaryAllScala(key).asJava

private[akka] def getBinaryAllScala(key: String): Seq[ByteBuffer] =
entries.collect {
case MetadataEntry(k, MetadataEntry.Value.BytesValue(value), _) if key.equalsIgnoreCase(k) =>
value.asReadOnlyByteBuffer()
}
util.Collections.emptyList()

override def getAllKeys: util.List[String] = getAllKeysScala.asJava
private[akka] def getAllKeysScala: Seq[String] = entries.map(_.key)

override def set(key: String, value: String): MetadataImpl = {
Objects.requireNonNull(key, "Key must not be null")
Objects.requireNonNull(value, "Value must not be null")
MetadataImpl.of(removeKey(key) :+ MetadataEntry(key, MetadataEntry.Value.StringValue(value)))
MetadataImpl.of(removeKey(key) :+ new SpiMetadataEntry(key, value))
}

override def setBinary(key: String, value: ByteBuffer): MetadataImpl = {
Objects.requireNonNull(key, "Key must not be null")
Objects.requireNonNull(value, "Value must not be null")
MetadataImpl.of(removeKey(key) :+ MetadataEntry(key, MetadataEntry.Value.BytesValue(ByteString.copyFrom(value))))
// binary not supported
this
}

override def add(key: String, value: String): MetadataImpl = {
Objects.requireNonNull(key, "Key must not be null")
Objects.requireNonNull(value, "Value must not be null")
MetadataImpl.of(entries :+ MetadataEntry(key, MetadataEntry.Value.StringValue(value)))
MetadataImpl.of(entries :+ new SpiMetadataEntry(key, value))
}

override def addBinary(key: String, value: ByteBuffer): MetadataImpl = {
Objects.requireNonNull(key, "Key must not be null")
Objects.requireNonNull(value, "Value must not be null")
MetadataImpl.of(entries :+ MetadataEntry(key, MetadataEntry.Value.BytesValue(ByteString.copyFrom(value))))
// binary not supported
this
}

override def remove(key: String): MetadataImpl = MetadataImpl.of(removeKey(key))

override def clear(): MetadataImpl = MetadataImpl.Empty

private[akka] def iteratorScala[R](f: MetadataEntry => R): Iterator[R] =
entries.iterator.map(f)

override def iterator(): util.Iterator[Metadata.MetadataEntry] =
iteratorScala(entry =>
override def iterator(): util.Iterator[Metadata.MetadataEntry] = {
entries.iterator.map { entry =>
new Metadata.MetadataEntry {
override def getKey: String = entry.key
override def getValue: String = entry.value.stringValue.orNull
override def getBinaryValue: ByteBuffer = entry.value.bytesValue.map(_.asReadOnlyByteBuffer()).orNull
override def isText: Boolean = entry.value.isStringValue
override def isBinary: Boolean = entry.value.isBytesValue
}).asJava
override def getValue: String = entry.value
override def getBinaryValue: ByteBuffer = null
override def isText: Boolean = true
override def isBinary: Boolean = false
}
}.asJava
}

private def removeKey(key: String) = entries.filterNot(_.key.equalsIgnoreCase(key))

Expand All @@ -146,16 +130,15 @@ private[javasdk] class MetadataImpl private (val entries: Seq[MetadataEntry]) ex
MetadataImpl.of(
entries.filterNot(e => MetadataImpl.CeRequired(e.key)) ++
Seq(
MetadataEntry(MetadataImpl.CeSpecversion, MetadataEntry.Value.StringValue(MetadataImpl.CeSpecversionValue)),
MetadataEntry(MetadataImpl.CeId, MetadataEntry.Value.StringValue(id)),
MetadataEntry(MetadataImpl.CeSource, MetadataEntry.Value.StringValue(source.toString)),
MetadataEntry(MetadataImpl.CeType, MetadataEntry.Value.StringValue(`type`))))
new SpiMetadataEntry(MetadataImpl.CeSpecversion, MetadataImpl.CeSpecversionValue),
new SpiMetadataEntry(MetadataImpl.CeId, id),
new SpiMetadataEntry(MetadataImpl.CeSource, source.toString),
new SpiMetadataEntry(MetadataImpl.CeType, `type`)))

private def getRequiredCloudEventField(key: String) =
entries
.collectFirst {
case MetadataEntry(k, MetadataEntry.Value.StringValue(value), _) if key.equalsIgnoreCase(k) =>
value
case entry if key.equalsIgnoreCase(entry.key) => entry.value
}
.getOrElse {
throw new IllegalStateException(s"Metadata is not a CloudEvent because it does not have required field $key")
Expand Down Expand Up @@ -258,27 +241,21 @@ object MetadataImpl {
def toSpi(metadata: Metadata): SpiMetadata = {
metadata match {
case impl: MetadataImpl if impl.entries.nonEmpty =>
val entries = impl.entries.map(entry =>
entry.value match {
case Value.Empty => new SpiMetadataEntry(entry.key, "")
case Value.StringValue(value) => new SpiMetadataEntry(entry.key, value)
case Value.BytesValue(value) =>
new SpiMetadataEntry(entry.key, value.toStringUtf8) //FIXME support bytes values or not
})
new SpiMetadata(entries)
case _: MetadataImpl => SpiMetadata.empty
new SpiMetadata(impl.entries)
case _: MetadataImpl =>
SpiMetadata.empty
case other =>
throw new RuntimeException(s"Unknown metadata implementation: ${other.getClass}, cannot send")
}
}

def of(entries: Seq[MetadataEntry]): MetadataImpl = {
def of(entries: Seq[SpiMetadataEntry]): MetadataImpl = {
val transformedEntries =
entries.map { entry =>
// is incoming ce key in one of the alternative formats?
// if so, convert key to our internal default key format
alternativeKeyFormats.get(entry.key) match {
case Some(defaultKey) => MetadataEntry(defaultKey, entry.value)
case Some(defaultKey) => new SpiMetadataEntry(defaultKey, entry.value)
case _ => entry
}
}
Expand All @@ -287,8 +264,7 @@ object MetadataImpl {
}

def of(metadata: SpiMetadata): MetadataImpl = {
val entries = metadata.entries.map(e => MetadataEntry(e.key, MetadataEntry.Value.StringValue(e.value)))
new MetadataImpl(entries)
of(metadata.entries)
}

}
30 changes: 23 additions & 7 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,22 @@ class SdkRunner private (dependencyProvider: Option[DependencyProvider]) extends

}

/**
* INTERNAL API
*/
@InternalApi
private object ComponentType {
// Those are also defined in ComponentAnnotationProcessor, and must be the same

val EventSourcedEntity = "event-sourced-entity"
val KeyValueEntity = "key-value-entity"
val Workflow = "workflow"
val HttpEndpoint = "http-endpoint"
val Consumer = "consumer"
val TimedAction = "timed-action"
val View = "view"
}

/**
* INTERNAL API
*/
Expand All @@ -186,13 +202,13 @@ private object ComponentLocator {
def locateUserComponents(system: ActorSystem[_]): LocatedClasses = {
val kalixComponentTypeAndBaseClasses: Map[String, Class[_]] =
Map(
"http-endpoint" -> classOf[AnyRef],
"timed-action" -> classOf[TimedAction],
"consumer" -> classOf[Consumer],
"event-sourced-entity" -> classOf[EventSourcedEntity[_, _]],
"workflow" -> classOf[Workflow[_]],
"key-value-entity" -> classOf[KeyValueEntity[_]],
"view" -> classOf[AnyRef])
ComponentType.HttpEndpoint -> classOf[AnyRef],
ComponentType.TimedAction -> classOf[TimedAction],
ComponentType.Consumer -> classOf[Consumer],
ComponentType.EventSourcedEntity -> classOf[EventSourcedEntity[_, _]],
ComponentType.Workflow -> classOf[Workflow[_]],
ComponentType.KeyValueEntity -> classOf[KeyValueEntity[_]],
ComponentType.View -> classOf[AnyRef])

// Alternative to but inspired by the stdlib SPI style of registering in META-INF/services
// since we don't always have top supertypes and want to inject things into component constructors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import akka.runtime.sdk.spi.SpiConsumer
import akka.runtime.sdk.spi.SpiConsumer.Effect
import akka.runtime.sdk.spi.SpiConsumer.Message
import akka.runtime.sdk.spi.SpiMetadata
import akka.runtime.sdk.spi.SpiMetadataEntry
import akka.runtime.sdk.spi.TimerClient
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.Tracer
import kalix.protocol.component.MetadataEntry
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.MDC
Expand Down Expand Up @@ -172,10 +172,7 @@ private[impl] final class MessageContextImpl(
override def componentCallMetadata: MetadataImpl = {
if (metadata.has(Telemetry.TRACE_PARENT_KEY)) {
MetadataImpl.of(
List(
MetadataEntry(
Telemetry.TRACE_PARENT_KEY,
MetadataEntry.Value.StringValue(metadata.get(Telemetry.TRACE_PARENT_KEY).get()))))
List(new SpiMetadataEntry(Telemetry.TRACE_PARENT_KEY, metadata.get(Telemetry.TRACE_PARENT_KEY).get())))
} else {
MetadataImpl.Empty
}
Expand Down
Loading

0 comments on commit 9285e42

Please sign in to comment.