Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SpanProcessor - rethrow a composite failure on multiple errors #386

Merged
merged 2 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ package org.typelevel.otel4s
package sdk.trace

import cats.Applicative
import cats.MonadThrow
import cats.Monoid
import cats.Parallel
import cats.data.NonEmptyList
import cats.syntax.foldable._
import cats.syntax.parallel._
import cats.syntax.all._
import org.typelevel.otel4s.sdk.trace.data.SpanData
import org.typelevel.otel4s.trace.SpanContext

Expand All @@ -37,6 +37,19 @@ import org.typelevel.otel4s.trace.SpanContext
*/
trait SpanProcessor[F[_]] {

/** The name of the processor.
*
* It will be used in an exception to distinguish individual failures in the
* multi-error scenario.
*
* @see
* [[SpanProcessor.ProcessorFailure]]
*
* @see
* [[SpanProcessor.CompositeProcessorFailure]]
*/
def name: String

/** Called when a span is started, if the `span.isRecording` returns true.
*
* This method is called synchronously on the execution thread, should not
Expand Down Expand Up @@ -75,14 +88,17 @@ trait SpanProcessor[F[_]] {
/** Processes all pending spans (if any).
*/
def forceFlush: F[Unit]

override def toString: String =
name
}

object SpanProcessor {

/** Creates a [[SpanProcessor]] which delegates all processing to the
* processors in order.
*/
def of[F[_]: Applicative: Parallel](
def of[F[_]: MonadThrow: Parallel](
processors: SpanProcessor[F]*
): SpanProcessor[F] =
if (processors.sizeIs == 1) processors.head
Expand All @@ -95,7 +111,7 @@ object SpanProcessor {
def noop[F[_]: Applicative]: SpanProcessor[F] =
new Noop

implicit def spanProcessorMonoid[F[_]: Applicative: Parallel]
implicit def spanProcessorMonoid[F[_]: MonadThrow: Parallel]
: Monoid[SpanProcessor[F]] =
new Monoid[SpanProcessor[F]] {
val empty: SpanProcessor[F] =
Expand All @@ -118,7 +134,39 @@ object SpanProcessor {
}
}

/** An error occurred when invoking a processor.
*
* @param processor
* the name of a processor that failed. See [[SpanProcessor.name]]
*
* @param failure
* the error occurred
*/
final case class ProcessorFailure(processor: String, failure: Throwable)
extends Exception(
s"The processor [$processor] has failed due to ${failure.getMessage}",
failure
)

/** An composite failure, when '''at least 2''' processors have failed.
*
* @param first
* the first occurred error
*
* @param rest
* the rest of errors
*/
final case class CompositeProcessorFailure(
first: ProcessorFailure,
rest: NonEmptyList[ProcessorFailure]
) extends Exception(
s"Multiple processors [${rest.prepend(first).map(_.processor).mkString_(", ")}] have failed",
first
)

private final class Noop[F[_]: Applicative] extends SpanProcessor[F] {
val name: String = "SpanProcessor.Noop"

def isStartRequired: Boolean = false
def isEndRequired: Boolean = false

Expand All @@ -130,11 +178,9 @@ object SpanProcessor {

def forceFlush: F[Unit] =
Applicative[F].unit

override def toString: String = "SpanProcessor.Noop"
}

private final case class Multi[F[_]: Parallel](
private final case class Multi[F[_]: MonadThrow: Parallel](
processors: NonEmptyList[SpanProcessor[F]]
) extends SpanProcessor[F] {
private val startOnly: List[SpanProcessor[F]] =
Expand All @@ -143,19 +189,49 @@ object SpanProcessor {
private val endOnly: List[SpanProcessor[F]] =
processors.filter(_.isEndRequired)

val name: String =
s"SpanProcessor.Multi(${processors.map(_.name).mkString_(", ")})"

def isStartRequired: Boolean = startOnly.nonEmpty
def isEndRequired: Boolean = endOnly.nonEmpty

def onStart(parentCtx: Option[SpanContext], span: SpanRef[F]): F[Unit] =
startOnly.parTraverse_(_.onStart(parentCtx, span))
startOnly
.parTraverse { p =>
p.onStart(parentCtx, span).attempt.tupleLeft(p.name)
}
.flatMap(attempts => handleAttempts(attempts))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit nitpicky (also below), but

Suggested change
.flatMap(attempts => handleAttempts(attempts))
.flatMap(handleAttempts)


def onEnd(span: SpanData): F[Unit] =
endOnly.parTraverse_(_.onEnd(span))
endOnly
.parTraverse(p => p.onEnd(span).attempt.tupleLeft(p.name))
.flatMap(attempts => handleAttempts(attempts))

def forceFlush: F[Unit] =
processors.parTraverse_(_.forceFlush)

override def toString: String =
s"SpanProcessor.Multi(${processors.map(_.toString).mkString_(", ")})"
processors
.parTraverse(p => p.forceFlush.attempt.tupleLeft(p.name))
.flatMap(attempts => handleAttempts(attempts.toList))

private def handleAttempts(
results: List[(String, Either[Throwable, Unit])]
): F[Unit] = {
val failures = results.collect { case (processor, Left(failure)) =>
ProcessorFailure(processor, failure)
}

failures match {
case Nil =>
MonadThrow[F].unit

case head :: Nil =>
MonadThrow[F].raiseError(head)

case head :: tail =>
MonadThrow[F].raiseError(
CompositeProcessorFailure(head, NonEmptyList.fromListUnsafe(tail))
)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.typelevel.otel4s.sdk.trace

import cats.data.NonEmptyList
import cats.effect.IO
import munit.FunSuite
import org.typelevel.otel4s.sdk.trace.data.SpanData
Expand Down Expand Up @@ -65,27 +66,95 @@ class SpanProcessorSuite extends FunSuite {
)
}

private def testProcessor(name: String): SpanProcessor[IO] =
test("of (multiple) - single failure - rethrow a single failure") {
val onStart = new RuntimeException("cannot start")
val onEnd = new RuntimeException("cannot end")
val onFlush = new RuntimeException("cannot flush")

val failing = testProcessor(
processorName = "error-prone",
start = IO.raiseError(onStart),
end = IO.raiseError(onEnd),
flush = IO.raiseError(onFlush)
)

val processor = SpanProcessor.of(testProcessor("success"), failing)

def expected(e: Throwable) =
SpanProcessor.ProcessorFailure("error-prone", e)

for {
start <- processor.onStart(None, null: SpanRef[IO]).attempt
end <- processor.onEnd(null).attempt
flush <- processor.forceFlush.attempt
} yield {
assertEquals(start, Left(expected(onStart)))
assertEquals(end, Left(expected(onEnd)))
assertEquals(flush, Left(expected(onFlush)))
}
}

test("of (multiple) - multiple failures - rethrow a composite failure") {
val onStart = new RuntimeException("cannot start")
val onEnd = new RuntimeException("cannot end")
val onFlush = new RuntimeException("cannot flush")

def failing(name: String) = testProcessor(
processorName = name,
start = IO.raiseError(onStart),
end = IO.raiseError(onEnd),
flush = IO.raiseError(onFlush)
)

val processor = SpanProcessor.of(
failing("error-prone-1"),
failing("error-prone-2")
)

def expected(e: Throwable) =
SpanProcessor.CompositeProcessorFailure(
SpanProcessor.ProcessorFailure("error-prone-1", e),
NonEmptyList.of(SpanProcessor.ProcessorFailure("error-prone-2", e))
)

for {
start <- processor.onStart(None, null).attempt
end <- processor.onEnd(null).attempt
flush <- processor.forceFlush.attempt
} yield {
assertEquals(start, Left(expected(onStart)))
assertEquals(end, Left(expected(onEnd)))
assertEquals(flush, Left(expected(onFlush)))
}
}

private def testProcessor(
processorName: String,
start: IO[Unit] = IO.unit,
end: IO[Unit] = IO.unit,
flush: IO[Unit] = IO.unit,
): SpanProcessor[IO] =
new SpanProcessor[IO] {
def name: String =
processorName

def onStart(
parentContext: Option[SpanContext],
span: SpanRef[IO]
): IO[Unit] =
IO.unit
start

def isStartRequired: Boolean =
false

def onEnd(span: SpanData): IO[Unit] =
IO.unit
end

def isEndRequired: Boolean =
false

def forceFlush: IO[Unit] =
IO.unit

override def toString: String = name
flush
}

}