Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sdk-trace: use MapRef in SpanStorage #796

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,84 +17,124 @@
package org.typelevel.otel4s.benchmarks

import cats.effect.IO
import cats.effect.Resource
import cats.effect.unsafe.implicits.global
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.`export`.SimpleSpanProcessor
import org.openjdk.jmh.annotations._
import org.typelevel.otel4s.oteljava.OtelJava
import org.typelevel.otel4s.trace.Tracer

import java.util.concurrent.TimeUnit

// benchmarks/Jmh/run org.typelevel.otel4s.benchmarks.TraceBenchmark -prof gc
@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(2)
@Measurement(iterations = 40, time = 1)
@Warmup(iterations = 5, time = 1)
class TraceBenchmark {

import TraceBenchmark._

@Param(Array("noop", "oteljava", "sdk"))
var backend: String = _
var tracer: Tracer[IO] = _
var finalizer: IO[Unit] = _

@Benchmark
def pure(): Unit =
IO.unit.unsafeRunSync()

@Benchmark
def noop(ctx: NoopTracer): Unit = {
import ctx._
def span(): Unit =
tracer.span("span").use_.unsafeRunSync()
}

@Benchmark
def inMemoryDisabled(ctx: InMemoryTracer): Unit = {
import ctx._
tracer
.noopScope(
tracer.span("span").use_
)
.unsafeRunSync()
}
def noopScope(): Unit =
tracer.noopScope(tracer.span("span").use_).unsafeRunSync()

@Benchmark
def inMemoryEnabled(ctx: InMemoryTracer): Unit = {
import ctx._
tracer
.rootScope(
tracer.span("span").use_
)
.unsafeRunSync()
}
def rootScope(): Unit =
tracer.rootScope(tracer.span("span").use_).unsafeRunSync()

@Setup(Level.Trial)
def setup(): Unit =
backend match {
case "noop" =>
tracer = noopTracer
finalizer = IO.unit

case "oteljava" =>
val (t, release) = otelJavaTracer.allocated.unsafeRunSync()

tracer = t
finalizer = release

case "sdk" =>
val (t, release) = sdkTracer.allocated.unsafeRunSync()

tracer = t
finalizer = release

case other =>
sys.error(s"unknown backend [$other]")
}

@TearDown(Level.Trial)
def cleanup(): Unit =
finalizer.unsafeRunSync()
}

object TraceBenchmark {
@State(Scope.Benchmark)
class NoopTracer {
implicit val tracer: Tracer[IO] = Tracer.noop
}

@State(Scope.Benchmark)
class InMemoryTracer {
private def makeTracer: IO[Tracer[IO]] = {
val exporter = InMemorySpanExporter.create()
private def otelJavaTracer: Resource[IO, Tracer[IO]] = {
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.`export`.BatchSpanProcessor
import org.typelevel.otel4s.oteljava.OtelJava

val builder = SdkTracerProvider
.builder()
.addSpanProcessor(SimpleSpanProcessor.create(exporter))
def exporter = InMemorySpanExporter.create()

val tracerProvider: SdkTracerProvider =
builder.build()
def builder = SdkTracerProvider
.builder()
.addSpanProcessor(BatchSpanProcessor.builder(exporter).build())

val otel = OpenTelemetrySdk
.builder()
.setTracerProvider(tracerProvider)
.build()
def tracerProvider: SdkTracerProvider =
builder.build()

OtelJava.forAsync[IO](otel).flatMap {
_.tracerProvider.tracer("trace-benchmark").get
def otel = OpenTelemetrySdk
.builder()
.setTracerProvider(tracerProvider)
.build()

OtelJava
.resource[IO](IO(otel))
.evalMap(_.tracerProvider.tracer("trace-benchmark").get)
}

private def sdkTracer: Resource[IO, Tracer[IO]] = {
import cats.effect.std.Random
import org.typelevel.otel4s.context.LocalProvider
import org.typelevel.otel4s.sdk.context.Context
import org.typelevel.otel4s.sdk.testkit.trace.InMemorySpanExporter
import org.typelevel.otel4s.sdk.trace.SdkTracerProvider
import org.typelevel.otel4s.sdk.trace.processor.BatchSpanProcessor

Resource.eval(InMemorySpanExporter.create[IO](None)).flatMap { exporter =>
BatchSpanProcessor.builder(exporter).build.evalMap { processor =>
Random.scalaUtilRandom[IO].flatMap { implicit random =>
LocalProvider[IO, Context].local.flatMap { implicit local =>
for {
tracerProvider <- SdkTracerProvider.builder[IO].addSpanProcessor(processor).build
tracer <- tracerProvider.get("trace-benchmark")
} yield tracer
}
}
}
}

implicit val tracer: Tracer[IO] =
makeTracer.unsafeRunSync()
}

private def noopTracer: Tracer[IO] =
Tracer.noop

}
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ThisBuild / tlBaseVersion := "0.10"
ThisBuild / tlBaseVersion := "0.11"

ThisBuild / organization := "org.typelevel"
ThisBuild / organizationName := "Typelevel"
Expand Down Expand Up @@ -710,7 +710,7 @@ lazy val benchmarks = project
.enablePlugins(NoPublishPlugin)
.enablePlugins(JmhPlugin)
.in(file("benchmarks"))
.dependsOn(core.jvm, sdk.jvm, oteljava)
.dependsOn(core.jvm, sdk.jvm, `sdk-testkit`.jvm, oteljava)
.settings(
name := "otel4s-benchmarks",
libraryDependencies ++= Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,7 @@ class OpenTelemetrySdkSuite extends CatsEffectSuite {
s"OpenTelemetrySdk{meterProvider=$meterProvider, " +
"tracerProvider=" +
s"SdkTracerProvider{resource=$resource, spanLimits=$spanLimits, sampler=$sampler, " +
"spanProcessor=SpanProcessor.Multi(" +
s"BatchSpanProcessor{exporter=$exporter, scheduleDelay=5 seconds, exporterTimeout=30 seconds, maxQueueSize=2048, maxExportBatchSize=512}, " +
"SpanStorage)}, " +
s"spanProcessor=BatchSpanProcessor{exporter=$exporter, scheduleDelay=5 seconds, exporterTimeout=30 seconds, maxQueueSize=2048, maxExportBatchSize=512}}, " +
s"propagators=$propagators}, resource=$resource}"

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import scala.concurrent.duration.FiniteDuration
private final class SdkSpanBackend[F[_]: Monad: Clock: Console] private (
spanLimits: SpanLimits,
spanProcessor: SpanProcessor[F],
spanStorage: SpanStorage[F],
immutableState: SdkSpanBackend.ImmutableState,
mutableState: Ref[F, SdkSpanBackend.MutableState]
) extends Span.Backend[F]
Expand Down Expand Up @@ -174,6 +175,7 @@ private final class SdkSpanBackend[F[_]: Monad: Clock: Console] private (
def end(timestamp: FiniteDuration): F[Unit] =
for {
updated <- updateState("end")(s => s.copy(endTimestamp = Some(timestamp)))
_ <- spanStorage.remove(this)
_ <- toSpanData.flatMap(span => spanProcessor.onEnd(span)).whenA(updated)
} yield ()

Expand Down Expand Up @@ -270,6 +272,9 @@ private object SdkSpanBackend {
* @param processor
* the [[SpanProcessor]] to call on span's start and end
*
* @param spanStorage
* the `SpanStorage` to store the span at
*
* @param attributes
* the [[Attributes]] of the span
*
Expand All @@ -289,6 +294,7 @@ private object SdkSpanBackend {
parentContext: Option[SpanContext],
spanLimits: SpanLimits,
processor: SpanProcessor[F],
spanStorage: SpanStorage[F],
attributes: LimitedData[Attribute[_], Attributes],
links: LimitedData[LinkData, Vector[LinkData]],
userStartTimestamp: Option[FiniteDuration]
Expand All @@ -315,12 +321,8 @@ private object SdkSpanBackend {
for {
start <- userStartTimestamp.fold(Clock[F].realTime)(_.pure)
state <- Ref[F].of(mutableState)
backend = new SdkSpanBackend[F](
spanLimits,
processor,
immutableState(start),
state
)
backend = new SdkSpanBackend[F](spanLimits, processor, spanStorage, immutableState(start), state)
_ <- spanStorage.add(backend)
_ <- processor.onStart(parentContext, backend)
} yield backend
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ private final case class SdkSpanBuilder[F[_]: Temporal: Console] private (
parentContext = parentSpanContext,
spanLimits = tracerSharedState.spanLimits,
processor = tracerSharedState.spanProcessor,
spanStorage = tracerSharedState.spanStorage,
attributes = attributes.appendAll(samplingResult.attributes),
links = links,
userStartTimestamp = state.startTimestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.typelevel.otel4s.context.propagation.TextMapUpdater
import org.typelevel.otel4s.meta.InstrumentMeta
import org.typelevel.otel4s.sdk.common.InstrumentationScope
import org.typelevel.otel4s.sdk.context.Context
import org.typelevel.otel4s.sdk.trace.processor.SpanStorage
import org.typelevel.otel4s.trace.Span
import org.typelevel.otel4s.trace.SpanBuilder
import org.typelevel.otel4s.trace.SpanContext
Expand All @@ -39,8 +38,7 @@ private final class SdkTracer[F[_]: Temporal: Console] private[trace] (
scopeInfo: InstrumentationScope,
propagators: ContextPropagators[Context],
sharedState: TracerSharedState[F],
traceScope: TraceScope[F, Context],
storage: SpanStorage[F]
traceScope: TraceScope[F, Context]
) extends Tracer[F] {

val meta: InstrumentMeta[F] = InstrumentMeta.enabled[F]
Expand All @@ -49,10 +47,9 @@ private final class SdkTracer[F[_]: Temporal: Console] private[trace] (
traceScope.current.map(current => current.filter(_.isValid))

private[this] def currentBackend: OptionT[F, Span.Backend[F]] =
OptionT(traceScope.current)
.semiflatMap { ctx =>
OptionT(storage.get(ctx)).getOrElse(Span.Backend.propagating(ctx))
}
OptionT(traceScope.current).semiflatMap { ctx =>
OptionT(sharedState.spanStorage.get(ctx)).getOrElse(Span.Backend.propagating(ctx))
}

def currentSpanOrNoop: F[Span[F]] =
currentBackend
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.typelevel.otel4s.Attributes
import org.typelevel.otel4s.context.propagation.ContextPropagators
import org.typelevel.otel4s.sdk.common.InstrumentationScope
import org.typelevel.otel4s.sdk.context.Context
import org.typelevel.otel4s.sdk.trace.processor.SpanStorage
import org.typelevel.otel4s.trace.TraceScope
import org.typelevel.otel4s.trace.Tracer
import org.typelevel.otel4s.trace.TracerBuilder
Expand All @@ -32,7 +31,6 @@ private final case class SdkTracerBuilder[F[_]: Temporal: Console](
propagators: ContextPropagators[Context],
traceScope: TraceScope[F, Context],
sharedState: TracerSharedState[F],
storage: SpanStorage[F],
name: String,
version: Option[String] = None,
schemaUrl: Option[String] = None
Expand All @@ -50,8 +48,7 @@ private final case class SdkTracerBuilder[F[_]: Temporal: Console](
InstrumentationScope(name, version, schemaUrl, Attributes.empty),
propagators,
sharedState,
traceScope,
storage
traceScope
)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.typelevel.otel4s.context.propagation.TextMapPropagator
import org.typelevel.otel4s.sdk.context.Context
import org.typelevel.otel4s.sdk.context.LocalContext
import org.typelevel.otel4s.sdk.trace.processor.SpanProcessor
import org.typelevel.otel4s.sdk.trace.processor.SpanStorage
import org.typelevel.otel4s.sdk.trace.samplers.Sampler
import org.typelevel.otel4s.trace.TraceScope
import org.typelevel.otel4s.trace.TracerBuilder
Expand All @@ -50,11 +49,12 @@ private class SdkTracerProvider[F[_]: Temporal: Parallel: Console](
resource,
spanLimits,
sampler,
SpanProcessor.of(spanProcessors: _*)
SpanProcessor.of(spanProcessors: _*),
storage
)

def tracer(name: String): TracerBuilder[F] =
new SdkTracerBuilder[F](propagators, traceScope, sharedState, storage, name)
new SdkTracerBuilder[F](propagators, traceScope, sharedState, name)

override def toString: String =
"SdkTracerProvider{" +
Expand Down Expand Up @@ -212,7 +212,7 @@ object SdkTracerProvider {
spanLimits,
sampler,
ContextPropagators.of(propagators: _*),
spanProcessors :+ storage,
spanProcessors,
SdkTraceScope.fromLocal[F],
storage
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@
*/

package org.typelevel.otel4s.sdk.trace
package processor

import cats.Applicative
import cats.effect.Concurrent
import cats.effect.Ref
import cats.effect.std.MapRef
import cats.syntax.functor._
import org.typelevel.otel4s.sdk.trace.data.SpanData
import org.typelevel.otel4s.trace.SpanContext

/** The span storage is used to keep the references of the active SpanRefs.
Expand All @@ -32,30 +29,23 @@ import org.typelevel.otel4s.trace.SpanContext
* @tparam F
* the higher-kinded type of a polymorphic effect
*/
private[trace] class SpanStorage[F[_]: Applicative] private (
storage: Ref[F, Map[SpanContext, SpanRef[F]]]
) extends SpanProcessor[F] {
val name: String = "SpanStorage"
private class SpanStorage[F[_]] private (
storage: MapRef[F, SpanContext, Option[SpanRef[F]]]
) {

def isStartRequired: Boolean = true
def isEndRequired: Boolean = true
def add(span: SpanRef[F]): F[Unit] =
storage(span.context).set(Some(span))

def onStart(parentContext: Option[SpanContext], span: SpanRef[F]): F[Unit] =
storage.update(_.updated(span.context, span))

def onEnd(span: SpanData): F[Unit] =
storage.update(_.removed(span.spanContext))
def remove(span: SpanRef[F]): F[Unit] =
storage(span.context).set(None)

def get(context: SpanContext): F[Option[SpanRef[F]]] =
storage.get.map(_.get(context))

def forceFlush: F[Unit] =
Applicative[F].unit
storage(context).get
}

private[trace] object SpanStorage {
private object SpanStorage {
def create[F[_]: Concurrent]: F[SpanStorage[F]] =
for {
storage <- Ref[F].of(Map.empty[SpanContext, SpanRef[F]])
storage <- MapRef.ofShardedImmutableMap[F, SpanContext, SpanRef[F]](Runtime.getRuntime.availableProcessors())
} yield new SpanStorage[F](storage)
}
Loading