Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SpanKind and linked traces as Span options #688

Merged
merged 11 commits into from
Jan 13, 2023
32 changes: 24 additions & 8 deletions modules/core/shared/src/main/scala/EntryPoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,32 @@ import cats.effect.Resource
trait EntryPoint[F[_]] {

/** Resource that creates a new root span in a new trace. */
def root(name: String): Resource[F, Span[F]]
final def root(name: String): Resource[F, Span[F]] = root(name, Span.Options.Defaults)

def root(name: String, options: Span.Options): Resource[F, Span[F]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to add some docs about the use cases for options here. Nothing too extensive.


/** Resource that creates a new span as the child of the span specified by the given kernel,
* which typically arrives via request headers. By this mechanism we can continue a trace that
* began in another system. If the required headers are not present in `kernel` an exception will
* be raised in `F`.
*/
def continue(name: String, kernel: Kernel): Resource[F, Span[F]]
final def continue(name: String, kernel: Kernel): Resource[F, Span[F]] =
continue(name, kernel, Span.Options.Defaults)

def continue(name: String, kernel: Kernel, options: Span.Options): Resource[F, Span[F]]

/** Resource that attempts to creates a new span as with `continue`, but falls back to a new root
* span as with `root` if the kernel does not contain the required headers. In other words, we
* continue the existing span if we can, otherwise we start a new one.
*/
def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]]
final def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]] =
continueOrElseRoot(name, kernel, Span.Options.Defaults)

def continueOrElseRoot(
name: String,
kernel: Kernel,
options: Span.Options
): Resource[F, Span[F]]

/** Converts this `EntryPoint[F]` to an `EntryPoint[G]` using an `F ~> G`.
*/
Expand All @@ -42,17 +54,21 @@ trait EntryPoint[F[_]] {

new EntryPoint[G] {

override def root(name: String): Resource[G, Span[G]] = aux(outer.root(name))
override def root(name: String, options: Span.Options): Resource[G, Span[G]] = aux(
outer.root(name, options)
)

override def continue(
name: String,
kernel: Kernel
): Resource[G, Span[G]] = aux(outer.continue(name, kernel))
kernel: Kernel,
options: Span.Options
): Resource[G, Span[G]] = aux(outer.continue(name, kernel, options))

override def continueOrElseRoot(
name: String,
kernel: Kernel
): Resource[G, Span[G]] = aux(outer.continueOrElseRoot(name, kernel))
kernel: Kernel,
options: Span.Options
): Resource[G, Span[G]] = aux(outer.continueOrElseRoot(name, kernel, options))
}
}
}
48 changes: 38 additions & 10 deletions modules/core/shared/src/main/scala/Span.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@

package natchez

import cats.data.Kleisli
import cats.data.{Chain, Kleisli}
import cats.effect.MonadCancel
import cats.effect.Resource
import cats.effect.Resource.ExitCase
import cats.syntax.applicative._
import cats.{~>, Applicative}
import cats.{Applicative, ~>}

import java.net.URI

/** An span that can be passed around and used to create child spans. */
Expand Down Expand Up @@ -89,10 +90,10 @@ trait Span[F[_]] {
object Span {

abstract class Default[F[_]: Applicative] extends Span[F] {
protected val spanCreationPolicy: Options.SpanCreationPolicy
protected val spanCreationPolicyOverride: Options.SpanCreationPolicy

def span(name: String, options: Options): Resource[F, Span[F]] =
spanCreationPolicy match {
override final def span(name: String, options: Options): Resource[F, Span[F]] =
spanCreationPolicyOverride match {
case Options.SpanCreationPolicy.Suppress => Resource.pure(Span.noop[F])
case Options.SpanCreationPolicy.Coalesce => Resource.pure(this)
case Options.SpanCreationPolicy.Default => makeSpan(name, options)
Expand Down Expand Up @@ -171,10 +172,16 @@ object Span {

/** Specifies how additional span creation requests are handled on the new span. */
def spanCreationPolicy: Options.SpanCreationPolicy
def spanKind: Span.SpanKind
def links: Chain[Kernel]

def withParentKernel(kernel: Kernel): Options
def withoutParentKernel: Options
def withSpanCreationPolicy(p: Options.SpanCreationPolicy): Options

def withSpanKind(spanKind: SpanKind): Options

def withLink(kernel: Kernel): Options
}

object Options {
Expand All @@ -193,17 +200,38 @@ object Span {

private case class OptionsImpl(
parentKernel: Option[Kernel],
spanCreationPolicy: SpanCreationPolicy
spanCreationPolicy: SpanCreationPolicy,
spanKind: SpanKind,
links: Chain[Kernel]
) extends Options {
def withParentKernel(kernel: Kernel): Options = OptionsImpl(Some(kernel), spanCreationPolicy)
def withoutParentKernel: Options = OptionsImpl(None, spanCreationPolicy)
def withSpanCreationPolicy(p: SpanCreationPolicy): Options = OptionsImpl(parentKernel, p)
override def withParentKernel(kernel: Kernel): Options =
OptionsImpl(Some(kernel), spanCreationPolicy, spanKind, links)
override def withoutParentKernel: Options =
OptionsImpl(None, spanCreationPolicy, spanKind, links)
override def withSpanCreationPolicy(p: SpanCreationPolicy): Options =
OptionsImpl(parentKernel, p, spanKind, links)
override def withSpanKind(spanKind: SpanKind): Options =
OptionsImpl(parentKernel, spanCreationPolicy, spanKind, links)
override def withLink(kernel: Kernel): Options =
OptionsImpl(parentKernel, spanCreationPolicy, spanKind, links.append(kernel))
}

val Defaults: Options = OptionsImpl(None, SpanCreationPolicy.Default)
val Defaults: Options =
OptionsImpl(None, SpanCreationPolicy.Default, SpanKind.Internal, Chain.empty)
val Suppress: Options = Defaults.withSpanCreationPolicy(SpanCreationPolicy.Suppress)
val Coalesce: Options = Defaults.withSpanCreationPolicy(SpanCreationPolicy.Coalesce)

def parentKernel(kernel: Kernel): Options = Defaults.withParentKernel(kernel)
}

sealed trait SpanKind

object SpanKind {
case object Internal extends SpanKind
case object Client extends SpanKind
case object Server extends SpanKind
case object Producer extends SpanKind
case object Consumer extends SpanKind
}

}
50 changes: 33 additions & 17 deletions modules/core/shared/src/test/scala/InMemory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ object InMemory {
lineage: Lineage,
k: Kernel,
ref: Ref[IO, Chain[(Lineage, NatchezCommand)]],
val spanCreationPolicy: Options.SpanCreationPolicy
val options: Options
) extends natchez.Span.Default[IO] {
override protected val spanCreationPolicyOverride: Options.SpanCreationPolicy =
options.spanCreationPolicy

def put(fields: (String, natchez.TraceValue)*): IO[Unit] =
ref.update(_.append(lineage -> NatchezCommand.Put(fields.toList)))
Expand All @@ -38,8 +40,8 @@ object InMemory {

def makeSpan(name: String, options: Options): Resource[IO, natchez.Span[IO]] = {
val acquire = ref
.update(_.append(lineage -> NatchezCommand.CreateSpan(name, options.parentKernel)))
.as(new Span(lineage / name, k, ref, options.spanCreationPolicy))
.update(_.append(lineage -> NatchezCommand.CreateSpan(name, options.parentKernel, options)))
.as(new Span(lineage / name, k, ref, options))

val release = ref.update(_.append(lineage -> NatchezCommand.ReleaseSpan(name)))

Expand All @@ -59,19 +61,31 @@ object InMemory {
class EntryPoint(val ref: Ref[IO, Chain[(Lineage, NatchezCommand)]])
extends natchez.EntryPoint[IO] {

def root(name: String): Resource[IO, Span] =
newSpan(name, Kernel(Map.empty))

def continue(name: String, kernel: Kernel): Resource[IO, Span] =
newSpan(name, kernel)

def continueOrElseRoot(name: String, kernel: Kernel): Resource[IO, Span] =
newSpan(name, kernel)

private def newSpan(name: String, kernel: Kernel): Resource[IO, Span] = {
override def root(name: String, options: natchez.Span.Options): Resource[IO, Span] =
newSpan(name, Kernel(Map.empty), options)

override def continue(
name: String,
kernel: Kernel,
options: natchez.Span.Options
): Resource[IO, Span] =
newSpan(name, kernel, options)

override def continueOrElseRoot(
name: String,
kernel: Kernel,
options: natchez.Span.Options
): Resource[IO, Span] =
newSpan(name, kernel, options)

private def newSpan(
name: String,
kernel: Kernel,
options: natchez.Span.Options
): Resource[IO, Span] = {
val acquire = ref
.update(_.append(Lineage.Root -> NatchezCommand.CreateRootSpan(name, kernel)))
.as(new Span(Lineage.Root, kernel, ref, Options.SpanCreationPolicy.Default))
.update(_.append(Lineage.Root -> NatchezCommand.CreateRootSpan(name, kernel, options)))
.as(new Span(Lineage.Root, kernel, ref, options))

val release = ref.update(_.append(Lineage.Root -> NatchezCommand.ReleaseRootSpan(name)))

Expand Down Expand Up @@ -99,13 +113,15 @@ object InMemory {
case object AskTraceId extends NatchezCommand
case object AskTraceUri extends NatchezCommand
case class Put(fields: List[(String, natchez.TraceValue)]) extends NatchezCommand
case class CreateSpan(name: String, kernel: Option[Kernel]) extends NatchezCommand
case class CreateSpan(name: String, kernel: Option[Kernel], options: natchez.Span.Options)
extends NatchezCommand
case class ReleaseSpan(name: String) extends NatchezCommand
case class AttachError(err: Throwable) extends NatchezCommand
case class LogEvent(event: String) extends NatchezCommand
case class LogFields(fields: List[(String, TraceValue)]) extends NatchezCommand
// entry point
case class CreateRootSpan(name: String, kernel: Kernel) extends NatchezCommand
case class CreateRootSpan(name: String, kernel: Kernel, options: natchez.Span.Options)
extends NatchezCommand
case class ReleaseRootSpan(name: String) extends NatchezCommand
}

Expand Down
8 changes: 4 additions & 4 deletions modules/core/shared/src/test/scala/SpanCoalesceTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ class SpanCoalesceTest extends InMemorySuite {
}

def expectedHistory = List(
(Lineage.Root, NatchezCommand.CreateRootSpan("root", Kernel(Map()))),
(Lineage.Root, NatchezCommand.CreateSpan("suppressed", None)),
(Lineage.Root, NatchezCommand.CreateRootSpan("root", Kernel(Map()), Span.Options.Defaults)),
(Lineage.Root, NatchezCommand.CreateSpan("suppressed", None, Span.Options.Suppress)),
(Lineage.Root, NatchezCommand.ReleaseSpan("suppressed")),
(Lineage.Root, NatchezCommand.ReleaseRootSpan("root"))
)
Expand All @@ -36,8 +36,8 @@ class SpanCoalesceTest extends InMemorySuite {
}

def expectedHistory = List(
(Lineage.Root, NatchezCommand.CreateRootSpan("root", Kernel(Map()))),
(Lineage.Root, NatchezCommand.CreateSpan("coalesced", None)),
(Lineage.Root, NatchezCommand.CreateRootSpan("root", Kernel(Map()), Span.Options.Defaults)),
(Lineage.Root, NatchezCommand.CreateSpan("coalesced", None, Span.Options.Coalesce)),
(Lineage.Root / "coalesced", NatchezCommand.Put(List("answer" -> 42))),
(Lineage.Root, NatchezCommand.ReleaseSpan("coalesced")),
(Lineage.Root, NatchezCommand.ReleaseRootSpan("root"))
Expand Down
12 changes: 6 additions & 6 deletions modules/core/shared/src/test/scala/SpanPropagationTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ class SpanPropagationTest extends InMemorySuite {
Trace[F].span("parent")(Trace[F].span("child")(Trace[F].put("answer" -> 42)))

def expectedHistory = List(
(Lineage.Root, NatchezCommand.CreateRootSpan("root", Kernel(Map()))),
(Lineage.Root, NatchezCommand.CreateSpan("parent", None)),
(Lineage.Root / "parent", NatchezCommand.CreateSpan("child", None)),
(Lineage.Root, NatchezCommand.CreateRootSpan("root", Kernel(Map()), Span.Options.Defaults)),
(Lineage.Root, NatchezCommand.CreateSpan("parent", None, Span.Options.Defaults)),
(Lineage.Root / "parent", NatchezCommand.CreateSpan("child", None, Span.Options.Defaults)),
(Lineage.Root / "parent" / "child", NatchezCommand.Put(List("answer" -> 42))),
(Lineage.Root / "parent", NatchezCommand.ReleaseSpan("child")),
(Lineage.Root, NatchezCommand.ReleaseSpan("parent")),
Expand All @@ -38,9 +38,9 @@ class SpanPropagationTest extends InMemorySuite {
}

def expectedHistory = List(
(Lineage.Root, NatchezCommand.CreateRootSpan("root", Kernel(Map()))),
(Lineage.Root, NatchezCommand.CreateSpan("spanR", None)),
(Lineage.Root, NatchezCommand.CreateSpan("span", None)),
(Lineage.Root, NatchezCommand.CreateRootSpan("root", Kernel(Map()), Span.Options.Defaults)),
(Lineage.Root, NatchezCommand.CreateSpan("spanR", None, Span.Options.Defaults)),
(Lineage.Root, NatchezCommand.CreateSpan("span", None, Span.Options.Defaults)),
(Lineage.Root / "spanR", NatchezCommand.Put(List("question" -> "ultimate"))),
(Lineage.Root / "span", NatchezCommand.Put(List("answer" -> 42))),
(Lineage.Root, NatchezCommand.ReleaseSpan("span")),
Expand Down
49 changes: 32 additions & 17 deletions modules/datadog/src/main/scala/DDEntryPoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,47 @@ import cats.effect._
import cats.syntax.all._
import io.opentracing.propagation.{Format, TextMapAdapter}
import io.{opentracing => ot}
import natchez.datadog.DDTracer._

import java.net.URI

final class DDEntryPoint[F[_]: Sync](tracer: ot.Tracer, uriPrefix: Option[URI])
extends EntryPoint[F] {
override def root(name: String): Resource[F, Span[F]] =
override def root(name: String, options: Span.Options): Resource[F, Span[F]] =
Resource
.make(Sync[F].delay(tracer.buildSpan(name).start()))(s => Sync[F].delay(s.finish()))
.map(DDSpan(tracer, _, uriPrefix, Span.Options.SpanCreationPolicy.Default))
.make {
Sync[F]
.delay(tracer.buildSpan(name))
.flatTap(addSpanKind(_, options.spanKind))
.flatMap(options.links.foldM(_)(addLink[F](tracer)))
.flatMap(builder => Sync[F].delay(builder.start()))
}(s => Sync[F].delay(s.finish()))
.map(DDSpan(tracer, _, uriPrefix, options))

override def continue(name: String, kernel: Kernel): Resource[F, Span[F]] =
override def continue(name: String, kernel: Kernel, options: Span.Options): Resource[F, Span[F]] =
Resource
.make(
Sync[F].delay {
val spanContext = tracer.extract(
Format.Builtin.HTTP_HEADERS,
new TextMapAdapter(kernel.toJava)
)
tracer.buildSpan(name).asChildOf(spanContext).start()
}
)(s => Sync[F].delay(s.finish()))
.map(DDSpan(tracer, _, uriPrefix, Span.Options.SpanCreationPolicy.Default))
.make {
Sync[F]
.delay {
val spanContext = tracer.extract(
Format.Builtin.HTTP_HEADERS,
new TextMapAdapter(kernel.toJava)
)
tracer.buildSpan(name).asChildOf(spanContext)
}
.flatTap(addSpanKind(_, options.spanKind))
.flatMap(options.links.foldM(_)(addLink[F](tracer)))
.flatMap(builder => Sync[F].delay(builder.start()))
}(s => Sync[F].delay(s.finish()))
.map(DDSpan(tracer, _, uriPrefix, options))

override def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]] =
continue(name, kernel).flatMap {
case null => root(name) // hurr, means headers are incomplete or invalid
override def continueOrElseRoot(
name: String,
kernel: Kernel,
options: Span.Options
): Resource[F, Span[F]] =
continue(name, kernel, options).flatMap {
case null => root(name, options) // hurr, means headers are incomplete or invalid
case span => span.pure[Resource[F, *]]
}
}
Loading