Skip to content

Commit

Permalink
Merge pull request #610 from iRevive/sdk-metrics/last-value-aggregato…
Browse files Browse the repository at this point in the history
…r [no ci]

sdk-metrics: add `LastValueAggregator`
  • Loading branch information
iRevive authored Apr 19, 2024
2 parents 8c83174 + 3950ac3 commit c5bdd9d
Show file tree
Hide file tree
Showing 3 changed files with 390 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright 2024 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.typelevel.otel4s.sdk.metrics.aggregation

import cats.Applicative
import cats.effect.Concurrent
import cats.syntax.functor._
import org.typelevel.otel4s.Attributes
import org.typelevel.otel4s.metrics.MeasurementValue
import org.typelevel.otel4s.sdk.TelemetryResource
import org.typelevel.otel4s.sdk.common.InstrumentationScope
import org.typelevel.otel4s.sdk.context.Context
import org.typelevel.otel4s.sdk.metrics.data.AggregationTemporality
import org.typelevel.otel4s.sdk.metrics.data.MetricData
import org.typelevel.otel4s.sdk.metrics.data.MetricPoints
import org.typelevel.otel4s.sdk.metrics.data.TimeWindow
import org.typelevel.otel4s.sdk.metrics.internal.AsynchronousMeasurement
import org.typelevel.otel4s.sdk.metrics.internal.MetricDescriptor
import org.typelevel.otel4s.sdk.metrics.internal.utils.Current

private object LastValueAggregator {

/** Creates a last value aggregator for synchronous instruments. Retains the
* last seen measurement.
*
* @see
* [[https://opentelemetry.io/docs/specs/otel/metrics/sdk/#last-value-aggregation]]
*
* @tparam F
* the higher-kinded type of a polymorphic effect
*
* @tparam A
* the type of the values to record
*/
def synchronous[
F[_]: Concurrent,
A: MeasurementValue
]: Aggregator.Synchronous[F, A] =
new Synchronous[F, A]

/** Creates a last value aggregator for asynchronous instruments.
*
* @see
* [[https://opentelemetry.io/docs/specs/otel/metrics/sdk/#last-value-aggregation]]
*
* @tparam F
* the higher-kinded type of a polymorphic effect
*
* @tparam A
* the type of the values to record
*/
def asynchronous[
F[_]: Applicative,
A: MeasurementValue
]: Aggregator.Asynchronous[F, A] =
new Asynchronous[F, A]

private final class Synchronous[
F[_]: Concurrent,
A: MeasurementValue
] extends Aggregator.Synchronous[F, A] {
val target: Target[A] = Target[A]

type Point = target.Point

def createAccumulator: F[Aggregator.Accumulator[F, A, Point]] =
for {
current <- Current.create[F, A]
} yield new Accumulator(current)

def toMetricData(
resource: TelemetryResource,
scope: InstrumentationScope,
descriptor: MetricDescriptor,
points: Vector[Point],
temporality: AggregationTemporality
): F[MetricData] =
Concurrent[F].pure(
MetricData(
resource,
scope,
descriptor.name,
descriptor.description,
descriptor.sourceInstrument.unit,
MetricPoints.gauge(points)
)
)

private class Accumulator(
current: Current[F, A]
) extends Aggregator.Accumulator[F, A, Point] {

def aggregate(
timeWindow: TimeWindow,
attributes: Attributes,
reset: Boolean
): F[Option[Point]] =
current.get(reset).map { value =>
value.map { v =>
target.makePointData(
timeWindow,
attributes,
Vector.empty,
v
)
}
}

def record(value: A, attributes: Attributes, context: Context): F[Unit] =
current.set(value)
}
}

private final class Asynchronous[
F[_]: Applicative,
A: MeasurementValue
] extends Aggregator.Asynchronous[F, A] {

private val target: Target[A] = Target[A]

def diff(
previous: AsynchronousMeasurement[A],
current: AsynchronousMeasurement[A]
): AsynchronousMeasurement[A] =
current

def toMetricData(
resource: TelemetryResource,
scope: InstrumentationScope,
descriptor: MetricDescriptor,
measurements: Vector[AsynchronousMeasurement[A]],
temporality: AggregationTemporality
): F[MetricData] = {
val points = measurements.map { m =>
target.makePointData(m.timeWindow, m.attributes, Vector.empty, m.value)
}

Applicative[F].pure(
MetricData(
resource,
scope,
descriptor.name,
descriptor.description,
descriptor.sourceInstrument.unit,
MetricPoints.gauge(points)
)
)
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2024 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.typelevel.otel4s.sdk.metrics.internal.utils

import cats.effect.Concurrent
import cats.syntax.functor._

private[metrics] trait Current[F[_], A] {
def set(a: A): F[Unit]
def get(reset: Boolean): F[Option[A]]
}

private[metrics] object Current {

def create[F[_]: Concurrent, A]: F[Current[F, A]] =
Concurrent[F].ref(Option.empty[A]).map { ref =>
new Current[F, A] {
def set(a: A): F[Unit] =
ref.set(Some(a))

def get(reset: Boolean): F[Option[A]] =
if (reset) ref.getAndSet(None) else ref.get
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Copyright 2024 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.typelevel.otel4s.sdk.metrics.aggregation

import cats.effect.IO
import cats.syntax.foldable._
import munit.CatsEffectSuite
import munit.ScalaCheckEffectSuite
import org.scalacheck.Gen
import org.scalacheck.Test
import org.scalacheck.effect.PropF
import org.typelevel.otel4s.Attributes
import org.typelevel.otel4s.sdk.context.Context
import org.typelevel.otel4s.sdk.metrics.data.MetricData
import org.typelevel.otel4s.sdk.metrics.data.MetricPoints
import org.typelevel.otel4s.sdk.metrics.data.PointData
import org.typelevel.otel4s.sdk.metrics.data.TimeWindow
import org.typelevel.otel4s.sdk.metrics.internal.MetricDescriptor
import org.typelevel.otel4s.sdk.metrics.scalacheck.Gens

import scala.concurrent.duration._

class LastValueAggregatorSuite
extends CatsEffectSuite
with ScalaCheckEffectSuite {

test("synchronous - aggregate with reset - return the last seen value") {
PropF.forAllF(Gen.listOf(Gen.long), Gens.attributes) { (values, attrs) =>
val aggregator =
LastValueAggregator.synchronous[IO, Long]

val timeWindow =
TimeWindow(100.millis, 200.millis)

val expected =
values.lastOption.map { value =>
PointData.longNumber(timeWindow, attrs, Vector.empty, value)
}

for {
accumulator <- aggregator.createAccumulator
_ <- values.traverse_ { value =>
accumulator.record(value, Attributes.empty, Context.root)
}
r1 <- accumulator.aggregate(timeWindow, attrs, reset = true)
r2 <- accumulator.aggregate(timeWindow, attrs, reset = true)
} yield {
assertEquals(r1: Option[PointData], expected)
assertEquals(r2: Option[PointData], None)
}
}
}

test("synchronous - aggregate without reset - return the last stored value") {
PropF.forAllF(Gen.listOf(Gen.long), Gens.attributes) { (values, attrs) =>
val aggregator =
LastValueAggregator.synchronous[IO, Long]

val timeWindow =
TimeWindow(100.millis, 200.millis)

val expected =
values.lastOption.map { value =>
PointData.longNumber(timeWindow, attrs, Vector.empty, value)
}

for {
accumulator <- aggregator.createAccumulator
_ <- values.traverse_ { value =>
accumulator.record(value, Attributes.empty, Context.root)
}
r1 <- accumulator.aggregate(timeWindow, attrs, reset = false)
r2 <- accumulator.aggregate(timeWindow, attrs, reset = false)
} yield {
assertEquals(r1: Option[PointData], expected)
assertEquals(r2: Option[PointData], expected)
}
}
}

test("synchronous - toMetricData") {
PropF.forAllF(
Gens.telemetryResource,
Gens.instrumentationScope,
Gens.instrumentDescriptor,
Gen.listOf(Gens.longNumberPointData),
Gens.aggregationTemporality
) { (resource, scope, descriptor, points, temporality) =>
type LongAggregator = Aggregator.Synchronous[IO, Long] {
type Point = PointData.LongNumber
}

val aggregator =
LastValueAggregator.synchronous[IO, Long].asInstanceOf[LongAggregator]

val expected =
MetricData(
resource = resource,
scope = scope,
name = descriptor.name.toString,
description = descriptor.description,
unit = descriptor.unit,
data = MetricPoints.gauge(points.toVector)
)

for {
metricData <- aggregator.toMetricData(
resource,
scope,
MetricDescriptor(None, descriptor),
points.toVector,
temporality
)
} yield assertEquals(metricData, expected)
}
}

test("asynchronous - diff - return the 'current' value") {
val aggregator = LastValueAggregator.asynchronous[IO, Long]

PropF.forAllF(
Gens.asynchronousMeasurement(Gen.long),
Gens.asynchronousMeasurement(Gen.long)
) { (previous, current) =>
IO(assertEquals(aggregator.diff(previous, current), current))
}
}

test("asynchronous - toMetricData") {
PropF.forAllF(
Gens.telemetryResource,
Gens.instrumentationScope,
Gens.instrumentDescriptor,
Gen.listOf(Gens.asynchronousMeasurement(Gen.long)),
Gens.aggregationTemporality
) { (resource, scope, descriptor, measurements, temporality) =>
val aggregator = LastValueAggregator.asynchronous[IO, Long]

val points = measurements.map { m =>
PointData.longNumber(m.timeWindow, m.attributes, Vector.empty, m.value)
}

val expected =
MetricData(
resource = resource,
scope = scope,
name = descriptor.name.toString,
description = descriptor.description,
unit = descriptor.unit,
data = MetricPoints.gauge(points.toVector)
)

for {
metricData <- aggregator.toMetricData(
resource,
scope,
MetricDescriptor(None, descriptor),
measurements.toVector,
temporality
)
} yield assertEquals(metricData, expected)
}
}

override protected def scalaCheckTestParameters: Test.Parameters =
super.scalaCheckTestParameters
.withMinSuccessfulTests(20)
.withMaxSize(20)

}

0 comments on commit c5bdd9d

Please sign in to comment.