Skip to content

Commit

Permalink
Merge pull request #386 from iRevive/sdk-trace/span-processor-update
Browse files Browse the repository at this point in the history
`SpanProcessor` - rethrow a composite failure on multiple errors
  • Loading branch information
iRevive authored Nov 30, 2023
2 parents 77986d7 + 5ab127c commit 5397cd4
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ package org.typelevel.otel4s
package sdk.trace

import cats.Applicative
import cats.MonadThrow
import cats.Monoid
import cats.Parallel
import cats.data.NonEmptyList
import cats.syntax.foldable._
import cats.syntax.parallel._
import cats.syntax.all._
import org.typelevel.otel4s.sdk.trace.data.SpanData
import org.typelevel.otel4s.trace.SpanContext

Expand All @@ -37,6 +37,19 @@ import org.typelevel.otel4s.trace.SpanContext
*/
trait SpanProcessor[F[_]] {

/** The name of the processor.
*
* It will be used in an exception to distinguish individual failures in the
* multi-error scenario.
*
* @see
* [[SpanProcessor.ProcessorFailure]]
*
* @see
* [[SpanProcessor.CompositeProcessorFailure]]
*/
def name: String

/** Called when a span is started, if the `span.isRecording` returns true.
*
* This method is called synchronously on the execution thread, should not
Expand Down Expand Up @@ -75,14 +88,17 @@ trait SpanProcessor[F[_]] {
/** Processes all pending spans (if any).
*/
def forceFlush: F[Unit]

override def toString: String =
name
}

object SpanProcessor {

/** Creates a [[SpanProcessor]] which delegates all processing to the
* processors in order.
*/
def of[F[_]: Applicative: Parallel](
def of[F[_]: MonadThrow: Parallel](
processors: SpanProcessor[F]*
): SpanProcessor[F] =
if (processors.sizeIs == 1) processors.head
Expand All @@ -95,7 +111,7 @@ object SpanProcessor {
def noop[F[_]: Applicative]: SpanProcessor[F] =
new Noop

implicit def spanProcessorMonoid[F[_]: Applicative: Parallel]
implicit def spanProcessorMonoid[F[_]: MonadThrow: Parallel]
: Monoid[SpanProcessor[F]] =
new Monoid[SpanProcessor[F]] {
val empty: SpanProcessor[F] =
Expand All @@ -118,7 +134,39 @@ object SpanProcessor {
}
}

/** An error occurred when invoking a processor.
*
* @param processor
* the name of a processor that failed. See [[SpanProcessor.name]]
*
* @param failure
* the error occurred
*/
final case class ProcessorFailure(processor: String, failure: Throwable)
extends Exception(
s"The processor [$processor] has failed due to ${failure.getMessage}",
failure
)

/** An composite failure, when '''at least 2''' processors have failed.
*
* @param first
* the first occurred error
*
* @param rest
* the rest of errors
*/
final case class CompositeProcessorFailure(
first: ProcessorFailure,
rest: NonEmptyList[ProcessorFailure]
) extends Exception(
s"Multiple processors [${rest.prepend(first).map(_.processor).mkString_(", ")}] have failed",
first
)

private final class Noop[F[_]: Applicative] extends SpanProcessor[F] {
val name: String = "SpanProcessor.Noop"

def isStartRequired: Boolean = false
def isEndRequired: Boolean = false

Expand All @@ -130,11 +178,9 @@ object SpanProcessor {

def forceFlush: F[Unit] =
Applicative[F].unit

override def toString: String = "SpanProcessor.Noop"
}

private final case class Multi[F[_]: Parallel](
private final case class Multi[F[_]: MonadThrow: Parallel](
processors: NonEmptyList[SpanProcessor[F]]
) extends SpanProcessor[F] {
private val startOnly: List[SpanProcessor[F]] =
Expand All @@ -143,19 +189,49 @@ object SpanProcessor {
private val endOnly: List[SpanProcessor[F]] =
processors.filter(_.isEndRequired)

val name: String =
s"SpanProcessor.Multi(${processors.map(_.name).mkString_(", ")})"

def isStartRequired: Boolean = startOnly.nonEmpty
def isEndRequired: Boolean = endOnly.nonEmpty

def onStart(parentCtx: Option[SpanContext], span: SpanRef[F]): F[Unit] =
startOnly.parTraverse_(_.onStart(parentCtx, span))
startOnly
.parTraverse { p =>
p.onStart(parentCtx, span).attempt.tupleLeft(p.name)
}
.flatMap(attempts => handleAttempts(attempts))

def onEnd(span: SpanData): F[Unit] =
endOnly.parTraverse_(_.onEnd(span))
endOnly
.parTraverse(p => p.onEnd(span).attempt.tupleLeft(p.name))
.flatMap(attempts => handleAttempts(attempts))

def forceFlush: F[Unit] =
processors.parTraverse_(_.forceFlush)

override def toString: String =
s"SpanProcessor.Multi(${processors.map(_.toString).mkString_(", ")})"
processors
.parTraverse(p => p.forceFlush.attempt.tupleLeft(p.name))
.flatMap(attempts => handleAttempts(attempts.toList))

private def handleAttempts(
results: List[(String, Either[Throwable, Unit])]
): F[Unit] = {
val failures = results.collect { case (processor, Left(failure)) =>
ProcessorFailure(processor, failure)
}

failures match {
case Nil =>
MonadThrow[F].unit

case head :: Nil =>
MonadThrow[F].raiseError(head)

case head :: tail =>
MonadThrow[F].raiseError(
CompositeProcessorFailure(head, NonEmptyList.fromListUnsafe(tail))
)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.typelevel.otel4s.sdk.trace

import cats.data.NonEmptyList
import cats.effect.IO
import munit.FunSuite
import org.typelevel.otel4s.sdk.trace.data.SpanData
Expand Down Expand Up @@ -65,27 +66,95 @@ class SpanProcessorSuite extends FunSuite {
)
}

private def testProcessor(name: String): SpanProcessor[IO] =
test("of (multiple) - single failure - rethrow a single failure") {
val onStart = new RuntimeException("cannot start")
val onEnd = new RuntimeException("cannot end")
val onFlush = new RuntimeException("cannot flush")

val failing = testProcessor(
processorName = "error-prone",
start = IO.raiseError(onStart),
end = IO.raiseError(onEnd),
flush = IO.raiseError(onFlush)
)

val processor = SpanProcessor.of(testProcessor("success"), failing)

def expected(e: Throwable) =
SpanProcessor.ProcessorFailure("error-prone", e)

for {
start <- processor.onStart(None, null: SpanRef[IO]).attempt
end <- processor.onEnd(null).attempt
flush <- processor.forceFlush.attempt
} yield {
assertEquals(start, Left(expected(onStart)))
assertEquals(end, Left(expected(onEnd)))
assertEquals(flush, Left(expected(onFlush)))
}
}

test("of (multiple) - multiple failures - rethrow a composite failure") {
val onStart = new RuntimeException("cannot start")
val onEnd = new RuntimeException("cannot end")
val onFlush = new RuntimeException("cannot flush")

def failing(name: String) = testProcessor(
processorName = name,
start = IO.raiseError(onStart),
end = IO.raiseError(onEnd),
flush = IO.raiseError(onFlush)
)

val processor = SpanProcessor.of(
failing("error-prone-1"),
failing("error-prone-2")
)

def expected(e: Throwable) =
SpanProcessor.CompositeProcessorFailure(
SpanProcessor.ProcessorFailure("error-prone-1", e),
NonEmptyList.of(SpanProcessor.ProcessorFailure("error-prone-2", e))
)

for {
start <- processor.onStart(None, null).attempt
end <- processor.onEnd(null).attempt
flush <- processor.forceFlush.attempt
} yield {
assertEquals(start, Left(expected(onStart)))
assertEquals(end, Left(expected(onEnd)))
assertEquals(flush, Left(expected(onFlush)))
}
}

private def testProcessor(
processorName: String,
start: IO[Unit] = IO.unit,
end: IO[Unit] = IO.unit,
flush: IO[Unit] = IO.unit,
): SpanProcessor[IO] =
new SpanProcessor[IO] {
def name: String =
processorName

def onStart(
parentContext: Option[SpanContext],
span: SpanRef[IO]
): IO[Unit] =
IO.unit
start

def isStartRequired: Boolean =
false

def onEnd(span: SpanData): IO[Unit] =
IO.unit
end

def isEndRequired: Boolean =
false

def forceFlush: IO[Unit] =
IO.unit

override def toString: String = name
flush
}

}

0 comments on commit 5397cd4

Please sign in to comment.