Skip to content

Commit

Permalink
Simplify solution
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Feb 14, 2023
1 parent 060a20c commit 71a7bfb
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 222 deletions.
37 changes: 18 additions & 19 deletions sdk/metric/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,57 +55,56 @@ func (c *cache[K, V]) Lookup(key K, f func() V) V {
return val
}

// streamCache is a cache of stream identifiers and the Aggregators they use.
// It is scoped at the Meter level along with a number type. Meaning all
// streams it contains need to belong to the same instrumentation.Scope
// (implicitly) and number type (explicitly).
type streamCache[N int64 | float64] struct {
// instrumentCache is a cache of instruments. It is scoped at the Meter level
// along with a number type. Meaning all instruments it contains need to belong
// to the same instrumentation.Scope (implicitly) and number type (explicitly).
type instrumentCache[N int64 | float64] struct {
// aggregators is used to ensure duplicate creations of the same instrument
// return the same instance of that instrument's aggregator.
aggregators *cache[streamID, aggVal[N]]
aggregators *cache[instrumentID, aggVal[N]]
// views is used to ensure if instruments with the same name are created,
// but do not have the same identifying properties, a warning is logged.
views *cache[string, streamID]
views *cache[string, instrumentID]
}

// newStreamCache returns a new streamCache that uses ac as the underlying
// cache for aggregators and vc as the cache for views. If ac or vc are nil, a
// new empty cache will be used.
func newStreamCache[N int64 | float64](ac *cache[streamID, aggVal[N]], vc *cache[string, streamID]) streamCache[N] {
// newInstrumentCache returns a new instrumentCache that uses ac as the
// underlying cache for aggregators and vc as the cache for views. If ac or vc
// are nil, a new empty cache will be used.
func newInstrumentCache[N int64 | float64](ac *cache[instrumentID, aggVal[N]], vc *cache[string, instrumentID]) instrumentCache[N] {
if ac == nil {
ac = &cache[streamID, aggVal[N]]{}
ac = &cache[instrumentID, aggVal[N]]{}
}
if vc == nil {
vc = &cache[string, streamID]{}
vc = &cache[string, instrumentID]{}
}
return streamCache[N]{aggregators: ac, views: vc}
return instrumentCache[N]{aggregators: ac, views: vc}
}

// LookupAggregator returns the Aggregator and error for a cached instrument if
// it exist in the cache. Otherwise, f is called and its returned value is set
// in the cache and returned.
//
// LookupAggregator is safe to call concurrently.
func (c streamCache[N]) LookupAggregator(id streamID, f func() (internal.Aggregator[N], error)) (agg internal.Aggregator[N], err error) {
func (c instrumentCache[N]) LookupAggregator(id instrumentID, f func() (internal.Aggregator[N], error)) (agg internal.Aggregator[N], err error) {
v := c.aggregators.Lookup(id, func() aggVal[N] {
a, err := f()
return aggVal[N]{Aggregator: a, Err: err}
})
return v.Aggregator, v.Err
}

// aggVal is the cached value of a streamCache's aggregators cache.
// aggVal is the cached value of an instrumentCache's aggregators cache.
type aggVal[N int64 | float64] struct {
Aggregator internal.Aggregator[N]
Err error
}

// Unique returns if id is unique or a duplicate instrument. If an instrument
// with the same name has already been created, that streamID will be
// with the same name has already been created, that instrumentID will be
// returned along with false. Otherwise, id is returned with true.
//
// Unique is safe to call concurrently.
func (c streamCache[N]) Unique(id streamID) (streamID, bool) {
got := c.views.Lookup(id.Name, func() streamID { return id })
func (c instrumentCache[N]) Unique(id instrumentID) (instrumentID, bool) {
got := c.views.Lookup(id.Name, func() instrumentID { return id })
return got, id == got
}
45 changes: 11 additions & 34 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,29 +136,6 @@ func (i Instrument) matchesScope(other Instrument) bool {
(i.Scope.SchemaURL == "" || i.Scope.SchemaURL == other.Scope.SchemaURL)
}

// instID are the identifying properties of a metric instrument.
type instID struct {
// Name is the name of the instrument.
Name string
// Description is the description of the instrument.
Description string
// Unit is the unit of the instrument.
Unit unit.Unit
// Kind is the kind of the instrument.
Kind InstrumentKind
}

// Instrument returns the fully scoped instrument i identifies.
func (i instID) Instrument(s instrumentation.Scope) Instrument {
return Instrument{
Name: i.Name,
Description: i.Description,
Unit: i.Unit,
Kind: i.Kind,
Scope: s,
}
}

// Stream describes the stream of data an instrument produces.
type Stream struct {
// Name is the human-readable identifier of the stream.
Expand All @@ -173,24 +150,24 @@ type Stream struct {
AttributeFilter attribute.Filter
}

// streamID are the identifying properties of a metric data stream.
type streamID struct {
// Name is the name of the data stream.
// instrumentID are the identifying properties of an instrument.
type instrumentID struct {
// Name is the name of the instrument.
Name string
// Description is the description of the data stream.
// Description is the description of the instrument.
Description string
// Unit is the unit of the data stream.
// Unit is the unit of the instrument.
Unit unit.Unit
// Aggregation is the aggregation data type of the data stream.
// Aggregation is the aggregation data type of the instrument.
Aggregation string
// Monotonic is the monotonicity of an data stream type. This field is not
// used for all types, so a zero value needs to be understood in the
// Monotonic is the monotonicity of an instruments data type. This field is
// not used for all data types, so a zero value needs to be understood in the
// context of Aggregation.
Monotonic bool
// Temporality is the temporality of an data stream's type. This field is
// not used by some types.
// Temporality is the temporality of an instrument's data type. This field
// is not used by some data types.
Temporality metricdata.Temporality
// Number is the number type of the data stream.
// Number is the number type of the instrument.
Number string
}

Expand Down
105 changes: 60 additions & 45 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/internal"
)

// meter handles the creation and coordination of all metric instruments. A
Expand All @@ -35,25 +36,25 @@ type meter struct {
scope instrumentation.Scope
pipes pipelines

int64Resolver resolver[int64]
float64Resolver resolver[float64]
int64IP *instProvider[int64]
float64IP *instProvider[float64]
}

func newMeter(s instrumentation.Scope, p pipelines) *meter {
// viewCache ensures instrument conflicts, including number conflicts, this
// meter is asked to create are logged to the user.
var viewCache cache[string, streamID]
var viewCache cache[string, instrumentID]

// Passing nil as the ac parameter to newInstrumentCache will have each
// create its own aggregator cache.
ic := newStreamCache[int64](nil, &viewCache)
fc := newStreamCache[float64](nil, &viewCache)
ic := newInstrumentCache[int64](nil, &viewCache)
fc := newInstrumentCache[float64](nil, &viewCache)

return &meter{
scope: s,
pipes: p,
int64Resolver: newResolver(s, p, ic),
float64Resolver: newResolver(s, p, fc),
scope: s,
pipes: p,
int64IP: newInstProvider(s, p, ic),
float64IP: newInstProvider(s, p, fc),
}
}

Expand All @@ -64,40 +65,36 @@ var _ metric.Meter = (*meter)(nil)
// options. The instrument is used to synchronously record increasing int64
// measurements during a computational operation.
func (m *meter) Int64Counter(name string, options ...instrument.Int64Option) (instrument.Int64Counter, error) {
global.Debug("Int64Counter Create", "name", name)
cfg := instrument.NewInt64Config(options...)
id := instID{name, cfg.Description(), cfg.Unit(), InstrumentKindCounter}
return m.int64Resolver.Instrument(id)
const kind = InstrumentKindCounter
return m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
}

// Int64UpDownCounter returns a new instrument identified by name and
// configured with options. The instrument is used to synchronously record
// int64 measurements during a computational operation.
func (m *meter) Int64UpDownCounter(name string, options ...instrument.Int64Option) (instrument.Int64UpDownCounter, error) {
global.Debug("Int64UpDownCounter Create", "name", name)
cfg := instrument.NewInt64Config(options...)
id := instID{name, cfg.Description(), cfg.Unit(), InstrumentKindUpDownCounter}
return m.int64Resolver.Instrument(id)
const kind = InstrumentKindUpDownCounter
return m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
}

// Int64Histogram returns a new instrument identified by name and configured
// with options. The instrument is used to synchronously record the
// distribution of int64 measurements during a computational operation.
func (m *meter) Int64Histogram(name string, options ...instrument.Int64Option) (instrument.Int64Histogram, error) {
global.Debug("Int64Histogram Create", "name", name)
cfg := instrument.NewInt64Config(options...)
id := instID{name, cfg.Description(), cfg.Unit(), InstrumentKindHistogram}
return m.int64Resolver.Instrument(id)
const kind = InstrumentKindHistogram
return m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
}

// Int64ObservableCounter returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// increasing int64 measurements once per a measurement collection cycle.
func (m *meter) Int64ObservableCounter(name string, options ...instrument.Int64ObserverOption) (instrument.Int64ObservableCounter, error) {
global.Debug("Int64ObservableCounter Create", "name", name)
cfg := instrument.NewInt64ObserverConfig(options...)
const kind = InstrumentKindObservableCounter
p := int64ObservProvider{m.int64Resolver}
p := int64ObservProvider{m.int64IP}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
Expand All @@ -110,10 +107,9 @@ func (m *meter) Int64ObservableCounter(name string, options ...instrument.Int64O
// configured with options. The instrument is used to asynchronously record
// int64 measurements once per a measurement collection cycle.
func (m *meter) Int64ObservableUpDownCounter(name string, options ...instrument.Int64ObserverOption) (instrument.Int64ObservableUpDownCounter, error) {
global.Debug("Int64ObservableUpDownCounter Create", "name", name)
cfg := instrument.NewInt64ObserverConfig(options...)
const kind = InstrumentKindObservableUpDownCounter
p := int64ObservProvider{m.int64Resolver}
p := int64ObservProvider{m.int64IP}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
Expand All @@ -126,10 +122,9 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...instrument.
// configured with options. The instrument is used to asynchronously record
// instantaneous int64 measurements once per a measurement collection cycle.
func (m *meter) Int64ObservableGauge(name string, options ...instrument.Int64ObserverOption) (instrument.Int64ObservableGauge, error) {
global.Debug("Int64ObservableGauge Create", "name", name)
cfg := instrument.NewInt64ObserverConfig(options...)
const kind = InstrumentKindObservableGauge
p := int64ObservProvider{m.int64Resolver}
p := int64ObservProvider{m.int64IP}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
Expand All @@ -142,40 +137,36 @@ func (m *meter) Int64ObservableGauge(name string, options ...instrument.Int64Obs
// with options. The instrument is used to synchronously record increasing
// float64 measurements during a computational operation.
func (m *meter) Float64Counter(name string, options ...instrument.Float64Option) (instrument.Float64Counter, error) {
global.Debug("Float64Counter Create", "name", name)
cfg := instrument.NewFloat64Config(options...)
id := instID{name, cfg.Description(), cfg.Unit(), InstrumentKindCounter}
return m.float64Resolver.Instrument(id)
const kind = InstrumentKindCounter
return m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
}

// Float64UpDownCounter returns a new instrument identified by name and
// configured with options. The instrument is used to synchronously record
// float64 measurements during a computational operation.
func (m *meter) Float64UpDownCounter(name string, options ...instrument.Float64Option) (instrument.Float64UpDownCounter, error) {
global.Debug("Float64UpDownCounter Create", "name", name)
cfg := instrument.NewFloat64Config(options...)
id := instID{name, cfg.Description(), cfg.Unit(), InstrumentKindUpDownCounter}
return m.float64Resolver.Instrument(id)
const kind = InstrumentKindUpDownCounter
return m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
}

// Float64Histogram returns a new instrument identified by name and configured
// with options. The instrument is used to synchronously record the
// distribution of float64 measurements during a computational operation.
func (m *meter) Float64Histogram(name string, options ...instrument.Float64Option) (instrument.Float64Histogram, error) {
global.Debug("Float64Histogram Create", "name", name)
cfg := instrument.NewFloat64Config(options...)
id := instID{name, cfg.Description(), cfg.Unit(), InstrumentKindHistogram}
return m.float64Resolver.Instrument(id)
const kind = InstrumentKindHistogram
return m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
}

// Float64ObservableCounter returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// increasing float64 measurements once per a measurement collection cycle.
func (m *meter) Float64ObservableCounter(name string, options ...instrument.Float64ObserverOption) (instrument.Float64ObservableCounter, error) {
global.Debug("Float64ObservableCounter Create", "name", name)
cfg := instrument.NewFloat64ObserverConfig(options...)
const kind = InstrumentKindObservableCounter
p := float64ObservProvider{m.float64Resolver}
p := float64ObservProvider{m.float64IP}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
Expand All @@ -188,10 +179,9 @@ func (m *meter) Float64ObservableCounter(name string, options ...instrument.Floa
// and configured with options. The instrument is used to asynchronously record
// float64 measurements once per a measurement collection cycle.
func (m *meter) Float64ObservableUpDownCounter(name string, options ...instrument.Float64ObserverOption) (instrument.Float64ObservableUpDownCounter, error) {
global.Debug("Float64ObservableUpDownCounter Create", "name", name)
cfg := instrument.NewFloat64ObserverConfig(options...)
const kind = InstrumentKindObservableUpDownCounter
p := float64ObservProvider{m.float64Resolver}
p := float64ObservProvider{m.float64IP}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
Expand All @@ -204,10 +194,9 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...instrumen
// configured with options. The instrument is used to asynchronously record
// instantaneous float64 measurements once per a measurement collection cycle.
func (m *meter) Float64ObservableGauge(name string, options ...instrument.Float64ObserverOption) (instrument.Float64ObservableGauge, error) {
global.Debug("Float64ObservableGauge Create", "name", name)
cfg := instrument.NewFloat64ObserverConfig(options...)
const kind = InstrumentKindObservableGauge
p := float64ObservProvider{m.float64Resolver}
p := float64ObservProvider{m.float64IP}
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
if err != nil {
return nil, err
Expand Down Expand Up @@ -379,11 +368,38 @@ func (noopRegister) Unregister() error {
return nil
}

type int64ObservProvider struct{ resolver[int64] }
// instProvider provides all OpenTelemetry instruments.
type instProvider[N int64 | float64] struct {
scope instrumentation.Scope
pipes pipelines
resolve resolver[N]
}

func newInstProvider[N int64 | float64](s instrumentation.Scope, p pipelines, c instrumentCache[N]) *instProvider[N] {
return &instProvider[N]{scope: s, pipes: p, resolve: newResolver(p, c)}
}

func (p *instProvider[N]) aggs(kind InstrumentKind, name, desc string, u unit.Unit) ([]internal.Aggregator[N], error) {
inst := Instrument{
Name: name,
Description: desc,
Unit: u,
Kind: kind,
Scope: p.scope,
}
return p.resolve.Aggregators(inst)
}

// lookup returns the resolved instrumentImpl.
func (p *instProvider[N]) lookup(kind InstrumentKind, name, desc string, u unit.Unit) (*instrumentImpl[N], error) {
aggs, err := p.aggs(kind, name, desc, u)
return &instrumentImpl[N]{aggregators: aggs}, err
}

type int64ObservProvider struct{ *instProvider[int64] }

func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc string, u unit.Unit) (int64Observable, error) {
id := instID{name, desc, u, kind}
aggs, err := p.Aggregators(id)
aggs, err := p.aggs(kind, name, desc, u)
return newInt64Observable(p.scope, kind, name, desc, u, aggs), err
}

Expand Down Expand Up @@ -411,11 +427,10 @@ func (o int64Observer) Observe(val int64, attrs ...attribute.KeyValue) {
o.observe(val, attrs)
}

type float64ObservProvider struct{ resolver[float64] }
type float64ObservProvider struct{ *instProvider[float64] }

func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc string, u unit.Unit) (float64Observable, error) {
id := instID{name, desc, u, kind}
aggs, err := p.Aggregators(id)
aggs, err := p.aggs(kind, name, desc, u)
return newFloat64Observable(p.scope, kind, name, desc, u, aggs), err
}

Expand Down
7 changes: 0 additions & 7 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,6 @@ type periodicReader struct {
shutdownOnce sync.Once
}

func (r *periodicReader) MarshalLog() interface{} {
if s, ok := r.exporter.(fmt.Stringer); ok {
return fmt.Sprintf("Periodic(%s)", s.String())
}
return fmt.Sprintf("Periodic(%#v)", r.exporter)
}

// Compile time check the periodicReader implements Reader and is comparable.
var _ = map[Reader]struct{}{&periodicReader{}: {}}

Expand Down
Loading

0 comments on commit 71a7bfb

Please sign in to comment.