From 2f662dbe131bb499e63dc99b66605722fcb4d761 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 7 May 2024 08:12:59 -0700 Subject: [PATCH] Refactor exemplars to not use generic argument (#5285) * Refactor exemplars to not use generic argument * Update internal/aggregate * Update metric SDK * Test exemplar value type * Add TestCollectExemplars * Fix lint --------- Co-authored-by: Sam Xie --- sdk/metric/exemplar.go | 16 +++--- sdk/metric/internal/aggregate/aggregate.go | 6 +- .../internal/aggregate/aggregate_test.go | 4 +- sdk/metric/internal/aggregate/exemplar.go | 42 ++++++++++++++ .../internal/aggregate/exemplar_test.go | 50 ++++++++++++++++ .../aggregate/exponential_histogram.go | 12 ++-- sdk/metric/internal/aggregate/histogram.go | 14 ++--- sdk/metric/internal/aggregate/lastvalue.go | 10 ++-- sdk/metric/internal/aggregate/sum.go | 20 +++---- sdk/metric/internal/exemplar/drop.go | 9 ++- sdk/metric/internal/exemplar/drop_test.go | 8 +-- sdk/metric/internal/exemplar/exemplar.go | 29 ++++++++++ sdk/metric/internal/exemplar/filter.go | 10 ++-- sdk/metric/internal/exemplar/filter_test.go | 15 +++-- sdk/metric/internal/exemplar/hist.go | 23 +++++--- sdk/metric/internal/exemplar/hist_test.go | 8 +-- sdk/metric/internal/exemplar/rand.go | 17 +++--- sdk/metric/internal/exemplar/rand_test.go | 16 +++--- sdk/metric/internal/exemplar/reservoir.go | 7 +-- .../internal/exemplar/reservoir_test.go | 35 ++++++------ sdk/metric/internal/exemplar/storage.go | 23 ++++---- sdk/metric/internal/exemplar/value.go | 57 +++++++++++++++++++ sdk/metric/internal/exemplar/value_test.go | 27 +++++++++ sdk/metric/pipeline.go | 2 +- 24 files changed, 334 insertions(+), 126 deletions(-) create mode 100644 sdk/metric/internal/aggregate/exemplar.go create mode 100644 sdk/metric/internal/aggregate/exemplar_test.go create mode 100644 sdk/metric/internal/exemplar/exemplar.go create mode 100644 sdk/metric/internal/exemplar/value.go create mode 100644 sdk/metric/internal/exemplar/value_test.go diff --git a/sdk/metric/exemplar.go b/sdk/metric/exemplar.go index 9155c242c71..c774a4684f2 100644 --- a/sdk/metric/exemplar.go +++ b/sdk/metric/exemplar.go @@ -19,21 +19,21 @@ import ( // Note: This will only return non-nil values when the experimental exemplar // feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable // is not set to always_off. -func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.Reservoir[N] { +func reservoirFunc(agg Aggregation) func() exemplar.Reservoir { if !x.Exemplars.Enabled() { return nil } // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults - resF := func() func() exemplar.Reservoir[N] { + resF := func() func() exemplar.Reservoir { // Explicit bucket histogram aggregation with more than 1 bucket will // use AlignedHistogramBucketExemplarReservoir. a, ok := agg.(AggregationExplicitBucketHistogram) if ok && len(a.Boundaries) > 0 { cp := slices.Clone(a.Boundaries) - return func() exemplar.Reservoir[N] { + return func() exemplar.Reservoir { bounds := cp - return exemplar.Histogram[N](bounds) + return exemplar.Histogram(bounds) } } @@ -61,8 +61,8 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.Reservoir } } - return func() exemplar.Reservoir[N] { - return exemplar.FixedSize[N](n) + return func() exemplar.Reservoir { + return exemplar.FixedSize(n) } } @@ -73,12 +73,12 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.Reservoir case "always_on": return resF() case "always_off": - return exemplar.Drop[N] + return exemplar.Drop case "trace_based": fallthrough default: newR := resF() - return func() exemplar.Reservoir[N] { + return func() exemplar.Reservoir { return exemplar.SampledFilter(newR()) } } diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index aa5229b09e4..0a97444a4be 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -39,7 +39,7 @@ type Builder[N int64 | float64] struct { // // If this is not provided a default factory function that returns an // exemplar.Drop reservoir will be used. - ReservoirFunc func() exemplar.Reservoir[N] + ReservoirFunc func() exemplar.Reservoir // AggregationLimit is the cardinality limit of measurement attributes. Any // measurement for new attributes once the limit has been reached will be // aggregated into a single aggregate for the "otel.metric.overflow" @@ -50,12 +50,12 @@ type Builder[N int64 | float64] struct { AggregationLimit int } -func (b Builder[N]) resFunc() func() exemplar.Reservoir[N] { +func (b Builder[N]) resFunc() func() exemplar.Reservoir { if b.ReservoirFunc != nil { return b.ReservoirFunc } - return exemplar.Drop[N] + return exemplar.Drop } type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) diff --git a/sdk/metric/internal/aggregate/aggregate_test.go b/sdk/metric/internal/aggregate/aggregate_test.go index 29481c506cd..1afb8a99258 100644 --- a/sdk/metric/internal/aggregate/aggregate_test.go +++ b/sdk/metric/internal/aggregate/aggregate_test.go @@ -49,8 +49,8 @@ var ( } ) -func dropExemplars[N int64 | float64]() exemplar.Reservoir[N] { - return exemplar.Drop[N]() +func dropExemplars[N int64 | float64]() exemplar.Reservoir { + return exemplar.Drop() } func TestBuilderFilter(t *testing.T) { diff --git a/sdk/metric/internal/aggregate/exemplar.go b/sdk/metric/internal/aggregate/exemplar.go new file mode 100644 index 00000000000..170ae8e58e2 --- /dev/null +++ b/sdk/metric/internal/aggregate/exemplar.go @@ -0,0 +1,42 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + +import ( + "sync" + + "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +var exemplarPool = sync.Pool{ + New: func() any { return new([]exemplar.Exemplar) }, +} + +func collectExemplars[N int64 | float64](out *[]metricdata.Exemplar[N], f func(*[]exemplar.Exemplar)) { + dest := exemplarPool.Get().(*[]exemplar.Exemplar) + defer func() { + *dest = (*dest)[:0] + exemplarPool.Put(dest) + }() + + *dest = reset(*dest, len(*out), cap(*out)) + + f(dest) + + *out = reset(*out, len(*dest), cap(*dest)) + for i, e := range *dest { + (*out)[i].FilteredAttributes = e.FilteredAttributes + (*out)[i].Time = e.Time + (*out)[i].SpanID = e.SpanID + (*out)[i].TraceID = e.TraceID + + switch e.Value.Type() { + case exemplar.Int64ValueType: + (*out)[i].Value = N(e.Value.Int64()) + case exemplar.Float64ValueType: + (*out)[i].Value = N(e.Value.Float64()) + } + } +} diff --git a/sdk/metric/internal/aggregate/exemplar_test.go b/sdk/metric/internal/aggregate/exemplar_test.go new file mode 100644 index 00000000000..df1d125fa66 --- /dev/null +++ b/sdk/metric/internal/aggregate/exemplar_test.go @@ -0,0 +1,50 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package aggregate + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +func TestCollectExemplars(t *testing.T) { + t.Run("Int64", testCollectExemplars[int64]()) + t.Run("Float64", testCollectExemplars[float64]()) +} + +func testCollectExemplars[N int64 | float64]() func(t *testing.T) { + return func(t *testing.T) { + now := time.Now() + alice := attribute.String("user", "Alice") + value := N(1) + spanID := [8]byte{0x1} + traceID := [16]byte{0x1} + + out := new([]metricdata.Exemplar[N]) + collectExemplars(out, func(in *[]exemplar.Exemplar) { + *in = reset(*in, 1, 1) + (*in)[0] = exemplar.Exemplar{ + FilteredAttributes: []attribute.KeyValue{alice}, + Time: now, + Value: exemplar.NewValue(value), + SpanID: spanID[:], + TraceID: traceID[:], + } + }) + + assert.Equal(t, []metricdata.Exemplar[N]{{ + FilteredAttributes: []attribute.KeyValue{alice}, + Time: now, + Value: value, + SpanID: spanID[:], + TraceID: traceID[:], + }}, *out) + } +} diff --git a/sdk/metric/internal/aggregate/exponential_histogram.go b/sdk/metric/internal/aggregate/exponential_histogram.go index a6629ee3125..902074b5bfd 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram.go +++ b/sdk/metric/internal/aggregate/exponential_histogram.go @@ -31,7 +31,7 @@ const ( // expoHistogramDataPoint is a single data point in an exponential histogram. type expoHistogramDataPoint[N int64 | float64] struct { attrs attribute.Set - res exemplar.Reservoir[N] + res exemplar.Reservoir count uint64 min N @@ -282,7 +282,7 @@ func (b *expoBuckets) downscale(delta int) { // newExponentialHistogram returns an Aggregator that summarizes a set of // measurements as an exponential histogram. Each histogram is scoped by attributes // and the aggregation cycle the measurements were made in. -func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir[N]) *expoHistogram[N] { +func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir) *expoHistogram[N] { return &expoHistogram[N]{ noSum: noSum, noMinMax: noMinMax, @@ -305,7 +305,7 @@ type expoHistogram[N int64 | float64] struct { maxSize int maxScale int - newRes func() exemplar.Reservoir[N] + newRes func() exemplar.Reservoir limit limiter[*expoHistogramDataPoint[N]] values map[attribute.Distinct]*expoHistogramDataPoint[N] valuesMu sync.Mutex @@ -333,7 +333,7 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib e.values[attr.Equivalent()] = v } v.record(value) - v.res.Offer(ctx, t, value, droppedAttr) + v.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr) } func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int { @@ -376,7 +376,7 @@ func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int { hDPts[i].Max = metricdata.NewExtrema(val.max) } - val.res.Collect(&hDPts[i].Exemplars) + collectExemplars(&hDPts[i].Exemplars, val.res.Collect) i++ } @@ -429,7 +429,7 @@ func (e *expoHistogram[N]) cumulative(dest *metricdata.Aggregation) int { hDPts[i].Max = metricdata.NewExtrema(val.max) } - val.res.Collect(&hDPts[i].Exemplars) + collectExemplars(&hDPts[i].Exemplars, val.res.Collect) i++ // TODO (#3006): This will use an unbounded amount of memory if there diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index 911d7c18691..213baf50f53 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -17,7 +17,7 @@ import ( type buckets[N int64 | float64] struct { attrs attribute.Set - res exemplar.Reservoir[N] + res exemplar.Reservoir counts []uint64 count uint64 @@ -48,13 +48,13 @@ type histValues[N int64 | float64] struct { noSum bool bounds []float64 - newRes func() exemplar.Reservoir[N] + newRes func() exemplar.Reservoir limit limiter[*buckets[N]] values map[attribute.Distinct]*buckets[N] valuesMu sync.Mutex } -func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.Reservoir[N]) *histValues[N] { +func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.Reservoir) *histValues[N] { // The responsibility of keeping all buckets correctly associated with the // passed boundaries is ultimately this type's responsibility. Make a copy // here so we can always guarantee this. Or, in the case of failure, have @@ -106,12 +106,12 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute if !s.noSum { b.sum(value) } - b.res.Offer(ctx, t, value, droppedAttr) + b.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr) } // newHistogram returns an Aggregator that summarizes a set of measurements as // an histogram. -func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir[N]) *histogram[N] { +func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir) *histogram[N] { return &histogram[N]{ histValues: newHistValues[N](boundaries, noSum, limit, r), noMinMax: noMinMax, @@ -163,7 +163,7 @@ func (s *histogram[N]) delta(dest *metricdata.Aggregation) int { hDPts[i].Max = metricdata.NewExtrema(val.max) } - val.res.Collect(&hDPts[i].Exemplars) + collectExemplars(&hDPts[i].Exemplars, val.res.Collect) i++ } @@ -219,7 +219,7 @@ func (s *histogram[N]) cumulative(dest *metricdata.Aggregation) int { hDPts[i].Max = metricdata.NewExtrema(val.max) } - val.res.Collect(&hDPts[i].Exemplars) + collectExemplars(&hDPts[i].Exemplars, val.res.Collect) i++ // TODO (#3006): This will use an unbounded amount of memory if there diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index 73cf98c7599..f3238974c6a 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -18,10 +18,10 @@ type datapoint[N int64 | float64] struct { attrs attribute.Set timestamp time.Time value N - res exemplar.Reservoir[N] + res exemplar.Reservoir } -func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir[N]) *lastValue[N] { +func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *lastValue[N] { return &lastValue[N]{ newRes: r, limit: newLimiter[datapoint[N]](limit), @@ -33,7 +33,7 @@ func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir[N]) type lastValue[N int64 | float64] struct { sync.Mutex - newRes func() exemplar.Reservoir[N] + newRes func() exemplar.Reservoir limit limiter[datapoint[N]] values map[attribute.Distinct]datapoint[N] } @@ -53,7 +53,7 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute. d.attrs = attr d.timestamp = t d.value = value - d.res.Offer(ctx, t, value, droppedAttr) + d.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr) s.values[attr.Equivalent()] = d } @@ -72,7 +72,7 @@ func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) { // ignored. (*dest)[i].Time = v.timestamp (*dest)[i].Value = v.value - v.res.Collect(&(*dest)[i].Exemplars) + collectExemplars(&(*dest)[i].Exemplars, v.res.Collect) i++ } // Do not report stale values. diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index 7514b95e698..babe76aba9b 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -15,19 +15,19 @@ import ( type sumValue[N int64 | float64] struct { n N - res exemplar.Reservoir[N] + res exemplar.Reservoir attrs attribute.Set } // valueMap is the storage for sums. type valueMap[N int64 | float64] struct { sync.Mutex - newRes func() exemplar.Reservoir[N] + newRes func() exemplar.Reservoir limit limiter[sumValue[N]] values map[attribute.Distinct]sumValue[N] } -func newValueMap[N int64 | float64](limit int, r func() exemplar.Reservoir[N]) *valueMap[N] { +func newValueMap[N int64 | float64](limit int, r func() exemplar.Reservoir) *valueMap[N] { return &valueMap[N]{ newRes: r, limit: newLimiter[sumValue[N]](limit), @@ -49,7 +49,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S v.attrs = attr v.n += value - v.res.Offer(ctx, t, value, droppedAttr) + v.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr) s.values[attr.Equivalent()] = v } @@ -57,7 +57,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S // newSum returns an aggregator that summarizes a set of measurements as their // arithmetic sum. Each sum is scoped by attributes and the aggregation cycle // the measurements were made in. -func newSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.Reservoir[N]) *sum[N] { +func newSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.Reservoir) *sum[N] { return &sum[N]{ valueMap: newValueMap[N](limit, r), monotonic: monotonic, @@ -94,7 +94,7 @@ func (s *sum[N]) delta(dest *metricdata.Aggregation) int { dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = val.n - val.res.Collect(&dPts[i].Exemplars) + collectExemplars(&dPts[i].Exemplars, val.res.Collect) i++ } // Do not report stale values. @@ -129,7 +129,7 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = value.n - value.res.Collect(&dPts[i].Exemplars) + collectExemplars(&dPts[i].Exemplars, value.res.Collect) // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not @@ -146,7 +146,7 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { // newPrecomputedSum returns an aggregator that summarizes a set of // observatrions as their arithmetic sum. Each sum is scoped by attributes and // the aggregation cycle the measurements were made in. -func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.Reservoir[N]) *precomputedSum[N] { +func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.Reservoir) *precomputedSum[N] { return &precomputedSum[N]{ valueMap: newValueMap[N](limit, r), monotonic: monotonic, @@ -188,7 +188,7 @@ func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int { dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = delta - value.res.Collect(&dPts[i].Exemplars) + collectExemplars(&dPts[i].Exemplars, value.res.Collect) newReported[key] = value.n i++ @@ -226,7 +226,7 @@ func (s *precomputedSum[N]) cumulative(dest *metricdata.Aggregation) int { dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = val.n - val.res.Collect(&dPts[i].Exemplars) + collectExemplars(&dPts[i].Exemplars, val.res.Collect) i++ } diff --git a/sdk/metric/internal/exemplar/drop.go b/sdk/metric/internal/exemplar/drop.go index 729c2793ee1..bf21e45dfaf 100644 --- a/sdk/metric/internal/exemplar/drop.go +++ b/sdk/metric/internal/exemplar/drop.go @@ -8,18 +8,17 @@ import ( "time" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/metricdata" ) // Drop returns a [Reservoir] that drops all measurements it is offered. -func Drop[N int64 | float64]() Reservoir[N] { return &dropRes[N]{} } +func Drop() Reservoir { return &dropRes{} } -type dropRes[N int64 | float64] struct{} +type dropRes struct{} // Offer does nothing, all measurements offered will be dropped. -func (r *dropRes[N]) Offer(context.Context, time.Time, N, []attribute.KeyValue) {} +func (r *dropRes) Offer(context.Context, time.Time, Value, []attribute.KeyValue) {} // Collect resets dest. No exemplars will ever be returned. -func (r *dropRes[N]) Collect(dest *[]metricdata.Exemplar[N]) { +func (r *dropRes) Collect(dest *[]Exemplar) { *dest = (*dest)[:0] } diff --git a/sdk/metric/internal/exemplar/drop_test.go b/sdk/metric/internal/exemplar/drop_test.go index eb07e9a10e4..9140f9e276e 100644 --- a/sdk/metric/internal/exemplar/drop_test.go +++ b/sdk/metric/internal/exemplar/drop_test.go @@ -8,11 +8,11 @@ import ( ) func TestDrop(t *testing.T) { - t.Run("Int64", ReservoirTest[int64](func(int) (Reservoir[int64], int) { - return Drop[int64](), 0 + t.Run("Int64", ReservoirTest[int64](func(int) (Reservoir, int) { + return Drop(), 0 })) - t.Run("Float64", ReservoirTest[float64](func(int) (Reservoir[float64], int) { - return Drop[float64](), 0 + t.Run("Float64", ReservoirTest[float64](func(int) (Reservoir, int) { + return Drop(), 0 })) } diff --git a/sdk/metric/internal/exemplar/exemplar.go b/sdk/metric/internal/exemplar/exemplar.go new file mode 100644 index 00000000000..fcaa6a4697c --- /dev/null +++ b/sdk/metric/internal/exemplar/exemplar.go @@ -0,0 +1,29 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + +import ( + "time" + + "go.opentelemetry.io/otel/attribute" +) + +// Exemplar is a measurement sampled from a timeseries providing a typical +// example. +type Exemplar struct { + // FilteredAttributes are the attributes recorded with the measurement but + // filtered out of the timeseries' aggregated data. + FilteredAttributes []attribute.KeyValue + // Time is the time when the measurement was recorded. + Time time.Time + // Value is the measured value. + Value Value + // SpanID is the ID of the span that was active during the measurement. If + // no span was active or the span was not sampled this will be empty. + SpanID []byte `json:",omitempty"` + // TraceID is the ID of the trace the active span belonged to during the + // measurement. If no span was active or the span was not sampled this will + // be empty. + TraceID []byte `json:",omitempty"` +} diff --git a/sdk/metric/internal/exemplar/filter.go b/sdk/metric/internal/exemplar/filter.go index 53c86d5cc24..d96aacc281a 100644 --- a/sdk/metric/internal/exemplar/filter.go +++ b/sdk/metric/internal/exemplar/filter.go @@ -14,15 +14,15 @@ import ( // SampledFilter returns a [Reservoir] wrapping r that will only offer measurements // to r if the passed context associated with the measurement contains a sampled // [go.opentelemetry.io/otel/trace.SpanContext]. -func SampledFilter[N int64 | float64](r Reservoir[N]) Reservoir[N] { - return filtered[N]{Reservoir: r} +func SampledFilter(r Reservoir) Reservoir { + return filtered{Reservoir: r} } -type filtered[N int64 | float64] struct { - Reservoir[N] +type filtered struct { + Reservoir } -func (f filtered[N]) Offer(ctx context.Context, t time.Time, n N, a []attribute.KeyValue) { +func (f filtered) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) { if trace.SpanContextFromContext(ctx).IsSampled() { f.Reservoir.Offer(ctx, t, n, a) } diff --git a/sdk/metric/internal/exemplar/filter_test.go b/sdk/metric/internal/exemplar/filter_test.go index b52f5b79049..eadcc667a8b 100644 --- a/sdk/metric/internal/exemplar/filter_test.go +++ b/sdk/metric/internal/exemplar/filter_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/trace" ) @@ -21,14 +20,14 @@ func TestSampledFilter(t *testing.T) { } func testSampledFiltered[N int64 | float64](t *testing.T) { - under := &res[N]{} + under := &res{} - r := SampledFilter[N](under) + r := SampledFilter(under) ctx := context.Background() - r.Offer(ctx, staticTime, 0, nil) + r.Offer(ctx, staticTime, NewValue(N(0)), nil) assert.False(t, under.OfferCalled, "underlying Reservoir Offer called") - r.Offer(sample(ctx), staticTime, 0, nil) + r.Offer(sample(ctx), staticTime, NewValue(N(0)), nil) assert.True(t, under.OfferCalled, "underlying Reservoir Offer not called") r.Collect(nil) @@ -44,15 +43,15 @@ func sample(parent context.Context) context.Context { return trace.ContextWithSpanContext(parent, sc) } -type res[N int64 | float64] struct { +type res struct { OfferCalled bool CollectCalled bool } -func (r *res[N]) Offer(context.Context, time.Time, N, []attribute.KeyValue) { +func (r *res) Offer(context.Context, time.Time, Value, []attribute.KeyValue) { r.OfferCalled = true } -func (r *res[N]) Collect(*[]metricdata.Exemplar[N]) { +func (r *res) Collect(*[]Exemplar) { r.CollectCalled = true } diff --git a/sdk/metric/internal/exemplar/hist.go b/sdk/metric/internal/exemplar/hist.go index 463c8a7d313..a6ff86d0271 100644 --- a/sdk/metric/internal/exemplar/hist.go +++ b/sdk/metric/internal/exemplar/hist.go @@ -17,21 +17,30 @@ import ( // by bounds. // // The passed bounds will be sorted by this function. -func Histogram[N int64 | float64](bounds []float64) Reservoir[N] { +func Histogram(bounds []float64) Reservoir { slices.Sort(bounds) - return &histRes[N]{ + return &histRes{ bounds: bounds, - storage: newStorage[N](len(bounds) + 1), + storage: newStorage(len(bounds) + 1), } } -type histRes[N int64 | float64] struct { - *storage[N] +type histRes struct { + *storage // bounds are bucket bounds in ascending order. bounds []float64 } -func (r *histRes[N]) Offer(ctx context.Context, t time.Time, n N, a []attribute.KeyValue) { - r.store[sort.SearchFloat64s(r.bounds, float64(n))] = newMeasurement(ctx, t, n, a) +func (r *histRes) Offer(ctx context.Context, t time.Time, v Value, a []attribute.KeyValue) { + var x float64 + switch v.Type() { + case Int64ValueType: + x = float64(v.Int64()) + case Float64ValueType: + x = v.Float64() + default: + panic("unknown value type") + } + r.store[sort.SearchFloat64s(r.bounds, x)] = newMeasurement(ctx, t, v, a) } diff --git a/sdk/metric/internal/exemplar/hist_test.go b/sdk/metric/internal/exemplar/hist_test.go index b22d6c22a01..499c9a3a2d3 100644 --- a/sdk/metric/internal/exemplar/hist_test.go +++ b/sdk/metric/internal/exemplar/hist_test.go @@ -7,11 +7,11 @@ import "testing" func TestHist(t *testing.T) { bounds := []float64{0, 100} - t.Run("Int64", ReservoirTest[int64](func(int) (Reservoir[int64], int) { - return Histogram[int64](bounds), len(bounds) + t.Run("Int64", ReservoirTest[int64](func(int) (Reservoir, int) { + return Histogram(bounds), len(bounds) })) - t.Run("Float64", ReservoirTest[float64](func(int) (Reservoir[float64], int) { - return Histogram[float64](bounds), len(bounds) + t.Run("Float64", ReservoirTest[float64](func(int) (Reservoir, int) { + return Histogram(bounds), len(bounds) })) } diff --git a/sdk/metric/internal/exemplar/rand.go b/sdk/metric/internal/exemplar/rand.go index 923953cb75b..6753e116646 100644 --- a/sdk/metric/internal/exemplar/rand.go +++ b/sdk/metric/internal/exemplar/rand.go @@ -10,7 +10,6 @@ import ( "time" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/metricdata" ) // rng is used to make sampling decisions. @@ -50,14 +49,14 @@ func random() float64 { // are k or less measurements made, the Reservoir will sample each one. If // there are more than k, the Reservoir will then randomly sample all // additional measurement with a decreasing probability. -func FixedSize[N int64 | float64](k int) Reservoir[N] { - r := &randRes[N]{storage: newStorage[N](k)} +func FixedSize(k int) Reservoir { + r := &randRes{storage: newStorage(k)} r.reset() return r } -type randRes[N int64 | float64] struct { - *storage[N] +type randRes struct { + *storage // count is the number of measurement seen. count int64 @@ -69,7 +68,7 @@ type randRes[N int64 | float64] struct { w float64 } -func (r *randRes[N]) Offer(ctx context.Context, t time.Time, n N, a []attribute.KeyValue) { +func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) { // The following algorithm is "Algorithm L" from Li, Kim-Hung (4 December // 1994). "Reservoir-Sampling Algorithms of Time Complexity // O(n(1+log(N/n)))". ACM Transactions on Mathematical Software. 20 (4): @@ -125,7 +124,7 @@ func (r *randRes[N]) Offer(ctx context.Context, t time.Time, n N, a []attribute. } // reset resets r to the initial state. -func (r *randRes[N]) reset() { +func (r *randRes) reset() { // This resets the number of exemplars known. r.count = 0 // Random index inserts should only happen after the storage is full. @@ -147,7 +146,7 @@ func (r *randRes[N]) reset() { // advance updates the count at which the offered measurement will overwrite an // existing exemplar. -func (r *randRes[N]) advance() { +func (r *randRes) advance() { // Calculate the next value in the random number series. // // The current value of r.w is based on the max of a distribution of random @@ -174,7 +173,7 @@ func (r *randRes[N]) advance() { r.next += int64(math.Log(random())/math.Log(1-r.w)) + 1 } -func (r *randRes[N]) Collect(dest *[]metricdata.Exemplar[N]) { +func (r *randRes) Collect(dest *[]Exemplar) { r.storage.Collect(dest) // Call reset here even though it will reset r.count and restart the random // number series. This will persist any old exemplars as long as no new diff --git a/sdk/metric/internal/exemplar/rand_test.go b/sdk/metric/internal/exemplar/rand_test.go index 5b6fd426fb4..7668d3e215a 100644 --- a/sdk/metric/internal/exemplar/rand_test.go +++ b/sdk/metric/internal/exemplar/rand_test.go @@ -13,12 +13,12 @@ import ( ) func TestFixedSize(t *testing.T) { - t.Run("Int64", ReservoirTest[int64](func(n int) (Reservoir[int64], int) { - return FixedSize[int64](n), n + t.Run("Int64", ReservoirTest[int64](func(n int) (Reservoir, int) { + return FixedSize(n), n })) - t.Run("Float64", ReservoirTest[float64](func(n int) (Reservoir[float64], int) { - return FixedSize[float64](n), n + t.Run("Float64", ReservoirTest[float64](func(n int) (Reservoir, int) { + return FixedSize(n), n })) } @@ -34,14 +34,14 @@ func TestFixedSizeSamplingCorrectness(t *testing.T) { // Sort to test position bias. slices.Sort(data) - r := FixedSize[float64](sampleSize) + r := FixedSize(sampleSize) for _, value := range data { - r.Offer(context.Background(), staticTime, value, nil) + r.Offer(context.Background(), staticTime, NewValue(value), nil) } var sum float64 - for _, m := range r.(*randRes[float64]).store { - sum += m.Value + for _, m := range r.(*randRes).store { + sum += m.Value.Float64() } mean := sum / float64(sampleSize) diff --git a/sdk/metric/internal/exemplar/reservoir.go b/sdk/metric/internal/exemplar/reservoir.go index a663aa22970..80fa59554f2 100644 --- a/sdk/metric/internal/exemplar/reservoir.go +++ b/sdk/metric/internal/exemplar/reservoir.go @@ -8,11 +8,10 @@ import ( "time" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/metricdata" ) // Reservoir holds the sampled exemplar of measurements made. -type Reservoir[N int64 | float64] interface { +type Reservoir interface { // Offer accepts the parameters associated with a measurement. The // parameters will be stored as an exemplar if the Reservoir decides to // sample the measurement. @@ -24,10 +23,10 @@ type Reservoir[N int64 | float64] interface { // The time t is the time when the measurement was made. The val and attr // parameters are the value and dropped (filtered) attributes of the // measurement respectively. - Offer(ctx context.Context, t time.Time, val N, attr []attribute.KeyValue) + Offer(ctx context.Context, t time.Time, val Value, attr []attribute.KeyValue) // Collect returns all the held exemplars. // // The Reservoir state is preserved after this call. - Collect(dest *[]metricdata.Exemplar[N]) + Collect(dest *[]Exemplar) } diff --git a/sdk/metric/internal/exemplar/reservoir_test.go b/sdk/metric/internal/exemplar/reservoir_test.go index f2cdfb0f542..f6d2d884453 100644 --- a/sdk/metric/internal/exemplar/reservoir_test.go +++ b/sdk/metric/internal/exemplar/reservoir_test.go @@ -12,16 +12,15 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/trace" ) // Sat Jan 01 2000 00:00:00 GMT+0000. var staticTime = time.Unix(946684800, 0) -type factory[N int64 | float64] func(requstedCap int) (r Reservoir[N], actualCap int) +type factory func(requstedCap int) (r Reservoir, actualCap int) -func ReservoirTest[N int64 | float64](f factory[N]) func(*testing.T) { +func ReservoirTest[N int64 | float64](f factory) func(*testing.T) { return func(t *testing.T) { t.Helper() @@ -43,14 +42,14 @@ func ReservoirTest[N int64 | float64](f factory[N]) func(*testing.T) { }) ctx := trace.ContextWithSpanContext(ctx, sc) - r.Offer(ctx, staticTime, 10, nil) + r.Offer(ctx, staticTime, NewValue(N(10)), nil) - var dest []metricdata.Exemplar[N] + var dest []Exemplar r.Collect(&dest) - want := metricdata.Exemplar[N]{ + want := Exemplar{ Time: staticTime, - Value: 10, + Value: NewValue(N(10)), SpanID: []byte(sID[:]), TraceID: []byte(tID[:]), } @@ -67,15 +66,15 @@ func ReservoirTest[N int64 | float64](f factory[N]) func(*testing.T) { } adminTrue := attribute.Bool("admin", true) - r.Offer(ctx, staticTime, 10, []attribute.KeyValue{adminTrue}) + r.Offer(ctx, staticTime, NewValue(N(10)), []attribute.KeyValue{adminTrue}) - var dest []metricdata.Exemplar[N] + var dest []Exemplar r.Collect(&dest) - want := metricdata.Exemplar[N]{ + want := Exemplar{ FilteredAttributes: []attribute.KeyValue{adminTrue}, Time: staticTime, - Value: 10, + Value: NewValue(N(10)), } require.Len(t, dest, 1, "number of collected exemplars") assert.Equal(t, want, dest[0]) @@ -89,9 +88,9 @@ func ReservoirTest[N int64 | float64](f factory[N]) func(*testing.T) { t.Skip("skipping, reservoir capacity less than 2:", n) } - r.Offer(ctx, staticTime, 10, nil) + r.Offer(ctx, staticTime, NewValue(N(10)), nil) - var dest []metricdata.Exemplar[N] + var dest []Exemplar r.Collect(&dest) // No empty exemplars are exported. require.Len(t, dest, 1, "number of collected exemplars") @@ -106,17 +105,17 @@ func ReservoirTest[N int64 | float64](f factory[N]) func(*testing.T) { } for i := 0; i < n+1; i++ { - v := N(i) + v := NewValue(N(i)) r.Offer(ctx, staticTime, v, nil) } - var dest []metricdata.Exemplar[N] + var dest []Exemplar r.Collect(&dest) assert.Len(t, dest, n, "multiple offers did not fill reservoir") // Ensure the collect reset also resets any counting state. for i := 0; i < n+1; i++ { - v := N(i) + v := NewValue(N(i)) r.Offer(ctx, staticTime, v, nil) } @@ -133,9 +132,9 @@ func ReservoirTest[N int64 | float64](f factory[N]) func(*testing.T) { t.Skip("skipping, reservoir capacity greater than 0:", n) } - r.Offer(context.Background(), staticTime, 10, nil) + r.Offer(context.Background(), staticTime, NewValue(N(10)), nil) - dest := []metricdata.Exemplar[N]{{}} // Should be reset to empty. + dest := []Exemplar{{}} // Should be reset to empty. r.Collect(&dest) assert.Len(t, dest, 0, "no exemplars should be collected") }) diff --git a/sdk/metric/internal/exemplar/storage.go b/sdk/metric/internal/exemplar/storage.go index 994ab10736b..10b2976f796 100644 --- a/sdk/metric/internal/exemplar/storage.go +++ b/sdk/metric/internal/exemplar/storage.go @@ -8,27 +8,26 @@ import ( "time" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/trace" ) // storage is an exemplar storage for [Reservoir] implementations. -type storage[N int64 | float64] struct { +type storage struct { // store are the measurements sampled. // // This does not use []metricdata.Exemplar because it potentially would // require an allocation for trace and span IDs in the hot path of Offer. - store []measurement[N] + store []measurement } -func newStorage[N int64 | float64](n int) *storage[N] { - return &storage[N]{store: make([]measurement[N], n)} +func newStorage(n int) *storage { + return &storage{store: make([]measurement, n)} } // Collect returns all the held exemplars. // // The Reservoir state is preserved after this call. -func (r *storage[N]) Collect(dest *[]metricdata.Exemplar[N]) { +func (r *storage) Collect(dest *[]Exemplar) { *dest = reset(*dest, len(r.store), len(r.store)) var n int for _, m := range r.store { @@ -43,13 +42,13 @@ func (r *storage[N]) Collect(dest *[]metricdata.Exemplar[N]) { } // measurement is a measurement made by a telemetry system. -type measurement[N int64 | float64] struct { +type measurement struct { // FilteredAttributes are the attributes dropped during the measurement. FilteredAttributes []attribute.KeyValue // Time is the time when the measurement was made. Time time.Time // Value is the value of the measurement. - Value N + Value Value // SpanContext is the SpanContext active when a measurement was made. SpanContext trace.SpanContext @@ -57,8 +56,8 @@ type measurement[N int64 | float64] struct { } // newMeasurement returns a new non-empty Measurement. -func newMeasurement[N int64 | float64](ctx context.Context, ts time.Time, v N, droppedAttr []attribute.KeyValue) measurement[N] { - return measurement[N]{ +func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []attribute.KeyValue) measurement { + return measurement{ FilteredAttributes: droppedAttr, Time: ts, Value: v, @@ -67,8 +66,8 @@ func newMeasurement[N int64 | float64](ctx context.Context, ts time.Time, v N, d } } -// Exemplar returns m as a [metricdata.Exemplar]. -func (m measurement[N]) Exemplar(dest *metricdata.Exemplar[N]) { +// Exemplar returns m as an [Exemplar]. +func (m measurement) Exemplar(dest *Exemplar) { dest.FilteredAttributes = m.FilteredAttributes dest.Time = m.Time dest.Value = m.Value diff --git a/sdk/metric/internal/exemplar/value.go b/sdk/metric/internal/exemplar/value.go new file mode 100644 index 00000000000..9daf27dc006 --- /dev/null +++ b/sdk/metric/internal/exemplar/value.go @@ -0,0 +1,57 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + +import "math" + +// ValueType identifies the type of value used in exemplar data. +type ValueType uint8 + +const ( + // UnknownValueType should not be used. It represents a misconfigured + // Value. + UnknownValueType ValueType = 0 + // Int64ValueType represents a Value with int64 data. + Int64ValueType ValueType = 1 + // Float64ValueType represents a Value with float64 data. + Float64ValueType ValueType = 2 +) + +// Value is the value of data held by an exemplar. +type Value struct { + t ValueType + val uint64 +} + +// NewValue returns a new [Value] for the provided value. +func NewValue[N int64 | float64](value N) Value { + switch v := any(value).(type) { + case int64: + return Value{t: Int64ValueType, val: uint64(v)} + case float64: + return Value{t: Float64ValueType, val: math.Float64bits(v)} + } + return Value{} +} + +// Type returns the [ValueType] of data held by v. +func (v Value) Type() ValueType { return v.t } + +// Int64 returns the value of v as an int64. If the ValueType of v is not an +// Int64ValueType, 0 is returned. +func (v Value) Int64() int64 { + if v.t == Int64ValueType { + return int64(v.val) + } + return 0 +} + +// Float64 returns the value of v as an float64. If the ValueType of v is not +// an Float64ValueType, 0 is returned. +func (v Value) Float64() float64 { + if v.t == Float64ValueType { + return math.Float64frombits(v.val) + } + return 0 +} diff --git a/sdk/metric/internal/exemplar/value_test.go b/sdk/metric/internal/exemplar/value_test.go new file mode 100644 index 00000000000..835879bdc05 --- /dev/null +++ b/sdk/metric/internal/exemplar/value_test.go @@ -0,0 +1,27 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exemplar + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestValue(t *testing.T) { + const iVal, fVal = int64(43), float64(0.3) + i, f, bad := NewValue[int64](iVal), NewValue[float64](fVal), Value{} + + assert.Equal(t, Int64ValueType, i.Type()) + assert.Equal(t, iVal, i.Int64()) + assert.Equal(t, float64(0), i.Float64()) + + assert.Equal(t, Float64ValueType, f.Type()) + assert.Equal(t, fVal, f.Float64()) + assert.Equal(t, int64(0), f.Int64()) + + assert.Equal(t, UnknownValueType, bad.Type()) + assert.Equal(t, float64(0), bad.Float64()) + assert.Equal(t, int64(0), bad.Int64()) +} diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index c3e2d9cc012..f2167974689 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -349,7 +349,7 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum cv := i.aggregators.Lookup(normID, func() aggVal[N] { b := aggregate.Builder[N]{ Temporality: i.pipeline.reader.temporality(kind), - ReservoirFunc: reservoirFunc[N](stream.Aggregation), + ReservoirFunc: reservoirFunc(stream.Aggregation), } b.Filter = stream.AttributeFilter // A value less than or equal to zero will disable the aggregation