Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add exemplars to the metric SDK as an experimental feature #4455

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
593a45b
Add internal/exemplar pkg
MrAlias Aug 16, 2023
c37db57
Update internal/aggregate to support exemplars
MrAlias Aug 16, 2023
becb3a0
Add experimental feature functionality
MrAlias Aug 16, 2023
141afc0
Update metric SDK to support exemplars
MrAlias Aug 16, 2023
778c62e
Add exemplar pkg tests
MrAlias Aug 17, 2023
99f6b14
Use Algorithm L for rand res
MrAlias Aug 18, 2023
0486576
Move experimental support to internal pkg
MrAlias Aug 18, 2023
c423127
Merge branch 'main' into exemplar-x
MrAlias Dec 15, 2023
1d571d5
Update default fixed bucket exemplar sizes
MrAlias Dec 15, 2023
b997bd3
Merge branch 'main' into exemplar-x
MrAlias Dec 15, 2023
ac0cf70
Merge branch 'main' into exemplar-x
MrAlias Jan 17, 2024
ebe1807
Revert name changes unrelated to PR
MrAlias Jan 17, 2024
e694fb8
Document Offer params in Reservoir iface
MrAlias Jan 17, 2024
623ed1b
Add end-to-end benchmark for rand and hist
MrAlias Jan 17, 2024
6a94b22
Move to recording dropped at Offer
MrAlias Jan 17, 2024
c4d8381
Remove the unneeded exemplar Filter
MrAlias Jan 19, 2024
0741f43
Merge branch 'main' into exemplar-x
MrAlias Jan 19, 2024
f77205f
Comment drop
MrAlias Jan 19, 2024
11bace0
Reorder filter_test.go
MrAlias Jan 19, 2024
d2171e7
Rename measurement.Empty to Valid
MrAlias Jan 19, 2024
27bc3e7
Doc fixed.go
MrAlias Jan 19, 2024
b044f34
Doc rand.go
MrAlias Jan 19, 2024
b729954
Fix doc for Reservoir
MrAlias Jan 19, 2024
6d2dfaa
Move adminTrue to only use scope
MrAlias Jan 19, 2024
3b4a1b5
Add internal/exemplar/doc.go
MrAlias Jan 19, 2024
4460389
Add link to res sampling comparison repo
MrAlias Jan 19, 2024
925675f
Rename fixedRes to storage
MrAlias Jan 19, 2024
bb1c495
Rename testReservoir to ReservoirTest
MrAlias Jan 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"fmt"
"runtime"
"strconv"
"testing"

Expand All @@ -24,6 +26,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/trace"
)

var viewBenchmarks = []struct {
Expand Down Expand Up @@ -369,3 +372,89 @@ func benchCollectAttrs(setup func(attribute.Set) Reader) func(*testing.B) {
b.Run("Attributes/10", run(setup(attribute.NewSet(attrs...))))
}
}

func BenchmarkExemplars(b *testing.B) {
sc := trace.NewSpanContext(trace.SpanContextConfig{
SpanID: trace.SpanID{01},
TraceID: trace.TraceID{01},
TraceFlags: trace.FlagsSampled,
})
ctx := trace.ContextWithSpanContext(context.Background(), sc)

attr := attribute.NewSet(
attribute.String("user", "Alice"),
attribute.Bool("admin", true),
)

setup := func(name string) (metric.Meter, Reader) {
r := NewManualReader()
v := NewView(Instrument{Name: "*"}, Stream{
AttributeFilter: func(kv attribute.KeyValue) bool {
return kv.Key == attribute.Key("user")
},
})
mp := NewMeterProvider(WithReader(r), WithView(v))
return mp.Meter(name), r
}
nCPU := runtime.NumCPU()

b.Setenv("OTEL_GO_X_EXEMPLAR", "true")

name := fmt.Sprintf("Int64Counter/%d", nCPU)
b.Run(name, func(b *testing.B) {
m, r := setup("Int64Counter")
i, err := m.Int64Counter("int64-counter")
assert.NoError(b, err)

rm := newRM(metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{Exemplars: make([]metricdata.Exemplar[int64], 0, nCPU)},
},
})
e := &(rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Exemplars)

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
for j := 0; j < 2*nCPU; j++ {
i.Add(ctx, 1, metric.WithAttributeSet(attr))
}

r.Collect(ctx, rm)
assert.Len(b, *e, nCPU)
}
})

name = fmt.Sprintf("Int64Histogram/%d", nCPU)
b.Run(name, func(b *testing.B) {
m, r := setup("Int64Counter")
i, err := m.Int64Histogram("int64-histogram")
assert.NoError(b, err)

rm := newRM(metricdata.Histogram[int64]{
DataPoints: []metricdata.HistogramDataPoint[int64]{
{Exemplars: make([]metricdata.Exemplar[int64], 0, 1)},
},
})
e := &(rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[int64]).DataPoints[0].Exemplars)

b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
for j := 0; j < 2*nCPU; j++ {
i.Record(ctx, 1, metric.WithAttributeSet(attr))
}

r.Collect(ctx, rm)
assert.Len(b, *e, 1)
}
})
}

func newRM(a metricdata.Aggregation) *metricdata.ResourceMetrics {
return &metricdata.ResourceMetrics{
ScopeMetrics: []metricdata.ScopeMetrics{
{Metrics: []metricdata.Metrics{{Data: a}}},
},
}
}
96 changes: 96 additions & 0 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"os"
"runtime"

"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/x"
)

// reservoirFunc returns the appropriately configured exemplar reservoir
// creation func based on the passed InstrumentKind and user defined
// environment variables.
//
// Note: This will only return non-nil values when the experimental exemplar
// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable
// is not set to always_off.
func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.Reservoir[N] {
if !x.Exemplars.Enabled() {
return nil
}

// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
resF := func() func() exemplar.Reservoir[N] {
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 1 {
cp := make([]float64, len(a.Boundaries))
copy(cp, a.Boundaries)
return func() exemplar.Reservoir[N] {
bounds := cp
return exemplar.Histogram[N](bounds)
}
}

var n int
if a, ok := agg.(AggregationBase2ExponentialHistogram); ok {
// Base2 Exponential Histogram Aggregation SHOULD use a
// SimpleFixedSizeExemplarReservoir with a reservoir equal to the
// smaller of the maximum number of buckets configured on the
// aggregation or twenty (e.g. min(20, max_buckets)).
n = int(a.MaxSize)
if n > 20 {
n = 20
}
} else {
// https://github.com/open-telemetry/opentelemetry-specification/blob/e94af89e3d0c01de30127a0f423e912f6cda7bed/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir
// This Exemplar reservoir MAY take a configuration parameter for
// the size of the reservoir. If no size configuration is
// provided, the default size MAY be the number of possible
// concurrent threads (e.g. numer of CPUs) to help reduce
// contention. Otherwise, a default size of 1 SHOULD be used.
n = runtime.NumCPU()
if n < 1 {
// Should never be the case, but be defensive.
n = 1
}
}

return func() exemplar.Reservoir[N] {
return exemplar.FixedSize[N](n)
}
}

// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar
const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER"

switch os.Getenv(filterEnvKey) {
case "always_on":
return resF()
case "always_off":
return exemplar.Drop[N]
case "trace_based":
fallthrough
default:
newR := resF()
return func() exemplar.Reservoir[N] {
return exemplar.SampledFilter(newR())
}
}
}
2 changes: 1 addition & 1 deletion sdk/metric/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ require (
go.opentelemetry.io/otel v1.23.0-rc.1
go.opentelemetry.io/otel/metric v1.23.0-rc.1
go.opentelemetry.io/otel/sdk v1.23.0-rc.1
go.opentelemetry.io/otel/trace v1.23.0-rc.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/otel/trace v1.23.0-rc.1 // indirect
golang.org/x/sys v0.16.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
37 changes: 28 additions & 9 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand All @@ -44,6 +45,12 @@ type Builder[N int64 | float64] struct {
// Filter is the attribute filter the aggregate function will use on the
// input of measurements.
Filter attribute.Filter
// ReservoirFunc is the factory function used by aggregate functions to
// create new exemplar reservoirs for a new seen attribute set.
//
// If this is not provided a default factory function that returns an
// exemplar.Drop reservoir will be used.
ReservoirFunc func() exemplar.Reservoir[N]
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
Expand All @@ -54,15 +61,27 @@ type Builder[N int64 | float64] struct {
AggregationLimit int
}

func (b Builder[N]) filter(f Measure[N]) Measure[N] {
func (b Builder[N]) resFunc() func() exemplar.Reservoir[N] {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}

return exemplar.Drop[N]
}

type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)

func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] {
if b.Filter != nil {
fltr := b.Filter // Copy to make it immutable after assignment.
return func(ctx context.Context, n N, a attribute.Set) {
fAttr, _ := a.Filter(fltr)
f(ctx, n, fAttr)
fAttr, dropped := a.Filter(fltr)
f(ctx, n, fAttr, dropped)
}
}
return f
return func(ctx context.Context, n N, a attribute.Set) {
f(ctx, n, a, nil)
}
}

// LastValue returns a last-value aggregate function input and output.
Expand All @@ -71,7 +90,7 @@ func (b Builder[N]) filter(f Measure[N]) Measure[N] {
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// Delta temporality is the only temporality that makes semantic sense for
// a last-value aggregate.
lv := newLastValue[N](b.AggregationLimit)
lv := newLastValue[N](b.AggregationLimit, b.resFunc())

return b.filter(lv.measure), func(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory
Expand All @@ -87,7 +106,7 @@ func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// PrecomputedSum returns a sum aggregate function input and output. The
// arguments passed to the input are expected to be the precomputed sum values.
func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) {
s := newPrecomputedSum[N](monotonic, b.AggregationLimit)
s := newPrecomputedSum[N](monotonic, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
Expand All @@ -98,7 +117,7 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati

// Sum returns a sum aggregate function input and output.
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
s := newSum[N](monotonic, b.AggregationLimit)
s := newSum[N](monotonic, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
Expand All @@ -110,7 +129,7 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
// ExplicitBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit)
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
Expand All @@ -122,7 +141,7 @@ func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSu
// ExponentialBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExponentialBucketHistogram(maxSize, maxScale int32, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit)
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
Expand Down
16 changes: 11 additions & 5 deletions sdk/metric/internal/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)
Expand Down Expand Up @@ -59,6 +60,10 @@ var (
}
)

func dropExemplars[N int64 | float64]() exemplar.Reservoir[N] {
return exemplar.Drop[N]()
}

func TestBuilderFilter(t *testing.T) {
t.Run("Int64", testBuilderFilter[int64]())
t.Run("Float64", testBuilderFilter[float64]())
Expand All @@ -69,20 +74,21 @@ func testBuilderFilter[N int64 | float64]() func(t *testing.T) {
t.Helper()

value, attr := N(1), alice
run := func(b Builder[N], wantA attribute.Set) func(*testing.T) {
run := func(b Builder[N], wantF attribute.Set, wantD []attribute.KeyValue) func(*testing.T) {
return func(t *testing.T) {
t.Helper()

meas := b.filter(func(_ context.Context, v N, a attribute.Set) {
meas := b.filter(func(_ context.Context, v N, f attribute.Set, d []attribute.KeyValue) {
assert.Equal(t, value, v, "measured incorrect value")
assert.Equal(t, wantA, a, "measured incorrect attributes")
assert.Equal(t, wantF, f, "measured incorrect filtered attributes")
assert.ElementsMatch(t, wantD, d, "measured incorrect dropped attributes")
})
meas(context.Background(), value, attr)
}
}

t.Run("NoFilter", run(Builder[N]{}, attr))
t.Run("Filter", run(Builder[N]{Filter: attrFltr}, fltrAlice))
t.Run("NoFilter", run(Builder[N]{}, attr, nil))
t.Run("Filter", run(Builder[N]{Filter: attrFltr}, fltrAlice, []attribute.KeyValue{adminTrue}))
}
}

Expand Down
Loading
Loading