Skip to content

Commit

Permalink
Add tests for malformed selectors in readers (#4350)
Browse files Browse the repository at this point in the history
  • Loading branch information
MadVikingGod committed Jul 25, 2023
1 parent c1a644a commit d423bd4
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Fix `resource.WithHostID()` to not set an empty `host.id`. (#4317)
- Use the instrument identifying fields to cache aggregators and determine duplicate instrument registrations in `go.opentelemetry.io/otel/sdk/metric`. (#4337)
- Detect duplicate instruments for case-insensitive names in `go.opentelemetry.io/otel/sdk/metric`. (#4338)
- The `ManualReader` will not panic if `AggregationSelector` returns `nil`. (#4350)
- If a Reader's AggregationSelector return nil or DefaultAggregation the pipeline will use the default aggregation. (#4350)
- Log a suggested view that fixes instrument conflicts in `go.opentelemetry.io/otel/sdk/metric`. (#4349)
- Fix possible panic, deadlock and race condition in batch span processor in `go.opentelemetry.io/otel/sdk/trace`. (#4353)

Expand Down
3 changes: 3 additions & 0 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ func WithAggregationSelector(selector AggregationSelector) ManualReaderOption {
// Deep copy and validate before using.
wrapped := func(ik InstrumentKind) aggregation.Aggregation {
a := selector(ik)
if a == nil {
return nil
}
cpA := a.Copy()
if err := cpA.Err(); err != nil {
cpA = DefaultAggregationSelector(ik)
Expand Down
146 changes: 146 additions & 0 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
Expand Down Expand Up @@ -1811,3 +1812,148 @@ func BenchmarkInstrumentCreation(b *testing.B) {
sfHistogram, _ = meter.Float64Histogram("sync.float64.histogram")
}
}

func testNilAggregationSelector(InstrumentKind) aggregation.Aggregation {
return nil
}
func testDefaultAggregationSelector(InstrumentKind) aggregation.Aggregation {
return aggregation.Default{}
}
func testUndefinedTemporalitySelector(InstrumentKind) metricdata.Temporality {
return metricdata.Temporality(0)
}
func testInvalidTemporalitySelector(InstrumentKind) metricdata.Temporality {
return metricdata.Temporality(255)
}

type noErrorHandler struct {
t *testing.T
}

func (h noErrorHandler) Handle(err error) {
assert.NoError(h.t, err)
}

func TestMalformedSelectors(t *testing.T) {
type testCase struct {
name string
reader Reader
}
testCases := []testCase{
{
name: "nil aggregation selector",
reader: NewManualReader(WithAggregationSelector(testNilAggregationSelector)),
},
{
name: "nil aggregation selector periodic",
reader: NewPeriodicReader(&fnExporter{aggregationFunc: testNilAggregationSelector}),
},
{
name: "default aggregation selector",
reader: NewManualReader(WithAggregationSelector(testDefaultAggregationSelector)),
},
{
name: "default aggregation selector periodic",
reader: NewPeriodicReader(&fnExporter{aggregationFunc: testDefaultAggregationSelector}),
},
{
name: "undefined temporality selector",
reader: NewManualReader(WithTemporalitySelector(testUndefinedTemporalitySelector)),
},
{
name: "undefined temporality selector periodic",
reader: NewPeriodicReader(&fnExporter{temporalityFunc: testUndefinedTemporalitySelector}),
},
{
name: "invalid temporality selector",
reader: NewManualReader(WithTemporalitySelector(testInvalidTemporalitySelector)),
},
{
name: "invalid temporality selector periodic",
reader: NewPeriodicReader(&fnExporter{temporalityFunc: testInvalidTemporalitySelector}),
},
{
name: "both aggregation and temporality selector",
reader: NewManualReader(
WithAggregationSelector(testNilAggregationSelector),
WithTemporalitySelector(testUndefinedTemporalitySelector),
),
},
{
name: "both aggregation and temporality selector periodic",
reader: NewPeriodicReader(&fnExporter{
aggregationFunc: testNilAggregationSelector,
temporalityFunc: testUndefinedTemporalitySelector,
}),
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
origErrorHandler := global.GetErrorHandler()
defer global.SetErrorHandler(origErrorHandler)
global.SetErrorHandler(noErrorHandler{t})

defer func() {
_ = tt.reader.Shutdown(context.Background())
}()

meter := NewMeterProvider(WithReader(tt.reader)).Meter("TestNilAggregationSelector")

// Create All instruments, they should not error
aiCounter, err := meter.Int64ObservableCounter("observable.int64.counter")
require.NoError(t, err)
aiUpDownCounter, err := meter.Int64ObservableUpDownCounter("observable.int64.up.down.counter")
require.NoError(t, err)
aiGauge, err := meter.Int64ObservableGauge("observable.int64.gauge")
require.NoError(t, err)

afCounter, err := meter.Float64ObservableCounter("observable.float64.counter")
require.NoError(t, err)
afUpDownCounter, err := meter.Float64ObservableUpDownCounter("observable.float64.up.down.counter")
require.NoError(t, err)
afGauge, err := meter.Float64ObservableGauge("observable.float64.gauge")
require.NoError(t, err)

siCounter, err := meter.Int64Counter("sync.int64.counter")
require.NoError(t, err)
siUpDownCounter, err := meter.Int64UpDownCounter("sync.int64.up.down.counter")
require.NoError(t, err)
siHistogram, err := meter.Int64Histogram("sync.int64.histogram")
require.NoError(t, err)

sfCounter, err := meter.Float64Counter("sync.float64.counter")
require.NoError(t, err)
sfUpDownCounter, err := meter.Float64UpDownCounter("sync.float64.up.down.counter")
require.NoError(t, err)
sfHistogram, err := meter.Float64Histogram("sync.float64.histogram")
require.NoError(t, err)

callback := func(ctx context.Context, obs metric.Observer) error {
obs.ObserveInt64(aiCounter, 1)
obs.ObserveInt64(aiUpDownCounter, 1)
obs.ObserveInt64(aiGauge, 1)
obs.ObserveFloat64(afCounter, 1)
obs.ObserveFloat64(afUpDownCounter, 1)
obs.ObserveFloat64(afGauge, 1)
return nil
}
_, err = meter.RegisterCallback(callback, aiCounter, aiUpDownCounter, aiGauge, afCounter, afUpDownCounter, afGauge)
require.NoError(t, err)

siCounter.Add(context.Background(), 1)
siUpDownCounter.Add(context.Background(), 1)
siHistogram.Record(context.Background(), 1)
sfCounter.Add(context.Background(), 1)
sfUpDownCounter.Add(context.Background(), 1)
sfHistogram.Record(context.Background(), 1)

var rm metricdata.ResourceMetrics
err = tt.reader.Collect(context.Background(), &rm)
require.NoError(t, err)

require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, 12)
})
}
}
5 changes: 5 additions & 0 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,11 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
case nil, aggregation.Default:
// Undefined, nil, means to use the default from the reader.
stream.Aggregation = i.pipeline.reader.aggregation(kind)
switch stream.Aggregation.(type) {
case nil, aggregation.Default:
// If the reader returns default or nil use the default selector.
stream.Aggregation = DefaultAggregationSelector(kind)
}
}

if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions sdk/metric/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ func DefaultTemporalitySelector(InstrumentKind) metricdata.Temporality {

// AggregationSelector selects the aggregation and the parameters to use for
// that aggregation based on the InstrumentKind.
//
// If the Aggregation returned is nil or DefaultAggregation, the selection from
// DefaultAggregationSelector will be used.
type AggregationSelector func(InstrumentKind) aggregation.Aggregation

// DefaultAggregationSelector returns the default aggregation and parameters
Expand Down

0 comments on commit d423bd4

Please sign in to comment.