diff --git a/modules/core/shared/src/main/scala/EntryPoint.scala b/modules/core/shared/src/main/scala/EntryPoint.scala index 0ea98648..d77f8c4d 100644 --- a/modules/core/shared/src/main/scala/EntryPoint.scala +++ b/modules/core/shared/src/main/scala/EntryPoint.scala @@ -14,20 +14,32 @@ import cats.effect.Resource trait EntryPoint[F[_]] { /** Resource that creates a new root span in a new trace. */ - def root(name: String): Resource[F, Span[F]] + final def root(name: String): Resource[F, Span[F]] = root(name, Span.Options.Defaults) + + def root(name: String, options: Span.Options): Resource[F, Span[F]] /** Resource that creates a new span as the child of the span specified by the given kernel, * which typically arrives via request headers. By this mechanism we can continue a trace that * began in another system. If the required headers are not present in `kernel` an exception will * be raised in `F`. */ - def continue(name: String, kernel: Kernel): Resource[F, Span[F]] + final def continue(name: String, kernel: Kernel): Resource[F, Span[F]] = + continue(name, kernel, Span.Options.Defaults) + + def continue(name: String, kernel: Kernel, options: Span.Options): Resource[F, Span[F]] /** Resource that attempts to creates a new span as with `continue`, but falls back to a new root * span as with `root` if the kernel does not contain the required headers. In other words, we * continue the existing span if we can, otherwise we start a new one. */ - def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]] + final def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]] = + continueOrElseRoot(name, kernel, Span.Options.Defaults) + + def continueOrElseRoot( + name: String, + kernel: Kernel, + options: Span.Options + ): Resource[F, Span[F]] /** Converts this `EntryPoint[F]` to an `EntryPoint[G]` using an `F ~> G`. */ @@ -42,17 +54,21 @@ trait EntryPoint[F[_]] { new EntryPoint[G] { - override def root(name: String): Resource[G, Span[G]] = aux(outer.root(name)) + override def root(name: String, options: Span.Options): Resource[G, Span[G]] = aux( + outer.root(name, options) + ) override def continue( name: String, - kernel: Kernel - ): Resource[G, Span[G]] = aux(outer.continue(name, kernel)) + kernel: Kernel, + options: Span.Options + ): Resource[G, Span[G]] = aux(outer.continue(name, kernel, options)) override def continueOrElseRoot( name: String, - kernel: Kernel - ): Resource[G, Span[G]] = aux(outer.continueOrElseRoot(name, kernel)) + kernel: Kernel, + options: Span.Options + ): Resource[G, Span[G]] = aux(outer.continueOrElseRoot(name, kernel, options)) } } } diff --git a/modules/core/shared/src/main/scala/Span.scala b/modules/core/shared/src/main/scala/Span.scala index cef0640c..ad20c1eb 100644 --- a/modules/core/shared/src/main/scala/Span.scala +++ b/modules/core/shared/src/main/scala/Span.scala @@ -4,12 +4,13 @@ package natchez -import cats.data.Kleisli +import cats.data.{Chain, Kleisli} import cats.effect.MonadCancel import cats.effect.Resource import cats.effect.Resource.ExitCase import cats.syntax.applicative._ -import cats.{~>, Applicative} +import cats.{Applicative, ~>} + import java.net.URI /** An span that can be passed around and used to create child spans. */ @@ -89,10 +90,10 @@ trait Span[F[_]] { object Span { abstract class Default[F[_]: Applicative] extends Span[F] { - protected val spanCreationPolicy: Options.SpanCreationPolicy + protected val spanCreationPolicyOverride: Options.SpanCreationPolicy - def span(name: String, options: Options): Resource[F, Span[F]] = - spanCreationPolicy match { + override final def span(name: String, options: Options): Resource[F, Span[F]] = + spanCreationPolicyOverride match { case Options.SpanCreationPolicy.Suppress => Resource.pure(Span.noop[F]) case Options.SpanCreationPolicy.Coalesce => Resource.pure(this) case Options.SpanCreationPolicy.Default => makeSpan(name, options) @@ -171,10 +172,16 @@ object Span { /** Specifies how additional span creation requests are handled on the new span. */ def spanCreationPolicy: Options.SpanCreationPolicy + def spanKind: Span.SpanKind + def links: Chain[Kernel] def withParentKernel(kernel: Kernel): Options def withoutParentKernel: Options def withSpanCreationPolicy(p: Options.SpanCreationPolicy): Options + + def withSpanKind(spanKind: SpanKind): Options + + def withLink(kernel: Kernel): Options } object Options { @@ -193,17 +200,38 @@ object Span { private case class OptionsImpl( parentKernel: Option[Kernel], - spanCreationPolicy: SpanCreationPolicy + spanCreationPolicy: SpanCreationPolicy, + spanKind: SpanKind, + links: Chain[Kernel] ) extends Options { - def withParentKernel(kernel: Kernel): Options = OptionsImpl(Some(kernel), spanCreationPolicy) - def withoutParentKernel: Options = OptionsImpl(None, spanCreationPolicy) - def withSpanCreationPolicy(p: SpanCreationPolicy): Options = OptionsImpl(parentKernel, p) + override def withParentKernel(kernel: Kernel): Options = + OptionsImpl(Some(kernel), spanCreationPolicy, spanKind, links) + override def withoutParentKernel: Options = + OptionsImpl(None, spanCreationPolicy, spanKind, links) + override def withSpanCreationPolicy(p: SpanCreationPolicy): Options = + OptionsImpl(parentKernel, p, spanKind, links) + override def withSpanKind(spanKind: SpanKind): Options = + OptionsImpl(parentKernel, spanCreationPolicy, spanKind, links) + override def withLink(kernel: Kernel): Options = + OptionsImpl(parentKernel, spanCreationPolicy, spanKind, links.append(kernel)) } - val Defaults: Options = OptionsImpl(None, SpanCreationPolicy.Default) + val Defaults: Options = + OptionsImpl(None, SpanCreationPolicy.Default, SpanKind.Internal, Chain.empty) val Suppress: Options = Defaults.withSpanCreationPolicy(SpanCreationPolicy.Suppress) val Coalesce: Options = Defaults.withSpanCreationPolicy(SpanCreationPolicy.Coalesce) def parentKernel(kernel: Kernel): Options = Defaults.withParentKernel(kernel) } + + sealed trait SpanKind + + object SpanKind { + case object Internal extends SpanKind + case object Client extends SpanKind + case object Server extends SpanKind + case object Producer extends SpanKind + case object Consumer extends SpanKind + } + } diff --git a/modules/core/shared/src/test/scala/InMemory.scala b/modules/core/shared/src/test/scala/InMemory.scala index a2f600a1..5f9b318c 100644 --- a/modules/core/shared/src/test/scala/InMemory.scala +++ b/modules/core/shared/src/test/scala/InMemory.scala @@ -18,8 +18,10 @@ object InMemory { lineage: Lineage, k: Kernel, ref: Ref[IO, Chain[(Lineage, NatchezCommand)]], - val spanCreationPolicy: Options.SpanCreationPolicy + val options: Options ) extends natchez.Span.Default[IO] { + override protected val spanCreationPolicyOverride: Options.SpanCreationPolicy = + options.spanCreationPolicy def put(fields: (String, natchez.TraceValue)*): IO[Unit] = ref.update(_.append(lineage -> NatchezCommand.Put(fields.toList))) @@ -38,8 +40,8 @@ object InMemory { def makeSpan(name: String, options: Options): Resource[IO, natchez.Span[IO]] = { val acquire = ref - .update(_.append(lineage -> NatchezCommand.CreateSpan(name, options.parentKernel))) - .as(new Span(lineage / name, k, ref, options.spanCreationPolicy)) + .update(_.append(lineage -> NatchezCommand.CreateSpan(name, options.parentKernel, options))) + .as(new Span(lineage / name, k, ref, options)) val release = ref.update(_.append(lineage -> NatchezCommand.ReleaseSpan(name))) @@ -59,19 +61,31 @@ object InMemory { class EntryPoint(val ref: Ref[IO, Chain[(Lineage, NatchezCommand)]]) extends natchez.EntryPoint[IO] { - def root(name: String): Resource[IO, Span] = - newSpan(name, Kernel(Map.empty)) - - def continue(name: String, kernel: Kernel): Resource[IO, Span] = - newSpan(name, kernel) - - def continueOrElseRoot(name: String, kernel: Kernel): Resource[IO, Span] = - newSpan(name, kernel) - - private def newSpan(name: String, kernel: Kernel): Resource[IO, Span] = { + override def root(name: String, options: natchez.Span.Options): Resource[IO, Span] = + newSpan(name, Kernel(Map.empty), options) + + override def continue( + name: String, + kernel: Kernel, + options: natchez.Span.Options + ): Resource[IO, Span] = + newSpan(name, kernel, options) + + override def continueOrElseRoot( + name: String, + kernel: Kernel, + options: natchez.Span.Options + ): Resource[IO, Span] = + newSpan(name, kernel, options) + + private def newSpan( + name: String, + kernel: Kernel, + options: natchez.Span.Options + ): Resource[IO, Span] = { val acquire = ref - .update(_.append(Lineage.Root -> NatchezCommand.CreateRootSpan(name, kernel))) - .as(new Span(Lineage.Root, kernel, ref, Options.SpanCreationPolicy.Default)) + .update(_.append(Lineage.Root -> NatchezCommand.CreateRootSpan(name, kernel, options))) + .as(new Span(Lineage.Root, kernel, ref, options)) val release = ref.update(_.append(Lineage.Root -> NatchezCommand.ReleaseRootSpan(name))) @@ -99,13 +113,15 @@ object InMemory { case object AskTraceId extends NatchezCommand case object AskTraceUri extends NatchezCommand case class Put(fields: List[(String, natchez.TraceValue)]) extends NatchezCommand - case class CreateSpan(name: String, kernel: Option[Kernel]) extends NatchezCommand + case class CreateSpan(name: String, kernel: Option[Kernel], options: natchez.Span.Options) + extends NatchezCommand case class ReleaseSpan(name: String) extends NatchezCommand case class AttachError(err: Throwable) extends NatchezCommand case class LogEvent(event: String) extends NatchezCommand case class LogFields(fields: List[(String, TraceValue)]) extends NatchezCommand // entry point - case class CreateRootSpan(name: String, kernel: Kernel) extends NatchezCommand + case class CreateRootSpan(name: String, kernel: Kernel, options: natchez.Span.Options) + extends NatchezCommand case class ReleaseRootSpan(name: String) extends NatchezCommand } diff --git a/modules/core/shared/src/test/scala/SpanCoalesceTest.scala b/modules/core/shared/src/test/scala/SpanCoalesceTest.scala index ac44a0f0..b6fbb889 100644 --- a/modules/core/shared/src/test/scala/SpanCoalesceTest.scala +++ b/modules/core/shared/src/test/scala/SpanCoalesceTest.scala @@ -18,8 +18,8 @@ class SpanCoalesceTest extends InMemorySuite { } def expectedHistory = List( - (Lineage.Root, NatchezCommand.CreateRootSpan("root", Kernel(Map()))), - (Lineage.Root, NatchezCommand.CreateSpan("suppressed", None)), + (Lineage.Root, NatchezCommand.CreateRootSpan("root", Kernel(Map()), Span.Options.Defaults)), + (Lineage.Root, NatchezCommand.CreateSpan("suppressed", None, Span.Options.Suppress)), (Lineage.Root, NatchezCommand.ReleaseSpan("suppressed")), (Lineage.Root, NatchezCommand.ReleaseRootSpan("root")) ) @@ -36,8 +36,8 @@ class SpanCoalesceTest extends InMemorySuite { } def expectedHistory = List( - (Lineage.Root, NatchezCommand.CreateRootSpan("root", Kernel(Map()))), - (Lineage.Root, NatchezCommand.CreateSpan("coalesced", None)), + (Lineage.Root, NatchezCommand.CreateRootSpan("root", Kernel(Map()), Span.Options.Defaults)), + (Lineage.Root, NatchezCommand.CreateSpan("coalesced", None, Span.Options.Coalesce)), (Lineage.Root / "coalesced", NatchezCommand.Put(List("answer" -> 42))), (Lineage.Root, NatchezCommand.ReleaseSpan("coalesced")), (Lineage.Root, NatchezCommand.ReleaseRootSpan("root")) diff --git a/modules/core/shared/src/test/scala/SpanPropagationTest.scala b/modules/core/shared/src/test/scala/SpanPropagationTest.scala index 9d698121..bef20cef 100644 --- a/modules/core/shared/src/test/scala/SpanPropagationTest.scala +++ b/modules/core/shared/src/test/scala/SpanPropagationTest.scala @@ -16,9 +16,9 @@ class SpanPropagationTest extends InMemorySuite { Trace[F].span("parent")(Trace[F].span("child")(Trace[F].put("answer" -> 42))) def expectedHistory = List( - (Lineage.Root, NatchezCommand.CreateRootSpan("root", Kernel(Map()))), - (Lineage.Root, NatchezCommand.CreateSpan("parent", None)), - (Lineage.Root / "parent", NatchezCommand.CreateSpan("child", None)), + (Lineage.Root, NatchezCommand.CreateRootSpan("root", Kernel(Map()), Span.Options.Defaults)), + (Lineage.Root, NatchezCommand.CreateSpan("parent", None, Span.Options.Defaults)), + (Lineage.Root / "parent", NatchezCommand.CreateSpan("child", None, Span.Options.Defaults)), (Lineage.Root / "parent" / "child", NatchezCommand.Put(List("answer" -> 42))), (Lineage.Root / "parent", NatchezCommand.ReleaseSpan("child")), (Lineage.Root, NatchezCommand.ReleaseSpan("parent")), @@ -38,9 +38,9 @@ class SpanPropagationTest extends InMemorySuite { } def expectedHistory = List( - (Lineage.Root, NatchezCommand.CreateRootSpan("root", Kernel(Map()))), - (Lineage.Root, NatchezCommand.CreateSpan("spanR", None)), - (Lineage.Root, NatchezCommand.CreateSpan("span", None)), + (Lineage.Root, NatchezCommand.CreateRootSpan("root", Kernel(Map()), Span.Options.Defaults)), + (Lineage.Root, NatchezCommand.CreateSpan("spanR", None, Span.Options.Defaults)), + (Lineage.Root, NatchezCommand.CreateSpan("span", None, Span.Options.Defaults)), (Lineage.Root / "spanR", NatchezCommand.Put(List("question" -> "ultimate"))), (Lineage.Root / "span", NatchezCommand.Put(List("answer" -> 42))), (Lineage.Root, NatchezCommand.ReleaseSpan("span")), diff --git a/modules/datadog/src/main/scala/DDEntryPoint.scala b/modules/datadog/src/main/scala/DDEntryPoint.scala index d444feb4..9afa9dce 100644 --- a/modules/datadog/src/main/scala/DDEntryPoint.scala +++ b/modules/datadog/src/main/scala/DDEntryPoint.scala @@ -9,32 +9,47 @@ import cats.effect._ import cats.syntax.all._ import io.opentracing.propagation.{Format, TextMapAdapter} import io.{opentracing => ot} +import natchez.datadog.DDTracer._ import java.net.URI final class DDEntryPoint[F[_]: Sync](tracer: ot.Tracer, uriPrefix: Option[URI]) extends EntryPoint[F] { - override def root(name: String): Resource[F, Span[F]] = + override def root(name: String, options: Span.Options): Resource[F, Span[F]] = Resource - .make(Sync[F].delay(tracer.buildSpan(name).start()))(s => Sync[F].delay(s.finish())) - .map(DDSpan(tracer, _, uriPrefix, Span.Options.SpanCreationPolicy.Default)) + .make { + Sync[F] + .delay(tracer.buildSpan(name)) + .flatTap(addSpanKind(_, options.spanKind)) + .flatMap(options.links.foldM(_)(addLink[F](tracer))) + .flatMap(builder => Sync[F].delay(builder.start())) + }(s => Sync[F].delay(s.finish())) + .map(DDSpan(tracer, _, uriPrefix, options)) - override def continue(name: String, kernel: Kernel): Resource[F, Span[F]] = + override def continue(name: String, kernel: Kernel, options: Span.Options): Resource[F, Span[F]] = Resource - .make( - Sync[F].delay { - val spanContext = tracer.extract( - Format.Builtin.HTTP_HEADERS, - new TextMapAdapter(kernel.toJava) - ) - tracer.buildSpan(name).asChildOf(spanContext).start() - } - )(s => Sync[F].delay(s.finish())) - .map(DDSpan(tracer, _, uriPrefix, Span.Options.SpanCreationPolicy.Default)) + .make { + Sync[F] + .delay { + val spanContext = tracer.extract( + Format.Builtin.HTTP_HEADERS, + new TextMapAdapter(kernel.toJava) + ) + tracer.buildSpan(name).asChildOf(spanContext) + } + .flatTap(addSpanKind(_, options.spanKind)) + .flatMap(options.links.foldM(_)(addLink[F](tracer))) + .flatMap(builder => Sync[F].delay(builder.start())) + }(s => Sync[F].delay(s.finish())) + .map(DDSpan(tracer, _, uriPrefix, options)) - override def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]] = - continue(name, kernel).flatMap { - case null => root(name) // hurr, means headers are incomplete or invalid + override def continueOrElseRoot( + name: String, + kernel: Kernel, + options: Span.Options + ): Resource[F, Span[F]] = + continue(name, kernel, options).flatMap { + case null => root(name, options) // hurr, means headers are incomplete or invalid case span => span.pure[Resource[F, *]] } } diff --git a/modules/datadog/src/main/scala/DDSpan.scala b/modules/datadog/src/main/scala/DDSpan.scala index 346b506e..7f6e2238 100644 --- a/modules/datadog/src/main/scala/DDSpan.scala +++ b/modules/datadog/src/main/scala/DDSpan.scala @@ -16,6 +16,8 @@ import io.opentracing.tag.Tags import natchez.TraceValue.{BooleanValue, NumberValue, StringValue} import _root_.datadog.trace.api.DDTags import _root_.datadog.trace.api.interceptor.MutableSpan +import natchez.Span.Options +import natchez.datadog.DDTracer.{addLink, addSpanKind} import scala.jdk.CollectionConverters._ import java.net.URI @@ -24,8 +26,10 @@ final case class DDSpan[F[_]: Sync]( tracer: ot.Tracer, span: ot.Span, uriPrefix: Option[URI], - spanCreationPolicy: Span.Options.SpanCreationPolicy + options: Span.Options ) extends Span.Default[F] { + override protected val spanCreationPolicyOverride: Options.SpanCreationPolicy = + options.spanCreationPolicy def kernel: F[Kernel] = Sync[F].delay { @@ -53,21 +57,26 @@ final case class DDSpan[F[_]: Sync]( override def log(event: String): F[Unit] = Sync[F].delay(span.log(event)).void - override def makeSpan(name: String, options: Span.Options): Resource[F, Span[F]] = { - val parent = options.parentKernel.map(k => - tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(k.toJava)) - ) + override def makeSpan(name: String, options: Span.Options): Resource[F, Span[F]] = Span.putErrorFields( Resource .makeCase( - Sync[F].delay(tracer.buildSpan(name).asChildOf(parent.orNull).asChildOf(span).start) + Sync[F] + .delay { + val parent = options.parentKernel.map(k => + tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(k.toJava)) + ) + tracer.buildSpan(name).asChildOf(parent.orNull).asChildOf(span) + } + .flatTap(addSpanKind(_, options.spanKind)) + .flatMap(options.links.foldM(_)(addLink[F](tracer))) + .flatMap(builder => Sync[F].delay(builder.start())) ) { case (span, ExitCase.Errored(e)) => Sync[F].delay(span.log(e.toString).finish()) case (span, _) => Sync[F].delay(span.finish()) } - .map(DDSpan(tracer, _, uriPrefix, options.spanCreationPolicy)) + .map(DDSpan(tracer, _, uriPrefix, options)) ) - } def traceId: F[Option[String]] = Sync[F].pure { diff --git a/modules/datadog/src/main/scala/DDTracer.scala b/modules/datadog/src/main/scala/DDTracer.scala index 1adcaa91..91e9cda5 100644 --- a/modules/datadog/src/main/scala/DDTracer.scala +++ b/modules/datadog/src/main/scala/DDTracer.scala @@ -6,11 +6,12 @@ package natchez package datadog import java.net.URI - import cats.effect._ import cats.syntax.all._ import _root_.datadog.opentracing.{DDTracer => NativeDDTracer} import _root_.datadog.opentracing.DDTracer.DDTracerBuilder +import io.opentracing.Tracer +import io.opentracing.propagation.{Format, TextMapAdapter} import natchez.opentracing.GlobalTracer object DDTracer { @@ -31,4 +32,31 @@ object DDTracer { def globalTracerEntryPoint[F[_]: Sync](uriPrefix: Option[URI]): F[Option[EntryPoint[F]]] = GlobalTracer.fetch.map(_.map(new DDEntryPoint[F](_, uriPrefix))) + + /** see https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/compatibility.md#opentracing + */ + private[datadog] def addLink[F[_]: Sync]( + tracer: Tracer + )(builder: Tracer.SpanBuilder, linkKernel: Kernel): F[Tracer.SpanBuilder] = + Sync[F].delay { + Option(tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(linkKernel.toJava))) + .fold(builder)(builder.addReference("follows_from", _)) + } + + /** see https://github.com/opentracing/specification/blob/master/semantic_conventions.md#span-tags-table + */ + private[datadog] def addSpanKind[F[_]: Sync]( + builder: Tracer.SpanBuilder, + spanKind: Span.SpanKind + ): F[Unit] = + Option(spanKind) + .collect { + case Span.SpanKind.Client => "client" + case Span.SpanKind.Server => "server" + case Span.SpanKind.Producer => "producer" + case Span.SpanKind.Consumer => "consumer" + } + .fold(().pure[F]) { kind => + Sync[F].delay(builder.withTag("span.kind", kind)).void + } } diff --git a/modules/honeycomb/src/main/scala/Honeycomb.scala b/modules/honeycomb/src/main/scala/Honeycomb.scala index 98122911..7f56abcc 100644 --- a/modules/honeycomb/src/main/scala/Honeycomb.scala +++ b/modules/honeycomb/src/main/scala/Honeycomb.scala @@ -29,19 +29,24 @@ object Honeycomb { c <- Sync[F].delay(LibHoney.create(o)) _ <- Sync[F].delay(c.addResponseObserver(responseObserver)) } yield c - }(c => Sync[F].delay(c.close)) + }(c => Sync[F].delay(c.close())) .map { c => new EntryPoint[F] { - def continue(name: String, kernel: Kernel): Resource[F, Span[F]] = - Resource.makeCase(HoneycombSpan.fromKernel(c, name, kernel))(HoneycombSpan.finish).widen + def continue(name: String, kernel: Kernel, options: Span.Options): Resource[F, Span[F]] = + Resource + .makeCase(HoneycombSpan.fromKernel(c, name, kernel, options))(HoneycombSpan.finish) + .widen - def root(name: String): Resource[F, Span[F]] = - Resource.makeCase(HoneycombSpan.root(c, name))(HoneycombSpan.finish).widen + def root(name: String, options: Span.Options): Resource[F, Span[F]] = + Resource.makeCase(HoneycombSpan.root(c, name, options))(HoneycombSpan.finish).widen - def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]] = + def continueOrElseRoot(name: String, kernel: Kernel, options: Span.Options) + : Resource[F, Span[F]] = Resource - .makeCase(HoneycombSpan.fromKernelOrElseRoot(c, name, kernel))(HoneycombSpan.finish) + .makeCase(HoneycombSpan.fromKernelOrElseRoot(c, name, kernel, options))( + HoneycombSpan.finish + ) .widen } diff --git a/modules/honeycomb/src/main/scala/HoneycombSpan.scala b/modules/honeycomb/src/main/scala/HoneycombSpan.scala index c83774af..a34c0e76 100644 --- a/modules/honeycomb/src/main/scala/HoneycombSpan.scala +++ b/modules/honeycomb/src/main/scala/HoneycombSpan.scala @@ -9,10 +9,11 @@ import cats.effect.Resource.ExitCase import cats.effect.Resource.ExitCase._ import cats.syntax.all._ -import io.honeycomb.libhoney.HoneyClient +import io.honeycomb.libhoney.{Event, HoneyClient} import java.time.Instant import java.util.UUID import natchez._ +import natchez.Span.Options import java.net.URI import org.typelevel.ci._ @@ -24,10 +25,13 @@ private[honeycomb] final case class HoneycombSpan[F[_]: Sync]( traceUUID: UUID, timestamp: Instant, fields: Ref[F, Map[String, TraceValue]], - spanCreationPolicy: Span.Options.SpanCreationPolicy + options: Span.Options ) extends Span.Default[F] { import HoneycombSpan._ + override protected val spanCreationPolicyOverride: Options.SpanCreationPolicy = + options.spanCreationPolicy + def get(key: String): F[Option[TraceValue]] = fields.get.map(_.get(key)) @@ -51,7 +55,7 @@ private[honeycomb] final case class HoneycombSpan[F[_]: Sync]( override def makeSpan(name: String, options: Span.Options): Resource[F, Span[F]] = Span.putErrorFields( Resource - .makeCase(HoneycombSpan.child(this, name, options.spanCreationPolicy))( + .makeCase(HoneycombSpan.child(this, name, options))( HoneycombSpan.finish[F] ) .widen @@ -88,9 +92,12 @@ private[honeycomb] object HoneycombSpan { private def now[F[_]: Sync]: F[Instant] = Sync[F].delay(Instant.now) - def finish[F[_]: Sync]: (HoneycombSpan[F], ExitCase) => F[Unit] = { (span, exitCase) => + private def createEvent[F[_]: Sync]( + span: HoneycombSpan[F], + exitCase: ExitCase, + durationMs: Long + ): F[Event] = for { - n <- now fs <- span.fields.get e <- Sync[F].delay { val e = span.client.createEvent() @@ -100,7 +107,11 @@ private[honeycomb] object HoneycombSpan { e.addField("name", span.name) // and other trace fields e.addField("trace.span_id", span.spanUUID) e.addField("trace.trace_id", span.traceUUID) - e.addField("duration_ms", n.toEpochMilli - span.timestamp.toEpochMilli) + e.addField("duration_ms", durationMs) + e.addField( + "span.kind", + span.options.spanKind.toString + ) // see https://github.com/honeycombio/opentelemetry-exporter-python/blob/17864084812ed67e6dc580e4119f7a5a37841d03/opentelemetry/ext/honeycomb/__init__.py#L130 exitCase match { case Succeeded => e.addField("exit.case", "completed") case Canceled => e.addField("exit.case", "canceled") @@ -108,14 +119,36 @@ private[honeycomb] object HoneycombSpan { } e } - _ <- Sync[F].delay(e.send()) + } yield e + + def finish[F[_]: Sync]: (HoneycombSpan[F], ExitCase) => F[Unit] = { (span, exitCase) => + for { + n <- now + durationMs = n.toEpochMilli - span.timestamp.toEpochMilli + e <- createEvent(span, exitCase, durationMs) + links <- span.options.links.traverse { k => + (k.toHeaders.get(Headers.TraceId), k.toHeaders.get(Headers.SpanId)).tupled + .traverse { case (traceId, spanId) => + createEvent(span, exitCase, durationMs) + .flatMap { linkEvent => + Sync[F].delay { + linkEvent.addMetadata("meta.annotation_type", "link") + linkEvent.addField("trace.parent_id", span.spanUUID) + linkEvent.addField("trace.link.span_id", spanId) + linkEvent.addField("trace.link.trace_id", traceId) + } + } + } + } + events = links.collect { case Some(event) => event }.prepend(e) + _ <- events.traverse_(e => Sync[F].delay(e.send())) } yield () } def child[F[_]: Sync]( parent: HoneycombSpan[F], name: String, - spanCreationPolicy: Span.Options.SpanCreationPolicy + options: Span.Options ): F[HoneycombSpan[F]] = for { spanUUID <- uuid[F] @@ -129,12 +162,13 @@ private[honeycomb] object HoneycombSpan { traceUUID = parent.traceUUID, timestamp = timestamp, fields = fields, - spanCreationPolicy = spanCreationPolicy + options = options ) def root[F[_]: Sync]( client: HoneyClient, - name: String + name: String, + options: Span.Options ): F[HoneycombSpan[F]] = for { spanUUID <- uuid[F] @@ -149,13 +183,14 @@ private[honeycomb] object HoneycombSpan { traceUUID = traceUUID, timestamp = timestamp, fields = fields, - spanCreationPolicy = Span.Options.SpanCreationPolicy.Default + options = options ) def fromKernel[F[_]]( client: HoneyClient, name: String, - kernel: Kernel + kernel: Kernel, + options: Span.Options )(implicit ev: Sync[F]): F[HoneycombSpan[F]] = for { traceUUID <- ev.catchNonFatal(UUID.fromString(kernel.toHeaders(Headers.TraceId))) @@ -171,15 +206,16 @@ private[honeycomb] object HoneycombSpan { traceUUID = traceUUID, timestamp = timestamp, fields = fields, - spanCreationPolicy = Span.Options.SpanCreationPolicy.Default + options = options ) def fromKernelOrElseRoot[F[_]]( client: HoneyClient, name: String, - kernel: Kernel + kernel: Kernel, + options: Span.Options )(implicit ev: Sync[F]): F[HoneycombSpan[F]] = - fromKernel(client, name, kernel).recoverWith { case _: NoSuchElementException => - root(client, name) + fromKernel(client, name, kernel, options).recoverWith { case _: NoSuchElementException => + root(client, name, options) } } diff --git a/modules/jaeger/src/main/scala/JaegerEntryPoint.scala b/modules/jaeger/src/main/scala/JaegerEntryPoint.scala index 301c3c0b..cbfeb49a 100644 --- a/modules/jaeger/src/main/scala/JaegerEntryPoint.scala +++ b/modules/jaeger/src/main/scala/JaegerEntryPoint.scala @@ -10,37 +10,85 @@ import cats.syntax.all._ import io.jaegertracing.internal.exceptions.UnsupportedFormatException import io.opentracing.propagation.{Format, TextMapAdapter} import io.{opentracing => ot} +import natchez.Span.SpanKind import java.net.URI final class JaegerEntryPoint[F[_]: Sync](tracer: ot.Tracer, uriPrefix: Option[URI]) extends EntryPoint[F] { - def continue(name: String, kernel: Kernel): Resource[F, Span[F]] = + override def continue(name: String, kernel: Kernel, options: Span.Options): Resource[F, Span[F]] = Resource .make( - Sync[F].delay { - val p = tracer.extract( - Format.Builtin.HTTP_HEADERS, - new TextMapAdapter(kernel.toJava) - ) - tracer.buildSpan(name).asChildOf(p).start() - } + spanContextFromKernel(kernel) + .flatMap { p => + Sync[F].delay { + tracer + .buildSpan(name) + .asChildOf(p) + } + } + .flatMap(setOptionsAndStart(options)) )(s => Sync[F].delay(s.finish)) - .map(JaegerSpan(tracer, _, uriPrefix, Span.Options.SpanCreationPolicy.Default)) + .map(JaegerSpan(tracer, _, uriPrefix, options.spanCreationPolicy)) - def root(name: String): Resource[F, Span[F]] = + override def root(name: String, options: Span.Options): Resource[F, Span[F]] = Resource - .make(Sync[F].delay(tracer.buildSpan(name).start()))(s => Sync[F].delay(s.finish)) - .map(JaegerSpan(tracer, _, uriPrefix, Span.Options.SpanCreationPolicy.Default)) + .make(Sync[F].delay(tracer.buildSpan(name)).flatMap(setOptionsAndStart(options)))(s => + Sync[F].delay(s.finish) + ) + .map(JaegerSpan(tracer, _, uriPrefix, options.spanCreationPolicy)) - def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]] = - continue(name, kernel) + override def continueOrElseRoot( + name: String, + kernel: Kernel, + options: Span.Options + ): Resource[F, Span[F]] = + continue(name, kernel, options) .flatMap { - case null => root(name) // hurr, means headers are incomplete or invalid + case null => root(name, options) // hurr, means headers are incomplete or invalid case a => Resource.pure[F, Span[F]](a) } .recoverWith { case _: UnsupportedFormatException => root(name) } + private def spanContextFromKernel(kernel: Kernel): F[ot.SpanContext] = + Sync[F].delay { + tracer.extract( + Format.Builtin.HTTP_HEADERS, + new TextMapAdapter(kernel.toJava) + ) + } + + private def setOptionsAndStart( + options: Span.Options + )(spanBuilder: ot.Tracer.SpanBuilder): F[ot.Span] = + options.links + .foldM(spanBuilder)(addLink(_)(_)) + .flatMap(setSpanKind(options.spanKind)) + .flatMap(sb => Sync[F].delay(sb.start())) + + private def addLink( + spanBuilder: ot.Tracer.SpanBuilder + )(kernel: Kernel): F[ot.Tracer.SpanBuilder] = + spanContextFromKernel(kernel).flatMap { spanContext => + Sync[F].delay { + spanBuilder.addReference("FOLLOWS_FROM", spanContext) + } + } + + private def setSpanKind( + spanKind: SpanKind + )(spanBuilder: ot.Tracer.SpanBuilder): F[ot.Tracer.SpanBuilder] = + spanKindTag + .lift(spanKind) + .foldM(spanBuilder)((sb, k) => Sync[F].delay(sb.withTag("span.kind", k))) + + // mapping defined at https://opentelemetry.io/docs/reference/specification/trace/sdk_exporters/jaeger/#spankind + private val spanKindTag: PartialFunction[SpanKind, String] = { + case SpanKind.Client => "client" + case SpanKind.Server => "server" + case SpanKind.Consumer => "consumer" + case SpanKind.Producer => "producer" + } } diff --git a/modules/jaeger/src/main/scala/JaegerSpan.scala b/modules/jaeger/src/main/scala/JaegerSpan.scala index d7c2900e..e4d70e77 100644 --- a/modules/jaeger/src/main/scala/JaegerSpan.scala +++ b/modules/jaeger/src/main/scala/JaegerSpan.scala @@ -23,7 +23,7 @@ private[jaeger] final case class JaegerSpan[F[_]: Sync]( tracer: ot.Tracer, span: ot.Span, prefix: Option[URI], - spanCreationPolicy: Span.Options.SpanCreationPolicy + spanCreationPolicyOverride: Span.Options.SpanCreationPolicy ) extends Span.Default[F] { import TraceValue._ diff --git a/modules/lightstep/src/main/scala/Lightstep.scala b/modules/lightstep/src/main/scala/Lightstep.scala index 350de1f5..a633e357 100644 --- a/modules/lightstep/src/main/scala/Lightstep.scala +++ b/modules/lightstep/src/main/scala/Lightstep.scala @@ -9,6 +9,8 @@ import cats.effect.{Resource, Sync} import cats.syntax.all._ import com.lightstep.tracer.shared.Options.OptionsBuilder import io.opentracing.Tracer +import io.opentracing.propagation.{Format, TextMapAdapter} +import natchez.Span.SpanKind import natchez.opentracing.GlobalTracer object Lightstep { @@ -22,4 +24,31 @@ object Lightstep { def globalTracerEntryPoint[F[_]: Sync]: F[Option[EntryPoint[F]]] = GlobalTracer.fetch.map(_.map(new LightstepEntryPoint[F](_))) + + /** see https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/compatibility.md#opentracing + */ + private[lightstep] def addLink[F[_]: Sync]( + tracer: Tracer + )(builder: Tracer.SpanBuilder, linkKernel: Kernel): F[Tracer.SpanBuilder] = + Sync[F].delay { + Option(tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(linkKernel.toJava))) + .fold(builder)(builder.addReference("follows_from", _)) + } + + /** see https://github.com/opentracing/specification/blob/master/semantic_conventions.md#span-tags-table + */ + private[lightstep] def addSpanKind[F[_]: Sync]( + builder: Tracer.SpanBuilder, + spanKind: Span.SpanKind + ): F[Unit] = + Option(spanKind) + .collect { + case SpanKind.Client => "client" + case SpanKind.Server => "server" + case SpanKind.Producer => "producer" + case SpanKind.Consumer => "consumer" + } + .fold(().pure[F]) { kind => + Sync[F].delay(builder.withTag("span.kind", kind)).void + } } diff --git a/modules/lightstep/src/main/scala/LightstepEntryPoint.scala b/modules/lightstep/src/main/scala/LightstepEntryPoint.scala index 51bd2dc4..c11960b1 100644 --- a/modules/lightstep/src/main/scala/LightstepEntryPoint.scala +++ b/modules/lightstep/src/main/scala/LightstepEntryPoint.scala @@ -9,27 +9,42 @@ import cats.effect.{Resource, Sync} import cats.syntax.all._ import io.opentracing.Tracer import io.opentracing.propagation.{Format, TextMapAdapter} +import natchez.lightstep.Lightstep._ final class LightstepEntryPoint[F[_]: Sync](tracer: Tracer) extends EntryPoint[F] { - override def root(name: String): Resource[F, Span[F]] = + override def root(name: String, options: Span.Options): Resource[F, Span[F]] = Resource - .make(Sync[F].delay(tracer.buildSpan(name).start()))(s => Sync[F].delay(s.finish())) - .map(LightstepSpan(tracer, _, Span.Options.SpanCreationPolicy.Default)) + .make { + Sync[F] + .delay(tracer.buildSpan(name)) + .flatTap(addSpanKind(_, options.spanKind)) + .flatMap(options.links.foldM(_)(addLink[F](tracer))) + .flatMap(builder => Sync[F].delay(builder.start())) + }(s => Sync[F].delay(s.finish())) + .map(LightstepSpan(tracer, _, options)) - override def continue(name: String, kernel: Kernel): Resource[F, Span[F]] = + override def continue(name: String, kernel: Kernel, options: Span.Options): Resource[F, Span[F]] = Resource .make( - Sync[F].delay { - val p = - tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(kernel.toJava)) - tracer.buildSpan(name).asChildOf(p).start() - } + Sync[F] + .delay { + val p = + tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(kernel.toJava)) + tracer.buildSpan(name).asChildOf(p) + } + .flatTap(addSpanKind(_, options.spanKind)) + .flatMap(options.links.foldM(_)(addLink[F](tracer))) + .flatMap(builder => Sync[F].delay(builder.start())) )(s => Sync[F].delay(s.finish())) - .map(LightstepSpan(tracer, _, Span.Options.SpanCreationPolicy.Default)) + .map(LightstepSpan(tracer, _, options)) - override def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]] = - continue(name, kernel).flatMap { - case null => root(name) + override def continueOrElseRoot( + name: String, + kernel: Kernel, + options: Span.Options + ): Resource[F, Span[F]] = + continue(name, kernel, options).flatMap { + case null => root(name, options) case a => a.pure[Resource[F, *]] } } diff --git a/modules/lightstep/src/main/scala/LightstepSpan.scala b/modules/lightstep/src/main/scala/LightstepSpan.scala index ddcf03b0..04ff90df 100644 --- a/modules/lightstep/src/main/scala/LightstepSpan.scala +++ b/modules/lightstep/src/main/scala/LightstepSpan.scala @@ -11,6 +11,8 @@ import io.opentracing.log.Fields import io.{opentracing => ot} import io.opentracing.propagation.{Format, TextMapAdapter} import io.opentracing.tag.Tags +import natchez.Span.Options +import natchez.lightstep.Lightstep._ import scala.jdk.CollectionConverters._ import java.net.URI @@ -18,11 +20,13 @@ import java.net.URI private[lightstep] final case class LightstepSpan[F[_]: Sync]( tracer: ot.Tracer, span: ot.Span, - spanCreationPolicy: Span.Options.SpanCreationPolicy + options: Span.Options ) extends Span.Default[F] { - import TraceValue._ + override protected val spanCreationPolicyOverride: Options.SpanCreationPolicy = + options.spanCreationPolicy + override def kernel: F[Kernel] = Sync[F].delay { val m = new java.util.HashMap[String, String] @@ -67,10 +71,14 @@ private[lightstep] final case class LightstepSpan[F[_]: Sync]( ) Span.putErrorFields( Resource - .make(Sync[F].delay(tracer.buildSpan(name).asChildOf(p.orNull).asChildOf(span).start()))( - s => Sync[F].delay(s.finish()) - ) - .map(LightstepSpan(tracer, _, options.spanCreationPolicy)) + .make { + Sync[F] + .delay(tracer.buildSpan(name).asChildOf(p.orNull).asChildOf(span)) + .flatTap(addSpanKind(_, options.spanKind)) + .flatMap(options.links.foldM(_)(addLink[F](tracer))) + .flatMap(builder => Sync[F].delay(builder.start())) + }(s => Sync[F].delay(s.finish())) + .map(LightstepSpan(tracer, _, options)) ) } diff --git a/modules/log-odin/src/main/scala/Log.scala b/modules/log-odin/src/main/scala/Log.scala index b2d5f0af..d5c45918 100644 --- a/modules/log-odin/src/main/scala/Log.scala +++ b/modules/log-odin/src/main/scala/Log.scala @@ -19,14 +19,22 @@ object Log { def make(span: F[LogSpan[F]]): Resource[F, Span[F]] = Resource.makeCase(span)(LogSpan.finish).widen - def continue(name: String, kernel: Kernel): Resource[F, Span[F]] = - make(LogSpan.fromKernel(service, name, kernel)) - - def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]] = - make(LogSpan.fromKernelOrElseRoot(service, name, kernel)) - - def root(name: String): Resource[F, Span[F]] = - make(LogSpan.root(service, name)) + override def continue( + name: String, + kernel: Kernel, + options: Span.Options + ): Resource[F, Span[F]] = + make(LogSpan.fromKernel(service, name, kernel, options)) + + override def continueOrElseRoot( + name: String, + kernel: Kernel, + options: Span.Options + ): Resource[F, Span[F]] = + make(LogSpan.fromKernelOrElseRoot(service, name, kernel, options)) + + override def root(name: String, options: Span.Options): Resource[F, Span[F]] = + make(LogSpan.root(service, name, options)) } diff --git a/modules/log-odin/src/main/scala/LogSpan.scala b/modules/log-odin/src/main/scala/LogSpan.scala index 20f6e5f9..cc6e9b27 100644 --- a/modules/log-odin/src/main/scala/LogSpan.scala +++ b/modules/log-odin/src/main/scala/LogSpan.scala @@ -9,15 +9,16 @@ import cats.effect._ import cats.effect.Resource.ExitCase import cats.effect.Resource.ExitCase._ import cats.implicits._ + import java.time.Instant import java.util.UUID import natchez._ import natchez.TraceValue._ -import io.circe.Json -import io.circe.Encoder +import io.circe.{Encoder, Json, JsonObject, KeyEncoder} import io.circe.syntax._ -import io.circe.JsonObject import io.odin.Logger +import natchez.Span.Options + import java.net.URI import org.typelevel.ci._ @@ -30,10 +31,13 @@ private[logodin] final case class LogSpan[F[_]: Sync: Logger]( timestamp: Instant, fields: Ref[F, Map[String, Json]], children: Ref[F, List[JsonObject]], - spanCreationPolicy: Span.Options.SpanCreationPolicy + options: Span.Options ) extends Span.Default[F] { import LogSpan._ + override protected val spanCreationPolicyOverride: Options.SpanCreationPolicy = + options.spanCreationPolicy + def spanId: F[Option[String]] = sid.toString.some.pure[F] @@ -72,7 +76,7 @@ private[logodin] final case class LogSpan[F[_]: Sync: Logger]( def makeSpan(label: String, options: Span.Options): Resource[F, Span[F]] = Resource - .makeCase(LogSpan.child(this, label, options.spanCreationPolicy))(LogSpan.finish[F]) + .makeCase(LogSpan.child(this, label, options))(LogSpan.finish[F]) .widen def json(finish: Instant, exitCase: ExitCase): F[JsonObject] = @@ -96,7 +100,9 @@ private[logodin] final case class LogSpan[F[_]: Sync: Logger]( "duration_ms" -> (finish.toEpochMilli - timestamp.toEpochMilli).asJson, "trace.span_id" -> sid.asJson, "trace.parent_id" -> parentId.asJson, - "trace.trace_id" -> tid.asJson + "trace.trace_id" -> tid.asJson, + "span.kind" -> options.spanKind.asJson, + "span.links" -> options.links.asJson ) ++ { exitCase match { case Succeeded => List("exit.case" -> "completed".asJson) @@ -130,6 +136,12 @@ private[logodin] object LogSpan { case NumberValue(n) => n.doubleValue.asJson } + implicit val KeyEncodeCIString: KeyEncoder[CIString] = KeyEncoder[String].contramap(_.toString) + + implicit val EncodeSpanKind: Encoder[Span.SpanKind] = Encoder[String].contramap(_.toString) + + implicit val EncodeKernel: Encoder[Kernel] = Encoder[Map[CIString, String]].contramap(_.toHeaders) + object Headers { val TraceId = ci"X-Natchez-Trace-Id" val SpanId = ci"X-Natchez-Parent-Span-Id" @@ -155,7 +167,7 @@ private[logodin] object LogSpan { def child[F[_]: Sync: Logger]( parent: LogSpan[F], name: String, - spanCreationPolicy: Span.Options.SpanCreationPolicy + options: Span.Options ): F[LogSpan[F]] = for { spanId <- uuid[F] @@ -171,12 +183,13 @@ private[logodin] object LogSpan { timestamp = timestamp, fields = fields, children = children, - spanCreationPolicy = spanCreationPolicy + options = options ) def root[F[_]: Sync: Logger]( service: String, - name: String + name: String, + options: Span.Options ): F[LogSpan[F]] = for { spanId <- uuid[F] @@ -193,13 +206,14 @@ private[logodin] object LogSpan { timestamp = timestamp, fields = fields, children = children, - spanCreationPolicy = Span.Options.SpanCreationPolicy.Default + options = options ) def fromKernel[F[_]: Sync: Logger]( service: String, name: String, - kernel: Kernel + kernel: Kernel, + options: Span.Options ): F[LogSpan[F]] = for { traceId <- Sync[F].catchNonFatal(UUID.fromString(kernel.toHeaders(Headers.TraceId))) @@ -217,15 +231,16 @@ private[logodin] object LogSpan { timestamp = timestamp, fields = fields, children = children, - spanCreationPolicy = Span.Options.SpanCreationPolicy.Default + options = options ) def fromKernelOrElseRoot[F[_]: Sync: Logger]( service: String, name: String, - kernel: Kernel + kernel: Kernel, + options: Span.Options ): F[LogSpan[F]] = - fromKernel(service, name, kernel).recoverWith { case _: NoSuchElementException => - root(service, name) + fromKernel(service, name, kernel, options).recoverWith { case _: NoSuchElementException => + root(service, name, options) } } diff --git a/modules/log/shared/src/main/scala/Log.scala b/modules/log/shared/src/main/scala/Log.scala index dcba8c66..4b1e83b1 100644 --- a/modules/log/shared/src/main/scala/Log.scala +++ b/modules/log/shared/src/main/scala/Log.scala @@ -21,14 +21,22 @@ object Log { def make(span: F[LogSpan[F]]): Resource[F, Span[F]] = Resource.makeCase(span)(LogSpan.finish(format)).widen - def continue(name: String, kernel: Kernel): Resource[F, Span[F]] = - make(LogSpan.fromKernel(service, name, kernel)) - - def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]] = - make(LogSpan.fromKernelOrElseRoot(service, name, kernel)) - - def root(name: String): Resource[F, Span[F]] = - make(LogSpan.root(service, name)) + override def continue( + name: String, + kernel: Kernel, + options: Span.Options + ): Resource[F, Span[F]] = + make(LogSpan.fromKernel(service, name, kernel, options)) + + override def continueOrElseRoot( + name: String, + kernel: Kernel, + options: Span.Options + ): Resource[F, Span[F]] = + make(LogSpan.fromKernelOrElseRoot(service, name, kernel, options)) + + override def root(name: String, options: Span.Options): Resource[F, Span[F]] = + make(LogSpan.root(service, name, options)) } diff --git a/modules/log/shared/src/main/scala/LogSpan.scala b/modules/log/shared/src/main/scala/LogSpan.scala index 4d96ef50..574fc909 100644 --- a/modules/log/shared/src/main/scala/LogSpan.scala +++ b/modules/log/shared/src/main/scala/LogSpan.scala @@ -15,10 +15,9 @@ import java.time.Instant import java.util.UUID import natchez._ import natchez.TraceValue._ -import io.circe.Json -import io.circe.Encoder +import io.circe.{Encoder, Json, JsonObject, KeyEncoder} import io.circe.syntax._ -import io.circe.JsonObject +import natchez.Span.Options import org.typelevel.log4cats.Logger import org.typelevel.ci._ @@ -34,10 +33,13 @@ private[log] final case class LogSpan[F[_]: Sync: Logger]( timestamp: Instant, fields: Ref[F, Map[String, Json]], children: Ref[F, List[JsonObject]], - spanCreationPolicy: Span.Options.SpanCreationPolicy + options: Span.Options ) extends Span.Default[F] { import LogSpan._ + override protected val spanCreationPolicyOverride: Options.SpanCreationPolicy = + options.spanCreationPolicy + def parentId: Option[String] = parent.map(_.fold(identity, _.sid)) @@ -90,7 +92,9 @@ private[log] final case class LogSpan[F[_]: Sync: Logger]( "duration_ms" -> (finish.toEpochMilli - timestamp.toEpochMilli).asJson, "trace.span_id" -> sid.asJson, "trace.parent_id" -> parentId.asJson, - "trace.trace_id" -> traceID.asJson + "trace.trace_id" -> traceID.asJson, + "span.kind" -> options.spanKind.asJson, + "span.links" -> options.links.asJson ) ++ fs ++ List("children" -> cs.reverse.map(Json.fromJsonObject).asJson) JsonObject.fromIterable(fields) @@ -125,6 +129,12 @@ private[log] object LogSpan { case NumberValue(n) => n.doubleValue.asJson } + implicit val KeyEncodeCIString: KeyEncoder[CIString] = KeyEncoder[String].contramap(_.toString) + + implicit val EncodeKernel: Encoder[Kernel] = Encoder[Map[CIString, String]].contramap(_.toHeaders) + + implicit val EncodeSpanKind: Encoder[Span.SpanKind] = Encoder[String].contramap(_.toString) + object Headers { val TraceId = ci"X-Natchez-Trace-Id" val SpanId = ci"X-Natchez-Parent-Span-Id" @@ -180,12 +190,13 @@ private[log] object LogSpan { timestamp = timestamp, fields = fields, children = children, - spanCreationPolicy = options.spanCreationPolicy + options = options ) def root[F[_]: Sync: Logger]( service: String, - name: String + name: String, + options: Span.Options ): F[LogSpan[F]] = for { spanId <- uuid[F] @@ -203,13 +214,14 @@ private[log] object LogSpan { timestamp = timestamp, fields = fields, children = children, - spanCreationPolicy = Span.Options.SpanCreationPolicy.Default + options = options ) def fromKernel[F[_]: Sync: Logger]( service: String, name: String, - kernel: Kernel + kernel: Kernel, + options: Span.Options ): F[LogSpan[F]] = for { traceID <- Sync[F].catchNonFatal(kernel.toHeaders(Headers.TraceId)) @@ -228,15 +240,16 @@ private[log] object LogSpan { timestamp = timestamp, fields = fields, children = children, - spanCreationPolicy = Span.Options.SpanCreationPolicy.Default + options = options ) def fromKernelOrElseRoot[F[_]: Sync: Logger]( service: String, name: String, - kernel: Kernel + kernel: Kernel, + options: Span.Options ): F[LogSpan[F]] = - fromKernel(service, name, kernel).recoverWith { case _: NoSuchElementException => - root(service, name) + fromKernel(service, name, kernel, options).recoverWith { case _: NoSuchElementException => + root(service, name, options) } } diff --git a/modules/log/shared/src/test/scala/LogSuite.scala b/modules/log/shared/src/test/scala/LogSuite.scala index 5a3391fb..2843e461 100644 --- a/modules/log/shared/src/test/scala/LogSuite.scala +++ b/modules/log/shared/src/test/scala/LogSuite.scala @@ -8,6 +8,7 @@ package log import munit.CatsEffectSuite import cats.effect.IO import io.circe.Json +import natchez.Span.SpanKind class LogSuite extends CatsEffectSuite { @@ -33,15 +34,21 @@ class LogSuite extends CatsEffectSuite { test("log formatter should log things") { MockLogger.newInstance[IO]("test").flatMap { implicit log => - Log.entryPoint[IO]("service", filter(_).spaces2).root("root span").use { root => - root.put("foo" -> 1, "bar" -> true) *> - root.span("child").use { child => - child.put("baz" -> "qux") - } - } *> log.get.assertEquals { + Log + .entryPoint[IO]("service", filter(_).spaces2) + .root("root span", Span.Options.Defaults.withSpanKind(SpanKind.Server)) + .use { root => + root.put("foo" -> 1, "bar" -> true) *> + root.span("child").use { child => + child.put("baz" -> "qux") + } + } *> log.get.assertEquals { """|test: [info] { | "name" : "root span", | "service" : "service", + | "span.kind" : "Server", + | "span.links" : [ + | ], | "foo" : 1, | "bar" : true, | "exit.case" : "succeeded", @@ -49,6 +56,9 @@ class LogSuite extends CatsEffectSuite { | { | "name" : "child", | "service" : "service", + | "span.kind" : "Internal", + | "span.links" : [ + | ], | "baz" : "qux", | "exit.case" : "succeeded", | "children" : [ diff --git a/modules/newrelic/src/main/scala/natchez/newrelic/NewRelic.scala b/modules/newrelic/src/main/scala/natchez/newrelic/NewRelic.scala index 5aed4954..37d7ac23 100644 --- a/modules/newrelic/src/main/scala/natchez/newrelic/NewRelic.scala +++ b/modules/newrelic/src/main/scala/natchez/newrelic/NewRelic.scala @@ -14,21 +14,25 @@ object NewRelic { def entryPoint[F[_]: Sync](system: String)(sender: SpanBatchSender): EntryPoint[F] = new EntryPoint[F] { - def continue(name: String, kernel: Kernel): Resource[F, Span[F]] = + def continue(name: String, kernel: Kernel, options: Span.Options): Resource[F, Span[F]] = Resource - .make(NewrelicSpan.fromKernel[F](system, name, kernel)(sender))(s => + .make(NewrelicSpan.fromKernel[F](system, name, kernel, options)(sender))(s => NewrelicSpan.finish[F](s) ) .widen - def root(name: String): Resource[F, Span[F]] = + def root(name: String, options: Span.Options): Resource[F, Span[F]] = Resource - .make(NewrelicSpan.root[F](system, name, sender))(NewrelicSpan.finish[F]) + .make(NewrelicSpan.root[F](system, name, sender, options))(NewrelicSpan.finish[F]) .widen - def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]] = - continue(name, kernel).recoverWith { case _: NoSuchElementException => - root(name) + def continueOrElseRoot( + name: String, + kernel: Kernel, + options: Span.Options + ): Resource[F, Span[F]] = + continue(name, kernel, options).recoverWith { case _: NoSuchElementException => + root(name, options) } } diff --git a/modules/newrelic/src/main/scala/natchez/newrelic/NewrelicSpan.scala b/modules/newrelic/src/main/scala/natchez/newrelic/NewrelicSpan.scala index eae9c08c..ff12be1e 100644 --- a/modules/newrelic/src/main/scala/natchez/newrelic/NewrelicSpan.scala +++ b/modules/newrelic/src/main/scala/natchez/newrelic/NewrelicSpan.scala @@ -6,7 +6,6 @@ package natchez.newrelic import java.net.URI import java.util.UUID - import cats.effect.Ref import cats.effect.{Resource, Sync} import cats.syntax.all._ @@ -29,8 +28,10 @@ private[newrelic] final case class NewrelicSpan[F[_]: Sync]( children: Ref[F, List[Span]], parent: Option[Either[String, NewrelicSpan[F]]], sender: SpanBatchSender, - spanCreationPolicy: natchez.Span.Options.SpanCreationPolicy + options: natchez.Span.Options ) extends natchez.Span.Default[F] { + override protected val spanCreationPolicyOverride: natchez.Span.Options.SpanCreationPolicy = + options.spanCreationPolicy override def kernel: F[Kernel] = Sync[F].delay { @@ -40,7 +41,6 @@ private[newrelic] final case class NewrelicSpan[F[_]: Sync]( Headers.SpanId -> id ) ) - } override def put(fields: (String, TraceValue)*): F[Unit] = @@ -59,7 +59,7 @@ private[newrelic] final case class NewrelicSpan[F[_]: Sync]( override def makeSpan(name: String, options: natchez.Span.Options): Resource[F, natchez.Span[F]] = Resource - .make(NewrelicSpan.child(name, this, options.spanCreationPolicy))(NewrelicSpan.finish[F]) + .make(NewrelicSpan.child(name, this, options))(NewrelicSpan.finish[F]) .widen override def spanId: F[Option[String]] = id.some.pure[F] @@ -67,6 +67,16 @@ private[newrelic] final case class NewrelicSpan[F[_]: Sync]( override def traceId: F[Option[String]] = traceIdS.some.pure[F] override def traceUri: F[Option[URI]] = none[URI].pure[F] + + /** New Relic doesn't seem to have the concept of linked spans in their data dictionary, + * so we just attach them as a string attribute in case they end up being useful. + */ + private def links: Option[String] = + Option { + options.links + .mapFilter(_.toHeaders.get(Headers.SpanId)) + .mkString_(",") + }.filter(_.nonEmpty) } object NewrelicSpan { @@ -78,7 +88,8 @@ object NewrelicSpan { def fromKernel[F[_]: Sync]( service: String, name: String, - kernel: Kernel + kernel: Kernel, + options: natchez.Span.Options )(sender: SpanBatchSender): F[NewrelicSpan[F]] = for { traceId <- Sync[F].catchNonFatal(kernel.toHeaders(Headers.TraceId)) @@ -97,10 +108,15 @@ object NewrelicSpan { attributes = attributes, children = children, sender = sender, - spanCreationPolicy = natchez.Span.Options.SpanCreationPolicy.Default + options = options ) - def root[F[_]: Sync](service: String, name: String, sender: SpanBatchSender): F[NewrelicSpan[F]] = + def root[F[_]: Sync]( + service: String, + name: String, + sender: SpanBatchSender, + options: natchez.Span.Options + ): F[NewrelicSpan[F]] = for { spanId <- Sync[F].delay(UUID.randomUUID().toString) traceId <- Sync[F].delay(UUID.randomUUID().toString) @@ -117,13 +133,13 @@ object NewrelicSpan { children, None, sender, - spanCreationPolicy = natchez.Span.Options.SpanCreationPolicy.Default + options ) def child[F[_]: Sync]( name: String, parent: NewrelicSpan[F], - spanCreationPolicy: natchez.Span.Options.SpanCreationPolicy + options: natchez.Span.Options ): F[NewrelicSpan[F]] = for { spanId <- Sync[F].delay(UUID.randomUUID().toString) @@ -140,46 +156,41 @@ object NewrelicSpan { children, Some(Right(parent)), parent.sender, - spanCreationPolicy = spanCreationPolicy + options ) def finish[F[_]: Sync](nrs: NewrelicSpan[F]): F[Unit] = - nrs.parent match { - case Some(parent) => - for { - attributes <- nrs.attributes.get - finish <- Sync[F].delay(System.currentTimeMillis()) - curChildren <- nrs.children.get - curSpan = Span + for { + attributes <- nrs.attributes.get.map { + nrs.links + .foldl(_)(_.put("span.links", _)) + /* + * see https://docs.newrelic.com/attribute-dictionary/?event=Span&attribute=span.kind + * It's possible that only `"client"` is supported by New Relic, since it's the only value mentioned + */ + .put("span.kind", nrs.options.spanKind.toString.toLowerCase) + } + finish <- Sync[F].delay(System.currentTimeMillis()) + curChildren <- nrs.children.get + curSpan = nrs.parent + .map(_.fold(identity, _.id)) + .foldl { + Span .builder(nrs.id) .traceId(nrs.traceIdS) .name(nrs.name) - .parentId(parent.fold(identity, _.id)) .serviceName(nrs.service) .attributes(attributes) .durationMs((finish - nrs.startTime).toDouble) - .build() - _ <- parent match { - case Left(_) => Sync[F].unit - case Right(p) => p.children.update(curSpan :: curChildren ::: _) - } - } yield () - case None => - for { - attributes <- nrs.attributes.get - finish <- Sync[F].delay(System.currentTimeMillis()) - curChildren <- nrs.children.get - curSpan = Span - .builder(nrs.id) - .traceId(nrs.traceIdS) - .name(nrs.name) - .attributes(attributes) - .durationMs((finish - nrs.startTime).toDouble) - .serviceName(nrs.service) - .build() - batch = new SpanBatch((curSpan :: curChildren).asJava, new Attributes(), nrs.traceIdS) - _ <- Sync[F].delay(nrs.sender.sendBatch(batch)) - } yield () + }(_.parentId(_)) + .build() + _ <- nrs.parent match { + case Some(Left(_)) => Sync[F].unit + case Some(Right(p)) => p.children.update(curSpan :: curChildren ::: _) + case None => + val batch = new SpanBatch((curSpan :: curChildren).asJava, new Attributes(), nrs.traceIdS) + Sync[F].delay(nrs.sender.sendBatch(batch)).void + } + } yield () - } } diff --git a/modules/noop/shared/src/main/scala/NoopEntrypoint.scala b/modules/noop/shared/src/main/scala/NoopEntrypoint.scala index 57a4a663..73e753ef 100644 --- a/modules/noop/shared/src/main/scala/NoopEntrypoint.scala +++ b/modules/noop/shared/src/main/scala/NoopEntrypoint.scala @@ -10,17 +10,19 @@ import cats.effect.Resource final case class NoopEntrypoint[F[_]: Applicative]() extends EntryPoint[F] { - override def root(name: String): Resource[F, Span[F]] = + override def root(name: String, options: Span.Options): Resource[F, Span[F]] = Resource.eval[F, Span[F]](Applicative[F].pure(NoopSpan())) override def continue( name: String, - kernel: Kernel + kernel: Kernel, + options: Span.Options ): Resource[F, Span[F]] = - root(name) + root(name, options) override def continueOrElseRoot( name: String, - kernel: Kernel - ): Resource[F, Span[F]] = root(name) + kernel: Kernel, + options: Span.Options + ): Resource[F, Span[F]] = root(name, options) } diff --git a/modules/opencensus/src/main/scala/OpenCensus.scala b/modules/opencensus/src/main/scala/OpenCensus.scala index 1d908597..c655e3c2 100644 --- a/modules/opencensus/src/main/scala/OpenCensus.scala +++ b/modules/opencensus/src/main/scala/OpenCensus.scala @@ -39,17 +39,20 @@ object OpenCensus { .delay(Tracing.getTracer) .map { t => new EntryPoint[F] { - def continue(name: String, kernel: Kernel): Resource[F, Span[F]] = + def continue(name: String, kernel: Kernel, options: Span.Options): Resource[F, Span[F]] = Resource - .makeCase(OpenCensusSpan.fromKernel(t, name, kernel))(OpenCensusSpan.finish) + .makeCase(OpenCensusSpan.fromKernel(t, name, kernel, options))(OpenCensusSpan.finish) .widen - def root(name: String): Resource[F, Span[F]] = - Resource.makeCase(OpenCensusSpan.root(t, name, sampler))(OpenCensusSpan.finish).widen + def root(name: String, options: Span.Options): Resource[F, Span[F]] = + Resource + .makeCase(OpenCensusSpan.root(t, name, sampler, options))(OpenCensusSpan.finish) + .widen - def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]] = + def continueOrElseRoot(name: String, kernel: Kernel, options: Span.Options) + : Resource[F, Span[F]] = Resource - .makeCase(OpenCensusSpan.fromKernelOrElseRoot(t, name, kernel, sampler))( + .makeCase(OpenCensusSpan.fromKernelOrElseRoot(t, name, kernel, sampler, options))( OpenCensusSpan.finish ) .widen diff --git a/modules/opencensus/src/main/scala/OpenCensusSpan.scala b/modules/opencensus/src/main/scala/OpenCensusSpan.scala index 381b0935..8126dba4 100644 --- a/modules/opencensus/src/main/scala/OpenCensusSpan.scala +++ b/modules/opencensus/src/main/scala/OpenCensusSpan.scala @@ -10,9 +10,10 @@ import cats.effect.Resource.ExitCase import cats.effect.Resource.ExitCase._ import cats.syntax.all._ import io.opencensus.trace.propagation.TextFormat.Setter -import io.opencensus.trace.{AttributeValue, Sampler, Tracer, Tracing} +import io.opencensus.trace.{AttributeValue, Sampler, SpanBuilder, Tracer, Tracing} import io.opencensus.trace.propagation.SpanContextParseException import io.opencensus.trace.propagation.TextFormat.Getter +import natchez.Span.{Options, SpanKind} import natchez.TraceValue.{BooleanValue, NumberValue, StringValue} import org.typelevel.ci._ @@ -23,11 +24,14 @@ import scala.jdk.CollectionConverters._ private[opencensus] final case class OpenCensusSpan[F[_]: Sync]( tracer: Tracer, span: io.opencensus.trace.Span, - spanCreationPolicy: Span.Options.SpanCreationPolicy + options: Span.Options ) extends Span.Default[F] { import OpenCensusSpan._ + override protected val spanCreationPolicyOverride: Options.SpanCreationPolicy = + options.spanCreationPolicy + private def traceToAttribute(value: TraceValue): AttributeValue = value match { case StringValue(v) => val safeString = if (v == null) "null" else v @@ -62,9 +66,9 @@ private[opencensus] final case class OpenCensusSpan[F[_]: Sync]( Span.putErrorFields( Resource .makeCase(options.parentKernel match { - case None => OpenCensusSpan.child(this, name, options.spanCreationPolicy) + case None => OpenCensusSpan.child(this, name, options) case Some(k) => - OpenCensusSpan.fromKernelWithSpan(tracer, name, k, span, options.spanCreationPolicy) + OpenCensusSpan.fromKernelWithSpan(tracer, name, k, span, options) })( OpenCensusSpan.finish ) @@ -112,6 +116,19 @@ private[opencensus] object OpenCensusSpan { case Errored(ex) => outer.attachError(ex) } } + _ <- outer.options.links + .traverse { k => + Sync[F].delay { + Tracing.getPropagationComponent.getB3Format + .extract(k, spanContextGetter) + .getSpanId + .toLowerBase16 + } + } + .flatMap { linkedSpanIds => + // OpenCensus only has full support for parent and child links, so attach them as a field + outer.put("span.links" -> linkedSpanIds.mkString_(",")) + } _ <- Sync[F].delay(outer.span.end()) } yield () } @@ -119,75 +136,93 @@ private[opencensus] object OpenCensusSpan { def child[F[_]: Sync]( parent: OpenCensusSpan[F], name: String, - spanCreationPolicy: Span.Options.SpanCreationPolicy + options: Span.Options ): F[OpenCensusSpan[F]] = Sync[F] - .delay( - parent.tracer - .spanBuilderWithExplicitParent(name, parent.span) - .startSpan() - ) - .map(OpenCensusSpan(parent.tracer, _, spanCreationPolicy)) + .delay(parent.tracer.spanBuilderWithExplicitParent(name, parent.span)) + .flatTap(setOptionsOnBuilder[F](options)) + .flatMap(startSpan[F]) + .map(OpenCensusSpan(parent.tracer, _, options)) def root[F[_]: Sync]( tracer: Tracer, name: String, - sampler: Sampler + sampler: Sampler, + options: Span.Options ): F[OpenCensusSpan[F]] = Sync[F] - .delay( - tracer - .spanBuilder(name) - .setSampler(sampler) - .startSpan() - ) - .map(OpenCensusSpan(tracer, _, Span.Options.SpanCreationPolicy.Default)) + .delay(tracer.spanBuilder(name).setSampler(sampler)) + .flatTap(setOptionsOnBuilder[F](options)) + .flatMap(startSpan[F]) + .map(OpenCensusSpan(tracer, _, options)) def fromKernelWithSpan[F[_]: Sync]( tracer: Tracer, name: String, kernel: Kernel, span: io.opencensus.trace.Span, - spanCreationPolicy: Span.Options.SpanCreationPolicy - ): F[OpenCensusSpan[F]] = Sync[F] - .delay { - val ctx = Tracing.getPropagationComponent.getB3Format - .extract(kernel, spanContextGetter) - tracer - .spanBuilderWithRemoteParent(name, ctx) - .setParentLinks(List(span).asJava) - .startSpan() - } - .map(OpenCensusSpan(tracer, _, spanCreationPolicy)) + options: Span.Options + ): F[OpenCensusSpan[F]] = + Sync[F] + .delay { + val ctx = Tracing.getPropagationComponent.getB3Format + .extract(kernel, spanContextGetter) + tracer + .spanBuilderWithRemoteParent(name, ctx) + .setParentLinks(List(span).asJava) + } + .flatTap(setOptionsOnBuilder[F](options)) + .flatMap(startSpan[F]) + .map(OpenCensusSpan(tracer, _, options)) def fromKernel[F[_]: Sync]( tracer: Tracer, name: String, - kernel: Kernel + kernel: Kernel, + options: Span.Options ): F[OpenCensusSpan[F]] = Sync[F] .delay { val ctx = Tracing.getPropagationComponent.getB3Format .extract(kernel, spanContextGetter) - tracer.spanBuilderWithRemoteParent(name, ctx).startSpan() + tracer.spanBuilderWithRemoteParent(name, ctx) } - .map(OpenCensusSpan(tracer, _, Span.Options.SpanCreationPolicy.Default)) + .flatTap(setOptionsOnBuilder[F](options)) + .flatMap(startSpan[F]) + .map(OpenCensusSpan(tracer, _, options)) def fromKernelOrElseRoot[F[_]]( tracer: Tracer, name: String, kernel: Kernel, - sampler: Sampler + sampler: Sampler, + options: Span.Options )(implicit ev: Sync[F]): F[OpenCensusSpan[F]] = - fromKernel(tracer, name, kernel).recoverWith { + fromKernel(tracer, name, kernel, options).recoverWith { case _: SpanContextParseException => - root(tracer, name, sampler) + root(tracer, name, sampler, options) case _: NoSuchElementException => - root(tracer, name, sampler) // means headers are incomplete or invalid + root(tracer, name, sampler, options) // means headers are incomplete or invalid case _: NullPointerException => - root(tracer, name, sampler) // means headers are incomplete or invalid + root(tracer, name, sampler, options) // means headers are incomplete or invalid } private val spanContextGetter: Getter[Kernel] = (carrier: Kernel, key: String) => carrier.toHeaders(CIString(key)) + + private def setOptionsOnBuilder[F[_]: Sync]( + options: Span.Options + )(builder: SpanBuilder): F[Unit] = + options.spanKind match { + case SpanKind.Server => + Sync[F].delay(builder.setSpanKind(io.opencensus.trace.Span.Kind.SERVER)).void + case SpanKind.Client => + Sync[F].delay(builder.setSpanKind(io.opencensus.trace.Span.Kind.CLIENT)).void + case _ => + // OpenCensus only supports Server and Client span types, so ignore other types + ().pure[F] + } + + private def startSpan[F[_]: Sync](builder: SpanBuilder): F[io.opencensus.trace.Span] = + Sync[F].delay(builder.startSpan()) } diff --git a/modules/opentelemetry/src/main/scala/natchez/opentelemetry/OpenTelemetryEntryPoint.scala b/modules/opentelemetry/src/main/scala/natchez/opentelemetry/OpenTelemetryEntryPoint.scala index 514e23ab..de84216e 100644 --- a/modules/opentelemetry/src/main/scala/natchez/opentelemetry/OpenTelemetryEntryPoint.scala +++ b/modules/opentelemetry/src/main/scala/natchez/opentelemetry/OpenTelemetryEntryPoint.scala @@ -14,21 +14,33 @@ import java.net.URI final case class OpenTelemetryEntryPoint[F[_]: Sync](sdk: OTel, tracer: Tracer, prefix: Option[URI]) extends EntryPoint[F] { - override def continue(name: String, kernel: Kernel): Resource[F, Span[F]] = + override def continue(name: String, kernel: Kernel, options: Span.Options): Resource[F, Span[F]] = Resource - .makeCase(OpenTelemetrySpan.fromKernel(sdk, tracer, prefix, name, kernel))( + .makeCase( + OpenTelemetrySpan + .fromKernel(sdk, tracer, prefix, name, kernel, options) + )( OpenTelemetrySpan.finish ) .widen - override def root(name: String): Resource[F, Span[F]] = + override def root(name: String, options: Span.Options): Resource[F, Span[F]] = Resource - .makeCase(OpenTelemetrySpan.root(sdk, tracer, prefix, name))(OpenTelemetrySpan.finish) + .makeCase(OpenTelemetrySpan.root(sdk, tracer, prefix, name, options))( + OpenTelemetrySpan.finish + ) .widen - override def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]] = + override def continueOrElseRoot( + name: String, + kernel: Kernel, + options: Span.Options + ): Resource[F, Span[F]] = Resource - .makeCase(OpenTelemetrySpan.fromKernelOrElseRoot(sdk, tracer, prefix, name, kernel))( + .makeCase( + OpenTelemetrySpan + .fromKernelOrElseRoot(sdk, tracer, prefix, name, kernel, options) + )( OpenTelemetrySpan.finish ) .widen diff --git a/modules/opentelemetry/src/main/scala/natchez/opentelemetry/OpenTelemetrySpan.scala b/modules/opentelemetry/src/main/scala/natchez/opentelemetry/OpenTelemetrySpan.scala index ec2ac620..a8a9b0b5 100644 --- a/modules/opentelemetry/src/main/scala/natchez/opentelemetry/OpenTelemetrySpan.scala +++ b/modules/opentelemetry/src/main/scala/natchez/opentelemetry/OpenTelemetrySpan.scala @@ -5,17 +5,24 @@ package natchez package opentelemetry -import cats.data.Nested +import cats.data.{Chain, Nested} import cats.effect.{Resource, Sync} import cats.effect.kernel.Resource.ExitCase import cats.effect.kernel.Resource.ExitCase.{Canceled, Errored, Succeeded} import cats.syntax.all._ -import io.opentelemetry.api.trace.StatusCode +import io.opentelemetry.api.trace.{ + SpanBuilder, + SpanContext, + StatusCode, + TraceFlags, + TraceState, + Tracer, + Span => TSpan +} import io.opentelemetry.context.propagation.{TextMapGetter, TextMapSetter} import io.opentelemetry.context.Context import java.lang -import io.opentelemetry.api.trace.{Tracer, Span => TSpan} import io.opentelemetry.api.{OpenTelemetry => OTel} import TraceValue.{BooleanValue, NumberValue, StringValue} @@ -29,7 +36,7 @@ private[opentelemetry] final case class OpenTelemetrySpan[F[_]: Sync]( tracer: Tracer, span: TSpan, prefix: Option[URI], - spanCreationPolicy: Span.Options.SpanCreationPolicy + spanCreationPolicyOverride: Span.Options.SpanCreationPolicy ) extends Span.Default[F] { import OpenTelemetrySpan._ @@ -88,10 +95,20 @@ private[opentelemetry] final case class OpenTelemetrySpan[F[_]: Sync]( Span.putErrorFields( Resource .makeCase(options.parentKernel match { - case None => OpenTelemetrySpan.child(this, name, options.spanCreationPolicy) + case None => + OpenTelemetrySpan + .child(otel, this, name, options) case Some(k) => OpenTelemetrySpan - .fromKernelWithSpan(otel, tracer, name, k, span, prefix, options.spanCreationPolicy) + .fromKernelWithSpan( + otel, + tracer, + name, + k, + span, + prefix, + options + ) })( OpenTelemetrySpan.finish ) @@ -140,32 +157,79 @@ private[opentelemetry] object OpenTelemetrySpan { } def child[F[_]: Sync]( + otel: OTel, parent: OpenTelemetrySpan[F], name: String, - spanCreationPolicy: Span.Options.SpanCreationPolicy + options: Span.Options ): F[OpenTelemetrySpan[F]] = - Sync[F] - .delay( - parent.tracer - .spanBuilder(name) - .setParent(Context.current().`with`(parent.span)) - .startSpan() + createSpan(parent.tracer, name, options.spanKind) + .flatMap { spanBuilder => + Sync[F].delay { + spanBuilder.setParent(Context.current().`with`(parent.span)) + } + } + .flatMap(addLinks[F](otel, options.links)) + .flatMap(startSpan[F]) + .map( + OpenTelemetrySpan(parent.otel, parent.tracer, _, parent.prefix, options.spanCreationPolicy) ) - .map(OpenTelemetrySpan(parent.otel, parent.tracer, _, parent.prefix, spanCreationPolicy)) def root[F[_]: Sync]( otel: OTel, tracer: Tracer, prefix: Option[URI], - name: String + name: String, + options: Span.Options ): F[OpenTelemetrySpan[F]] = - Sync[F] - .delay( - tracer - .spanBuilder(name) - .startSpan() - ) - .map(OpenTelemetrySpan(otel, tracer, _, prefix, Span.Options.SpanCreationPolicy.Default)) + createSpan(tracer, name, options.spanKind) + .flatMap(addLinks[F](otel, options.links)) + .flatMap(startSpan[F]) + .map(OpenTelemetrySpan(otel, tracer, _, prefix, options.spanCreationPolicy)) + + private def startSpan[F[_]: Sync](spanBuilder: SpanBuilder): F[TSpan] = + Sync[F].delay(spanBuilder.startSpan()) + + private def createSpan[F[_]: Sync]( + tracer: Tracer, + name: String, + spanKind: Span.SpanKind + ): F[SpanBuilder] = + Sync[F].delay { + tracer + .spanBuilder(name) + .setSpanKind(spanKind match { + case Span.SpanKind.Internal => io.opentelemetry.api.trace.SpanKind.INTERNAL + case Span.SpanKind.Client => io.opentelemetry.api.trace.SpanKind.CLIENT + case Span.SpanKind.Server => io.opentelemetry.api.trace.SpanKind.SERVER + case Span.SpanKind.Producer => io.opentelemetry.api.trace.SpanKind.PRODUCER + case Span.SpanKind.Consumer => io.opentelemetry.api.trace.SpanKind.CONSUMER + }) + } + + private def addLinks[F[_]: Sync](otel: OTel, links: Chain[Kernel])( + spanBuilder: SpanBuilder + ): F[SpanBuilder] = + links.foldM(spanBuilder) { (builder, kernel) => + Sync[F].delay { + val ctx = otel.getPropagators.getTextMapPropagator.extract( + Context.current(), + kernel, + spanContextGetter + ) + val link = TSpan.fromContext(ctx).getSpanContext + + builder.addLink { + // See https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#isremote + // "When extracting a SpanContext through the Propagators API, IsRemote MUST return true" + SpanContext.createFromRemoteParent( + link.getTraceId, + link.getSpanId, + TraceFlags.getDefault, + TraceState.getDefault + ) + } + } + } def fromKernelWithSpan[F[_]: Sync]( sdk: OTel, @@ -174,42 +238,53 @@ private[opentelemetry] object OpenTelemetrySpan { kernel: Kernel, span: TSpan, prefix: Option[URI], - spanCreationPolicy: Span.Options.SpanCreationPolicy - ): F[OpenTelemetrySpan[F]] = Sync[F] - .delay { - val ctx = sdk.getPropagators.getTextMapPropagator - .extract(Context.current(), kernel, spanContextGetter) - tracer.spanBuilder(name).setParent(ctx).addLink(span.getSpanContext).startSpan - } - .map(OpenTelemetrySpan(sdk, tracer, _, prefix, spanCreationPolicy)) + options: Span.Options + ): F[OpenTelemetrySpan[F]] = + createSpan(tracer, name, options.spanKind) + .flatMap { sb => + Sync[F].delay { + val ctx = sdk.getPropagators.getTextMapPropagator + .extract(Context.current(), kernel, spanContextGetter) + sb.setParent(ctx).addLink(span.getSpanContext) + } + } + .flatMap(addLinks[F](sdk, options.links)) + .flatMap(startSpan[F]) + .map(OpenTelemetrySpan(sdk, tracer, _, prefix, options.spanCreationPolicy)) def fromKernel[F[_]: Sync]( otel: OTel, tracer: Tracer, prefix: Option[URI], name: String, - kernel: Kernel + kernel: Kernel, + options: Span.Options ): F[OpenTelemetrySpan[F]] = - Sync[F] - .delay { - val ctx = otel.getPropagators.getTextMapPropagator - .extract(Context.current(), kernel, spanContextGetter) - tracer.spanBuilder(name).setParent(ctx).startSpan() + createSpan(tracer, name, options.spanKind) + .flatMap { sb => + Sync[F].delay { + val ctx = otel.getPropagators.getTextMapPropagator + .extract(Context.current(), kernel, spanContextGetter) + sb.setParent(ctx) + } } - .map(OpenTelemetrySpan(otel, tracer, _, prefix, Span.Options.SpanCreationPolicy.Default)) + .flatMap(addLinks[F](otel, options.links)) + .flatMap(startSpan[F]) + .map(OpenTelemetrySpan(otel, tracer, _, prefix, options.spanCreationPolicy)) def fromKernelOrElseRoot[F[_]]( otel: OTel, tracer: Tracer, prefix: Option[URI], name: String, - kernel: Kernel + kernel: Kernel, + options: Span.Options )(implicit ev: Sync[F]): F[OpenTelemetrySpan[F]] = - fromKernel(otel, tracer, prefix, name, kernel).recoverWith { + fromKernel(otel, tracer, prefix, name, kernel, options).recoverWith { case _: NoSuchElementException => - root(otel, tracer, prefix, name) // means headers are incomplete or invalid + root(otel, tracer, prefix, name, options) // means headers are incomplete or invalid case _: NullPointerException => - root(otel, tracer, prefix, name) // means headers are incomplete or invalid + root(otel, tracer, prefix, name, options) // means headers are incomplete or invalid } private val spanContextGetter: TextMapGetter[Kernel] = new TextMapGetter[Kernel] { diff --git a/modules/xray/src/main/scala/natchez/xray/XRayEntryPoint.scala b/modules/xray/src/main/scala/natchez/xray/XRayEntryPoint.scala index b6c1a873..3e59d181 100644 --- a/modules/xray/src/main/scala/natchez/xray/XRayEntryPoint.scala +++ b/modules/xray/src/main/scala/natchez/xray/XRayEntryPoint.scala @@ -31,17 +31,21 @@ final class XRayEntryPoint[F[_]: Concurrent: Clock: Random: XRayEnvironment]( def make(span: F[XRaySpan[F]]): Resource[F, Span[F]] = Resource.makeCase(span)(XRaySpan.finish(_, this, _)).widen - def root(name: String): Resource[F, Span[F]] = - make(XRaySpan.root(name, this)) + override def root(name: String, options: Span.Options): Resource[F, Span[F]] = + make(XRaySpan.root(name, this, options)) - def continue(name: String, kernel: Kernel): Resource[F, Span[F]] = + override def continue(name: String, kernel: Kernel, options: Span.Options): Resource[F, Span[F]] = make( - OptionT(XRaySpan.fromKernel(name, kernel, this, useEnvironmentFallback)) + OptionT(XRaySpan.fromKernel(name, kernel, this, useEnvironmentFallback, options)) .getOrElseF(new NoSuchElementException().raiseError) ) - def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]] = - make(XRaySpan.fromKernelOrElseRoot(name, kernel, this, useEnvironmentFallback)) + override def continueOrElseRoot( + name: String, + kernel: Kernel, + options: Span.Options + ): Resource[F, Span[F]] = + make(XRaySpan.fromKernelOrElseRoot(name, kernel, this, useEnvironmentFallback, options)) } object XRayEntryPoint { diff --git a/modules/xray/src/main/scala/natchez/xray/XRaySpan.scala b/modules/xray/src/main/scala/natchez/xray/XRaySpan.scala index 8f5a3f77..83883384 100644 --- a/modules/xray/src/main/scala/natchez/xray/XRaySpan.scala +++ b/modules/xray/src/main/scala/natchez/xray/XRaySpan.scala @@ -6,23 +6,19 @@ package natchez.xray import cats._ import cats.data._ -import cats.effect._ -import cats.syntax.all._ import cats.effect.Resource.ExitCase -import natchez._ -import natchez.TraceValue._ -import cats.effect.Resource - -import java.net.URI -import io.circe.JsonObject +import cats.effect._ +import cats.effect.kernel.Resource.ExitCase.{Canceled, Errored, Succeeded} import cats.effect.std.Random +import cats.syntax.all._ import io.circe._ import io.circe.syntax._ -import cats.effect.kernel.Resource.ExitCase.Canceled -import cats.effect.kernel.Resource.ExitCase.Errored -import cats.effect.kernel.Resource.ExitCase.Succeeded +import natchez.Span.Options +import natchez.TraceValue._ +import natchez._ import org.typelevel.ci._ +import java.net.URI import scala.concurrent.duration._ import scala.util.matching.Regex @@ -36,10 +32,13 @@ private[xray] final case class XRaySpan[F[_]: Concurrent: Clock: Random]( fields: Ref[F, Map[String, Json]], children: Ref[F, List[JsonObject]], sampled: Boolean, - spanCreationPolicy: Span.Options.SpanCreationPolicy + options: Span.Options ) extends Span.Default[F] { import XRaySpan._ + override protected val spanCreationPolicyOverride: Options.SpanCreationPolicy = + options.spanCreationPolicy + def put(fields: (String, TraceValue)*): F[Unit] = { val fieldsToAdd = fields.map { case (k, v) => k -> v.asJson } this.fields.update(_ ++ fieldsToAdd.toMap) @@ -56,7 +55,7 @@ private[xray] final case class XRaySpan[F[_]: Concurrent: Clock: Random]( def log(fields: (String, TraceValue)*): F[Unit] = Applicative[F].unit override def makeSpan(name: String, options: Span.Options): Resource[F, Span[F]] = - Resource.makeCase(XRaySpan.child(this, name, options.spanCreationPolicy))( + Resource.makeCase(XRaySpan.child(this, name, options))( XRaySpan.finish[F](_, entry, _) ) @@ -120,7 +119,11 @@ private[xray] final case class XRaySpan[F[_]: Concurrent: Clock: Random]( "end_time" -> toEpochSeconds(end).asJson, "trace_id" -> xrayTraceId.asJson, "subsegments" -> cs.reverse.map(Json.fromJsonObject).asJson, - "annotations" -> allAnnotations.asJson + "annotations" -> allAnnotations.asJson, + "metadata" -> JsonObject( + "links" -> options.links.asJson, + "span.kind" -> options.spanKind.asJson + ).asJson ).deepMerge(exitCase match { case Canceled => JsonObject.singleton("fault", true.asJson) case Errored(e) => XRayException(id, e).asJsonObject @@ -135,6 +138,12 @@ private[xray] final case class XRaySpan[F[_]: Concurrent: Clock: Random]( private[xray] object XRaySpan { private[XRaySpan] val keyRegex: Regex = """[^A-Za-z0-9_]""".r + private[XRaySpan] implicit val ciStringKeyEncoder: KeyEncoder[CIString] = + KeyEncoder[String].contramap(_.toString) + private[XRaySpan] implicit val kernelEncoder: Encoder[Kernel] = + Encoder[Map[CIString, String]].contramap(_.toHeaders) + private[XRaySpan] implicit val spanKindEncoder: Encoder[Span.SpanKind] = + Encoder[String].contramap(_.toString) final case class XRayException(id: String, ex: Throwable) @@ -206,7 +215,8 @@ private[xray] object XRaySpan { def fromHeader[F[_]: Concurrent: Clock: Random]( name: String, header: XRayHeader, - entry: XRayEntryPoint[F] + entry: XRayEntryPoint[F], + options: Span.Options ): F[XRaySpan[F]] = ( segmentId[F], @@ -225,7 +235,7 @@ private[xray] object XRaySpan { children = children, parent = header.parentId.map(_.asLeft), sampled = header.sampled, - spanCreationPolicy = Span.Options.SpanCreationPolicy.Default + options = options ) } @@ -233,17 +243,18 @@ private[xray] object XRaySpan { name: String, kernel: Kernel, entry: XRayEntryPoint[F], - useEnvironmentFallback: Boolean + useEnvironmentFallback: Boolean, + options: Span.Options ): F[Option[XRaySpan[F]]] = OptionT .fromOption[F](kernel.toHeaders.get(Header)) .subflatMap(parseHeader) - .semiflatMap(fromHeader(name, _, entry)) + .semiflatMap(fromHeader(name, _, entry, options)) .orElse { OptionT .whenF(useEnvironmentFallback) { XRayEnvironment[F].kernelFromEnvironment - .flatMap(XRaySpan.fromKernel(name, _, entry, useEnvironmentFallback = false)) + .flatMap(XRaySpan.fromKernel(name, _, entry, useEnvironmentFallback = false, options)) } .flattenOption } @@ -253,14 +264,16 @@ private[xray] object XRaySpan { name: String, kernel: Kernel, entry: XRayEntryPoint[F], - useEnvironmentFallback: Boolean + useEnvironmentFallback: Boolean, + options: Span.Options ): F[XRaySpan[F]] = - OptionT(fromKernel(name, kernel, entry, useEnvironmentFallback)) - .getOrElseF(root(name, entry)) + OptionT(fromKernel(name, kernel, entry, useEnvironmentFallback, options)) + .getOrElseF(root(name, entry, options)) def root[F[_]: Concurrent: Clock: Random]( name: String, - entry: XRayEntryPoint[F] + entry: XRayEntryPoint[F], + options: Span.Options ): F[XRaySpan[F]] = ( segmentId[F], @@ -280,14 +293,14 @@ private[xray] object XRaySpan { children = children, parent = None, sampled = true, - spanCreationPolicy = Span.Options.SpanCreationPolicy.Default + options = options ) } def child[F[_]: Concurrent: Clock: Random]( parent: XRaySpan[F], name: String, - spanCreationPolicy: Span.Options.SpanCreationPolicy + options: Span.Options ): F[XRaySpan[F]] = ( segmentId[F], @@ -305,7 +318,7 @@ private[xray] object XRaySpan { children = children, parent = Some(Right(parent)), sampled = parent.sampled, - spanCreationPolicy = spanCreationPolicy + options = options ) }