Skip to content

Commit

Permalink
Restrict Meters to only register and collect instruments it created (#…
Browse files Browse the repository at this point in the history
…4333)

* Add acceptance test

* Update Meter Register and collect only inst from itself

* Add change to changelog

* Fix spelling error

* Update changelog entry wording

* Simplify the partial success code path
  • Loading branch information
MrAlias committed Jul 19, 2023
1 parent f194fb0 commit f2a9f2f
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 69 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
This change is made to ensure compatibility with the OpenTelemetry specification. (#4288)
- If an attribute set is omitted from an async callback, the previous value will no longer be exported. (#4290)
- Allow the explicit bucket histogram aggregation to be used for the up-down counter, observable counter, observable up-down counter, and observable gauge in the `go.opentelemetry.io/otel/sdk/metric` package. (#4332)
- Restrict `Meter`s in `go.opentelemetry.io/otel/sdk/metric` to only register and collect instruments it created. (#4333)

### Fixed

Expand Down
20 changes: 11 additions & 9 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@ var _ metric.Float64ObservableCounter = float64Observable{}
var _ metric.Float64ObservableUpDownCounter = float64Observable{}
var _ metric.Float64ObservableGauge = float64Observable{}

func newFloat64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable {
func newFloat64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable {
return float64Observable{
observable: newObservable(scope, kind, name, desc, u, meas),
observable: newObservable(m, kind, name, desc, u, meas),
}
}

Expand All @@ -296,28 +296,30 @@ var _ metric.Int64ObservableCounter = int64Observable{}
var _ metric.Int64ObservableUpDownCounter = int64Observable{}
var _ metric.Int64ObservableGauge = int64Observable{}

func newInt64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable {
func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable {
return int64Observable{
observable: newObservable(scope, kind, name, desc, u, meas),
observable: newObservable(m, kind, name, desc, u, meas),
}
}

type observable[N int64 | float64] struct {
metric.Observable
observablID[N]

meter *meter
measures []aggregate.Measure[N]
}

func newObservable[N int64 | float64](scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] {
func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] {
return &observable[N]{
observablID: observablID[N]{
name: name,
description: desc,
kind: kind,
unit: u,
scope: scope,
scope: m.scope,
},
meter: m,
measures: meas,
}
}
Expand All @@ -335,16 +337,16 @@ var errEmptyAgg = errors.New("no aggregators for observable instrument")
// and nil if it should. An errEmptyAgg error is returned if o is effectively a
// no-op because it does not have any aggregators. Also, an error is returned
// if scope defines a Meter other than the one o was created by.
func (o *observable[N]) registerable(scope instrumentation.Scope) error {
func (o *observable[N]) registerable(m *meter) error {
if len(o.measures) == 0 {
return errEmptyAgg
}
if scope != o.scope {
if m != o.meter {
return fmt.Errorf(
"invalid registration: observable %q from Meter %q, registered with Meter %q",
o.name,
o.scope.Name,
scope.Name,
m.scope.Name,
)
}
return nil
Expand Down
106 changes: 46 additions & 60 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type meter struct {
scope instrumentation.Scope
pipes pipelines

int64IP *int64InstProvider
float64IP *float64InstProvider
int64Resolver resolver[int64]
float64Resolver resolver[float64]
}

func newMeter(s instrumentation.Scope, p pipelines) *meter {
Expand All @@ -52,10 +52,10 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter {
var viewCache cache[string, streamID]

return &meter{
scope: s,
pipes: p,
int64IP: newInt64InstProvider(s, p, &viewCache),
float64IP: newFloat64InstProvider(s, p, &viewCache),
scope: s,
pipes: p,
int64Resolver: newResolver[int64](p, &viewCache),
float64Resolver: newResolver[float64](p, &viewCache),
}
}

Expand All @@ -68,7 +68,8 @@ var _ metric.Meter = (*meter)(nil)
func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) (metric.Int64Counter, error) {
cfg := metric.NewInt64CounterConfig(options...)
const kind = InstrumentKindCounter
i, err := m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
p := int64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return i, err
}
Expand All @@ -82,7 +83,8 @@ func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption)
func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCounterOption) (metric.Int64UpDownCounter, error) {
cfg := metric.NewInt64UpDownCounterConfig(options...)
const kind = InstrumentKindUpDownCounter
i, err := m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
p := int64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return i, err
}
Expand All @@ -96,7 +98,8 @@ func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCou
func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOption) (metric.Int64Histogram, error) {
cfg := metric.NewInt64HistogramConfig(options...)
const kind = InstrumentKindHistogram
i, err := m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
p := int64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return i, err
}
Expand All @@ -111,7 +114,7 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti
func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) {
cfg := metric.NewInt64ObservableCounterConfig(options...)
const kind = InstrumentKindObservableCounter
p := int64ObservProvider{m.int64IP}
p := int64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
Expand All @@ -127,7 +130,7 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser
func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) {
cfg := metric.NewInt64ObservableUpDownCounterConfig(options...)
const kind = InstrumentKindObservableUpDownCounter
p := int64ObservProvider{m.int64IP}
p := int64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
Expand All @@ -143,7 +146,7 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6
func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) {
cfg := metric.NewInt64ObservableGaugeConfig(options...)
const kind = InstrumentKindObservableGauge
p := int64ObservProvider{m.int64IP}
p := int64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
Expand All @@ -158,7 +161,8 @@ func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64Observa
func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOption) (metric.Float64Counter, error) {
cfg := metric.NewFloat64CounterConfig(options...)
const kind = InstrumentKindCounter
i, err := m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
p := float64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return i, err
}
Expand All @@ -172,7 +176,8 @@ func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOpti
func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDownCounterOption) (metric.Float64UpDownCounter, error) {
cfg := metric.NewFloat64UpDownCounterConfig(options...)
const kind = InstrumentKindUpDownCounter
i, err := m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
p := float64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return i, err
}
Expand All @@ -186,7 +191,8 @@ func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDow
func (m *meter) Float64Histogram(name string, options ...metric.Float64HistogramOption) (metric.Float64Histogram, error) {
cfg := metric.NewFloat64HistogramConfig(options...)
const kind = InstrumentKindHistogram
i, err := m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
p := float64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return i, err
}
Expand All @@ -201,7 +207,7 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram
func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) {
cfg := metric.NewFloat64ObservableCounterConfig(options...)
const kind = InstrumentKindObservableCounter
p := float64ObservProvider{m.float64IP}
p := float64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
Expand All @@ -217,7 +223,7 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O
func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) {
cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...)
const kind = InstrumentKindObservableUpDownCounter
p := float64ObservProvider{m.float64IP}
p := float64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
Expand All @@ -233,7 +239,7 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl
func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) {
cfg := metric.NewFloat64ObservableGaugeConfig(options...)
const kind = InstrumentKindObservableGauge
p := float64ObservProvider{m.float64IP}
p := float64ObservProvider{m}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
Expand Down Expand Up @@ -301,15 +307,15 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)

switch o := inst.(type) {
case int64Observable:
if err := o.registerable(m.scope); err != nil {
if err := o.registerable(m); err != nil {
if !errors.Is(err, errEmptyAgg) {
errs.append(err)
}
continue
}
reg.registerInt64(o.observablID)
case float64Observable:
if err := o.registerable(m.scope); err != nil {
if err := o.registerable(m); err != nil {
if !errors.Is(err, errEmptyAgg) {
errs.append(err)
}
Expand All @@ -322,19 +328,15 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
}
}

if err := errs.errorOrNil(); err != nil {
return nil, err
}

err := errs.errorOrNil()
if reg.len() == 0 {
// All insts use drop aggregation.
return noopRegister{}, nil
// All insts use drop aggregation or are invalid.
return noopRegister{}, err
}

cback := func(ctx context.Context) error {
return f(ctx, reg)
}
return m.pipes.registerMultiCallback(cback), nil
// Some or all instruments were valid.
cback := func(ctx context.Context) error { return f(ctx, reg) }
return m.pipes.registerMultiCallback(cback), err
}

type observer struct {
Expand Down Expand Up @@ -441,66 +443,50 @@ func (noopRegister) Unregister() error {
}

// int64InstProvider provides int64 OpenTelemetry instruments.
type int64InstProvider struct {
scope instrumentation.Scope
pipes pipelines
resolve resolver[int64]
}
type int64InstProvider struct{ *meter }

func newInt64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *int64InstProvider {
return &int64InstProvider{scope: s, pipes: p, resolve: newResolver[int64](p, c)}
}

func (p *int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[int64], error) {
func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[int64], error) {
inst := Instrument{
Name: name,
Description: desc,
Unit: u,
Kind: kind,
Scope: p.scope,
}
return p.resolve.Aggregators(inst)
return p.int64Resolver.Aggregators(inst)
}

// lookup returns the resolved instrumentImpl.
func (p *int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) {
func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &int64Inst{measures: aggs}, err
}

// float64InstProvider provides float64 OpenTelemetry instruments.
type float64InstProvider struct {
scope instrumentation.Scope
pipes pipelines
resolve resolver[float64]
}

func newFloat64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *float64InstProvider {
return &float64InstProvider{scope: s, pipes: p, resolve: newResolver[float64](p, c)}
}
type float64InstProvider struct{ *meter }

func (p *float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[float64], error) {
func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[float64], error) {
inst := Instrument{
Name: name,
Description: desc,
Unit: u,
Kind: kind,
Scope: p.scope,
}
return p.resolve.Aggregators(inst)
return p.float64Resolver.Aggregators(inst)
}

// lookup returns the resolved instrumentImpl.
func (p *float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) {
func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &float64Inst{measures: aggs}, err
}

type int64ObservProvider struct{ *int64InstProvider }
type int64ObservProvider struct{ *meter }

func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) {
aggs, err := p.aggs(kind, name, desc, u)
return newInt64Observable(p.scope, kind, name, desc, u, aggs), err
aggs, err := (int64InstProvider)(p).aggs(kind, name, desc, u)
return newInt64Observable(p.meter, kind, name, desc, u, aggs), err
}

func (p int64ObservProvider) registerCallbacks(inst int64Observable, cBacks []metric.Int64Callback) {
Expand Down Expand Up @@ -529,11 +515,11 @@ func (o int64Observer) Observe(val int64, opts ...metric.ObserveOption) {
o.observe(val, c.Attributes())
}

type float64ObservProvider struct{ *float64InstProvider }
type float64ObservProvider struct{ *meter }

func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (float64Observable, error) {
aggs, err := p.aggs(kind, name, desc, u)
return newFloat64Observable(p.scope, kind, name, desc, u, aggs), err
aggs, err := (float64InstProvider)(p).aggs(kind, name, desc, u)
return newFloat64Observable(p.meter, kind, name, desc, u, aggs), err
}

func (p float64ObservProvider) registerCallbacks(inst float64Observable, cBacks []metric.Float64Callback) {
Expand Down
Loading

0 comments on commit f2a9f2f

Please sign in to comment.