Skip to content

Commit

Permalink
sdk: replace whenA with if, foldMapA with foldMapM
Browse files Browse the repository at this point in the history
  • Loading branch information
iRevive committed Oct 30, 2024
1 parent 73005d4 commit 27c822b
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ private[metrics] object ExemplarReservoir {
): ExemplarReservoir[F, A] =
new ExemplarReservoir[F, A] {
def offer(value: A, attributes: Attributes, context: Context): F[Unit] =
original
.offer(value, attributes, context)
.whenA(
filter.shouldSample(value, attributes, context)
)
if (filter.shouldSample(value, attributes, context)) {
original.offer(value, attributes, context)
} else {
Applicative[F].unit
}

def collectAndReset(attributes: Attributes): F[Vector[Exemplar[A]]] =
original.collectAndReset(attributes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.typelevel.otel4s.sdk.metrics.exporter

import cats.Applicative
import cats.Foldable
import cats.Monad
import cats.effect.std.Console
Expand Down Expand Up @@ -60,7 +59,7 @@ private final class ConsoleMetricExporter[F[_]: Monad: Console](
doExport.whenA(metrics.nonEmpty)
}

def flush: F[Unit] = Applicative[F].unit
def flush: F[Unit] = Monad[F].unit
}

object ConsoleMetricExporter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ private[metrics] final class CallbackRegistration[F[_]: MonadCancelThrow](
* @param timeWindow
* the time window of the measurement
*/
def invokeCallback(
reader: RegisteredReader[F],
timeWindow: TimeWindow
): F[Unit] =
measurements
.traverse_(_.withActiveReader(reader, timeWindow))
.surround(callback)
.whenA(hasStorages)
def invokeCallback(reader: RegisteredReader[F], timeWindow: TimeWindow): F[Unit] =
if (hasStorages) {
measurements
.traverse_(_.withActiveReader(reader, timeWindow))
.surround(callback)
} else {
MonadCancelThrow[F].unit
}

override def toString: String =
s"CallbackRegistration{instrumentDescriptors=${measurements.map(_.descriptor).mkString_("[", ", ", "]")}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ private[metrics] final class MeterSharedState[

for {
_ <- currentCallbacks.traverse_(_.invokeCallback(reader, timeWindow))
storages <- registries.get(reader).foldMapA(_.storages)
storages <- registries.get(reader).foldMapM(_.storages)
result <- storages.traverse { storage =>
storage.collect(resource, scope, timeWindow)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,14 @@ private[metrics] object SdkObservableMeasurement {
v => !cast(v).isNaN
}

def withActiveReader(
reader: RegisteredReader[F],
timeWindow: TimeWindow
): Resource[F, Unit] =
def withActiveReader(reader: RegisteredReader[F], timeWindow: TimeWindow): Resource[F, Unit] =
Resource.make(stateRef.set(State.WithReader(reader, timeWindow))) { _ =>
stateRef.set(State.Empty())
}

def record(value: A, attributes: Attributes): F[Unit] =
stateRef.get
.flatMap {
if (isValid(value)) {
stateRef.get.flatMap {
case State.Empty() =>
Console[F].errorln(
"SdkObservableMeasurement: " +
Expand All @@ -143,7 +140,9 @@ private[metrics] object SdkObservableMeasurement {
.filter(_.reader == reader)
.traverse_(storage => storage.record(measurement))
}
.whenA(isValid(value))
} else {
Monad[F].unit
}

def hasStorages: Boolean = storages.nonEmpty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,13 @@ private final class SdkSpanBackend[F[_]: Monad: Clock: Console] private (
updateState("updateName")(_.copy(name = name)).void

def addAttributes(attributes: immutable.Iterable[Attribute[_]]): F[Unit] =
updateState("addAttributes") { s =>
s.copy(attributes = s.attributes.appendAll(attributes.to(Attributes)))
}.unlessA(attributes.isEmpty)
if (attributes.nonEmpty) {
updateState("addAttributes") { s =>
s.copy(attributes = s.attributes.appendAll(attributes.to(Attributes)))
}.void
} else {
Monad[F].unit
}

def addEvent(
name: String,
Expand Down Expand Up @@ -193,11 +197,11 @@ private final class SdkSpanBackend[F[_]: Monad: Clock: Console] private (
else (update(state), true)
}
.flatTap { modified =>
Console[F]
.println(
s"SdkSpanBackend: calling [$method] on the ended span $context"
)
.unlessA(modified)
if (modified) {
Monad[F].unit
} else {
Console[F].println(s"SdkSpanBackend: calling [$method] on the ended span $context")
}
}

// SpanRef interfaces
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package processor

import cats.MonadThrow
import cats.effect.std.Console
import cats.syntax.applicative._
import cats.syntax.applicativeError._
import org.typelevel.otel4s.sdk.trace.data.SpanData
import org.typelevel.otel4s.sdk.trace.exporter.SpanExporter
Expand Down Expand Up @@ -53,7 +52,7 @@ private final class SimpleSpanProcessor[F[_]: MonadThrow: Console] private (

def onEnd(span: SpanData): F[Unit] = {
val canExport = !exportOnlySampled || span.spanContext.isSampled
doExport(span).whenA(canExport)
if (canExport) doExport(span) else MonadThrow[F].unit
}

private def doExport(span: SpanData): F[Unit] =
Expand Down

0 comments on commit 27c822b

Please sign in to comment.