From 9285e4284ba87933160ce550024b67ffe2e51a93 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 13 Dec 2024 14:02:44 +0100 Subject: [PATCH] replace more protocol Metadata --- .../testkit/impl/EventingTestKitImpl.scala | 31 ++++++- .../src/main/java/akka/javasdk/Metadata.java | 8 ++ .../akka/javasdk/impl/MetadataImpl.scala | 88 +++++++------------ .../scala/akka/javasdk/impl/SdkRunner.scala | 30 +++++-- .../javasdk/impl/consumer/ConsumerImpl.scala | 7 +- .../EventSourcedEntityImpl.scala | 9 +- .../keyvalueentity/KeyValueEntityImpl.scala | 9 +- .../javasdk/impl/telemetry/Telemetry.scala | 57 ++++++------ .../impl/timedaction/TimedActionImpl.scala | 7 +- 9 files changed, 131 insertions(+), 115 deletions(-) diff --git a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/EventingTestKitImpl.scala b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/EventingTestKitImpl.scala index dccbf8fdb..37b3bfef8 100644 --- a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/EventingTestKitImpl.scala +++ b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/EventingTestKitImpl.scala @@ -54,6 +54,7 @@ import java.util.UUID import java.util.concurrent.ConcurrentHashMap import java.util.{ List => JList } +import scala.annotation.nowarn import scala.jdk.CollectionConverters._ import scala.jdk.DurationConverters._ import scala.jdk.OptionConverters._ @@ -66,6 +67,8 @@ import scala.util.Failure import scala.util.Success import akka.javasdk.impl.serialization.JsonSerializer +import akka.runtime.sdk.spi.SpiMetadata +import akka.runtime.sdk.spi.SpiMetadataEntry object EventingTestKitImpl { @@ -118,7 +121,9 @@ object EventingTestKitImpl { if (sdkMetadataEntry.isText) { mde.withStringValue(sdkMetadataEntry.getValue) } else { - mde.withBytesValue(ByteString.copyFrom(sdkMetadataEntry.getBinaryValue)) + @nowarn("msg=deprecated") + val binary = sdkMetadataEntry.getBinaryValue; + mde.withBytesValue(ByteString.copyFrom(binary)) } } @@ -135,6 +140,21 @@ object EventingTestKitImpl { } } + + def metadataToSpi(metadata: Option[Metadata]): SpiMetadata = + metadata.map(metadataToSpi).getOrElse(SpiMetadata.empty) + + def metadataToSpi(metadata: Metadata): SpiMetadata = { + import kalix.protocol.component.MetadataEntry.Value + val entries = metadata.entries.map(entry => + entry.value match { + case Value.Empty => new SpiMetadataEntry(entry.key, "") + case Value.StringValue(value) => new SpiMetadataEntry(entry.key, value) + case Value.BytesValue(value) => + new SpiMetadataEntry(entry.key, value.toStringUtf8) //FIXME binary not supported + }) + new SpiMetadata(entries) + } } /** @@ -325,6 +345,7 @@ private[testkit] class OutgoingMessagesImpl( private[testkit] val destinationProbe: TestProbe, protected val serializer: JsonSerializer) extends OutgoingMessages { + import EventingTestKitImpl.metadataToSpi val DefaultTimeout: time.Duration = time.Duration.ofSeconds(3) @@ -353,7 +374,7 @@ private[testkit] class OutgoingMessagesImpl( override def expectOneTyped[T](clazz: Class[T], timeout: time.Duration): TestKitMessage[T] = { val msg = expectMsgInternal(destinationProbe, timeout, Some(clazz)) - val metadata = MetadataImpl.of(msg.getMessage.getMetadata.entries) + val metadata = MetadataImpl.of(metadataToSpi(msg.getMessage.getMetadata)) // FIXME don't use proto val scalaPb = ScalaPbAny(typeUrlFor(metadata), msg.getMessage.payload) @@ -371,7 +392,7 @@ private[testkit] class OutgoingMessagesImpl( } private def anyFromMessage(m: kalix.testkit.protocol.eventing_test_backend.Message): TestKitMessage[_] = { - val metadata = MetadataImpl.of(m.metadata.getOrElse(Metadata.defaultInstance).entries) + val metadata = MetadataImpl.of(metadataToSpi(m.metadata)) val anyMsg = if (AnySupport.isJsonTypeUrl(typeUrlFor(metadata))) { m.payload.toStringUtf8 // FIXME isn't this strange? } else { @@ -427,8 +448,10 @@ private[testkit] case class TestKitMessageImpl[P](payload: P, metadata: SdkMetad } private[testkit] object TestKitMessageImpl { + import EventingTestKitImpl.metadataToSpi + def ofProtocolMessage(m: kalix.testkit.protocol.eventing_test_backend.Message): TestKitMessage[ByteString] = { - val metadata = MetadataImpl.of(m.metadata.getOrElse(Metadata()).entries) + val metadata = MetadataImpl.of(metadataToSpi(m.metadata)) TestKitMessageImpl[ByteString](m.payload, metadata).asInstanceOf[TestKitMessage[ByteString]] } diff --git a/akka-javasdk/src/main/java/akka/javasdk/Metadata.java b/akka-javasdk/src/main/java/akka/javasdk/Metadata.java index 6ff6ba981..48a7fbf91 100644 --- a/akka-javasdk/src/main/java/akka/javasdk/Metadata.java +++ b/akka-javasdk/src/main/java/akka/javasdk/Metadata.java @@ -124,7 +124,9 @@ public interface Metadata extends Iterable { * @param key The key to set. * @param value The value to set. * @return A copy of this Metadata object with the entry set. + * @deprecated binary not supported, use {@link #set(String, String)} */ + @Deprecated Metadata setBinary(String key, ByteBuffer value); /** @@ -154,7 +156,9 @@ public interface Metadata extends Iterable { * @param key The key to add. * @param value The value to add. * @return A copy of this Metadata object with the entry added. + * @deprecated binary not supported, use {@link #add(String, String)} */ + @Deprecated Metadata addBinary(String key, ByteBuffer value); /** @@ -243,7 +247,9 @@ interface MetadataEntry { * The binary value for the metadata entry. * * @return The binary value, or null if this entry is not a string Metadata entry. + * @deprecated binary not supported, use {@link #getValue()} */ + @Deprecated ByteBuffer getBinaryValue(); /** @@ -257,7 +263,9 @@ interface MetadataEntry { * Whether this entry is a binary entry. * * @return True if this entry is a binary entry. + * @deprecated binary not supported, use {@link #getValue()} */ + @Deprecated boolean isBinary(); } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/MetadataImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/MetadataImpl.scala index efd229f4b..b87dd14b1 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/MetadataImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/MetadataImpl.scala @@ -22,19 +22,16 @@ import akka.javasdk.impl.telemetry.Telemetry import akka.javasdk.impl.telemetry.Telemetry.metadataGetter import akka.runtime.sdk.spi.SpiMetadata import akka.runtime.sdk.spi.SpiMetadataEntry -import com.google.protobuf.ByteString import io.opentelemetry.api.trace.Span import io.opentelemetry.api.trace.SpanContext import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator import io.opentelemetry.context.{ Context => OtelContext } -import kalix.protocol.component.MetadataEntry -import kalix.protocol.component.MetadataEntry.Value /** * INTERNAL API */ @InternalApi -private[javasdk] class MetadataImpl private (val entries: Seq[MetadataEntry]) extends Metadata with CloudEvent { +private[javasdk] class MetadataImpl private (val entries: Seq[SpiMetadataEntry]) extends Metadata with CloudEvent { override def has(key: String): Boolean = entries.exists(_.key.equalsIgnoreCase(key)) @@ -43,8 +40,7 @@ private[javasdk] class MetadataImpl private (val entries: Seq[MetadataEntry]) ex private[akka] def getScala(key: String): Option[String] = entries.collectFirst { - case MetadataEntry(k, MetadataEntry.Value.StringValue(value), _) if key.equalsIgnoreCase(k) => - value + case entry if key.equalsIgnoreCase(entry.key) => entry.value } def withTracing(spanContext: SpanContext): Metadata = { @@ -53,7 +49,7 @@ private[javasdk] class MetadataImpl private (val entries: Seq[MetadataEntry]) ex def withTracing(span: Span): MetadataImpl = { // remove parent trace parent and trace state from the metadata so they can be re-injected with current span context - val builder = Vector.newBuilder[MetadataEntry] + val builder = Vector.newBuilder[SpiMetadataEntry] builder.addAll( entries.iterator.filter(m => m.key != Telemetry.TRACE_PARENT_KEY && m.key != Telemetry.TRACE_STATE_KEY)) W3CTraceContextPropagator @@ -67,27 +63,14 @@ private[javasdk] class MetadataImpl private (val entries: Seq[MetadataEntry]) ex private[akka] def getAllScala(key: String): Seq[String] = entries.collect { - case MetadataEntry(k, MetadataEntry.Value.StringValue(value), _) if key.equalsIgnoreCase(k) => - value + case entry if key.equalsIgnoreCase(entry.key) => entry.value } override def getBinary(key: String): Optional[ByteBuffer] = - getBinaryScala(key).toJava - - private[akka] def getBinaryScala(key: String): Option[ByteBuffer] = - entries.collectFirst { - case MetadataEntry(k, MetadataEntry.Value.BytesValue(value), _) if key.equalsIgnoreCase(k) => - value.asReadOnlyByteBuffer() - } + Optional.empty[ByteBuffer] // binary not supported override def getBinaryAll(key: String): util.List[ByteBuffer] = - getBinaryAllScala(key).asJava - - private[akka] def getBinaryAllScala(key: String): Seq[ByteBuffer] = - entries.collect { - case MetadataEntry(k, MetadataEntry.Value.BytesValue(value), _) if key.equalsIgnoreCase(k) => - value.asReadOnlyByteBuffer() - } + util.Collections.emptyList() override def getAllKeys: util.List[String] = getAllKeysScala.asJava private[akka] def getAllKeysScala: Seq[String] = entries.map(_.key) @@ -95,43 +78,44 @@ private[javasdk] class MetadataImpl private (val entries: Seq[MetadataEntry]) ex override def set(key: String, value: String): MetadataImpl = { Objects.requireNonNull(key, "Key must not be null") Objects.requireNonNull(value, "Value must not be null") - MetadataImpl.of(removeKey(key) :+ MetadataEntry(key, MetadataEntry.Value.StringValue(value))) + MetadataImpl.of(removeKey(key) :+ new SpiMetadataEntry(key, value)) } override def setBinary(key: String, value: ByteBuffer): MetadataImpl = { Objects.requireNonNull(key, "Key must not be null") Objects.requireNonNull(value, "Value must not be null") - MetadataImpl.of(removeKey(key) :+ MetadataEntry(key, MetadataEntry.Value.BytesValue(ByteString.copyFrom(value)))) + // binary not supported + this } override def add(key: String, value: String): MetadataImpl = { Objects.requireNonNull(key, "Key must not be null") Objects.requireNonNull(value, "Value must not be null") - MetadataImpl.of(entries :+ MetadataEntry(key, MetadataEntry.Value.StringValue(value))) + MetadataImpl.of(entries :+ new SpiMetadataEntry(key, value)) } override def addBinary(key: String, value: ByteBuffer): MetadataImpl = { Objects.requireNonNull(key, "Key must not be null") Objects.requireNonNull(value, "Value must not be null") - MetadataImpl.of(entries :+ MetadataEntry(key, MetadataEntry.Value.BytesValue(ByteString.copyFrom(value)))) + // binary not supported + this } override def remove(key: String): MetadataImpl = MetadataImpl.of(removeKey(key)) override def clear(): MetadataImpl = MetadataImpl.Empty - private[akka] def iteratorScala[R](f: MetadataEntry => R): Iterator[R] = - entries.iterator.map(f) - - override def iterator(): util.Iterator[Metadata.MetadataEntry] = - iteratorScala(entry => + override def iterator(): util.Iterator[Metadata.MetadataEntry] = { + entries.iterator.map { entry => new Metadata.MetadataEntry { override def getKey: String = entry.key - override def getValue: String = entry.value.stringValue.orNull - override def getBinaryValue: ByteBuffer = entry.value.bytesValue.map(_.asReadOnlyByteBuffer()).orNull - override def isText: Boolean = entry.value.isStringValue - override def isBinary: Boolean = entry.value.isBytesValue - }).asJava + override def getValue: String = entry.value + override def getBinaryValue: ByteBuffer = null + override def isText: Boolean = true + override def isBinary: Boolean = false + } + }.asJava + } private def removeKey(key: String) = entries.filterNot(_.key.equalsIgnoreCase(key)) @@ -146,16 +130,15 @@ private[javasdk] class MetadataImpl private (val entries: Seq[MetadataEntry]) ex MetadataImpl.of( entries.filterNot(e => MetadataImpl.CeRequired(e.key)) ++ Seq( - MetadataEntry(MetadataImpl.CeSpecversion, MetadataEntry.Value.StringValue(MetadataImpl.CeSpecversionValue)), - MetadataEntry(MetadataImpl.CeId, MetadataEntry.Value.StringValue(id)), - MetadataEntry(MetadataImpl.CeSource, MetadataEntry.Value.StringValue(source.toString)), - MetadataEntry(MetadataImpl.CeType, MetadataEntry.Value.StringValue(`type`)))) + new SpiMetadataEntry(MetadataImpl.CeSpecversion, MetadataImpl.CeSpecversionValue), + new SpiMetadataEntry(MetadataImpl.CeId, id), + new SpiMetadataEntry(MetadataImpl.CeSource, source.toString), + new SpiMetadataEntry(MetadataImpl.CeType, `type`))) private def getRequiredCloudEventField(key: String) = entries .collectFirst { - case MetadataEntry(k, MetadataEntry.Value.StringValue(value), _) if key.equalsIgnoreCase(k) => - value + case entry if key.equalsIgnoreCase(entry.key) => entry.value } .getOrElse { throw new IllegalStateException(s"Metadata is not a CloudEvent because it does not have required field $key") @@ -258,27 +241,21 @@ object MetadataImpl { def toSpi(metadata: Metadata): SpiMetadata = { metadata match { case impl: MetadataImpl if impl.entries.nonEmpty => - val entries = impl.entries.map(entry => - entry.value match { - case Value.Empty => new SpiMetadataEntry(entry.key, "") - case Value.StringValue(value) => new SpiMetadataEntry(entry.key, value) - case Value.BytesValue(value) => - new SpiMetadataEntry(entry.key, value.toStringUtf8) //FIXME support bytes values or not - }) - new SpiMetadata(entries) - case _: MetadataImpl => SpiMetadata.empty + new SpiMetadata(impl.entries) + case _: MetadataImpl => + SpiMetadata.empty case other => throw new RuntimeException(s"Unknown metadata implementation: ${other.getClass}, cannot send") } } - def of(entries: Seq[MetadataEntry]): MetadataImpl = { + def of(entries: Seq[SpiMetadataEntry]): MetadataImpl = { val transformedEntries = entries.map { entry => // is incoming ce key in one of the alternative formats? // if so, convert key to our internal default key format alternativeKeyFormats.get(entry.key) match { - case Some(defaultKey) => MetadataEntry(defaultKey, entry.value) + case Some(defaultKey) => new SpiMetadataEntry(defaultKey, entry.value) case _ => entry } } @@ -287,8 +264,7 @@ object MetadataImpl { } def of(metadata: SpiMetadata): MetadataImpl = { - val entries = metadata.entries.map(e => MetadataEntry(e.key, MetadataEntry.Value.StringValue(e.value))) - new MetadataImpl(entries) + of(metadata.entries) } } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala index bc7650ded..d6d8a95f6 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala @@ -168,6 +168,22 @@ class SdkRunner private (dependencyProvider: Option[DependencyProvider]) extends } +/** + * INTERNAL API + */ +@InternalApi +private object ComponentType { + // Those are also defined in ComponentAnnotationProcessor, and must be the same + + val EventSourcedEntity = "event-sourced-entity" + val KeyValueEntity = "key-value-entity" + val Workflow = "workflow" + val HttpEndpoint = "http-endpoint" + val Consumer = "consumer" + val TimedAction = "timed-action" + val View = "view" +} + /** * INTERNAL API */ @@ -186,13 +202,13 @@ private object ComponentLocator { def locateUserComponents(system: ActorSystem[_]): LocatedClasses = { val kalixComponentTypeAndBaseClasses: Map[String, Class[_]] = Map( - "http-endpoint" -> classOf[AnyRef], - "timed-action" -> classOf[TimedAction], - "consumer" -> classOf[Consumer], - "event-sourced-entity" -> classOf[EventSourcedEntity[_, _]], - "workflow" -> classOf[Workflow[_]], - "key-value-entity" -> classOf[KeyValueEntity[_]], - "view" -> classOf[AnyRef]) + ComponentType.HttpEndpoint -> classOf[AnyRef], + ComponentType.TimedAction -> classOf[TimedAction], + ComponentType.Consumer -> classOf[Consumer], + ComponentType.EventSourcedEntity -> classOf[EventSourcedEntity[_, _]], + ComponentType.Workflow -> classOf[Workflow[_]], + ComponentType.KeyValueEntity -> classOf[KeyValueEntity[_]], + ComponentType.View -> classOf[AnyRef]) // Alternative to but inspired by the stdlib SPI style of registering in META-INF/services // since we don't always have top supertypes and want to inject things into component constructors diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerImpl.scala index c860478e9..620a01d44 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumerImpl.scala @@ -34,10 +34,10 @@ import akka.runtime.sdk.spi.SpiConsumer import akka.runtime.sdk.spi.SpiConsumer.Effect import akka.runtime.sdk.spi.SpiConsumer.Message import akka.runtime.sdk.spi.SpiMetadata +import akka.runtime.sdk.spi.SpiMetadataEntry import akka.runtime.sdk.spi.TimerClient import io.opentelemetry.api.trace.Span import io.opentelemetry.api.trace.Tracer -import kalix.protocol.component.MetadataEntry import org.slf4j.Logger import org.slf4j.LoggerFactory import org.slf4j.MDC @@ -172,10 +172,7 @@ private[impl] final class MessageContextImpl( override def componentCallMetadata: MetadataImpl = { if (metadata.has(Telemetry.TRACE_PARENT_KEY)) { MetadataImpl.of( - List( - MetadataEntry( - Telemetry.TRACE_PARENT_KEY, - MetadataEntry.Value.StringValue(metadata.get(Telemetry.TRACE_PARENT_KEY).get())))) + List(new SpiMetadataEntry(Telemetry.TRACE_PARENT_KEY, metadata.get(Telemetry.TRACE_PARENT_KEY).get()))) } else { MetadataImpl.Empty } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala index 529aa436c..97b01e640 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntityImpl.scala @@ -20,6 +20,7 @@ import akka.javasdk.impl.AbstractContext import akka.javasdk.impl.ActivatableContext import akka.javasdk.impl.AnySupport import akka.javasdk.impl.ComponentDescriptor +import akka.javasdk.impl.ComponentType import akka.javasdk.impl.EntityExceptions import akka.javasdk.impl.EntityExceptions.EntityException import akka.javasdk.impl.ErrorHandling.BadRequestException @@ -31,8 +32,10 @@ import akka.javasdk.impl.effect.NoSecondaryEffectImpl import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityEffectImpl.EmitEvents import akka.javasdk.impl.eventsourcedentity.EventSourcedEntityEffectImpl.NoPrimaryEffect import akka.javasdk.impl.serialization.JsonSerializer +import akka.javasdk.impl.telemetry.EventSourcedEntityCategory import akka.javasdk.impl.telemetry.SpanTracingImpl import akka.javasdk.impl.telemetry.Telemetry +import akka.javasdk.impl.telemetry.TraceInstrumentation import akka.runtime.sdk.spi.BytesPayload import akka.runtime.sdk.spi.SpiEntity import akka.runtime.sdk.spi.SpiEventSourcedEntity @@ -90,8 +93,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[ extends SpiEventSourcedEntity { import EventSourcedEntityImpl._ - // FIXME -// private val traceInstrumentation = new TraceInstrumentation(componentId, EventSourcedEntityCategory, tracerFactory) + private val traceInstrumentation = new TraceInstrumentation(componentId, EventSourcedEntityCategory, tracerFactory) private val router: ReflectiveEventSourcedEntityRouter[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]] = { val context = new EventSourcedEntityContextImpl(entityId) @@ -109,7 +111,8 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[ state: SpiEventSourcedEntity.State, command: SpiEntity.Command): Future[SpiEventSourcedEntity.Effect] = { - val span: Option[Span] = None // FIXME traceInstrumentation.buildSpan(service, command) + val span: Option[Span] = + traceInstrumentation.buildSpan(ComponentType.EventSourcedEntity, componentId, entityId, command) span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId)) val cmdPayload = command.payload.getOrElse( // smuggling 0 arity method called from component client through here diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/keyvalueentity/KeyValueEntityImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/keyvalueentity/KeyValueEntityImpl.scala index c09b624d7..ec42382d5 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/keyvalueentity/KeyValueEntityImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/keyvalueentity/KeyValueEntityImpl.scala @@ -16,6 +16,7 @@ import akka.javasdk.impl.AbstractContext import akka.javasdk.impl.ActivatableContext import akka.javasdk.impl.AnySupport import akka.javasdk.impl.ComponentDescriptor +import akka.javasdk.impl.ComponentType import akka.javasdk.impl.EntityExceptions import akka.javasdk.impl.EntityExceptions.EntityException import akka.javasdk.impl.ErrorHandling.BadRequestException @@ -25,8 +26,10 @@ import akka.javasdk.impl.effect.ErrorReplyImpl import akka.javasdk.impl.effect.MessageReplyImpl import akka.javasdk.impl.effect.NoSecondaryEffectImpl import akka.javasdk.impl.serialization.JsonSerializer +import akka.javasdk.impl.telemetry.KeyValueEntityCategory import akka.javasdk.impl.telemetry.SpanTracingImpl import akka.javasdk.impl.telemetry.Telemetry +import akka.javasdk.impl.telemetry.TraceInstrumentation import akka.javasdk.keyvalueentity.CommandContext import akka.javasdk.keyvalueentity.KeyValueEntity import akka.javasdk.keyvalueentity.KeyValueEntityContext @@ -84,8 +87,7 @@ private[impl] final class KeyValueEntityImpl[S, KV <: KeyValueEntity[S]]( import KeyValueEntityEffectImpl._ import KeyValueEntityImpl._ - // FIXME -// private val traceInstrumentation = new TraceInstrumentation(componentId, EventSourcedEntityCategory, tracerFactory) + private val traceInstrumentation = new TraceInstrumentation(componentId, KeyValueEntityCategory, tracerFactory) private val router: ReflectiveKeyValueEntityRouter[AnyRef, KeyValueEntity[AnyRef]] = { val context = new KeyValueEntityContextImpl(entityId) @@ -103,7 +105,8 @@ private[impl] final class KeyValueEntityImpl[S, KV <: KeyValueEntity[S]]( state: SpiEventSourcedEntity.State, command: SpiEntity.Command): Future[SpiEventSourcedEntity.Effect] = { - val span: Option[Span] = None // FIXME traceInstrumentation.buildSpan(service, command) + val span: Option[Span] = + traceInstrumentation.buildSpan(ComponentType.KeyValueEntity, componentId, entityId, command) span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId)) val cmdPayload = command.payload.getOrElse( // smuggling 0 arity method called from component client through here diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/telemetry/Telemetry.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/telemetry/Telemetry.scala index a8b8669d8..628b68473 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/telemetry/Telemetry.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/telemetry/Telemetry.scala @@ -6,8 +6,6 @@ package akka.javasdk.impl.telemetry import akka.annotation.InternalApi import akka.javasdk.Metadata -import akka.javasdk.impl.MetadataImpl -import akka.javasdk.impl.Service import io.opentelemetry.api.OpenTelemetry import io.opentelemetry.api.trace.Span import io.opentelemetry.api.trace.SpanKind @@ -17,19 +15,18 @@ import io.opentelemetry.context.propagation.ContextPropagators import io.opentelemetry.context.propagation.TextMapGetter import io.opentelemetry.context.propagation.TextMapSetter import io.opentelemetry.context.{ Context => OtelContext } -import kalix.protocol.action.ActionCommand -import kalix.protocol.component.MetadataEntry -import kalix.protocol.component.MetadataEntry.Value.StringValue -import kalix.protocol.component.{ Metadata => ProtocolMetadata } -import kalix.protocol.entity.Command import org.slf4j.Logger import org.slf4j.LoggerFactory - import java.lang import java.util.Collections + import scala.collection.mutable import scala.jdk.OptionConverters._ +import akka.runtime.sdk.spi.SpiEntity +import akka.runtime.sdk.spi.SpiMetadata +import akka.runtime.sdk.spi.SpiMetadataEntry + /** * INTERNAL API */ @@ -92,8 +89,8 @@ private[akka] object Telemetry { carrier.getAllKeys } - lazy val builderSetter: TextMapSetter[mutable.Builder[MetadataEntry, _]] = (carrier, key, value) => { - carrier.addOne(new MetadataEntry(key, StringValue(value))) + lazy val builderSetter: TextMapSetter[mutable.Builder[SpiMetadataEntry, _]] = (carrier, key, value) => { + carrier.addOne(new SpiMetadataEntry(key, value)) } } @@ -103,14 +100,15 @@ private[akka] object Telemetry { */ @InternalApi private[akka] object TraceInstrumentation { - // Trick to extract trace parent from a single protocol metadata entry and using the W3C decoding from OTEL - private val metadataEntryTraceParentGetter = new TextMapGetter[MetadataEntry]() { + // Trick to extract trace parent from a single metadata entry and using the W3C decoding from OTEL + private val metadataEntryTraceParentGetter = new TextMapGetter[SpiMetadataEntry]() { - override def get(carrier: MetadataEntry, key: String): String = - if (key == Telemetry.TRACE_PARENT_KEY) carrier.getStringValue + override def get(carrier: SpiMetadataEntry, key: String): String = + if (key == Telemetry.TRACE_PARENT_KEY) carrier.value else null - override def keys(carrier: MetadataEntry): lang.Iterable[String] = Collections.singleton(Telemetry.TRACE_PARENT_KEY) + override def keys(carrier: SpiMetadataEntry): lang.Iterable[String] = + Collections.singleton(Telemetry.TRACE_PARENT_KEY) } val InstrumentationScopeName: String = "akka-javasdk" @@ -141,27 +139,22 @@ private[akka] final class TraceInstrumentation( /** * Creates a span if it finds a trace parent in the command's metadata */ - def buildSpan(service: Service, command: Command): Option[Span] = - if (enabled) internalBuildSpan(service, command.name, command.metadata, Some(command.entityId)) + def buildSpan( + componentType: String, + componentId: String, + entityId: String, + command: SpiEntity.Command): Option[Span] = + if (enabled) internalBuildSpan(componentType, componentId, command.name, command.metadata, Some(entityId)) else None - /** - * Creates a span if it finds a trace parent in the command's metadata - */ - def buildSpan(service: Service, command: ActionCommand): Option[Span] = - if (enabled) { - val subject = - command.metadata.flatMap(_.entries.find(_.key == MetadataImpl.CeSubject).flatMap(_.value.stringValue)) - internalBuildSpan(service, command.name, command.metadata, subject) - } else None - private def internalBuildSpan( - service: Service, + componentType: String, + componentId: String, commandName: String, - commandMetadata: Option[ProtocolMetadata], + commandMetadata: SpiMetadata, subjectId: Option[String]): Option[Span] = { // only if there is a trace parent in the metadata - val traceParent = commandMetadata.flatMap(_.entries.find(_.key == TRACE_PARENT_KEY)) + val traceParent = commandMetadata.entries.find(_.key == TRACE_PARENT_KEY) traceParent.map { traceParentMetadataEntry => val parentContext = propagator.getTextMapPropagator .extract(OtelContext.current(), traceParentMetadataEntry, metadataEntryTraceParentGetter) @@ -172,8 +165,8 @@ private[akka] final class TraceInstrumentation( .spanBuilder(spanName) .setParent(parentContext) .setSpanKind(SpanKind.SERVER) - .setAttribute("component.type", service.componentType) - .setAttribute("component.type_id", service.componentId) + .setAttribute("component.type", componentType) + .setAttribute("component.type_id", componentId) subjectId.foreach(id => spanBuilder = spanBuilder.setAttribute("component.id", id)) spanBuilder.startSpan() } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/timedaction/TimedActionImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/timedaction/TimedActionImpl.scala index 324cecf02..40bb92e19 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/timedaction/TimedActionImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/timedaction/TimedActionImpl.scala @@ -28,13 +28,13 @@ import akka.javasdk.timedaction.CommandEnvelope import akka.javasdk.timedaction.TimedAction import akka.javasdk.timer.TimerScheduler import akka.runtime.sdk.spi.BytesPayload +import akka.runtime.sdk.spi.SpiMetadataEntry import akka.runtime.sdk.spi.SpiTimedAction import akka.runtime.sdk.spi.SpiTimedAction.Command import akka.runtime.sdk.spi.SpiTimedAction.Effect import akka.runtime.sdk.spi.TimerClient import io.opentelemetry.api.trace.Span import io.opentelemetry.api.trace.Tracer -import kalix.protocol.component.MetadataEntry import org.slf4j.Logger import org.slf4j.LoggerFactory import org.slf4j.MDC @@ -57,10 +57,7 @@ object TimedActionImpl { override def componentCallMetadata: MetadataImpl = { if (metadata.has(Telemetry.TRACE_PARENT_KEY)) { MetadataImpl.of( - List( - MetadataEntry( - Telemetry.TRACE_PARENT_KEY, - MetadataEntry.Value.StringValue(metadata.get(Telemetry.TRACE_PARENT_KEY).get())))) + List(new SpiMetadataEntry(Telemetry.TRACE_PARENT_KEY, metadata.get(Telemetry.TRACE_PARENT_KEY).get()))) } else { MetadataImpl.Empty }