Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor exemplars to not use generic argument #5285

Merged
merged 10 commits into from
May 7, 2024
16 changes: 8 additions & 8 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ import (
// Note: This will only return non-nil values when the experimental exemplar
// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable
// is not set to always_off.
func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.Reservoir[N] {
func reservoirFunc(agg Aggregation) func() exemplar.Reservoir {
if !x.Exemplars.Enabled() {
return nil
}

// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
resF := func() func() exemplar.Reservoir[N] {
resF := func() func() exemplar.Reservoir {
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := slices.Clone(a.Boundaries)
return func() exemplar.Reservoir[N] {
return func() exemplar.Reservoir {
bounds := cp
return exemplar.Histogram[N](bounds)
return exemplar.Histogram(bounds)
}
}

Expand Down Expand Up @@ -61,8 +61,8 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.Reservoir
}
}

return func() exemplar.Reservoir[N] {
return exemplar.FixedSize[N](n)
return func() exemplar.Reservoir {
return exemplar.FixedSize(n)
}
}

Expand All @@ -73,12 +73,12 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.Reservoir
case "always_on":
return resF()
case "always_off":
return exemplar.Drop[N]
return exemplar.Drop
case "trace_based":
fallthrough
default:
newR := resF()
return func() exemplar.Reservoir[N] {
return func() exemplar.Reservoir {
return exemplar.SampledFilter(newR())
}
}
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Builder[N int64 | float64] struct {
//
// If this is not provided a default factory function that returns an
// exemplar.Drop reservoir will be used.
ReservoirFunc func() exemplar.Reservoir[N]
ReservoirFunc func() exemplar.Reservoir
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
Expand All @@ -50,12 +50,12 @@ type Builder[N int64 | float64] struct {
AggregationLimit int
}

func (b Builder[N]) resFunc() func() exemplar.Reservoir[N] {
func (b Builder[N]) resFunc() func() exemplar.Reservoir {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}

return exemplar.Drop[N]
return exemplar.Drop
}

type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ var (
}
)

func dropExemplars[N int64 | float64]() exemplar.Reservoir[N] {
return exemplar.Drop[N]()
func dropExemplars[N int64 | float64]() exemplar.Reservoir {
return exemplar.Drop()
}

func TestBuilderFilter(t *testing.T) {
Expand Down
42 changes: 42 additions & 0 deletions sdk/metric/internal/aggregate/exemplar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"sync"

"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

var exemplarPool = sync.Pool{
New: func() any { return new([]exemplar.Exemplar) },
}

func collectExemplars[N int64 | float64](out *[]metricdata.Exemplar[N], f func(*[]exemplar.Exemplar)) {
dest := exemplarPool.Get().(*[]exemplar.Exemplar)
defer func() {
*dest = (*dest)[:0]
exemplarPool.Put(dest)
}()

*dest = reset(*dest, len(*out), cap(*out))

f(dest)

*out = reset(*out, len(*dest), cap(*dest))
for i, e := range *dest {
(*out)[i].FilteredAttributes = e.FilteredAttributes
(*out)[i].Time = e.Time
(*out)[i].SpanID = e.SpanID
(*out)[i].TraceID = e.TraceID

switch e.Value.Type() {
case exemplar.Int64ValueType:
(*out)[i].Value = N(e.Value.Int64())
case exemplar.Float64ValueType:
(*out)[i].Value = N(e.Value.Float64())
}
}
}
50 changes: 50 additions & 0 deletions sdk/metric/internal/aggregate/exemplar_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package aggregate

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

func TestCollectExemplars(t *testing.T) {
t.Run("Int64", testCollectExemplars[int64]())
t.Run("Float64", testCollectExemplars[float64]())
}

func testCollectExemplars[N int64 | float64]() func(t *testing.T) {
return func(t *testing.T) {
now := time.Now()
alice := attribute.String("user", "Alice")
value := N(1)
spanID := [8]byte{0x1}
traceID := [16]byte{0x1}

out := new([]metricdata.Exemplar[N])
collectExemplars(out, func(in *[]exemplar.Exemplar) {
*in = reset(*in, 1, 1)
(*in)[0] = exemplar.Exemplar{
FilteredAttributes: []attribute.KeyValue{alice},
Time: now,
Value: exemplar.NewValue(value),
SpanID: spanID[:],
TraceID: traceID[:],
}
})

assert.Equal(t, []metricdata.Exemplar[N]{{
FilteredAttributes: []attribute.KeyValue{alice},
Time: now,
Value: value,
SpanID: spanID[:],
TraceID: traceID[:],
}}, *out)
}
}
12 changes: 6 additions & 6 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
// expoHistogramDataPoint is a single data point in an exponential histogram.
type expoHistogramDataPoint[N int64 | float64] struct {
attrs attribute.Set
res exemplar.Reservoir[N]
res exemplar.Reservoir

count uint64
min N
Expand Down Expand Up @@ -282,7 +282,7 @@ func (b *expoBuckets) downscale(delta int) {
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir[N]) *expoHistogram[N] {
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
Expand All @@ -305,7 +305,7 @@ type expoHistogram[N int64 | float64] struct {
maxSize int
maxScale int

newRes func() exemplar.Reservoir[N]
newRes func() exemplar.Reservoir
limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex
Expand Down Expand Up @@ -333,7 +333,7 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib
e.values[attr.Equivalent()] = v
}
v.record(value)
v.res.Offer(ctx, t, value, droppedAttr)
v.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr)
}

func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
Expand Down Expand Up @@ -376,7 +376,7 @@ func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
hDPts[i].Max = metricdata.NewExtrema(val.max)
}

val.res.Collect(&hDPts[i].Exemplars)
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)

i++
}
Expand Down Expand Up @@ -429,7 +429,7 @@ func (e *expoHistogram[N]) cumulative(dest *metricdata.Aggregation) int {
hDPts[i].Max = metricdata.NewExtrema(val.max)
}

val.res.Collect(&hDPts[i].Exemplars)
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)

i++
// TODO (#3006): This will use an unbounded amount of memory if there
Expand Down
14 changes: 7 additions & 7 deletions sdk/metric/internal/aggregate/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

type buckets[N int64 | float64] struct {
attrs attribute.Set
res exemplar.Reservoir[N]
res exemplar.Reservoir

counts []uint64
count uint64
Expand Down Expand Up @@ -48,13 +48,13 @@ type histValues[N int64 | float64] struct {
noSum bool
bounds []float64

newRes func() exemplar.Reservoir[N]
newRes func() exemplar.Reservoir
limit limiter[*buckets[N]]
values map[attribute.Distinct]*buckets[N]
valuesMu sync.Mutex
}

func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.Reservoir[N]) *histValues[N] {
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.Reservoir) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
Expand Down Expand Up @@ -106,12 +106,12 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute
if !s.noSum {
b.sum(value)
}
b.res.Offer(ctx, t, value, droppedAttr)
b.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr)
}

// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir[N]) *histogram[N] {
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](boundaries, noSum, limit, r),
noMinMax: noMinMax,
Expand Down Expand Up @@ -163,7 +163,7 @@ func (s *histogram[N]) delta(dest *metricdata.Aggregation) int {
hDPts[i].Max = metricdata.NewExtrema(val.max)
}

val.res.Collect(&hDPts[i].Exemplars)
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)

i++
}
Expand Down Expand Up @@ -219,7 +219,7 @@ func (s *histogram[N]) cumulative(dest *metricdata.Aggregation) int {
hDPts[i].Max = metricdata.NewExtrema(val.max)
}

val.res.Collect(&hDPts[i].Exemplars)
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)

i++
// TODO (#3006): This will use an unbounded amount of memory if there
Expand Down
10 changes: 5 additions & 5 deletions sdk/metric/internal/aggregate/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ type datapoint[N int64 | float64] struct {
attrs attribute.Set
timestamp time.Time
value N
res exemplar.Reservoir[N]
res exemplar.Reservoir
}

func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir[N]) *lastValue[N] {
func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *lastValue[N] {
return &lastValue[N]{
newRes: r,
limit: newLimiter[datapoint[N]](limit),
Expand All @@ -33,7 +33,7 @@ func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir[N])
type lastValue[N int64 | float64] struct {
sync.Mutex

newRes func() exemplar.Reservoir[N]
newRes func() exemplar.Reservoir
limit limiter[datapoint[N]]
values map[attribute.Distinct]datapoint[N]
}
Expand All @@ -53,7 +53,7 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.
d.attrs = attr
d.timestamp = t
d.value = value
d.res.Offer(ctx, t, value, droppedAttr)
d.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr)

s.values[attr.Equivalent()] = d
}
Expand All @@ -72,7 +72,7 @@ func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) {
// ignored.
(*dest)[i].Time = v.timestamp
(*dest)[i].Value = v.value
v.res.Collect(&(*dest)[i].Exemplars)
collectExemplars(&(*dest)[i].Exemplars, v.res.Collect)
i++
}
// Do not report stale values.
Expand Down
Loading