From 420bd8062760f58bc67ba0a54d4b3de1dba77b61 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 15 Feb 2023 09:42:57 -0800 Subject: [PATCH] Merge instrumentCache into inserter --- sdk/metric/cache.go | 56 ------------------------------------------ sdk/metric/pipeline.go | 56 ++++++++++++++++++++++++++---------------- 2 files changed, 35 insertions(+), 77 deletions(-) diff --git a/sdk/metric/cache.go b/sdk/metric/cache.go index b75e7ea54020..110e49005771 100644 --- a/sdk/metric/cache.go +++ b/sdk/metric/cache.go @@ -16,8 +16,6 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "sync" - - "go.opentelemetry.io/otel/sdk/metric/internal" ) // cache is a locking storage used to quickly return already computed values. @@ -54,57 +52,3 @@ func (c *cache[K, V]) Lookup(key K, f func() V) V { c.data[key] = val return val } - -// instrumentCache is a cache of instruments. It is scoped at the Meter level -// along with a number type. Meaning all instruments it contains need to belong -// to the same instrumentation.Scope (implicitly) and number type (explicitly). -type instrumentCache[N int64 | float64] struct { - // aggregators is used to ensure duplicate creations of the same instrument - // return the same instance of that instrument's aggregator. - aggregators *cache[instrumentID, aggVal[N]] - // views is used to ensure if instruments with the same name are created, - // but do not have the same identifying properties, a warning is logged. - views *cache[string, instrumentID] -} - -// newInstrumentCache returns a new instrumentCache that uses ac as the -// underlying cache for aggregators and vc as the cache for views. If ac or vc -// are nil, a new empty cache will be used. -func newInstrumentCache[N int64 | float64](ac *cache[instrumentID, aggVal[N]], vc *cache[string, instrumentID]) instrumentCache[N] { - if ac == nil { - ac = &cache[instrumentID, aggVal[N]]{} - } - if vc == nil { - vc = &cache[string, instrumentID]{} - } - return instrumentCache[N]{aggregators: ac, views: vc} -} - -// LookupAggregator returns the Aggregator and error for a cached instrument if -// it exist in the cache. Otherwise, f is called and its returned value is set -// in the cache and returned. -// -// LookupAggregator is safe to call concurrently. -func (c instrumentCache[N]) LookupAggregator(id instrumentID, f func() (internal.Aggregator[N], error)) (agg internal.Aggregator[N], err error) { - v := c.aggregators.Lookup(id, func() aggVal[N] { - a, err := f() - return aggVal[N]{Aggregator: a, Err: err} - }) - return v.Aggregator, v.Err -} - -// aggVal is the cached value of an instrumentCache's aggregators cache. -type aggVal[N int64 | float64] struct { - Aggregator internal.Aggregator[N] - Err error -} - -// Unique returns if id is unique or a duplicate instrument. If an instrument -// with the same name has already been created, that instrumentID will be -// returned along with false. Otherwise, id is returned with true. -// -// Unique is safe to call concurrently. -func (c instrumentCache[N]) Unique(id instrumentID) (instrumentID, bool) { - got := c.views.Lookup(id.Name, func() instrumentID { return id }) - return got, id == got -} diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 9023a33d9a38..dda082f201fb 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -179,25 +179,32 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err // inserter facilitates inserting of new instruments from a single scope into a // pipeline. type inserter[N int64 | float64] struct { - // cache holds the aggregators this inserter has created and the views the - // parent meter resolved. - // - // The aggregators part of the cache ensures that if this inserter has - // already created an aggregator and registered it with the underlying - // pipeline (and likewise its underlying reader), that aggregator is - // returned if a user asks the meter for an equivalent stream (directly, or - // via a view transform). - // - // The views part ensures that any instrument conflicts (i.e. same name but - // different description, unit, ...) are logged. This is done from the - // scope of the Meter by virtue of vc coming from the meter. - cache instrumentCache[N] + // aggregators is a cache that holds Aggregators inserted into the + // underlying reader pipeline. This cache ensures no duplicate Aggregators + // are inserted into the reader pipeline and if a new request during an + // instrument creation asks for the same Aggregator the same instance is + // returned. + aggregators *cache[instrumentID, aggCV[N]] + + // views is a cache that holds instrument identifiers for all the + // instruments a Meter has created, it is provided from the Meter that owns + // this inserter. This cache ensures during the creation of instruments + // with the same name but different options (e.g. description, unit) a + // warning message is logged. + views *cache[string, instrumentID] + pipeline *pipeline } func newInserter[N int64 | float64](p *pipeline, vc *cache[string, instrumentID]) *inserter[N] { - c := newInstrumentCache[N](nil, vc) - return &inserter[N]{cache: c, pipeline: p} + if vc == nil { + vc = &cache[string, instrumentID]{} + } + return &inserter[N]{ + aggregators: &cache[instrumentID, aggCV[N]]{}, + views: vc, + pipeline: p, + } } // Instrument inserts the instrument inst with instUnit into a pipeline. All @@ -274,6 +281,12 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]internal.Aggregator[N], err return aggs, errs.errorOrNil() } +// aggCV is the cached value in an aggregators cache. +type aggCV[N int64 | float64] struct { + Aggregator internal.Aggregator[N] + Err error +} + // cachedAggregator returns the appropriate Aggregator for an instrument // configuration. If the exact instrument has been created within the // inst.Scope, that Aggregator instance will be returned. Otherwise, a new @@ -305,13 +318,13 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum // If there is a conflict, the specification says the view should // still be applied and a warning should be logged. i.logConflict(id) - return i.cache.LookupAggregator(id, func() (internal.Aggregator[N], error) { + cv := i.aggregators.Lookup(id, func() aggCV[N] { agg, err := i.aggregator(stream.Aggregation, kind, id.Temporality, id.Monotonic) if err != nil { - return nil, err + return aggCV[N]{nil, err} } if agg == nil { // Drop aggregator. - return nil, nil + return aggCV[N]{nil, nil} } if stream.AttributeFilter != nil { agg = internal.NewFilter(agg, stream.AttributeFilter) @@ -323,15 +336,16 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum unit: stream.Unit, aggregator: agg, }) - return agg, err + return aggCV[N]{agg, err} }) + return cv.Aggregator, cv.Err } // logConflict validates if an instrument with the same name as id has already // been created. If that instrument conflicts with id, a warning is logged. func (i *inserter[N]) logConflict(id instrumentID) { - existing, unique := i.cache.Unique(id) - if unique { + existing := i.views.Lookup(id.Name, func() instrumentID { return id }) + if id == existing { return }