Skip to content

Commit

Permalink
Merge pull request #788 from iRevive/sdk-trace/span-processor-visibility
Browse files Browse the repository at this point in the history
sdk-trace: make `SimpleSpanProcessor` and `BatchSpanProcessor` private
  • Loading branch information
iRevive authored Sep 24, 2024
2 parents 16fdfc5 + 94c5578 commit 9c176c2
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import org.typelevel.otel4s.sdk.autoconfigure.AutoConfigure
import org.typelevel.otel4s.sdk.autoconfigure.Config
import org.typelevel.otel4s.sdk.trace.exporter.SpanExporter
import org.typelevel.otel4s.sdk.trace.processor.BatchSpanProcessor
import org.typelevel.otel4s.sdk.trace.processor.SpanProcessor

import scala.concurrent.duration.FiniteDuration
import scala.util.chaining._

/** Autoconfigures [[BatchSpanProcessor]].
*
Expand All @@ -48,41 +50,27 @@ import scala.concurrent.duration.FiniteDuration
*/
private final class BatchSpanProcessorAutoConfigure[F[_]: Temporal: Console](
exporter: SpanExporter[F]
) extends AutoConfigure.WithHint[F, BatchSpanProcessor[F]](
) extends AutoConfigure.WithHint[F, SpanProcessor[F]](
"BatchSpanProcessor",
BatchSpanProcessorAutoConfigure.ConfigKeys.All
) {

import BatchSpanProcessorAutoConfigure.ConfigKeys

def fromConfig(config: Config): Resource[F, BatchSpanProcessor[F]] = {
def fromConfig(config: Config): Resource[F, SpanProcessor[F]] = {
def configure =
for {
scheduleDelay <- config.get(ConfigKeys.ScheduleDelay)
maxQueueSize <- config.get(ConfigKeys.MaxQueueSize)
maxExportBatchSize <- config.get(ConfigKeys.MaxExportBatchSize)
exporterTimeout <- config.get(ConfigKeys.ExporterTimeout)
} yield {
val builder = BatchSpanProcessor.builder(exporter)

val withScheduleDelay =
scheduleDelay.foldLeft(builder)(_.withScheduleDelay(_))

val withMaxQueueSize =
maxQueueSize.foldLeft(withScheduleDelay)(_.withMaxQueueSize(_))

val withMaxExportBatchSize =
maxExportBatchSize.foldLeft(withMaxQueueSize)(
_.withMaxExportBatchSize(_)
)

val withExporterTimeout =
exporterTimeout.foldLeft(withMaxExportBatchSize)(
_.withExporterTimeout(_)
)

withExporterTimeout.build
}
} yield BatchSpanProcessor
.builder(exporter)
.pipe(b => scheduleDelay.foldLeft(b)(_.withScheduleDelay(_)))
.pipe(b => maxQueueSize.foldLeft(b)(_.withMaxQueueSize(_)))
.pipe(b => maxExportBatchSize.foldLeft(b)(_.withMaxExportBatchSize(_)))
.pipe(b => exporterTimeout.foldLeft(b)(_.withExporterTimeout(_)))
.build

Resource.suspend(
Temporal[F]
Expand Down Expand Up @@ -130,9 +118,7 @@ private[sdk] object BatchSpanProcessorAutoConfigure {
* @param exporter
* the exporter to use with the configured batch span processor
*/
def apply[F[_]: Temporal: Console](
exporter: SpanExporter[F]
): AutoConfigure[F, BatchSpanProcessor[F]] =
def apply[F[_]: Temporal: Console](exporter: SpanExporter[F]): AutoConfigure[F, SpanProcessor[F]] =
new BatchSpanProcessorAutoConfigure[F](exporter)

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import scala.concurrent.duration._
* @tparam F
* the higher-kinded type of a polymorphic effect
*/
final class BatchSpanProcessor[F[_]: Temporal: Console] private (
private final class BatchSpanProcessor[F[_]: Temporal: Console] private (
queue: Queue[F, SpanData],
signal: CountDownLatch[F],
state: Ref[F, BatchSpanProcessor.State],
Expand Down Expand Up @@ -110,11 +110,7 @@ final class BatchSpanProcessor[F[_]: Temporal: Console] private (
// wait either for:
// 1) the signal - it means the queue has enough spans
// 2) the timeout - it means we can export all remaining spans
def pollMore(
now: FiniteDuration,
nextExportTime: FiniteDuration,
currentBatchSize: Int
): F[Unit] = {
def pollMore(now: FiniteDuration, nextExportTime: FiniteDuration, currentBatchSize: Int): F[Unit] = {
val pollWaitTime = nextExportTime - now
val spansNeeded = config.maxExportBatchSize - currentBatchSize

Expand Down Expand Up @@ -148,8 +144,7 @@ final class BatchSpanProcessor[F[_]: Temporal: Console] private (
// two reasons to export:
// 1) the current batch size exceeds the limit
// 2) the worker is behind the scheduled export time
val canExport =
batch.size >= config.maxExportBatchSize || now >= nextExportTime
val canExport = batch.size >= config.maxExportBatchSize || now >= nextExportTime

if (canExport) doExport(now, batch)
else pollMore(now, nextExportTime, batch.size)
Expand Down Expand Up @@ -216,7 +211,7 @@ object BatchSpanProcessor {

/** Creates a [[BatchSpanProcessor]] with the configuration of this builder.
*/
def build: Resource[F, BatchSpanProcessor[F]]
def build: Resource[F, SpanProcessor[F]]
}

/** Create a [[Builder]] for [[BatchSpanProcessor]].
Expand Down Expand Up @@ -249,7 +244,7 @@ object BatchSpanProcessor {
* how long the export can run before it is cancelled
*
* @param maxQueueSize
* the maximum queue size. Once the the limit is reached new spans will be dropped
* the maximum queue size. Once the limit is reached new spans will be dropped
*
* @param maxExportBatchSize
* the maximum batch size of every export
Expand Down Expand Up @@ -287,7 +282,7 @@ object BatchSpanProcessor {
def withMaxExportBatchSize(maxExportBatchSize: Int): Builder[F] =
copy(maxExportBatchSize = maxExportBatchSize)

def build: Resource[F, BatchSpanProcessor[F]] = {
def build: Resource[F, SpanProcessor[F]] = {
val config = Config(
scheduleDelay,
exporterTimeout,
Expand All @@ -311,7 +306,7 @@ object BatchSpanProcessor {

for {
processor <- Resource.eval(create)
_ <- Resource.make(Temporal[F].unit)(_ => processor.exportAll)
_ <- Resource.onFinalize(processor.exportAll)
_ <- processor.worker.background
} yield processor
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.typelevel.otel4s.trace.SpanContext
* @tparam F
* the higher-kinded type of a polymorphic effect
*/
final class SimpleSpanProcessor[F[_]: MonadThrow: Console] private (
private final class SimpleSpanProcessor[F[_]: MonadThrow: Console] private (
exporter: SpanExporter[F],
exportOnlySampled: Boolean
) extends SpanProcessor[F] {
Expand Down Expand Up @@ -74,9 +74,7 @@ object SimpleSpanProcessor {
* @param exporter
* the [[exporter.SpanExporter SpanExporter]] to use
*/
def apply[F[_]: MonadThrow: Console](
exporter: SpanExporter[F]
): SimpleSpanProcessor[F] =
def apply[F[_]: MonadThrow: Console](exporter: SpanExporter[F]): SpanProcessor[F] =
apply(exporter, exportOnlySampled = true)

/** Creates a [[SimpleSpanProcessor]] that passes ended spans to the given `exporter`.
Expand All @@ -87,10 +85,7 @@ object SimpleSpanProcessor {
* @param exportOnlySampled
* whether to export only sampled spans
*/
def apply[F[_]: MonadThrow: Console](
exporter: SpanExporter[F],
exportOnlySampled: Boolean
): SimpleSpanProcessor[F] =
def apply[F[_]: MonadThrow: Console](exporter: SpanExporter[F], exportOnlySampled: Boolean): SpanProcessor[F] =
new SimpleSpanProcessor[F](exporter, exportOnlySampled)

}

0 comments on commit 9c176c2

Please sign in to comment.