Skip to content

Commit

Permalink
Add Aggregation to the metric pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Aug 10, 2023
1 parent 36b3633 commit f759f17
Show file tree
Hide file tree
Showing 16 changed files with 397 additions and 117 deletions.
201 changes: 201 additions & 0 deletions sdk/metric/aggregation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"errors"
"fmt"
)

// errAgg is wrapped by misconfigured aggregations.
var errAgg = errors.New("aggregation")

// Aggregation is the aggregation used to summarize recorded measurements.
type Aggregation interface {
// copy returns a deep copy of the Aggregation.
copy() Aggregation

// err returns an error for any misconfigured Aggregation.
err() error
}

// AggregationDrop is an Aggregation that drops all recorded data.
type AggregationDrop struct{} // Drop has no parameters.

var _ Aggregation = AggregationDrop{}

// copy returns a deep copy of d.
func (d AggregationDrop) copy() Aggregation { return d }

// err returns an error for any misconfiguration. A Drop aggregation has no
// parameters and cannot be misconfigured, therefore this always returns nil.
func (AggregationDrop) err() error { return nil }

// AggregationDefault is an Aggregation that uses the default instrument kind selection
// mapping to select another Aggregation. A metric reader can be configured to
// make an aggregation selection based on instrument kind that differs from
// the default. This Aggregation ensures the default is used.
//
// See the "go.opentelemetry.io/otel/sdk/metric".DefaultAggregationSelector
// for information about the default instrument kind selection mapping.
type AggregationDefault struct{} // Default has no parameters.

var _ Aggregation = AggregationDefault{}

// copy returns a deep copy of d.
func (d AggregationDefault) copy() Aggregation { return d }

// err returns an error for any misconfiguration. A Default aggregation has no
// parameters and cannot be misconfigured, therefore this always returns nil.
func (AggregationDefault) err() error { return nil }

// Sum is an Aggregation that summarizes a set of measurements as their
// arithmetic sum.
type Sum struct{} // Sum has no parameters.

var _ Aggregation = Sum{}

// copy returns a deep copy of s.
func (s Sum) copy() Aggregation { return s }

// err returns an error for any misconfiguration. A Sum aggregation has no
// parameters and cannot be misconfigured, therefore this always returns nil.
func (Sum) err() error { return nil }

// AggregationLastValue is an Aggregation that summarizes a set of measurements as the
// last one made.
type AggregationLastValue struct{} // LastValue has no parameters.

var _ Aggregation = AggregationLastValue{}

// copy returns a deep copy of l.
func (l AggregationLastValue) copy() Aggregation { return l }

// err returns an error for any misconfiguration. A LastValue aggregation has
// no parameters and cannot be misconfigured, therefore this always returns
// nil.
func (AggregationLastValue) err() error { return nil }

// AggregationExplicitBucketHistogram is an Aggregation that summarizes a set of
// measurements as an histogram with explicitly defined buckets.
type AggregationExplicitBucketHistogram struct {
// Boundaries are the increasing bucket boundary values. Boundary values
// define bucket upper bounds. Buckets are exclusive of their lower
// boundary and inclusive of their upper bound (except at positive
// infinity). A measurement is defined to fall into the greatest-numbered
// bucket with a boundary that is greater than or equal to the
// measurement. As an example, boundaries defined as:
//
// []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 1000}
//
// Will define these buckets:
//
// (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0],
// (50.0, 75.0], (75.0, 100.0], (100.0, 250.0], (250.0, 500.0],
// (500.0, 1000.0], (1000.0, +∞)
Boundaries []float64
// NoMinMax indicates whether to not record the min and max of the
// distribution. By default, these extrema are recorded.
//
// Recording these extrema for cumulative data is expected to have little
// value, they will represent the entire life of the instrument instead of
// just the current collection cycle. It is recommended to set this to true
// for that type of data to avoid computing the low-value extrema.
NoMinMax bool
}

var _ Aggregation = AggregationExplicitBucketHistogram{}

// errHist is returned by misconfigured ExplicitBucketHistograms.
var errHist = fmt.Errorf("%w: explicit bucket histogram", errAgg)

// err returns an error for any misconfiguration.
func (h AggregationExplicitBucketHistogram) err() error {
if len(h.Boundaries) <= 1 {
return nil
}

// Check boundaries are monotonic.
i := h.Boundaries[0]
for _, j := range h.Boundaries[1:] {
if i >= j {
return fmt.Errorf("%w: non-monotonic boundaries: %v", errHist, h.Boundaries)
}
i = j
}

return nil
}

// copy returns a deep copy of h.
func (h AggregationExplicitBucketHistogram) copy() Aggregation {
b := make([]float64, len(h.Boundaries))
copy(b, h.Boundaries)
return AggregationExplicitBucketHistogram{
Boundaries: b,
NoMinMax: h.NoMinMax,
}
}

// AggregationBase2ExponentialHistogram is an Aggregation that summarizes a set of
// measurements as an histogram with bucket widths that grow exponentially.
type AggregationBase2ExponentialHistogram struct {
// MaxSize is the maximum number of buckets to use for the histogram.
MaxSize int32
// MaxScale is the maximum resolution scale to use for the histogram.
//
// MaxScale has a maximum value of 20. Using a value of 20 means the
// maximum number of buckets that can fit within the range of a
// signed 32-bit integer index could be used.
//
// MaxScale has a minimum value of -10. Using a value of -10 means only
// two buckets will be use.
MaxScale int32

// NoMinMax indicates whether to not record the min and max of the
// distribution. By default, these extrema are recorded.
//
// Recording these extrema for cumulative data is expected to have little
// value, they will represent the entire life of the instrument instead of
// just the current collection cycle. It is recommended to set this to true
// for that type of data to avoid computing the low-value extrema.
NoMinMax bool
}

var _ Aggregation = AggregationBase2ExponentialHistogram{}

// copy returns a deep copy of the Aggregation.
func (e AggregationBase2ExponentialHistogram) copy() Aggregation {
return e
}

const (
expoMaxScale = 20
expoMinScale = -10
)

// errExpoHist is returned by misconfigured Base2ExponentialBucketHistograms.
var errExpoHist = fmt.Errorf("%w: exponential histogram", errAgg)

// err returns an error for any misconfigured Aggregation.
func (e AggregationBase2ExponentialHistogram) err() error {
if e.MaxScale > expoMaxScale {
return fmt.Errorf("%w: max size %d is greater than maximum scale %d", errExpoHist, e.MaxSize, expoMaxScale)
}
if e.MaxSize <= 0 {
return fmt.Errorf("%w: max size %d is less than or equal to zero", errExpoHist, e.MaxSize)
}
return nil
}
95 changes: 95 additions & 0 deletions sdk/metric/aggregation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metric

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestAggregationErr(t *testing.T) {
t.Run("DropOperation", func(t *testing.T) {
assert.NoError(t, AggregationDrop{}.err())
})

t.Run("SumOperation", func(t *testing.T) {
assert.NoError(t, Sum{}.err())
})

t.Run("LastValueOperation", func(t *testing.T) {
assert.NoError(t, AggregationLastValue{}.err())
})

t.Run("ExplicitBucketHistogramOperation", func(t *testing.T) {
assert.NoError(t, AggregationExplicitBucketHistogram{}.err())

assert.NoError(t, AggregationExplicitBucketHistogram{
Boundaries: []float64{0},
NoMinMax: true,
}.err())

assert.NoError(t, AggregationExplicitBucketHistogram{
Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 1000},
}.err())
})

t.Run("NonmonotonicHistogramBoundaries", func(t *testing.T) {
assert.ErrorIs(t, AggregationExplicitBucketHistogram{
Boundaries: []float64{2, 1},
}.err(), errAgg)

assert.ErrorIs(t, AggregationExplicitBucketHistogram{
Boundaries: []float64{0, 1, 2, 1, 3, 4},
}.err(), errAgg)
})

t.Run("ExponentialHistogramOperation", func(t *testing.T) {
assert.NoError(t, AggregationBase2ExponentialHistogram{
MaxSize: 160,
MaxScale: 20,
}.err())

assert.NoError(t, AggregationBase2ExponentialHistogram{
MaxSize: 1,
NoMinMax: true,
}.err())

assert.NoError(t, AggregationBase2ExponentialHistogram{
MaxSize: 1024,
MaxScale: -3,
}.err())
})

t.Run("InvalidExponentialHistogramOperation", func(t *testing.T) {
// MazSize must be greater than 0
assert.ErrorIs(t, AggregationBase2ExponentialHistogram{}.err(), errAgg)

// MaxScale Must be <=20
assert.ErrorIs(t, AggregationBase2ExponentialHistogram{
MaxSize: 1,
MaxScale: 30,
}.err(), errAgg)
})
}

func TestExplicitBucketHistogramDeepCopy(t *testing.T) {
const orig = 0.0
b := []float64{orig}
h := AggregationExplicitBucketHistogram{Boundaries: b}
cpH := h.copy().(AggregationExplicitBucketHistogram)
b[0] = orig + 1
assert.Equal(t, orig, cpH.Boundaries[0], "changing the underlying slice data should not affect the copy")
}
3 changes: 1 addition & 2 deletions sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand All @@ -36,7 +35,7 @@ var viewBenchmarks = []struct {
"DropView",
[]View{NewView(
Instrument{Name: "*"},
Stream{Aggregation: aggregation.Drop{}},
Stream{Aggregation: AggregationDrop{}},
)},
},
{
Expand Down
3 changes: 1 addition & 2 deletions sdk/metric/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
)
Expand All @@ -39,7 +38,7 @@ type reader struct {

var _ Reader = (*reader)(nil)

func (r *reader) aggregation(kind InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type.
func (r *reader) aggregation(kind InstrumentKind) Aggregation { // nolint:revive // import-shadow for method scoped by type.
return r.aggregationFunc(kind)
}

Expand Down
3 changes: 1 addition & 2 deletions sdk/metric/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"

"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand All @@ -39,7 +38,7 @@ type Exporter interface {
//
// This method needs to be concurrent safe with itself and all the other
// Exporter methods.
Aggregation(InstrumentKind) aggregation.Aggregation
Aggregation(InstrumentKind) Aggregation

// Export serializes and transmits metric data to a receiver.
//
Expand Down
3 changes: 1 addition & 2 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
)

Expand Down Expand Up @@ -145,7 +144,7 @@ type Stream struct {
// Unit is the unit of measurement recorded.
Unit string
// Aggregation the stream uses for an instrument.
Aggregation aggregation.Aggregation
Aggregation Aggregation
// AllowAttributeKeys are an allow-list of attribute keys that will be
// preserved for the stream. Any attribute recorded for the stream with a
// key not in this slice will be dropped.
Expand Down
9 changes: 4 additions & 5 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sync/atomic"

"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand Down Expand Up @@ -87,7 +86,7 @@ func (mr *ManualReader) temporality(kind InstrumentKind) metricdata.Temporality
}

// aggregation returns what Aggregation to use for kind.
func (mr *ManualReader) aggregation(kind InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type.
func (mr *ManualReader) aggregation(kind InstrumentKind) Aggregation { // nolint:revive // import-shadow for method scoped by type.
return mr.aggregationSelector(kind)
}

Expand Down Expand Up @@ -225,13 +224,13 @@ func (t temporalitySelectorOption) applyManual(mrc manualReaderConfig) manualRea
// or the aggregation explicitly passed for a view matching an instrument.
func WithAggregationSelector(selector AggregationSelector) ManualReaderOption {
// Deep copy and validate before using.
wrapped := func(ik InstrumentKind) aggregation.Aggregation {
wrapped := func(ik InstrumentKind) Aggregation {
a := selector(ik)
if a == nil {
return nil
}
cpA := a.Copy()
if err := cpA.Err(); err != nil {
cpA := a.copy()
if err := cpA.err(); err != nil {
cpA = DefaultAggregationSelector(ik)
global.Error(
err, "using default aggregation instead",
Expand Down
Loading

0 comments on commit f759f17

Please sign in to comment.