From b57c2da94e6f8b5a8035e4e6f2e23c32da32cd27 Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Mon, 27 Nov 2023 10:48:41 +0200 Subject: [PATCH 1/2] `SpanProcessor` - rethrow a composite failure on multiple errors --- .../otel4s/sdk/trace/SpanProcessor.scala | 102 +++++++++++++++--- .../otel4s/sdk/trace/SpanProcessorSuite.scala | 81 ++++++++++++-- 2 files changed, 164 insertions(+), 19 deletions(-) diff --git a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/SpanProcessor.scala b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/SpanProcessor.scala index b9f477a6d..87251a305 100644 --- a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/SpanProcessor.scala +++ b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/SpanProcessor.scala @@ -18,11 +18,11 @@ package org.typelevel.otel4s package sdk.trace import cats.Applicative +import cats.MonadThrow import cats.Monoid import cats.Parallel import cats.data.NonEmptyList -import cats.syntax.foldable._ -import cats.syntax.parallel._ +import cats.syntax.all._ import org.typelevel.otel4s.sdk.trace.data.SpanData import org.typelevel.otel4s.trace.SpanContext @@ -37,6 +37,19 @@ import org.typelevel.otel4s.trace.SpanContext */ trait SpanProcessor[F[_]] { + /** The name of the processor. + * + * It will be used in an exception to distinguish individual failures in the + * multi-error scenario. + * + * @see + * [[SpanProcessor.ProcessorFailure]] + * + * @see + * [[SpanProcessor.CompositeProcessorFailure]] + */ + def name: String + /** Called when a span is started, if the `span.isRecording` returns true. * * This method is called synchronously on the execution thread, should not @@ -75,6 +88,9 @@ trait SpanProcessor[F[_]] { /** Processes all pending spans (if any). */ def forceFlush: F[Unit] + + override def toString: String = + name } object SpanProcessor { @@ -82,7 +98,7 @@ object SpanProcessor { /** Creates a [[SpanProcessor]] which delegates all processing to the * processors in order. */ - def of[F[_]: Applicative: Parallel]( + def of[F[_]: MonadThrow: Parallel]( processors: SpanProcessor[F]* ): SpanProcessor[F] = if (processors.sizeIs == 1) processors.head @@ -95,7 +111,7 @@ object SpanProcessor { def noop[F[_]: Applicative]: SpanProcessor[F] = new Noop - implicit def spanProcessorMonoid[F[_]: Applicative: Parallel] + implicit def spanProcessorMonoid[F[_]: MonadThrow: Parallel] : Monoid[SpanProcessor[F]] = new Monoid[SpanProcessor[F]] { val empty: SpanProcessor[F] = @@ -118,7 +134,39 @@ object SpanProcessor { } } + /** An error occurred when invoking a processor. + * + * @param processor + * the name of a processor that failed. See [[SpanProcessor.name]] + * + * @param failure + * the error occurred + */ + final case class ProcessorFailure(processor: String, failure: Throwable) + extends Throwable( + s"The processor [$processor] has failed due to ${failure.getMessage}", + failure + ) + + /** An composite failure, when '''at least 2''' processors have failed. + * + * @param first + * the first occurred error + * + * @param rest + * the rest of errors + */ + final case class CompositeProcessorFailure( + first: ProcessorFailure, + rest: NonEmptyList[ProcessorFailure] + ) extends Throwable( + s"Multiple processors [${rest.prepend(first).map(_.processor).mkString_(", ")}] have failed", + first + ) + private final class Noop[F[_]: Applicative] extends SpanProcessor[F] { + val name: String = "SpanProcessor.Noop" + def isStartRequired: Boolean = false def isEndRequired: Boolean = false @@ -130,11 +178,9 @@ object SpanProcessor { def forceFlush: F[Unit] = Applicative[F].unit - - override def toString: String = "SpanProcessor.Noop" } - private final case class Multi[F[_]: Parallel]( + private final case class Multi[F[_]: MonadThrow: Parallel]( processors: NonEmptyList[SpanProcessor[F]] ) extends SpanProcessor[F] { private val startOnly: List[SpanProcessor[F]] = @@ -143,19 +189,49 @@ object SpanProcessor { private val endOnly: List[SpanProcessor[F]] = processors.filter(_.isEndRequired) + val name: String = + s"SpanProcessor.Multi(${processors.map(_.name).mkString_(", ")})" + def isStartRequired: Boolean = startOnly.nonEmpty def isEndRequired: Boolean = endOnly.nonEmpty def onStart(parentCtx: Option[SpanContext], span: SpanRef[F]): F[Unit] = - startOnly.parTraverse_(_.onStart(parentCtx, span)) + startOnly + .parTraverse { p => + p.onStart(parentCtx, span).attempt.tupleLeft(p.name) + } + .flatMap(attempts => handleAttempts(attempts)) def onEnd(span: SpanData): F[Unit] = - endOnly.parTraverse_(_.onEnd(span)) + endOnly + .parTraverse(p => p.onEnd(span).attempt.tupleLeft(p.name)) + .flatMap(attempts => handleAttempts(attempts)) def forceFlush: F[Unit] = - processors.parTraverse_(_.forceFlush) - - override def toString: String = - s"SpanProcessor.Multi(${processors.map(_.toString).mkString_(", ")})" + processors + .parTraverse(p => p.forceFlush.attempt.tupleLeft(p.name)) + .flatMap(attempts => handleAttempts(attempts.toList)) + + private def handleAttempts( + results: List[(String, Either[Throwable, Unit])] + ): F[Unit] = { + val failures = results.collect { case (processor, Left(failure)) => + ProcessorFailure(processor, failure) + } + + failures match { + case Nil => + MonadThrow[F].unit + + case head :: Nil => + MonadThrow[F].raiseError(head) + + case head :: tail => + MonadThrow[F].raiseError( + CompositeProcessorFailure(head, NonEmptyList.fromListUnsafe(tail)) + ) + } + } } + } diff --git a/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/SpanProcessorSuite.scala b/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/SpanProcessorSuite.scala index 3fbafd86a..a536793a7 100644 --- a/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/SpanProcessorSuite.scala +++ b/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/SpanProcessorSuite.scala @@ -16,6 +16,7 @@ package org.typelevel.otel4s.sdk.trace +import cats.data.NonEmptyList import cats.effect.IO import munit.FunSuite import org.typelevel.otel4s.sdk.trace.data.SpanData @@ -65,27 +66,95 @@ class SpanProcessorSuite extends FunSuite { ) } - private def testProcessor(name: String): SpanProcessor[IO] = + test("of (multiple) - single failure - rethrow a single failure") { + val onStart = new RuntimeException("cannot start") + val onEnd = new RuntimeException("cannot end") + val onFlush = new RuntimeException("cannot flush") + + val failing = testProcessor( + processorName = "error-prone", + start = IO.raiseError(onStart), + end = IO.raiseError(onEnd), + flush = IO.raiseError(onFlush) + ) + + val processor = SpanProcessor.of(testProcessor("success"), failing) + + def expected(e: Throwable) = + SpanProcessor.ProcessorFailure("error-prone", e) + + for { + start <- processor.onStart(None, null: SpanRef[IO]).attempt + end <- processor.onEnd(null).attempt + flush <- processor.forceFlush.attempt + } yield { + assertEquals(start, Left(expected(onStart))) + assertEquals(end, Left(expected(onEnd))) + assertEquals(flush, Left(expected(onFlush))) + } + } + + test("of (multiple) - multiple failures - rethrow a composite failure") { + val onStart = new RuntimeException("cannot start") + val onEnd = new RuntimeException("cannot end") + val onFlush = new RuntimeException("cannot flush") + + def failing(name: String) = testProcessor( + processorName = name, + start = IO.raiseError(onStart), + end = IO.raiseError(onEnd), + flush = IO.raiseError(onFlush) + ) + + val processor = SpanProcessor.of( + failing("error-prone-1"), + failing("error-prone-2") + ) + + def expected(e: Throwable) = + SpanProcessor.CompositeProcessorFailure( + SpanProcessor.ProcessorFailure("error-prone-1", e), + NonEmptyList.of(SpanProcessor.ProcessorFailure("error-prone-2", e)) + ) + + for { + start <- processor.onStart(None, null).attempt + end <- processor.onEnd(null).attempt + flush <- processor.forceFlush.attempt + } yield { + assertEquals(start, Left(expected(onStart))) + assertEquals(end, Left(expected(onEnd))) + assertEquals(flush, Left(expected(onFlush))) + } + } + + private def testProcessor( + processorName: String, + start: IO[Unit] = IO.unit, + end: IO[Unit] = IO.unit, + flush: IO[Unit] = IO.unit, + ): SpanProcessor[IO] = new SpanProcessor[IO] { + def name: String = + processorName + def onStart( parentContext: Option[SpanContext], span: SpanRef[IO] ): IO[Unit] = - IO.unit + start def isStartRequired: Boolean = false def onEnd(span: SpanData): IO[Unit] = - IO.unit + end def isEndRequired: Boolean = false def forceFlush: IO[Unit] = - IO.unit - - override def toString: String = name + flush } } From 5ab127cbd411b6d73ffe322ef3c4e67c8a0a49d3 Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Mon, 27 Nov 2023 14:35:35 +0200 Subject: [PATCH 2/2] Make errors extend `Exception` --- .../scala/org/typelevel/otel4s/sdk/trace/SpanProcessor.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/SpanProcessor.scala b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/SpanProcessor.scala index 87251a305..4f1beca4d 100644 --- a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/SpanProcessor.scala +++ b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/SpanProcessor.scala @@ -143,7 +143,7 @@ object SpanProcessor { * the error occurred */ final case class ProcessorFailure(processor: String, failure: Throwable) - extends Throwable( + extends Exception( s"The processor [$processor] has failed due to ${failure.getMessage}", failure ) @@ -159,7 +159,7 @@ object SpanProcessor { final case class CompositeProcessorFailure( first: ProcessorFailure, rest: NonEmptyList[ProcessorFailure] - ) extends Throwable( + ) extends Exception( s"Multiple processors [${rest.prepend(first).map(_.processor).mkString_(", ")}] have failed", first )