From 94c557898a967c6e37b95196227394781c0cc079 Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Tue, 24 Sep 2024 10:26:02 +0300 Subject: [PATCH] sdk-trace: make `SimpleSpanProcessor` and `BatchSpanProcessor` private --- .../BatchSpanProcessorAutoConfigure.scala | 38 ++++++------------- .../trace/processor/BatchSpanProcessor.scala | 19 ++++------ .../trace/processor/SimpleSpanProcessor.scala | 11 ++---- 3 files changed, 22 insertions(+), 46 deletions(-) diff --git a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/autoconfigure/BatchSpanProcessorAutoConfigure.scala b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/autoconfigure/BatchSpanProcessorAutoConfigure.scala index 23e12aab9..96785213d 100644 --- a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/autoconfigure/BatchSpanProcessorAutoConfigure.scala +++ b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/autoconfigure/BatchSpanProcessorAutoConfigure.scala @@ -25,8 +25,10 @@ import org.typelevel.otel4s.sdk.autoconfigure.AutoConfigure import org.typelevel.otel4s.sdk.autoconfigure.Config import org.typelevel.otel4s.sdk.trace.exporter.SpanExporter import org.typelevel.otel4s.sdk.trace.processor.BatchSpanProcessor +import org.typelevel.otel4s.sdk.trace.processor.SpanProcessor import scala.concurrent.duration.FiniteDuration +import scala.util.chaining._ /** Autoconfigures [[BatchSpanProcessor]]. * @@ -48,41 +50,27 @@ import scala.concurrent.duration.FiniteDuration */ private final class BatchSpanProcessorAutoConfigure[F[_]: Temporal: Console]( exporter: SpanExporter[F] -) extends AutoConfigure.WithHint[F, BatchSpanProcessor[F]]( +) extends AutoConfigure.WithHint[F, SpanProcessor[F]]( "BatchSpanProcessor", BatchSpanProcessorAutoConfigure.ConfigKeys.All ) { import BatchSpanProcessorAutoConfigure.ConfigKeys - def fromConfig(config: Config): Resource[F, BatchSpanProcessor[F]] = { + def fromConfig(config: Config): Resource[F, SpanProcessor[F]] = { def configure = for { scheduleDelay <- config.get(ConfigKeys.ScheduleDelay) maxQueueSize <- config.get(ConfigKeys.MaxQueueSize) maxExportBatchSize <- config.get(ConfigKeys.MaxExportBatchSize) exporterTimeout <- config.get(ConfigKeys.ExporterTimeout) - } yield { - val builder = BatchSpanProcessor.builder(exporter) - - val withScheduleDelay = - scheduleDelay.foldLeft(builder)(_.withScheduleDelay(_)) - - val withMaxQueueSize = - maxQueueSize.foldLeft(withScheduleDelay)(_.withMaxQueueSize(_)) - - val withMaxExportBatchSize = - maxExportBatchSize.foldLeft(withMaxQueueSize)( - _.withMaxExportBatchSize(_) - ) - - val withExporterTimeout = - exporterTimeout.foldLeft(withMaxExportBatchSize)( - _.withExporterTimeout(_) - ) - - withExporterTimeout.build - } + } yield BatchSpanProcessor + .builder(exporter) + .pipe(b => scheduleDelay.foldLeft(b)(_.withScheduleDelay(_))) + .pipe(b => maxQueueSize.foldLeft(b)(_.withMaxQueueSize(_))) + .pipe(b => maxExportBatchSize.foldLeft(b)(_.withMaxExportBatchSize(_))) + .pipe(b => exporterTimeout.foldLeft(b)(_.withExporterTimeout(_))) + .build Resource.suspend( Temporal[F] @@ -130,9 +118,7 @@ private[sdk] object BatchSpanProcessorAutoConfigure { * @param exporter * the exporter to use with the configured batch span processor */ - def apply[F[_]: Temporal: Console]( - exporter: SpanExporter[F] - ): AutoConfigure[F, BatchSpanProcessor[F]] = + def apply[F[_]: Temporal: Console](exporter: SpanExporter[F]): AutoConfigure[F, SpanProcessor[F]] = new BatchSpanProcessorAutoConfigure[F](exporter) } diff --git a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/BatchSpanProcessor.scala b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/BatchSpanProcessor.scala index 58f5e5c3f..b661aee8d 100644 --- a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/BatchSpanProcessor.scala +++ b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/BatchSpanProcessor.scala @@ -49,7 +49,7 @@ import scala.concurrent.duration._ * @tparam F * the higher-kinded type of a polymorphic effect */ -final class BatchSpanProcessor[F[_]: Temporal: Console] private ( +private final class BatchSpanProcessor[F[_]: Temporal: Console] private ( queue: Queue[F, SpanData], signal: CountDownLatch[F], state: Ref[F, BatchSpanProcessor.State], @@ -110,11 +110,7 @@ final class BatchSpanProcessor[F[_]: Temporal: Console] private ( // wait either for: // 1) the signal - it means the queue has enough spans // 2) the timeout - it means we can export all remaining spans - def pollMore( - now: FiniteDuration, - nextExportTime: FiniteDuration, - currentBatchSize: Int - ): F[Unit] = { + def pollMore(now: FiniteDuration, nextExportTime: FiniteDuration, currentBatchSize: Int): F[Unit] = { val pollWaitTime = nextExportTime - now val spansNeeded = config.maxExportBatchSize - currentBatchSize @@ -148,8 +144,7 @@ final class BatchSpanProcessor[F[_]: Temporal: Console] private ( // two reasons to export: // 1) the current batch size exceeds the limit // 2) the worker is behind the scheduled export time - val canExport = - batch.size >= config.maxExportBatchSize || now >= nextExportTime + val canExport = batch.size >= config.maxExportBatchSize || now >= nextExportTime if (canExport) doExport(now, batch) else pollMore(now, nextExportTime, batch.size) @@ -216,7 +211,7 @@ object BatchSpanProcessor { /** Creates a [[BatchSpanProcessor]] with the configuration of this builder. */ - def build: Resource[F, BatchSpanProcessor[F]] + def build: Resource[F, SpanProcessor[F]] } /** Create a [[Builder]] for [[BatchSpanProcessor]]. @@ -249,7 +244,7 @@ object BatchSpanProcessor { * how long the export can run before it is cancelled * * @param maxQueueSize - * the maximum queue size. Once the the limit is reached new spans will be dropped + * the maximum queue size. Once the limit is reached new spans will be dropped * * @param maxExportBatchSize * the maximum batch size of every export @@ -287,7 +282,7 @@ object BatchSpanProcessor { def withMaxExportBatchSize(maxExportBatchSize: Int): Builder[F] = copy(maxExportBatchSize = maxExportBatchSize) - def build: Resource[F, BatchSpanProcessor[F]] = { + def build: Resource[F, SpanProcessor[F]] = { val config = Config( scheduleDelay, exporterTimeout, @@ -311,7 +306,7 @@ object BatchSpanProcessor { for { processor <- Resource.eval(create) - _ <- Resource.make(Temporal[F].unit)(_ => processor.exportAll) + _ <- Resource.onFinalize(processor.exportAll) _ <- processor.worker.background } yield processor } diff --git a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/SimpleSpanProcessor.scala b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/SimpleSpanProcessor.scala index fb6f7618e..e6d998c31 100644 --- a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/SimpleSpanProcessor.scala +++ b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/SimpleSpanProcessor.scala @@ -37,7 +37,7 @@ import org.typelevel.otel4s.trace.SpanContext * @tparam F * the higher-kinded type of a polymorphic effect */ -final class SimpleSpanProcessor[F[_]: MonadThrow: Console] private ( +private final class SimpleSpanProcessor[F[_]: MonadThrow: Console] private ( exporter: SpanExporter[F], exportOnlySampled: Boolean ) extends SpanProcessor[F] { @@ -74,9 +74,7 @@ object SimpleSpanProcessor { * @param exporter * the [[exporter.SpanExporter SpanExporter]] to use */ - def apply[F[_]: MonadThrow: Console]( - exporter: SpanExporter[F] - ): SimpleSpanProcessor[F] = + def apply[F[_]: MonadThrow: Console](exporter: SpanExporter[F]): SpanProcessor[F] = apply(exporter, exportOnlySampled = true) /** Creates a [[SimpleSpanProcessor]] that passes ended spans to the given `exporter`. @@ -87,10 +85,7 @@ object SimpleSpanProcessor { * @param exportOnlySampled * whether to export only sampled spans */ - def apply[F[_]: MonadThrow: Console]( - exporter: SpanExporter[F], - exportOnlySampled: Boolean - ): SimpleSpanProcessor[F] = + def apply[F[_]: MonadThrow: Console](exporter: SpanExporter[F], exportOnlySampled: Boolean): SpanProcessor[F] = new SimpleSpanProcessor[F](exporter, exportOnlySampled) }