Skip to content

Commit

Permalink
Merge instrumentCache into inserter
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Feb 15, 2023
1 parent 4b65721 commit 420bd80
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 77 deletions.
56 changes: 0 additions & 56 deletions sdk/metric/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"sync"

"go.opentelemetry.io/otel/sdk/metric/internal"
)

// cache is a locking storage used to quickly return already computed values.
Expand Down Expand Up @@ -54,57 +52,3 @@ func (c *cache[K, V]) Lookup(key K, f func() V) V {
c.data[key] = val
return val
}

// instrumentCache is a cache of instruments. It is scoped at the Meter level
// along with a number type. Meaning all instruments it contains need to belong
// to the same instrumentation.Scope (implicitly) and number type (explicitly).
type instrumentCache[N int64 | float64] struct {
// aggregators is used to ensure duplicate creations of the same instrument
// return the same instance of that instrument's aggregator.
aggregators *cache[instrumentID, aggVal[N]]
// views is used to ensure if instruments with the same name are created,
// but do not have the same identifying properties, a warning is logged.
views *cache[string, instrumentID]
}

// newInstrumentCache returns a new instrumentCache that uses ac as the
// underlying cache for aggregators and vc as the cache for views. If ac or vc
// are nil, a new empty cache will be used.
func newInstrumentCache[N int64 | float64](ac *cache[instrumentID, aggVal[N]], vc *cache[string, instrumentID]) instrumentCache[N] {
if ac == nil {
ac = &cache[instrumentID, aggVal[N]]{}
}
if vc == nil {
vc = &cache[string, instrumentID]{}
}
return instrumentCache[N]{aggregators: ac, views: vc}
}

// LookupAggregator returns the Aggregator and error for a cached instrument if
// it exist in the cache. Otherwise, f is called and its returned value is set
// in the cache and returned.
//
// LookupAggregator is safe to call concurrently.
func (c instrumentCache[N]) LookupAggregator(id instrumentID, f func() (internal.Aggregator[N], error)) (agg internal.Aggregator[N], err error) {
v := c.aggregators.Lookup(id, func() aggVal[N] {
a, err := f()
return aggVal[N]{Aggregator: a, Err: err}
})
return v.Aggregator, v.Err
}

// aggVal is the cached value of an instrumentCache's aggregators cache.
type aggVal[N int64 | float64] struct {
Aggregator internal.Aggregator[N]
Err error
}

// Unique returns if id is unique or a duplicate instrument. If an instrument
// with the same name has already been created, that instrumentID will be
// returned along with false. Otherwise, id is returned with true.
//
// Unique is safe to call concurrently.
func (c instrumentCache[N]) Unique(id instrumentID) (instrumentID, bool) {
got := c.views.Lookup(id.Name, func() instrumentID { return id })
return got, id == got
}
56 changes: 35 additions & 21 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,25 +179,32 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
// inserter facilitates inserting of new instruments from a single scope into a
// pipeline.
type inserter[N int64 | float64] struct {
// cache holds the aggregators this inserter has created and the views the
// parent meter resolved.
//
// The aggregators part of the cache ensures that if this inserter has
// already created an aggregator and registered it with the underlying
// pipeline (and likewise its underlying reader), that aggregator is
// returned if a user asks the meter for an equivalent stream (directly, or
// via a view transform).
//
// The views part ensures that any instrument conflicts (i.e. same name but
// different description, unit, ...) are logged. This is done from the
// scope of the Meter by virtue of vc coming from the meter.
cache instrumentCache[N]
// aggregators is a cache that holds Aggregators inserted into the
// underlying reader pipeline. This cache ensures no duplicate Aggregators
// are inserted into the reader pipeline and if a new request during an
// instrument creation asks for the same Aggregator the same instance is
// returned.
aggregators *cache[instrumentID, aggCV[N]]

// views is a cache that holds instrument identifiers for all the
// instruments a Meter has created, it is provided from the Meter that owns
// this inserter. This cache ensures during the creation of instruments
// with the same name but different options (e.g. description, unit) a
// warning message is logged.
views *cache[string, instrumentID]

pipeline *pipeline
}

func newInserter[N int64 | float64](p *pipeline, vc *cache[string, instrumentID]) *inserter[N] {
c := newInstrumentCache[N](nil, vc)
return &inserter[N]{cache: c, pipeline: p}
if vc == nil {
vc = &cache[string, instrumentID]{}
}
return &inserter[N]{
aggregators: &cache[instrumentID, aggCV[N]]{},
views: vc,
pipeline: p,
}
}

// Instrument inserts the instrument inst with instUnit into a pipeline. All
Expand Down Expand Up @@ -274,6 +281,12 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]internal.Aggregator[N], err
return aggs, errs.errorOrNil()
}

// aggCV is the cached value in an aggregators cache.
type aggCV[N int64 | float64] struct {
Aggregator internal.Aggregator[N]
Err error
}

// cachedAggregator returns the appropriate Aggregator for an instrument
// configuration. If the exact instrument has been created within the
// inst.Scope, that Aggregator instance will be returned. Otherwise, a new
Expand Down Expand Up @@ -305,13 +318,13 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
// If there is a conflict, the specification says the view should
// still be applied and a warning should be logged.
i.logConflict(id)
return i.cache.LookupAggregator(id, func() (internal.Aggregator[N], error) {
cv := i.aggregators.Lookup(id, func() aggCV[N] {
agg, err := i.aggregator(stream.Aggregation, kind, id.Temporality, id.Monotonic)
if err != nil {
return nil, err
return aggCV[N]{nil, err}
}
if agg == nil { // Drop aggregator.
return nil, nil
return aggCV[N]{nil, nil}
}
if stream.AttributeFilter != nil {
agg = internal.NewFilter(agg, stream.AttributeFilter)
Expand All @@ -323,15 +336,16 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
unit: stream.Unit,
aggregator: agg,
})
return agg, err
return aggCV[N]{agg, err}
})
return cv.Aggregator, cv.Err
}

// logConflict validates if an instrument with the same name as id has already
// been created. If that instrument conflicts with id, a warning is logged.
func (i *inserter[N]) logConflict(id instrumentID) {
existing, unique := i.cache.Unique(id)
if unique {
existing := i.views.Lookup(id.Name, func() instrumentID { return id })
if id == existing {
return
}

Expand Down

0 comments on commit 420bd80

Please sign in to comment.