Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge instrument cache to inserter #3724

Merged
merged 8 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Fixed

- Ensure `go.opentelemetry.io/otel` does not use generics. (#3723, #3725)
- Multi-reader `MeterProvider`s now export metrics for all readers, instead of just the first reader. (#3720, #3724)
- Remove use of deprecated `"math/rand".Seed` in `go.opentelemetry.io/otel/example/prometheus`. (#3733)

## [1.13.0/0.36.0] 2023-02-07
Expand Down
56 changes: 0 additions & 56 deletions sdk/metric/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"sync"

"go.opentelemetry.io/otel/sdk/metric/internal"
)

// cache is a locking storage used to quickly return already computed values.
Expand Down Expand Up @@ -54,57 +52,3 @@ func (c *cache[K, V]) Lookup(key K, f func() V) V {
c.data[key] = val
return val
}

// instrumentCache is a cache of instruments. It is scoped at the Meter level
// along with a number type. Meaning all instruments it contains need to belong
// to the same instrumentation.Scope (implicitly) and number type (explicitly).
type instrumentCache[N int64 | float64] struct {
// aggregators is used to ensure duplicate creations of the same instrument
// return the same instance of that instrument's aggregator.
aggregators *cache[streamID, aggVal[N]]
// views is used to ensure if instruments with the same name are created,
// but do not have the same identifying properties, a warning is logged.
views *cache[string, streamID]
}

// newInstrumentCache returns a new instrumentCache that uses ac as the
// underlying cache for aggregators and vc as the cache for views. If ac or vc
// are nil, a new empty cache will be used.
func newInstrumentCache[N int64 | float64](ac *cache[streamID, aggVal[N]], vc *cache[string, streamID]) instrumentCache[N] {
if ac == nil {
ac = &cache[streamID, aggVal[N]]{}
}
if vc == nil {
vc = &cache[string, streamID]{}
}
return instrumentCache[N]{aggregators: ac, views: vc}
}

// LookupAggregator returns the Aggregator and error for a cached instrument if
// it exist in the cache. Otherwise, f is called and its returned value is set
// in the cache and returned.
//
// LookupAggregator is safe to call concurrently.
func (c instrumentCache[N]) LookupAggregator(id streamID, f func() (internal.Aggregator[N], error)) (agg internal.Aggregator[N], err error) {
v := c.aggregators.Lookup(id, func() aggVal[N] {
a, err := f()
return aggVal[N]{Aggregator: a, Err: err}
})
return v.Aggregator, v.Err
}

// aggVal is the cached value of an instrumentCache's aggregators cache.
type aggVal[N int64 | float64] struct {
Aggregator internal.Aggregator[N]
Err error
}

// Unique returns if id is unique or a duplicate instrument. If an instrument
// with the same name has already been created, that streamID will be returned
// along with false. Otherwise, id is returned with true.
//
// Unique is safe to call concurrently.
func (c instrumentCache[N]) Unique(id streamID) (streamID, bool) {
got := c.views.Lookup(id.Name, func() streamID { return id })
return got, id == got
}
13 changes: 4 additions & 9 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,11 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter {
// meter is asked to create are logged to the user.
var viewCache cache[string, streamID]

// Passing nil as the ac parameter to newInstrumentCache will have each
// create its own aggregator cache.
ic := newInstrumentCache[int64](nil, &viewCache)
fc := newInstrumentCache[float64](nil, &viewCache)

return &meter{
scope: s,
pipes: p,
int64IP: newInstProvider(s, p, ic),
float64IP: newInstProvider(s, p, fc),
int64IP: newInstProvider[int64](s, p, &viewCache),
float64IP: newInstProvider[float64](s, p, &viewCache),
}
}

Expand Down Expand Up @@ -375,8 +370,8 @@ type instProvider[N int64 | float64] struct {
resolve resolver[N]
}

func newInstProvider[N int64 | float64](s instrumentation.Scope, p pipelines, c instrumentCache[N]) *instProvider[N] {
return &instProvider[N]{scope: s, pipes: p, resolve: newResolver(p, c)}
func newInstProvider[N int64 | float64](s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *instProvider[N] {
return &instProvider[N]{scope: s, pipes: p, resolve: newResolver[N](p, c)}
}

func (p *instProvider[N]) aggs(kind InstrumentKind, name, desc string, u unit.Unit) ([]internal.Aggregator[N], error) {
Expand Down
49 changes: 38 additions & 11 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,32 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
// inserter facilitates inserting of new instruments from a single scope into a
// pipeline.
type inserter[N int64 | float64] struct {
cache instrumentCache[N]
// aggregators is a cache that holds Aggregators inserted into the
// underlying reader pipeline. This cache ensures no duplicate Aggregators
// are inserted into the reader pipeline and if a new request during an
// instrument creation asks for the same Aggregator the same instance is
// returned.
aggregators *cache[streamID, aggVal[N]]

// views is a cache that holds instrument identifiers for all the
// instruments a Meter has created, it is provided from the Meter that owns
// this inserter. This cache ensures during the creation of instruments
// with the same name but different options (e.g. description, unit) a
// warning message is logged.
views *cache[string, streamID]

pipeline *pipeline
}

func newInserter[N int64 | float64](p *pipeline, c instrumentCache[N]) *inserter[N] {
return &inserter[N]{cache: c, pipeline: p}
func newInserter[N int64 | float64](p *pipeline, vc *cache[string, streamID]) *inserter[N] {
if vc == nil {
vc = &cache[string, streamID]{}
}
return &inserter[N]{
aggregators: &cache[streamID, aggVal[N]]{},
views: vc,
pipeline: p,
}
}

// Instrument inserts the instrument inst with instUnit into a pipeline. All
Expand Down Expand Up @@ -261,6 +281,12 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]internal.Aggregator[N], err
return aggs, errs.errorOrNil()
}

// aggVal is the cached value in an aggregators cache.
type aggVal[N int64 | float64] struct {
Aggregator internal.Aggregator[N]
Err error
}

// cachedAggregator returns the appropriate Aggregator for an instrument
// configuration. If the exact instrument has been created within the
// inst.Scope, that Aggregator instance will be returned. Otherwise, a new
Expand Down Expand Up @@ -292,13 +318,13 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
// If there is a conflict, the specification says the view should
// still be applied and a warning should be logged.
i.logConflict(id)
return i.cache.LookupAggregator(id, func() (internal.Aggregator[N], error) {
cv := i.aggregators.Lookup(id, func() aggVal[N] {
agg, err := i.aggregator(stream.Aggregation, kind, id.Temporality, id.Monotonic)
if err != nil {
return nil, err
return aggVal[N]{nil, err}
}
if agg == nil { // Drop aggregator.
return nil, nil
return aggVal[N]{nil, nil}
}
if stream.AttributeFilter != nil {
agg = internal.NewFilter(agg, stream.AttributeFilter)
Expand All @@ -310,15 +336,16 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
unit: stream.Unit,
aggregator: agg,
})
return agg, err
return aggVal[N]{agg, err}
})
return cv.Aggregator, cv.Err
}

// logConflict validates if an instrument with the same name as id has already
// been created. If that instrument conflicts with id, a warning is logged.
func (i *inserter[N]) logConflict(id streamID) {
existing, unique := i.cache.Unique(id)
if unique {
existing := i.views.Lookup(id.Name, func() streamID { return id })
if id == existing {
return
}

Expand Down Expand Up @@ -491,10 +518,10 @@ type resolver[N int64 | float64] struct {
inserters []*inserter[N]
}

func newResolver[N int64 | float64](p pipelines, c instrumentCache[N]) resolver[N] {
func newResolver[N int64 | float64](p pipelines, vc *cache[string, streamID]) resolver[N] {
in := make([]*inserter[N], len(p))
for i := range in {
in[i] = newInserter(p[i], c)
in[i] = newInserter[N](p[i], vc)
}
return resolver[N]{in}
}
Expand Down
55 changes: 38 additions & 17 deletions sdk/metric/pipeline_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
}
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
c := newInstrumentCache[N](nil, nil)
i := newInserter(newPipeline(nil, tt.reader, tt.views), c)
var c cache[string, streamID]
i := newInserter[N](newPipeline(nil, tt.reader, tt.views), &c)
got, err := i.Instrument(tt.inst)
assert.ErrorIs(t, err, tt.wantErr)
require.Len(t, got, tt.wantLen)
Expand All @@ -227,9 +227,14 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
}
}

func TestCreateAggregators(t *testing.T) {
t.Run("Int64", testCreateAggregators[int64])
t.Run("Float64", testCreateAggregators[float64])
}

func testInvalidInstrumentShouldPanic[N int64 | float64]() {
c := newInstrumentCache[N](nil, nil)
i := newInserter(newPipeline(nil, NewManualReader(), []View{defaultView}), c)
var c cache[string, streamID]
i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}), &c)
inst := Instrument{
Name: "foo",
Kind: InstrumentKind(255),
Expand All @@ -242,9 +247,25 @@ func TestInvalidInstrumentShouldPanic(t *testing.T) {
assert.Panics(t, testInvalidInstrumentShouldPanic[float64])
}

func TestCreateAggregators(t *testing.T) {
t.Run("Int64", testCreateAggregators[int64])
t.Run("Float64", testCreateAggregators[float64])
func TestPipelinesAggregatorForEachReader(t *testing.T) {
r0, r1 := NewManualReader(), NewManualReader()
pipes := newPipelines(resource.Empty(), []Reader{r0, r1}, nil)
require.Len(t, pipes, 2, "created pipelines")

inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
var c cache[string, streamID]
r := newResolver[int64](pipes, &c)
aggs, err := r.Aggregators(inst)
require.NoError(t, err, "resolved Aggregators error")
require.Len(t, aggs, 2, "instrument aggregators")

for i, p := range pipes {
var aggN int
for _, is := range p.aggregations {
aggN += len(is)
}
assert.Equalf(t, 1, aggN, "pipeline %d: number of instrumentSync", i)
}
}

func TestPipelineRegistryCreateAggregators(t *testing.T) {
Expand Down Expand Up @@ -309,8 +330,8 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {

func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) {
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
c := newInstrumentCache[int64](nil, nil)
r := newResolver(p, c)
var c cache[string, streamID]
r := newResolver[int64](p, &c)
aggs, err := r.Aggregators(inst)
assert.NoError(t, err)

Expand All @@ -319,8 +340,8 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo

func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) {
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
c := newInstrumentCache[float64](nil, nil)
r := newResolver(p, c)
var c cache[string, streamID]
r := newResolver[float64](p, &c)
aggs, err := r.Aggregators(inst)
assert.NoError(t, err)

Expand All @@ -346,13 +367,13 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
p := newPipelines(resource.Empty(), readers, views)
inst := Instrument{Name: "foo", Kind: InstrumentKindObservableGauge}

vc := cache[string, streamID]{}
ri := newResolver(p, newInstrumentCache[int64](nil, &vc))
var vc cache[string, streamID]
ri := newResolver[int64](p, &vc)
intAggs, err := ri.Aggregators(inst)
assert.Error(t, err)
assert.Len(t, intAggs, 0)

rf := newResolver(p, newInstrumentCache[float64](nil, &vc))
rf := newResolver[float64](p, &vc)
floatAggs, err := rf.Aggregators(inst)
assert.Error(t, err)
assert.Len(t, floatAggs, 0)
Expand Down Expand Up @@ -397,8 +418,8 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) {

p := newPipelines(resource.Empty(), readers, views)

vc := cache[string, streamID]{}
ri := newResolver(p, newInstrumentCache[int64](nil, &vc))
var vc cache[string, streamID]
ri := newResolver[int64](p, &vc)
intAggs, err := ri.Aggregators(fooInst)
assert.NoError(t, err)
assert.Equal(t, 0, l.InfoN(), "no info logging should happen")
Expand All @@ -413,7 +434,7 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) {

// Creating a float foo instrument should log a warning because there is an
// int foo instrument.
rf := newResolver(p, newInstrumentCache[float64](nil, &vc))
rf := newResolver[float64](p, &vc)
floatAggs, err := rf.Aggregators(fooInst)
assert.NoError(t, err)
assert.Equal(t, 1, l.InfoN(), "instrument conflict not logged")
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c := newInstrumentCache[N](nil, nil)
i := newInserter(test.pipe, c)
var c cache[string, streamID]
i := newInserter[N](test.pipe, &c)
got, err := i.Instrument(inst)
require.NoError(t, err)
assert.Len(t, got, 1, "default view not applied")
Expand Down