From 63dfc4c2f152389d71d0ffff43af795bb1fb6bd3 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 17 Jul 2023 11:17:35 -0700 Subject: [PATCH 1/8] Update the instrument agg comp table (#4330) Use a check mark to indicate compatible instead of an "X" which can be misinterpreted as incompatible. --- sdk/metric/pipeline.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index a6a6c8ca44c..05fe29ce4f9 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -425,12 +425,12 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggr // // | Instrument Kind | Drop | LastValue | Sum | Histogram | Exponential Histogram | // |--------------------------|------|-----------|-----|-----------|-----------------------| -// | Counter | X | | X | X | X | -// | UpDownCounter | X | | X | | | -// | Histogram | X | | X | X | X | -// | Observable Counter | X | | X | | | -// | Observable UpDownCounter | X | | X | | | -// | Observable Gauge | X | X | | | |. +// | Counter | ✓ | | ✓ | ✓ | ✓ | +// | UpDownCounter | ✓ | | ✓ | | | +// | Histogram | ✓ | | ✓ | ✓ | ✓ | +// | Observable Counter | ✓ | | ✓ | | | +// | Observable UpDownCounter | ✓ | | ✓ | | | +// | Observable Gauge | ✓ | ✓ | | | |. func isAggregatorCompatible(kind InstrumentKind, agg aggregation.Aggregation) error { switch agg.(type) { case aggregation.Default: From f6a658c6c2f2909e93f064b2962aa9d3114b3e27 Mon Sep 17 00:00:00 2001 From: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> Date: Tue, 18 Jul 2023 03:10:34 -0500 Subject: [PATCH 2/8] Remove out of date example of internal usage. (#4334) --- .../aggregate/aggregator_example_test.go | 111 ------------------ 1 file changed, 111 deletions(-) delete mode 100644 sdk/metric/internal/aggregate/aggregator_example_test.go diff --git a/sdk/metric/internal/aggregate/aggregator_example_test.go b/sdk/metric/internal/aggregate/aggregator_example_test.go deleted file mode 100644 index 7b566b957d9..00000000000 --- a/sdk/metric/internal/aggregate/aggregator_example_test.go +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package aggregate - -import ( - "context" - "fmt" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/embedded" - "go.opentelemetry.io/otel/sdk/metric/aggregation" - "go.opentelemetry.io/otel/sdk/metric/metricdata" -) - -type meter struct { - // When a reader initiates a collection, the meter would collect - // aggregations from each of these functions. - aggregations []metricdata.Aggregation -} - -func (p *meter) Int64Counter(string, ...metric.Int64CounterOption) (metric.Int64Counter, error) { - // This is an example of how a meter would create an aggregator for a new - // counter. At this point the provider would determine the aggregation and - // temporality to used based on the Reader and View configuration. Assume - // here these are determined to be a cumulative sum. - - aggregator := newCumulativeSum[int64](true) - count := inst{aggregateFunc: aggregator.Aggregate} - - p.aggregations = append(p.aggregations, aggregator.Aggregation()) - - fmt.Printf("using %T aggregator for counter\n", aggregator) - - return count, nil -} - -func (p *meter) Int64UpDownCounter(string, ...metric.Int64UpDownCounterOption) (metric.Int64UpDownCounter, error) { - // This is an example of how a meter would create an aggregator for a new - // up-down counter. At this point the provider would determine the - // aggregation and temporality to used based on the Reader and View - // configuration. Assume here these are determined to be a last-value - // aggregation (the temporality does not affect the produced aggregations). - - aggregator := newLastValue[int64]() - upDownCount := inst{aggregateFunc: aggregator.Aggregate} - - p.aggregations = append(p.aggregations, aggregator.Aggregation()) - - fmt.Printf("using %T aggregator for up-down counter\n", aggregator) - - return upDownCount, nil -} - -func (p *meter) Int64Histogram(string, ...metric.Int64HistogramOption) (metric.Int64Histogram, error) { - // This is an example of how a meter would create an aggregator for a new - // histogram. At this point the provider would determine the aggregation - // and temporality to used based on the Reader and View configuration. - // Assume here these are determined to be a delta explicit-bucket - // histogram. - - aggregator := newDeltaHistogram[int64](aggregation.ExplicitBucketHistogram{ - Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 1000}, - NoMinMax: false, - }) - hist := inst{aggregateFunc: aggregator.Aggregate} - - p.aggregations = append(p.aggregations, aggregator.Aggregation()) - - fmt.Printf("using %T aggregator for histogram\n", aggregator) - - return hist, nil -} - -// inst is a generalized int64 synchronous counter, up-down counter, and -// histogram used for demonstration purposes only. -type inst struct { - aggregateFunc func(int64, attribute.Set) - - embedded.Int64Counter - embedded.Int64UpDownCounter - embedded.Int64Histogram -} - -func (inst) Add(context.Context, int64, ...metric.AddOption) {} -func (inst) Record(context.Context, int64, ...metric.RecordOption) {} - -func Example() { - m := meter{} - - _, _ = m.Int64Counter("counter example") - _, _ = m.Int64UpDownCounter("up-down counter example") - _, _ = m.Int64Histogram("histogram example") - - // Output: - // using *aggregate.cumulativeSum[int64] aggregator for counter - // using *aggregate.lastValue[int64] aggregator for up-down counter - // using *aggregate.deltaHistogram[int64] aggregator for histogram -} From 9b0c4d2caf7a5fb88714e4c64fe61d5fbf1c0b8a Mon Sep 17 00:00:00 2001 From: Matthew Wear Date: Tue, 18 Jul 2023 08:37:20 -0700 Subject: [PATCH 3/8] Fix empty host.id (#4317) --- CHANGELOG.md | 1 + sdk/resource/host_id_readfile.go | 2 +- sdk/resource/host_id_readfile_test.go | 53 +++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 1 deletion(-) create mode 100644 sdk/resource/host_id_readfile_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 770c6250764..15731d8c22b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Correctly format log messages from the `go.opentelemetry.io/otel/exporters/zipkin` exporter. (#4143) - Log an error for calls to `NewView` in `go.opentelemetry.io/otel/sdk/metric` that have empty criteria. (#4307) +- Fix `resource.WithHostID()` to not set an empty `host.id`. (#4317) ## [1.16.0/0.39.0] 2023-05-18 diff --git a/sdk/resource/host_id_readfile.go b/sdk/resource/host_id_readfile.go index f92c6dad0f9..721e3ca6e7d 100644 --- a/sdk/resource/host_id_readfile.go +++ b/sdk/resource/host_id_readfile.go @@ -21,7 +21,7 @@ import "os" func readFile(filename string) (string, error) { b, err := os.ReadFile(filename) if err != nil { - return "", nil + return "", err } return string(b), nil diff --git a/sdk/resource/host_id_readfile_test.go b/sdk/resource/host_id_readfile_test.go new file mode 100644 index 00000000000..f071a05cc0c --- /dev/null +++ b/sdk/resource/host_id_readfile_test.go @@ -0,0 +1,53 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux || dragonfly || freebsd || netbsd || openbsd || solaris + +package resource + +import ( + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestReadFileExistent(t *testing.T) { + fileContents := "foo" + + f, err := os.CreateTemp("", "readfile_") + require.NoError(t, err) + + defer os.Remove(f.Name()) + + _, err = f.WriteString(fileContents) + require.NoError(t, err) + require.NoError(t, f.Close()) + + result, err := readFile(f.Name()) + require.NoError(t, err) + require.Equal(t, result, fileContents) +} + +func TestReadFileNonExistent(t *testing.T) { + // create unique filename + f, err := os.CreateTemp("", "readfile_") + require.NoError(t, err) + + // make file non-existent + require.NoError(t, os.Remove(f.Name())) + + _, err = readFile(f.Name()) + require.ErrorIs(t, err, os.ErrNotExist) +} From f194fb0c6c0596cb95325bb66070c2fdec0357fa Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 19 Jul 2023 07:12:00 -0700 Subject: [PATCH 4/8] Allow histogram for all instruments (#4332) * Allow histogram for all instruments Any instrument that can record negative values, do not include a sum in the produced aggregation (like the specification recommends). Resolves #4161 * Add changes to changelog * Fix TestBucketsSum --- CHANGELOG.md | 1 + sdk/metric/internal/aggregate/aggregate.go | 6 +- sdk/metric/internal/aggregate/histogram.go | 28 +++-- .../internal/aggregate/histogram_test.go | 110 ++++++++++++++---- sdk/metric/pipeline.go | 31 +++-- sdk/metric/pipeline_registry_test.go | 6 +- 6 files changed, 135 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 15731d8c22b..64d34115a39 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm The `AttributeKeys` fields allows users to specify an allow-list of attributes allowed to be recorded for a view. This change is made to ensure compatibility with the OpenTelemetry specification. (#4288) - If an attribute set is omitted from an async callback, the previous value will no longer be exported. (#4290) +- Allow the explicit bucket histogram aggregation to be used for the up-down counter, observable counter, observable up-down counter, and observable gauge in the `go.opentelemetry.io/otel/sdk/metric` package. (#4332) ### Fixed diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index f386c337135..2c501f45f13 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -109,13 +109,13 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) { // ExplicitBucketHistogram returns a histogram aggregate function input and // output. -func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistogram) (Measure[N], ComputeAggregation) { +func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistogram, noSum bool) (Measure[N], ComputeAggregation) { var h aggregator[N] switch b.Temporality { case metricdata.DeltaTemporality: - h = newDeltaHistogram[N](cfg) + h = newDeltaHistogram[N](cfg, noSum) default: - h = newCumulativeHistogram[N](cfg) + h = newCumulativeHistogram[N](cfg, noSum) } return b.input(h), func(dest *metricdata.Aggregation) int { // TODO (#4220): optimize memory reuse here. diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index a7a7780c307..0683ff2eb23 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -27,7 +27,7 @@ import ( type buckets[N int64 | float64] struct { counts []uint64 count uint64 - sum N + total N min, max N } @@ -36,10 +36,11 @@ func newBuckets[N int64 | float64](n int) *buckets[N] { return &buckets[N]{counts: make([]uint64, n)} } +func (b *buckets[N]) sum(value N) { b.total += value } + func (b *buckets[N]) bin(idx int, value N) { b.counts[idx]++ b.count++ - b.sum += value if value < b.min { b.min = value } else if value > b.max { @@ -50,13 +51,14 @@ func (b *buckets[N]) bin(idx int, value N) { // histValues summarizes a set of measurements as an histValues with // explicitly defined buckets. type histValues[N int64 | float64] struct { + noSum bool bounds []float64 values map[attribute.Set]*buckets[N] valuesMu sync.Mutex } -func newHistValues[N int64 | float64](bounds []float64) *histValues[N] { +func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[N] { // The responsibility of keeping all buckets correctly associated with the // passed boundaries is ultimately this type's responsibility. Make a copy // here so we can always guarantee this. Or, in the case of failure, have @@ -65,6 +67,7 @@ func newHistValues[N int64 | float64](bounds []float64) *histValues[N] { copy(b, bounds) sort.Float64s(b) return &histValues[N]{ + noSum: noSum, bounds: b, values: make(map[attribute.Set]*buckets[N]), } @@ -98,6 +101,9 @@ func (s *histValues[N]) Aggregate(value N, attr attribute.Set) { s.values[attr] = b } b.bin(idx, value) + if !s.noSum { + b.sum(value) + } } // newDeltaHistogram returns an Aggregator that summarizes a set of @@ -107,9 +113,9 @@ func (s *histValues[N]) Aggregate(value N, attr attribute.Set) { // Each aggregation cycle is treated independently. When the returned // Aggregator's Aggregations method is called it will reset all histogram // counts to zero. -func newDeltaHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) aggregator[N] { +func newDeltaHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram, noSum bool) aggregator[N] { return &deltaHistogram[N]{ - histValues: newHistValues[N](cfg.Boundaries), + histValues: newHistValues[N](cfg.Boundaries, noSum), noMinMax: cfg.NoMinMax, start: now(), } @@ -148,7 +154,9 @@ func (s *deltaHistogram[N]) Aggregation() metricdata.Aggregation { Count: b.count, Bounds: bounds, BucketCounts: b.counts, - Sum: b.sum, + } + if !s.noSum { + hdp.Sum = b.total } if !s.noMinMax { hdp.Min = metricdata.NewExtrema(b.min) @@ -170,9 +178,9 @@ func (s *deltaHistogram[N]) Aggregation() metricdata.Aggregation { // Each aggregation cycle builds from the previous, the histogram counts are // the bucketed counts of all values aggregated since the returned Aggregator // was created. -func newCumulativeHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) aggregator[N] { +func newCumulativeHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram, noSum bool) aggregator[N] { return &cumulativeHistogram[N]{ - histValues: newHistValues[N](cfg.Boundaries), + histValues: newHistValues[N](cfg.Boundaries, noSum), noMinMax: cfg.NoMinMax, start: now(), } @@ -219,7 +227,9 @@ func (s *cumulativeHistogram[N]) Aggregation() metricdata.Aggregation { Count: b.count, Bounds: bounds, BucketCounts: counts, - Sum: b.sum, + } + if !s.noSum { + hdp.Sum = b.total } if !s.noMinMax { hdp.Min = metricdata.NewExtrema(b.min) diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index 3656be9e988..8c75562198d 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -49,10 +49,25 @@ func testHistogram[N int64 | float64](t *testing.T) { } incr := monoIncr[N]() - eFunc := deltaHistExpecter[N](incr) - t.Run("Delta", tester.Run(newDeltaHistogram[N](histConf), incr, eFunc)) + eFunc := deltaHistSummedExpecter[N](incr) + t.Run("Delta/Summed", tester.Run(newDeltaHistogram[N](histConf, false), incr, eFunc)) + eFunc = deltaHistExpecter[N](incr) + t.Run("Delta/NoSum", tester.Run(newDeltaHistogram[N](histConf, true), incr, eFunc)) + eFunc = cumuHistSummedExpecter[N](incr) + t.Run("Cumulative/Summed", tester.Run(newCumulativeHistogram[N](histConf, false), incr, eFunc)) eFunc = cumuHistExpecter[N](incr) - t.Run("Cumulative", tester.Run(newCumulativeHistogram[N](histConf), incr, eFunc)) + t.Run("Cumulative/NoSum", tester.Run(newCumulativeHistogram[N](histConf, true), incr, eFunc)) +} + +func deltaHistSummedExpecter[N int64 | float64](incr setMap[N]) expectFunc { + h := metricdata.Histogram[N]{Temporality: metricdata.DeltaTemporality} + return func(m int) metricdata.Aggregation { + h.DataPoints = make([]metricdata.HistogramDataPoint[N], 0, len(incr)) + for a, v := range incr { + h.DataPoints = append(h.DataPoints, hPointSummed[N](a, v, uint64(m))) + } + return h + } } func deltaHistExpecter[N int64 | float64](incr setMap[N]) expectFunc { @@ -66,6 +81,19 @@ func deltaHistExpecter[N int64 | float64](incr setMap[N]) expectFunc { } } +func cumuHistSummedExpecter[N int64 | float64](incr setMap[N]) expectFunc { + var cycle int + h := metricdata.Histogram[N]{Temporality: metricdata.CumulativeTemporality} + return func(m int) metricdata.Aggregation { + cycle++ + h.DataPoints = make([]metricdata.HistogramDataPoint[N], 0, len(incr)) + for a, v := range incr { + h.DataPoints = append(h.DataPoints, hPointSummed[N](a, v, uint64(cycle*m))) + } + return h + } +} + func cumuHistExpecter[N int64 | float64](incr setMap[N]) expectFunc { var cycle int h := metricdata.Histogram[N]{Temporality: metricdata.CumulativeTemporality} @@ -79,6 +107,25 @@ func cumuHistExpecter[N int64 | float64](incr setMap[N]) expectFunc { } } +// hPointSummed returns an HistogramDataPoint that started and ended now with +// multi number of measurements values v. It includes a min and max (set to v). +func hPointSummed[N int64 | float64](a attribute.Set, v N, multi uint64) metricdata.HistogramDataPoint[N] { + idx := sort.SearchFloat64s(bounds, float64(v)) + counts := make([]uint64, len(bounds)+1) + counts[idx] += multi + return metricdata.HistogramDataPoint[N]{ + Attributes: a, + StartTime: now(), + Time: now(), + Count: multi, + Bounds: bounds, + BucketCounts: counts, + Min: metricdata.NewExtrema(v), + Max: metricdata.NewExtrema(v), + Sum: v * N(multi), + } +} + // hPoint returns an HistogramDataPoint that started and ended now with multi // number of measurements values v. It includes a min and max (set to v). func hPoint[N int64 | float64](a attribute.Set, v N, multi uint64) metricdata.HistogramDataPoint[N] { @@ -94,7 +141,6 @@ func hPoint[N int64 | float64](a attribute.Set, v N, multi uint64) metricdata.Hi BucketCounts: counts, Min: metricdata.NewExtrema(v), Max: metricdata.NewExtrema(v), - Sum: v * N(multi), } } @@ -106,28 +152,50 @@ func TestBucketsBin(t *testing.T) { func testBucketsBin[N int64 | float64]() func(t *testing.T) { return func(t *testing.T) { b := newBuckets[N](3) - assertB := func(counts []uint64, count uint64, sum, min, max N) { + assertB := func(counts []uint64, count uint64, min, max N) { + t.Helper() assert.Equal(t, counts, b.counts) assert.Equal(t, count, b.count) - assert.Equal(t, sum, b.sum) assert.Equal(t, min, b.min) assert.Equal(t, max, b.max) } - assertB([]uint64{0, 0, 0}, 0, 0, 0, 0) + assertB([]uint64{0, 0, 0}, 0, 0, 0) b.bin(1, 2) - assertB([]uint64{0, 1, 0}, 1, 2, 0, 2) + assertB([]uint64{0, 1, 0}, 1, 0, 2) b.bin(0, -1) - assertB([]uint64{1, 1, 0}, 2, 1, -1, 2) + assertB([]uint64{1, 1, 0}, 2, -1, 2) + } +} + +func TestBucketsSum(t *testing.T) { + t.Run("Int64", testBucketsSum[int64]()) + t.Run("Float64", testBucketsSum[float64]()) +} + +func testBucketsSum[N int64 | float64]() func(t *testing.T) { + return func(t *testing.T) { + b := newBuckets[N](3) + + var want N + assert.Equal(t, want, b.total) + + b.sum(2) + want = 2 + assert.Equal(t, want, b.total) + + b.sum(-1) + want = 1 + assert.Equal(t, want, b.total) } } -func testHistImmutableBounds[N int64 | float64](newA func(aggregation.ExplicitBucketHistogram) aggregator[N], getBounds func(aggregator[N]) []float64) func(t *testing.T) { +func testHistImmutableBounds[N int64 | float64](newA func(aggregation.ExplicitBucketHistogram, bool) aggregator[N], getBounds func(aggregator[N]) []float64) func(t *testing.T) { b := []float64{0, 1, 2} cpB := make([]float64, len(b)) copy(cpB, b) - a := newA(aggregation.ExplicitBucketHistogram{Boundaries: b}) + a := newA(aggregation.ExplicitBucketHistogram{Boundaries: b}, false) return func(t *testing.T) { require.Equal(t, cpB, getBounds(a)) @@ -160,7 +228,7 @@ func TestHistogramImmutableBounds(t *testing.T) { } func TestCumulativeHistogramImutableCounts(t *testing.T) { - a := newCumulativeHistogram[int64](histConf) + a := newCumulativeHistogram[int64](histConf, false) a.Aggregate(5, alice) hdp := a.Aggregation().(metricdata.Histogram[int64]).DataPoints[0] @@ -176,12 +244,12 @@ func TestCumulativeHistogramImutableCounts(t *testing.T) { func TestDeltaHistogramReset(t *testing.T) { t.Cleanup(mockTime(now)) - a := newDeltaHistogram[int64](histConf) + a := newDeltaHistogram[int64](histConf, false) assert.Nil(t, a.Aggregation()) a.Aggregate(1, alice) expect := metricdata.Histogram[int64]{Temporality: metricdata.DeltaTemporality} - expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPoint[int64](alice, 1, 1)} + expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](alice, 1, 1)} metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) // The attr set should be forgotten once Aggregations is called. @@ -190,15 +258,15 @@ func TestDeltaHistogramReset(t *testing.T) { // Aggregating another set should not affect the original (alice). a.Aggregate(1, bob) - expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPoint[int64](bob, 1, 1)} + expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](bob, 1, 1)} metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) } func TestEmptyHistogramNilAggregation(t *testing.T) { - assert.Nil(t, newCumulativeHistogram[int64](histConf).Aggregation()) - assert.Nil(t, newCumulativeHistogram[float64](histConf).Aggregation()) - assert.Nil(t, newDeltaHistogram[int64](histConf).Aggregation()) - assert.Nil(t, newDeltaHistogram[float64](histConf).Aggregation()) + assert.Nil(t, newCumulativeHistogram[int64](histConf, false).Aggregation()) + assert.Nil(t, newCumulativeHistogram[float64](histConf, false).Aggregation()) + assert.Nil(t, newDeltaHistogram[int64](histConf, false).Aggregation()) + assert.Nil(t, newDeltaHistogram[float64](histConf, false).Aggregation()) } func BenchmarkHistogram(b *testing.B) { @@ -207,8 +275,8 @@ func BenchmarkHistogram(b *testing.B) { } func benchmarkHistogram[N int64 | float64](b *testing.B) { - factory := func() aggregator[N] { return newDeltaHistogram[N](histConf) } + factory := func() aggregator[N] { return newDeltaHistogram[N](histConf, false) } b.Run("Delta", benchmarkAggregator(factory)) - factory = func() aggregator[N] { return newCumulativeHistogram[N](histConf) } + factory = func() aggregator[N] { return newCumulativeHistogram[N](histConf, false) } b.Run("Cumulative", benchmarkAggregator(factory)) } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 05fe29ce4f9..5989e0c9575 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -412,7 +412,15 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggr meas, comp = b.Sum(false) } case aggregation.ExplicitBucketHistogram: - meas, comp = b.ExplicitBucketHistogram(a) + var noSum bool + switch kind { + case InstrumentKindUpDownCounter, InstrumentKindObservableUpDownCounter, InstrumentKindObservableGauge: + // The sum should not be collected for any instrument that can make + // negative measurements: + // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/sdk.md#histogram-aggregations + noSum = true + } + meas, comp = b.ExplicitBucketHistogram(a, noSum) default: err = errUnknownAggregation } @@ -426,22 +434,27 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggr // | Instrument Kind | Drop | LastValue | Sum | Histogram | Exponential Histogram | // |--------------------------|------|-----------|-----|-----------|-----------------------| // | Counter | ✓ | | ✓ | ✓ | ✓ | -// | UpDownCounter | ✓ | | ✓ | | | +// | UpDownCounter | ✓ | | ✓ | ✓ | | // | Histogram | ✓ | | ✓ | ✓ | ✓ | -// | Observable Counter | ✓ | | ✓ | | | -// | Observable UpDownCounter | ✓ | | ✓ | | | -// | Observable Gauge | ✓ | ✓ | | | |. +// | Observable Counter | ✓ | | ✓ | ✓ | | +// | Observable UpDownCounter | ✓ | | ✓ | ✓ | | +// | Observable Gauge | ✓ | ✓ | | ✓ | |. func isAggregatorCompatible(kind InstrumentKind, agg aggregation.Aggregation) error { switch agg.(type) { case aggregation.Default: return nil case aggregation.ExplicitBucketHistogram: - if kind == InstrumentKindCounter || kind == InstrumentKindHistogram { + switch kind { + case InstrumentKindCounter, + InstrumentKindUpDownCounter, + InstrumentKindHistogram, + InstrumentKindObservableCounter, + InstrumentKindObservableUpDownCounter, + InstrumentKindObservableGauge: return nil + default: + return errIncompatibleAggregation } - // TODO: review need for aggregation check after - // https://github.com/open-telemetry/opentelemetry-specification/issues/2710 - return errIncompatibleAggregation case aggregation.Sum: switch kind { case InstrumentKindObservableCounter, InstrumentKindObservableUpDownCounter, InstrumentKindCounter, InstrumentKindHistogram, InstrumentKindUpDownCounter: diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 3e1c86dc654..c10bbc36803 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -498,7 +498,7 @@ func TestPipelineRegistryResource(t *testing.T) { } func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { - testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} })) + testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Sum{} })) readers := []Reader{testRdrHistogram} views := []View{defaultView} @@ -643,7 +643,6 @@ func TestIsAggregatorCompatible(t *testing.T) { name: "SyncUpDownCounter and ExplicitBucketHistogram", kind: InstrumentKindUpDownCounter, agg: aggregation.ExplicitBucketHistogram{}, - want: errIncompatibleAggregation, }, { name: "SyncHistogram and Drop", @@ -686,7 +685,6 @@ func TestIsAggregatorCompatible(t *testing.T) { name: "ObservableCounter and ExplicitBucketHistogram", kind: InstrumentKindObservableCounter, agg: aggregation.ExplicitBucketHistogram{}, - want: errIncompatibleAggregation, }, { name: "ObservableUpDownCounter and Drop", @@ -708,7 +706,6 @@ func TestIsAggregatorCompatible(t *testing.T) { name: "ObservableUpDownCounter and ExplicitBucketHistogram", kind: InstrumentKindObservableUpDownCounter, agg: aggregation.ExplicitBucketHistogram{}, - want: errIncompatibleAggregation, }, { name: "ObservableGauge and Drop", @@ -730,7 +727,6 @@ func TestIsAggregatorCompatible(t *testing.T) { name: "ObservableGauge and ExplicitBucketHistogram", kind: InstrumentKindObservableGauge, agg: aggregation.ExplicitBucketHistogram{}, - want: errIncompatibleAggregation, }, { name: "unknown kind with Sum should error", From f2a9f2f2beaefa7c5e0b22026a2b0ea77e4da7af Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 19 Jul 2023 07:31:11 -0700 Subject: [PATCH 5/8] Restrict Meters to only register and collect instruments it created (#4333) * Add acceptance test * Update Meter Register and collect only inst from itself * Add change to changelog * Fix spelling error * Update changelog entry wording * Simplify the partial success code path --- CHANGELOG.md | 1 + sdk/metric/instrument.go | 20 ++++--- sdk/metric/meter.go | 106 ++++++++++++++++-------------------- sdk/metric/provider_test.go | 51 +++++++++++++++++ 4 files changed, 109 insertions(+), 69 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 64d34115a39..75d463e4eb4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm This change is made to ensure compatibility with the OpenTelemetry specification. (#4288) - If an attribute set is omitted from an async callback, the previous value will no longer be exported. (#4290) - Allow the explicit bucket histogram aggregation to be used for the up-down counter, observable counter, observable up-down counter, and observable gauge in the `go.opentelemetry.io/otel/sdk/metric` package. (#4332) +- Restrict `Meter`s in `go.opentelemetry.io/otel/sdk/metric` to only register and collect instruments it created. (#4333) ### Fixed diff --git a/sdk/metric/instrument.go b/sdk/metric/instrument.go index b4d6fa8b35c..83652c6e97f 100644 --- a/sdk/metric/instrument.go +++ b/sdk/metric/instrument.go @@ -277,9 +277,9 @@ var _ metric.Float64ObservableCounter = float64Observable{} var _ metric.Float64ObservableUpDownCounter = float64Observable{} var _ metric.Float64ObservableGauge = float64Observable{} -func newFloat64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable { +func newFloat64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable { return float64Observable{ - observable: newObservable(scope, kind, name, desc, u, meas), + observable: newObservable(m, kind, name, desc, u, meas), } } @@ -296,9 +296,9 @@ var _ metric.Int64ObservableCounter = int64Observable{} var _ metric.Int64ObservableUpDownCounter = int64Observable{} var _ metric.Int64ObservableGauge = int64Observable{} -func newInt64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable { +func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable { return int64Observable{ - observable: newObservable(scope, kind, name, desc, u, meas), + observable: newObservable(m, kind, name, desc, u, meas), } } @@ -306,18 +306,20 @@ type observable[N int64 | float64] struct { metric.Observable observablID[N] + meter *meter measures []aggregate.Measure[N] } -func newObservable[N int64 | float64](scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] { +func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] { return &observable[N]{ observablID: observablID[N]{ name: name, description: desc, kind: kind, unit: u, - scope: scope, + scope: m.scope, }, + meter: m, measures: meas, } } @@ -335,16 +337,16 @@ var errEmptyAgg = errors.New("no aggregators for observable instrument") // and nil if it should. An errEmptyAgg error is returned if o is effectively a // no-op because it does not have any aggregators. Also, an error is returned // if scope defines a Meter other than the one o was created by. -func (o *observable[N]) registerable(scope instrumentation.Scope) error { +func (o *observable[N]) registerable(m *meter) error { if len(o.measures) == 0 { return errEmptyAgg } - if scope != o.scope { + if m != o.meter { return fmt.Errorf( "invalid registration: observable %q from Meter %q, registered with Meter %q", o.name, o.scope.Name, - scope.Name, + m.scope.Name, ) } return nil diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 7e1d32be249..f76d5190413 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -42,8 +42,8 @@ type meter struct { scope instrumentation.Scope pipes pipelines - int64IP *int64InstProvider - float64IP *float64InstProvider + int64Resolver resolver[int64] + float64Resolver resolver[float64] } func newMeter(s instrumentation.Scope, p pipelines) *meter { @@ -52,10 +52,10 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter { var viewCache cache[string, streamID] return &meter{ - scope: s, - pipes: p, - int64IP: newInt64InstProvider(s, p, &viewCache), - float64IP: newFloat64InstProvider(s, p, &viewCache), + scope: s, + pipes: p, + int64Resolver: newResolver[int64](p, &viewCache), + float64Resolver: newResolver[float64](p, &viewCache), } } @@ -68,7 +68,8 @@ var _ metric.Meter = (*meter)(nil) func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) (metric.Int64Counter, error) { cfg := metric.NewInt64CounterConfig(options...) const kind = InstrumentKindCounter - i, err := m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) + p := int64InstProvider{m} + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return i, err } @@ -82,7 +83,8 @@ func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCounterOption) (metric.Int64UpDownCounter, error) { cfg := metric.NewInt64UpDownCounterConfig(options...) const kind = InstrumentKindUpDownCounter - i, err := m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) + p := int64InstProvider{m} + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return i, err } @@ -96,7 +98,8 @@ func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCou func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOption) (metric.Int64Histogram, error) { cfg := metric.NewInt64HistogramConfig(options...) const kind = InstrumentKindHistogram - i, err := m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) + p := int64InstProvider{m} + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return i, err } @@ -111,7 +114,7 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) { cfg := metric.NewInt64ObservableCounterConfig(options...) const kind = InstrumentKindObservableCounter - p := int64ObservProvider{m.int64IP} + p := int64ObservProvider{m} inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return nil, err @@ -127,7 +130,7 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) { cfg := metric.NewInt64ObservableUpDownCounterConfig(options...) const kind = InstrumentKindObservableUpDownCounter - p := int64ObservProvider{m.int64IP} + p := int64ObservProvider{m} inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return nil, err @@ -143,7 +146,7 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6 func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) { cfg := metric.NewInt64ObservableGaugeConfig(options...) const kind = InstrumentKindObservableGauge - p := int64ObservProvider{m.int64IP} + p := int64ObservProvider{m} inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return nil, err @@ -158,7 +161,8 @@ func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64Observa func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOption) (metric.Float64Counter, error) { cfg := metric.NewFloat64CounterConfig(options...) const kind = InstrumentKindCounter - i, err := m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) + p := float64InstProvider{m} + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return i, err } @@ -172,7 +176,8 @@ func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOpti func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDownCounterOption) (metric.Float64UpDownCounter, error) { cfg := metric.NewFloat64UpDownCounterConfig(options...) const kind = InstrumentKindUpDownCounter - i, err := m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) + p := float64InstProvider{m} + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return i, err } @@ -186,7 +191,8 @@ func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDow func (m *meter) Float64Histogram(name string, options ...metric.Float64HistogramOption) (metric.Float64Histogram, error) { cfg := metric.NewFloat64HistogramConfig(options...) const kind = InstrumentKindHistogram - i, err := m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) + p := float64InstProvider{m} + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return i, err } @@ -201,7 +207,7 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) { cfg := metric.NewFloat64ObservableCounterConfig(options...) const kind = InstrumentKindObservableCounter - p := float64ObservProvider{m.float64IP} + p := float64ObservProvider{m} inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return nil, err @@ -217,7 +223,7 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) { cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...) const kind = InstrumentKindObservableUpDownCounter - p := float64ObservProvider{m.float64IP} + p := float64ObservProvider{m} inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return nil, err @@ -233,7 +239,7 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) { cfg := metric.NewFloat64ObservableGaugeConfig(options...) const kind = InstrumentKindObservableGauge - p := float64ObservProvider{m.float64IP} + p := float64ObservProvider{m} inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return nil, err @@ -301,7 +307,7 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) switch o := inst.(type) { case int64Observable: - if err := o.registerable(m.scope); err != nil { + if err := o.registerable(m); err != nil { if !errors.Is(err, errEmptyAgg) { errs.append(err) } @@ -309,7 +315,7 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) } reg.registerInt64(o.observablID) case float64Observable: - if err := o.registerable(m.scope); err != nil { + if err := o.registerable(m); err != nil { if !errors.Is(err, errEmptyAgg) { errs.append(err) } @@ -322,19 +328,15 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) } } - if err := errs.errorOrNil(); err != nil { - return nil, err - } - + err := errs.errorOrNil() if reg.len() == 0 { - // All insts use drop aggregation. - return noopRegister{}, nil + // All insts use drop aggregation or are invalid. + return noopRegister{}, err } - cback := func(ctx context.Context) error { - return f(ctx, reg) - } - return m.pipes.registerMultiCallback(cback), nil + // Some or all instruments were valid. + cback := func(ctx context.Context) error { return f(ctx, reg) } + return m.pipes.registerMultiCallback(cback), err } type observer struct { @@ -441,17 +443,9 @@ func (noopRegister) Unregister() error { } // int64InstProvider provides int64 OpenTelemetry instruments. -type int64InstProvider struct { - scope instrumentation.Scope - pipes pipelines - resolve resolver[int64] -} +type int64InstProvider struct{ *meter } -func newInt64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *int64InstProvider { - return &int64InstProvider{scope: s, pipes: p, resolve: newResolver[int64](p, c)} -} - -func (p *int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[int64], error) { +func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[int64], error) { inst := Instrument{ Name: name, Description: desc, @@ -459,27 +453,19 @@ func (p *int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]a Kind: kind, Scope: p.scope, } - return p.resolve.Aggregators(inst) + return p.int64Resolver.Aggregators(inst) } // lookup returns the resolved instrumentImpl. -func (p *int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) { +func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) { aggs, err := p.aggs(kind, name, desc, u) return &int64Inst{measures: aggs}, err } // float64InstProvider provides float64 OpenTelemetry instruments. -type float64InstProvider struct { - scope instrumentation.Scope - pipes pipelines - resolve resolver[float64] -} - -func newFloat64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *float64InstProvider { - return &float64InstProvider{scope: s, pipes: p, resolve: newResolver[float64](p, c)} -} +type float64InstProvider struct{ *meter } -func (p *float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[float64], error) { +func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[float64], error) { inst := Instrument{ Name: name, Description: desc, @@ -487,20 +473,20 @@ func (p *float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([ Kind: kind, Scope: p.scope, } - return p.resolve.Aggregators(inst) + return p.float64Resolver.Aggregators(inst) } // lookup returns the resolved instrumentImpl. -func (p *float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) { +func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) { aggs, err := p.aggs(kind, name, desc, u) return &float64Inst{measures: aggs}, err } -type int64ObservProvider struct{ *int64InstProvider } +type int64ObservProvider struct{ *meter } func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) { - aggs, err := p.aggs(kind, name, desc, u) - return newInt64Observable(p.scope, kind, name, desc, u, aggs), err + aggs, err := (int64InstProvider)(p).aggs(kind, name, desc, u) + return newInt64Observable(p.meter, kind, name, desc, u, aggs), err } func (p int64ObservProvider) registerCallbacks(inst int64Observable, cBacks []metric.Int64Callback) { @@ -529,11 +515,11 @@ func (o int64Observer) Observe(val int64, opts ...metric.ObserveOption) { o.observe(val, c.Attributes()) } -type float64ObservProvider struct{ *float64InstProvider } +type float64ObservProvider struct{ *meter } func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (float64Observable, error) { - aggs, err := p.aggs(kind, name, desc, u) - return newFloat64Observable(p.scope, kind, name, desc, u, aggs), err + aggs, err := (float64InstProvider)(p).aggs(kind, name, desc, u) + return newFloat64Observable(p.meter, kind, name, desc, u, aggs), err } func (p float64ObservProvider) registerCallbacks(inst float64Observable, cBacks []metric.Float64Callback) { diff --git a/sdk/metric/provider_test.go b/sdk/metric/provider_test.go index 5f02b130570..774e026ad87 100644 --- a/sdk/metric/provider_test.go +++ b/sdk/metric/provider_test.go @@ -21,11 +21,14 @@ import ( "testing" "github.com/go-logr/logr/funcr" + "github.com/go-logr/logr/testr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" + api "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/sdk/metric/metricdata" ) func TestMeterConcurrentSafe(t *testing.T) { @@ -120,3 +123,51 @@ func TestMeterProviderReturnsNoopMeterAfterShutdown(t *testing.T) { _, ok = m.(noop.Meter) assert.Truef(t, ok, "Meter from shutdown MeterProvider is not NoOp: %T", m) } + +func TestMeterProviderMixingOnRegisterErrors(t *testing.T) { + otel.SetLogger(testr.New(t)) + + rdr0 := NewManualReader() + mp0 := NewMeterProvider(WithReader(rdr0)) + + rdr1 := NewManualReader() + mp1 := NewMeterProvider(WithReader(rdr1)) + + // Meters with the same scope but different MeterProviders. + m0 := mp0.Meter("TestMeterProviderMixingOnRegisterErrors") + m1 := mp1.Meter("TestMeterProviderMixingOnRegisterErrors") + + m0Gauge, err := m0.Float64ObservableGauge("float64Gauge") + require.NoError(t, err) + + m1Gauge, err := m1.Int64ObservableGauge("int64Gauge") + require.NoError(t, err) + + _, err = m0.RegisterCallback( + func(_ context.Context, o api.Observer) error { + o.ObserveFloat64(m0Gauge, 2) + // Observe an instrument from a different MeterProvider. + o.ObserveInt64(m1Gauge, 1) + + return nil + }, + m0Gauge, m1Gauge, + ) + assert.Error( + t, + err, + "Instrument registered with Meter from different MeterProvider", + ) + + var data metricdata.ResourceMetrics + _ = rdr0.Collect(context.Background(), &data) + // Only the metrics from mp0 should be produced. + assert.Len(t, data.ScopeMetrics, 1) + + err = rdr1.Collect(context.Background(), &data) + assert.NoError(t, err, "Errored when collect should be a noop") + assert.Len( + t, data.ScopeMetrics, 0, + "Metrics produced for instrument collected by different MeterProvider", + ) +} From c197fe93057875323a2e4f07af6d5707de52f1fd Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Wed, 19 Jul 2023 11:52:11 -0400 Subject: [PATCH 6/8] Metric SDK: Sum duplicate async observations regardless of filtering (#4289) * Metric SDK: Remove the distinction between filtered and unfiltered attributes. --- CHANGELOG.md | 1 + sdk/metric/internal/aggregate/aggregator.go | 19 --- sdk/metric/internal/aggregate/filter.go | 43 ------- sdk/metric/internal/aggregate/filter_test.go | 89 -------------- sdk/metric/internal/aggregate/sum.go | 82 ++----------- sdk/metric/internal/aggregate/sum_test.go | 116 +++++-------------- sdk/metric/meter_test.go | 3 - 7 files changed, 45 insertions(+), 308 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 75d463e4eb4..fd2e673fb42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm The `AttributeKeys` fields allows users to specify an allow-list of attributes allowed to be recorded for a view. This change is made to ensure compatibility with the OpenTelemetry specification. (#4288) - If an attribute set is omitted from an async callback, the previous value will no longer be exported. (#4290) +- If an attribute set is Observed multiple times in an async callback, the values will be summed instead of the last observation winning. (#4289) - Allow the explicit bucket histogram aggregation to be used for the up-down counter, observable counter, observable up-down counter, and observable gauge in the `go.opentelemetry.io/otel/sdk/metric` package. (#4332) - Restrict `Meter`s in `go.opentelemetry.io/otel/sdk/metric` to only register and collect instruments it created. (#4333) diff --git a/sdk/metric/internal/aggregate/aggregator.go b/sdk/metric/internal/aggregate/aggregator.go index 03d814d9b4a..fac0dfd901a 100644 --- a/sdk/metric/internal/aggregate/aggregator.go +++ b/sdk/metric/internal/aggregate/aggregator.go @@ -38,22 +38,3 @@ type aggregator[N int64 | float64] interface { // measurements made and ends an aggregation cycle. Aggregation() metricdata.Aggregation } - -// precomputeAggregator is an Aggregator that receives values to aggregate that -// have been pre-computed by the caller. -type precomputeAggregator[N int64 | float64] interface { - // The Aggregate method of the embedded Aggregator is used to record - // pre-computed measurements, scoped by attributes that have not been - // filtered by an attribute filter. - aggregator[N] - - // aggregateFiltered records measurements scoped by attributes that have - // been filtered by an attribute filter. - // - // Pre-computed measurements of filtered attributes need to be recorded - // separate from those that haven't been filtered so they can be added to - // the non-filtered pre-computed measurements in a collection cycle and - // then resets after the cycle (the non-filtered pre-computed measurements - // are not reset). - aggregateFiltered(N, attribute.Set) -} diff --git a/sdk/metric/internal/aggregate/filter.go b/sdk/metric/internal/aggregate/filter.go index 782c28a85df..ea471149e75 100644 --- a/sdk/metric/internal/aggregate/filter.go +++ b/sdk/metric/internal/aggregate/filter.go @@ -27,9 +27,6 @@ func newFilter[N int64 | float64](agg aggregator[N], fn attribute.Filter) aggreg if fn == nil { return agg } - if fa, ok := agg.(precomputeAggregator[N]); ok { - return newPrecomputedFilter(fa, fn) - } return &filter[N]{ filter: fn, aggregator: agg, @@ -59,43 +56,3 @@ func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) { func (f *filter[N]) Aggregation() metricdata.Aggregation { return f.aggregator.Aggregation() } - -// precomputedFilter is an aggregator that applies attribute filter when -// Aggregating for pre-computed Aggregations. The pre-computed Aggregations -// need to operate normally when no attribute filtering is done (for sums this -// means setting the value), but when attribute filtering is done it needs to -// be added to any set value. -type precomputedFilter[N int64 | float64] struct { - filter attribute.Filter - aggregator precomputeAggregator[N] -} - -// newPrecomputedFilter returns a precomputedFilter Aggregator that wraps agg -// with the attribute filter fn. -// -// This should not be used to wrap a non-pre-computed Aggregator. Use a -// precomputedFilter instead. -func newPrecomputedFilter[N int64 | float64](agg precomputeAggregator[N], fn attribute.Filter) *precomputedFilter[N] { - return &precomputedFilter[N]{ - filter: fn, - aggregator: agg, - } -} - -// Aggregate records the measurement, scoped by attr, and aggregates it -// into an aggregation. -func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) { - fAttr, _ := attr.Filter(f.filter) - if fAttr.Equals(&attr) { - // No filtering done. - f.aggregator.Aggregate(measurement, fAttr) - } else { - f.aggregator.aggregateFiltered(measurement, fAttr) - } -} - -// Aggregation returns an Aggregation, for all the aggregated -// measurements made and ends an aggregation cycle. -func (f *precomputedFilter[N]) Aggregation() metricdata.Aggregation { - return f.aggregator.Aggregation() -} diff --git a/sdk/metric/internal/aggregate/filter_test.go b/sdk/metric/internal/aggregate/filter_test.go index e348e7582ea..b6544e3706b 100644 --- a/sdk/metric/internal/aggregate/filter_test.go +++ b/sdk/metric/internal/aggregate/filter_test.go @@ -15,8 +15,6 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" import ( - "fmt" - "strings" "sync" "testing" @@ -196,90 +194,3 @@ func TestFilterConcurrent(t *testing.T) { testFilterConcurrent[float64](t) }) } - -func TestPrecomputedFilter(t *testing.T) { - t.Run("Int64", testPrecomputedFilter[int64]()) - t.Run("Float64", testPrecomputedFilter[float64]()) -} - -func testPrecomputedFilter[N int64 | float64]() func(t *testing.T) { - return func(t *testing.T) { - agg := newTestFilterAgg[N]() - f := newFilter[N](agg, testAttributeFilter) - require.IsType(t, &precomputedFilter[N]{}, f) - - var ( - powerLevel = attribute.Int("power-level", 9000) - user = attribute.String("user", "Alice") - admin = attribute.Bool("admin", true) - ) - a := attribute.NewSet(powerLevel) - key := a - f.Aggregate(1, a) - assert.Equal(t, N(1), agg.values[key].measured, str(a)) - assert.Equal(t, N(0), agg.values[key].filtered, str(a)) - - a = attribute.NewSet(powerLevel, user) - f.Aggregate(2, a) - assert.Equal(t, N(1), agg.values[key].measured, str(a)) - assert.Equal(t, N(2), agg.values[key].filtered, str(a)) - - a = attribute.NewSet(powerLevel, user, admin) - f.Aggregate(3, a) - assert.Equal(t, N(1), agg.values[key].measured, str(a)) - assert.Equal(t, N(5), agg.values[key].filtered, str(a)) - - a = attribute.NewSet(powerLevel) - f.Aggregate(2, a) - assert.Equal(t, N(2), agg.values[key].measured, str(a)) - assert.Equal(t, N(5), agg.values[key].filtered, str(a)) - - a = attribute.NewSet(user) - f.Aggregate(3, a) - assert.Equal(t, N(2), agg.values[key].measured, str(a)) - assert.Equal(t, N(5), agg.values[key].filtered, str(a)) - assert.Equal(t, N(3), agg.values[*attribute.EmptySet()].filtered, str(a)) - - _ = f.Aggregation() - assert.Equal(t, 1, agg.aggregationN, "failed to propagate Aggregation") - } -} - -func str(a attribute.Set) string { - iter := a.Iter() - out := make([]string, 0, iter.Len()) - for iter.Next() { - kv := iter.Attribute() - out = append(out, fmt.Sprintf("%s:%#v", kv.Key, kv.Value.AsInterface())) - } - return strings.Join(out, ",") -} - -type testFilterAgg[N int64 | float64] struct { - values map[attribute.Set]precomputedValue[N] - aggregationN int -} - -func newTestFilterAgg[N int64 | float64]() *testFilterAgg[N] { - return &testFilterAgg[N]{ - values: make(map[attribute.Set]precomputedValue[N]), - } -} - -func (a *testFilterAgg[N]) Aggregate(val N, attr attribute.Set) { - v := a.values[attr] - v.measured = val - a.values[attr] = v -} - -// nolint: unused // Used to agg filtered. -func (a *testFilterAgg[N]) aggregateFiltered(val N, attr attribute.Set) { - v := a.values[attr] - v.filtered += val - a.values[attr] = v -} - -func (a *testFilterAgg[N]) Aggregation() metricdata.Aggregation { - a.aggregationN++ - return nil -} diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index 14af6273cbd..594068c4354 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -150,63 +150,6 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation { return out } -// precomputedValue is the recorded measurement value for a set of attributes. -type precomputedValue[N int64 | float64] struct { - // measured is the last value measured for a set of attributes that were - // not filtered. - measured N - // filtered is the sum of values from measurements that had their - // attributes filtered. - filtered N -} - -// precomputedMap is the storage for precomputed sums. -type precomputedMap[N int64 | float64] struct { - sync.Mutex - values map[attribute.Set]precomputedValue[N] -} - -func newPrecomputedMap[N int64 | float64]() *precomputedMap[N] { - return &precomputedMap[N]{ - values: make(map[attribute.Set]precomputedValue[N]), - } -} - -// Aggregate records value with the unfiltered attributes attr. -// -// If a previous measurement was made for the same attribute set: -// -// - If that measurement's attributes were not filtered, this value overwrite -// that value. -// - If that measurement's attributes were filtered, this value will be -// recorded along side that value. -func (s *precomputedMap[N]) Aggregate(value N, attr attribute.Set) { - s.Lock() - v := s.values[attr] - v.measured = value - s.values[attr] = v - s.Unlock() -} - -// aggregateFiltered records value with the filtered attributes attr. -// -// If a previous measurement was made for the same attribute set: -// -// - If that measurement's attributes were not filtered, this value will be -// recorded along side that value. -// - If that measurement's attributes were filtered, this value will be -// added to it. -// -// This method should not be used if attr have not been reduced by an attribute -// filter. -func (s *precomputedMap[N]) aggregateFiltered(value N, attr attribute.Set) { // nolint: unused // Used to agg filtered. - s.Lock() - v := s.values[attr] - v.filtered += value - s.values[attr] = v - s.Unlock() -} - // newPrecomputedDeltaSum returns an Aggregator that summarizes a set of // pre-computed sums. Each sum is scoped by attributes and the aggregation // cycle the measurements were made in. @@ -218,17 +161,17 @@ func (s *precomputedMap[N]) aggregateFiltered(value N, attr attribute.Set) { // // The output Aggregation will report recorded values as delta temporality. func newPrecomputedDeltaSum[N int64 | float64](monotonic bool) aggregator[N] { return &precomputedDeltaSum[N]{ - precomputedMap: newPrecomputedMap[N](), - reported: make(map[attribute.Set]N), - monotonic: monotonic, - start: now(), + valueMap: newValueMap[N](), + reported: make(map[attribute.Set]N), + monotonic: monotonic, + start: now(), } } // precomputedDeltaSum summarizes a set of pre-computed sums recorded over all // aggregation cycles as the delta of these sums. type precomputedDeltaSum[N int64 | float64] struct { - *precomputedMap[N] + *valueMap[N] reported map[attribute.Set]N @@ -263,15 +206,14 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)), } for attr, value := range s.values { - v := value.measured + value.filtered - delta := v - s.reported[attr] + delta := value - s.reported[attr] out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{ Attributes: attr, StartTime: s.start, Time: t, Value: delta, }) - newReported[attr] = v + newReported[attr] = value // Unused attribute sets do not report. delete(s.values, attr) } @@ -294,15 +236,15 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { // temporality. func newPrecomputedCumulativeSum[N int64 | float64](monotonic bool) aggregator[N] { return &precomputedCumulativeSum[N]{ - precomputedMap: newPrecomputedMap[N](), - monotonic: monotonic, - start: now(), + valueMap: newValueMap[N](), + monotonic: monotonic, + start: now(), } } // precomputedCumulativeSum directly records and reports a set of pre-computed sums. type precomputedCumulativeSum[N int64 | float64] struct { - *precomputedMap[N] + *valueMap[N] monotonic bool start time.Time @@ -337,7 +279,7 @@ func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation { Attributes: attr, StartTime: s.start, Time: t, - Value: value.measured + value.filtered, + Value: value, }) // Unused attribute sets do not report. delete(s.values, attr) diff --git a/sdk/metric/internal/aggregate/sum_test.go b/sdk/metric/internal/aggregate/sum_test.go index e128459b1d0..0843bcf8429 100644 --- a/sdk/metric/internal/aggregate/sum_test.go +++ b/sdk/metric/internal/aggregate/sum_test.go @@ -37,6 +37,7 @@ func testSum[N int64 | float64](t *testing.T) { MeasurementN: defaultMeasurements, CycleN: defaultCycles, } + totalMeasurements := defaultGoroutines * defaultMeasurements t.Run("Delta", func(t *testing.T) { incr, mono := monoIncr[N](), true @@ -60,21 +61,21 @@ func testSum[N int64 | float64](t *testing.T) { t.Run("PreComputedDelta", func(t *testing.T) { incr, mono := monoIncr[N](), true - eFunc := preDeltaExpecter[N](incr, mono) + eFunc := preDeltaExpecter[N](incr, mono, N(totalMeasurements)) t.Run("Monotonic", tester.Run(newPrecomputedDeltaSum[N](mono), incr, eFunc)) incr, mono = nonMonoIncr[N](), false - eFunc = preDeltaExpecter[N](incr, mono) + eFunc = preDeltaExpecter[N](incr, mono, N(totalMeasurements)) t.Run("NonMonotonic", tester.Run(newPrecomputedDeltaSum[N](mono), incr, eFunc)) }) t.Run("PreComputedCumulative", func(t *testing.T) { incr, mono := monoIncr[N](), true - eFunc := preCumuExpecter[N](incr, mono) + eFunc := preCumuExpecter[N](incr, mono, N(totalMeasurements)) t.Run("Monotonic", tester.Run(newPrecomputedCumulativeSum[N](mono), incr, eFunc)) incr, mono = nonMonoIncr[N](), false - eFunc = preCumuExpecter[N](incr, mono) + eFunc = preCumuExpecter[N](incr, mono, N(totalMeasurements)) t.Run("NonMonotonic", tester.Run(newPrecomputedCumulativeSum[N](mono), incr, eFunc)) }) } @@ -103,26 +104,26 @@ func cumuExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc { } } -func preDeltaExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc { +func preDeltaExpecter[N int64 | float64](incr setMap[N], mono bool, totalMeasurements N) expectFunc { sum := metricdata.Sum[N]{Temporality: metricdata.DeltaTemporality, IsMonotonic: mono} last := make(map[attribute.Set]N) return func(int) metricdata.Aggregation { sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr)) for a, v := range incr { l := last[a] - sum.DataPoints = append(sum.DataPoints, point(a, N(v)-l)) + sum.DataPoints = append(sum.DataPoints, point(a, totalMeasurements*(N(v)-l))) last[a] = N(v) } return sum } } -func preCumuExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc { +func preCumuExpecter[N int64 | float64](incr setMap[N], mono bool, totalMeasurements N) expectFunc { sum := metricdata.Sum[N]{Temporality: metricdata.CumulativeTemporality, IsMonotonic: mono} return func(int) metricdata.Aggregation { sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr)) for a, v := range incr { - sum.DataPoints = append(sum.DataPoints, point(a, N(v))) + sum.DataPoints = append(sum.DataPoints, point(a, totalMeasurements*N(v))) } return sum } @@ -167,118 +168,65 @@ func TestDeltaSumReset(t *testing.T) { func TestPreComputedDeltaSum(t *testing.T) { var mono bool agg := newPrecomputedDeltaSum[int64](mono) - require.Implements(t, (*precomputeAggregator[int64])(nil), agg) + require.Implements(t, (*aggregator[int64])(nil), agg) attrs := attribute.NewSet(attribute.String("key", "val")) agg.Aggregate(1, attrs) - got := agg.Aggregation() want := metricdata.Sum[int64]{ IsMonotonic: mono, Temporality: metricdata.DeltaTemporality, DataPoints: []metricdata.DataPoint[int64]{point[int64](attrs, 1)}, } opt := metricdatatest.IgnoreTimestamp() - metricdatatest.AssertAggregationsEqual(t, want, got, opt) + metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) - // No observation means no metric data - got = agg.Aggregation() - metricdatatest.AssertAggregationsEqual(t, nil, got, opt) + // No observation results in an empty aggregation, and causes previous + // observations to be forgotten. + metricdatatest.AssertAggregationsEqual(t, nil, agg.Aggregation(), opt) - agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs) - got = agg.Aggregation() - // measured(+): 1, previous(-): 1, filtered(+): 1 + agg.Aggregate(1, attrs) + // measured(+): 1, previous(-): 0 want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) - - // Filtered values should not persist. - got = agg.Aggregation() - // No observation means no metric data - metricdatatest.AssertAggregationsEqual(t, nil, got, opt) + metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) - // Override set value. + // Duplicate observations add agg.Aggregate(2, attrs) agg.Aggregate(5, attrs) - // Filtered should add. - agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) - agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) - got = agg.Aggregation() - // measured(+): 5, previous(-): 0, filtered(+): 13 - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) - - // Filtered values should not persist. - agg.Aggregate(5, attrs) - got = agg.Aggregation() - // measured(+): 5, previous(-): 18, filtered(+): 0 - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -13)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) - - // Order should not affect measure. - // Filtered should add. - agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) - agg.Aggregate(7, attrs) - agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) - got = agg.Aggregation() - // measured(+): 7, previous(-): 5, filtered(+): 13 - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 15)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) - agg.Aggregate(7, attrs) - got = agg.Aggregation() - // measured(+): 7, previous(-): 20, filtered(+): 0 - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -13)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) + agg.Aggregate(3, attrs) + agg.Aggregate(10, attrs) + // measured(+): 20, previous(-): 1 + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 19)} + metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) } func TestPreComputedCumulativeSum(t *testing.T) { var mono bool agg := newPrecomputedCumulativeSum[int64](mono) - require.Implements(t, (*precomputeAggregator[int64])(nil), agg) + require.Implements(t, (*aggregator[int64])(nil), agg) attrs := attribute.NewSet(attribute.String("key", "val")) agg.Aggregate(1, attrs) - got := agg.Aggregation() want := metricdata.Sum[int64]{ IsMonotonic: mono, Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.DataPoint[int64]{point[int64](attrs, 1)}, } opt := metricdatatest.IgnoreTimestamp() - metricdatatest.AssertAggregationsEqual(t, want, got, opt) + metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) // Cumulative values should not persist. - got = agg.Aggregation() - metricdatatest.AssertAggregationsEqual(t, nil, got, opt) + metricdatatest.AssertAggregationsEqual(t, nil, agg.Aggregation(), opt) - agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs) - got = agg.Aggregation() + agg.Aggregate(1, attrs) want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) - - // Filtered values should not persist. - got = agg.Aggregation() - metricdatatest.AssertAggregationsEqual(t, nil, got, opt) + metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) - // Override set value. + // Duplicate measurements add agg.Aggregate(5, attrs) - // Filtered should add. - agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) - agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) - got = agg.Aggregation() + agg.Aggregate(3, attrs) + agg.Aggregate(10, attrs) want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) - - // Filtered values should not persist. - got = agg.Aggregation() - metricdatatest.AssertAggregationsEqual(t, nil, got, opt) - - // Order should not affect measure. - // Filtered should add. - agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) - agg.Aggregate(7, attrs) - agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) - got = agg.Aggregation() - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 20)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) + metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) } func TestEmptySumNilAggregation(t *testing.T) { diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 185095e4f8d..8657ccc7135 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -960,9 +960,6 @@ func TestGlobalInstRegisterCallback(t *testing.T) { _, err = preMtr.RegisterCallback(cb, preInt64Ctr, preFloat64Ctr, postInt64Ctr, postFloat64Ctr) assert.NoError(t, err) - _, err = preMtr.RegisterCallback(cb, preInt64Ctr, preFloat64Ctr, postInt64Ctr, postFloat64Ctr) - assert.NoError(t, err) - got := metricdata.ResourceMetrics{} err = rdr.Collect(context.Background(), &got) assert.NoError(t, err) From 84b2e5467131a1f1faf41f788677e5ee66a9b8de Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 19 Jul 2023 09:59:07 -0700 Subject: [PATCH 7/8] Use inst ID for agg cache key (#4337) * Use inst ID for agg cache key Resolve #4201 The specification requires the duplicate instrument conflicts to be identified based on the instrument identifying fields: - name - instrument kind - unit - description - language-level features such as the number type (int64 and float64) Currently, the conflict detection and aggregation caching are done based on the stream IDs which include an aggregation name, monotonicity, and temporality instead of the instrument kind. This changes the conflict detection and aggregation caching to use the OpenTelemetry specified fields. This is effectively a no-op given there is a 1-to-1 mapping of aggregation-name/monotonicity/temporality to instrument kind (they are all resolved based on the instrument kind). Additionally, this adds a stringer representation of the `InstrumentKind`. This is needed for the logging of duplicate instrument conflicts. * Add changes to changelog --- CHANGELOG.md | 1 + sdk/metric/instrument.go | 18 +++++-------- sdk/metric/instrumentkind_string.go | 29 ++++++++++++++++++++ sdk/metric/meter.go | 2 +- sdk/metric/pipeline.go | 40 +++++++++++----------------- sdk/metric/pipeline_registry_test.go | 14 +++++----- sdk/metric/pipeline_test.go | 2 +- 7 files changed, 61 insertions(+), 45 deletions(-) create mode 100644 sdk/metric/instrumentkind_string.go diff --git a/CHANGELOG.md b/CHANGELOG.md index fd2e673fb42..d3f8170b023 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Correctly format log messages from the `go.opentelemetry.io/otel/exporters/zipkin` exporter. (#4143) - Log an error for calls to `NewView` in `go.opentelemetry.io/otel/sdk/metric` that have empty criteria. (#4307) - Fix `resource.WithHostID()` to not set an empty `host.id`. (#4317) +- Use the instrument identifying fields to cache aggregators and determine duplicate instrument registrations in `go.opentelemetry.io/otel/sdk/metric`. (#4337) ## [1.16.0/0.39.0] 2023-05-18 diff --git a/sdk/metric/instrument.go b/sdk/metric/instrument.go index 83652c6e97f..eff2f179a51 100644 --- a/sdk/metric/instrument.go +++ b/sdk/metric/instrument.go @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:generate stringer -type=InstrumentKind -trimprefix=InstrumentKind + package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( @@ -25,7 +27,6 @@ import ( "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" - "go.opentelemetry.io/otel/sdk/metric/metricdata" ) var ( @@ -172,23 +173,16 @@ func (s Stream) attributeFilter() attribute.Filter { } } -// streamID are the identifying properties of a stream. -type streamID struct { +// instID are the identifying properties of a instrument. +type instID struct { // Name is the name of the stream. Name string // Description is the description of the stream. Description string + // Kind defines the functional group of the instrument. + Kind InstrumentKind // Unit is the unit of the stream. Unit string - // Aggregation is the aggregation data type of the stream. - Aggregation string - // Monotonic is the monotonicity of an instruments data type. This field is - // not used for all data types, so a zero value needs to be understood in the - // context of Aggregation. - Monotonic bool - // Temporality is the temporality of a stream's data type. This field is - // not used by some data types. - Temporality metricdata.Temporality // Number is the number type of the stream. Number string } diff --git a/sdk/metric/instrumentkind_string.go b/sdk/metric/instrumentkind_string.go new file mode 100644 index 00000000000..d5f9e982c2b --- /dev/null +++ b/sdk/metric/instrumentkind_string.go @@ -0,0 +1,29 @@ +// Code generated by "stringer -type=InstrumentKind -trimprefix=InstrumentKind"; DO NOT EDIT. + +package metric + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[instrumentKindUndefined-0] + _ = x[InstrumentKindCounter-1] + _ = x[InstrumentKindUpDownCounter-2] + _ = x[InstrumentKindHistogram-3] + _ = x[InstrumentKindObservableCounter-4] + _ = x[InstrumentKindObservableUpDownCounter-5] + _ = x[InstrumentKindObservableGauge-6] +} + +const _InstrumentKind_name = "instrumentKindUndefinedCounterUpDownCounterHistogramObservableCounterObservableUpDownCounterObservableGauge" + +var _InstrumentKind_index = [...]uint8{0, 23, 30, 43, 52, 69, 92, 107} + +func (i InstrumentKind) String() string { + if i >= InstrumentKind(len(_InstrumentKind_index)-1) { + return "InstrumentKind(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _InstrumentKind_name[_InstrumentKind_index[i]:_InstrumentKind_index[i+1]] +} diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index f76d5190413..caed7387c0a 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -49,7 +49,7 @@ type meter struct { func newMeter(s instrumentation.Scope, p pipelines) *meter { // viewCache ensures instrument conflicts, including number conflicts, this // meter is asked to create are logged to the user. - var viewCache cache[string, streamID] + var viewCache cache[string, instID] return &meter{ scope: s, diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 5989e0c9575..d6af04c6e27 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -187,24 +187,24 @@ type inserter[N int64 | float64] struct { // cache ensures no duplicate aggregate functions are inserted into the // reader pipeline and if a new request during an instrument creation asks // for the same aggregate function input the same instance is returned. - aggregators *cache[streamID, aggVal[N]] + aggregators *cache[instID, aggVal[N]] // views is a cache that holds instrument identifiers for all the // instruments a Meter has created, it is provided from the Meter that owns // this inserter. This cache ensures during the creation of instruments // with the same name but different options (e.g. description, unit) a // warning message is logged. - views *cache[string, streamID] + views *cache[string, instID] pipeline *pipeline } -func newInserter[N int64 | float64](p *pipeline, vc *cache[string, streamID]) *inserter[N] { +func newInserter[N int64 | float64](p *pipeline, vc *cache[string, instID]) *inserter[N] { if vc == nil { - vc = &cache[string, streamID]{} + vc = &cache[string, instID]{} } return &inserter[N]{ - aggregators: &cache[streamID, aggVal[N]]{}, + aggregators: &cache[instID, aggVal[N]]{}, views: vc, pipeline: p, } @@ -320,12 +320,14 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum ) } - id := i.streamID(kind, stream) + id := i.instID(kind, stream) // If there is a conflict, the specification says the view should // still be applied and a warning should be logged. i.logConflict(id) cv := i.aggregators.Lookup(id, func() aggVal[N] { - b := aggregate.Builder[N]{Temporality: id.Temporality} + b := aggregate.Builder[N]{ + Temporality: i.pipeline.reader.temporality(kind), + } if len(stream.AllowAttributeKeys) > 0 { b.Filter = stream.attributeFilter() } @@ -350,8 +352,8 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum // logConflict validates if an instrument with the same name as id has already // been created. If that instrument conflicts with id, a warning is logged. -func (i *inserter[N]) logConflict(id streamID) { - existing := i.views.Lookup(id.Name, func() streamID { return id }) +func (i *inserter[N]) logConflict(id instID) { + existing := i.views.Lookup(id.Name, func() instID { return id }) if id == existing { return } @@ -360,31 +362,21 @@ func (i *inserter[N]) logConflict(id streamID) { "duplicate metric stream definitions", "names", fmt.Sprintf("%q, %q", existing.Name, id.Name), "descriptions", fmt.Sprintf("%q, %q", existing.Description, id.Description), + "kinds", fmt.Sprintf("%s, %s", existing.Kind, id.Kind), "units", fmt.Sprintf("%s, %s", existing.Unit, id.Unit), "numbers", fmt.Sprintf("%s, %s", existing.Number, id.Number), - "aggregations", fmt.Sprintf("%s, %s", existing.Aggregation, id.Aggregation), - "monotonics", fmt.Sprintf("%t, %t", existing.Monotonic, id.Monotonic), - "temporalities", fmt.Sprintf("%s, %s", existing.Temporality.String(), id.Temporality.String()), ) } -func (i *inserter[N]) streamID(kind InstrumentKind, stream Stream) streamID { +func (i *inserter[N]) instID(kind InstrumentKind, stream Stream) instID { var zero N - id := streamID{ + return instID{ Name: stream.Name, Description: stream.Description, Unit: stream.Unit, - Aggregation: fmt.Sprintf("%T", stream.Aggregation), - Temporality: i.pipeline.reader.temporality(kind), + Kind: kind, Number: fmt.Sprintf("%T", zero), } - - switch kind { - case InstrumentKindObservableCounter, InstrumentKindCounter, InstrumentKindHistogram: - id.Monotonic = true - } - - return id } // aggregateFunc returns new aggregate functions matching agg, kind, and @@ -526,7 +518,7 @@ type resolver[N int64 | float64] struct { inserters []*inserter[N] } -func newResolver[N int64 | float64](p pipelines, vc *cache[string, streamID]) resolver[N] { +func newResolver[N int64 | float64](p pipelines, vc *cache[string, instID]) resolver[N] { in := make([]*inserter[N], len(p)) for i := range in { in[i] = newInserter[N](p[i], vc) diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index c10bbc36803..5969392c6cb 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -350,7 +350,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { } for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { - var c cache[string, streamID] + var c cache[string, instID] p := newPipeline(nil, tt.reader, tt.views) i := newInserter[N](p, &c) input, err := i.Instrument(tt.inst) @@ -371,7 +371,7 @@ func TestCreateAggregators(t *testing.T) { } func testInvalidInstrumentShouldPanic[N int64 | float64]() { - var c cache[string, streamID] + var c cache[string, instID] i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}), &c) inst := Instrument{ Name: "foo", @@ -391,7 +391,7 @@ func TestPipelinesAggregatorForEachReader(t *testing.T) { require.Len(t, pipes, 2, "created pipelines") inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} - var c cache[string, streamID] + var c cache[string, instID] r := newResolver[int64](pipes, &c) aggs, err := r.Aggregators(inst) require.NoError(t, err, "resolved Aggregators error") @@ -468,7 +468,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) { inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} - var c cache[string, streamID] + var c cache[string, instID] r := newResolver[int64](p, &c) aggs, err := r.Aggregators(inst) assert.NoError(t, err) @@ -478,7 +478,7 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) { inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} - var c cache[string, streamID] + var c cache[string, instID] r := newResolver[float64](p, &c) aggs, err := r.Aggregators(inst) assert.NoError(t, err) @@ -505,7 +505,7 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { p := newPipelines(resource.Empty(), readers, views) inst := Instrument{Name: "foo", Kind: InstrumentKindObservableGauge} - var vc cache[string, streamID] + var vc cache[string, instID] ri := newResolver[int64](p, &vc) intAggs, err := ri.Aggregators(inst) assert.Error(t, err) @@ -556,7 +556,7 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) { p := newPipelines(resource.Empty(), readers, views) - var vc cache[string, streamID] + var vc cache[string, instID] ri := newResolver[int64](p, &vc) intAggs, err := ri.Aggregators(fooInst) assert.NoError(t, err) diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index f9056275c47..dd30a35e065 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -137,7 +137,7 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - var c cache[string, streamID] + var c cache[string, instID] i := newInserter[N](test.pipe, &c) got, err := i.Instrument(inst) require.NoError(t, err) From e08359f30c881d6174bb8f750f3526b3774d4539 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 19 Jul 2023 10:17:38 -0700 Subject: [PATCH 8/8] Detect duplicate instruments for case-insensitive names (#4338) * Detect dup inst for case-insensitive names Resolve #3835 Detect duplicate instrument registrations for instruments that have the same case-insensitive names. Continue to return the instruments with different names, but log a warning. This is the solution proposed in https://github.com/open-telemetry/opentelemetry-specification/pull/3606. * Add changes to changelog * Reset global logger after test --- CHANGELOG.md | 1 + sdk/metric/go.mod | 2 +- sdk/metric/pipeline.go | 5 ++- sdk/metric/pipeline_test.go | 77 +++++++++++++++++++++++++++++++++++++ 4 files changed, 83 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d3f8170b023..85f1ec63320 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Log an error for calls to `NewView` in `go.opentelemetry.io/otel/sdk/metric` that have empty criteria. (#4307) - Fix `resource.WithHostID()` to not set an empty `host.id`. (#4317) - Use the instrument identifying fields to cache aggregators and determine duplicate instrument registrations in `go.opentelemetry.io/otel/sdk/metric`. (#4337) +- Detect duplicate instruments for case-insensitive names in `go.opentelemetry.io/otel/sdk/metric`. (#4338) ## [1.16.0/0.39.0] 2023-05-18 diff --git a/sdk/metric/go.mod b/sdk/metric/go.mod index ebec07c04d0..fcdfdfa5b74 100644 --- a/sdk/metric/go.mod +++ b/sdk/metric/go.mod @@ -4,6 +4,7 @@ go 1.19 require ( github.com/go-logr/logr v1.2.4 + github.com/go-logr/stdr v1.2.2 github.com/stretchr/testify v1.8.4 go.opentelemetry.io/otel v1.16.0 go.opentelemetry.io/otel/metric v1.16.0 @@ -12,7 +13,6 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/go-logr/stdr v1.2.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/otel/trace v1.16.0 // indirect golang.org/x/sys v0.10.0 // indirect diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index d6af04c6e27..ad52aedfe68 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -353,7 +353,10 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum // logConflict validates if an instrument with the same name as id has already // been created. If that instrument conflicts with id, a warning is logged. func (i *inserter[N]) logConflict(id instID) { - existing := i.views.Lookup(id.Name, func() instID { return id }) + // The API specification defines names as case-insensitive. If there is a + // different casing of a name it needs to be a conflict. + name := strings.ToLower(id.Name) + existing := i.views.Lookup(name, func() instID { return id }) if id == existing { return } diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index dd30a35e065..58916079429 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -17,12 +17,19 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" "fmt" + "log" + "os" + "strings" "sync" "testing" + "github.com/go-logr/logr" + "github.com/go-logr/logr/funcr" + "github.com/go-logr/stdr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/metricdata" @@ -166,3 +173,73 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { } } } + +func TestLogConflictName(t *testing.T) { + testcases := []struct { + existing, name string + conflict bool + }{ + { + existing: "requestCount", + name: "requestCount", + conflict: false, + }, + { + existing: "requestCount", + name: "requestDuration", + conflict: false, + }, + { + existing: "requestCount", + name: "requestcount", + conflict: true, + }, + { + existing: "requestCount", + name: "REQUESTCOUNT", + conflict: true, + }, + { + existing: "requestCount", + name: "rEqUeStCoUnT", + conflict: true, + }, + } + + var msg string + t.Cleanup(func(orig logr.Logger) func() { + otel.SetLogger(funcr.New(func(_, args string) { + msg = args + }, funcr.Options{Verbosity: 20})) + return func() { otel.SetLogger(orig) } + }(stdr.New(log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile)))) + + for _, tc := range testcases { + var vc cache[string, instID] + + name := strings.ToLower(tc.existing) + _ = vc.Lookup(name, func() instID { + return instID{Name: tc.existing} + }) + + i := newInserter[int64](newPipeline(nil, nil, nil), &vc) + i.logConflict(instID{Name: tc.name}) + + if tc.conflict { + assert.Containsf( + t, msg, "duplicate metric stream definitions", + "warning not logged for conflicting names: %s, %s", + tc.existing, tc.name, + ) + } else { + assert.Equalf( + t, msg, "", + "warning logged for non-conflicting names: %s, %s", + tc.existing, tc.name, + ) + } + + // Reset. + msg = "" + } +}