Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use inst ID for agg cache key #4337

Merged
merged 5 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Correctly format log messages from the `go.opentelemetry.io/otel/exporters/zipkin` exporter. (#4143)
- Log an error for calls to `NewView` in `go.opentelemetry.io/otel/sdk/metric` that have empty criteria. (#4307)
- Fix `resource.WithHostID()` to not set an empty `host.id`. (#4317)
- Use the instrument identifying fields to cache aggregators and determine duplicate instrument registrations in `go.opentelemetry.io/otel/sdk/metric`. (#4337)

## [1.16.0/0.39.0] 2023-05-18

Expand Down
18 changes: 6 additions & 12 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate stringer -type=InstrumentKind -trimprefix=InstrumentKind

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
Expand All @@ -25,7 +27,6 @@ import (
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

var (
Expand Down Expand Up @@ -172,23 +173,16 @@ func (s Stream) attributeFilter() attribute.Filter {
}
}

// streamID are the identifying properties of a stream.
type streamID struct {
// instID are the identifying properties of a instrument.
type instID struct {
// Name is the name of the stream.
Name string
// Description is the description of the stream.
Description string
// Kind defines the functional group of the instrument.
Kind InstrumentKind
// Unit is the unit of the stream.
Unit string
// Aggregation is the aggregation data type of the stream.
Aggregation string
// Monotonic is the monotonicity of an instruments data type. This field is
// not used for all data types, so a zero value needs to be understood in the
// context of Aggregation.
Monotonic bool
// Temporality is the temporality of a stream's data type. This field is
// not used by some data types.
Temporality metricdata.Temporality
// Number is the number type of the stream.
Number string
}
Expand Down
29 changes: 29 additions & 0 deletions sdk/metric/instrumentkind_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type meter struct {
func newMeter(s instrumentation.Scope, p pipelines) *meter {
// viewCache ensures instrument conflicts, including number conflicts, this
// meter is asked to create are logged to the user.
var viewCache cache[string, streamID]
var viewCache cache[string, instID]

return &meter{
scope: s,
Expand Down
40 changes: 16 additions & 24 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,24 +187,24 @@
// cache ensures no duplicate aggregate functions are inserted into the
// reader pipeline and if a new request during an instrument creation asks
// for the same aggregate function input the same instance is returned.
aggregators *cache[streamID, aggVal[N]]
aggregators *cache[instID, aggVal[N]]

// views is a cache that holds instrument identifiers for all the
// instruments a Meter has created, it is provided from the Meter that owns
// this inserter. This cache ensures during the creation of instruments
// with the same name but different options (e.g. description, unit) a
// warning message is logged.
views *cache[string, streamID]
views *cache[string, instID]

pipeline *pipeline
}

func newInserter[N int64 | float64](p *pipeline, vc *cache[string, streamID]) *inserter[N] {
func newInserter[N int64 | float64](p *pipeline, vc *cache[string, instID]) *inserter[N] {
if vc == nil {
vc = &cache[string, streamID]{}
vc = &cache[string, instID]{}

Check warning on line 204 in sdk/metric/pipeline.go

View check run for this annotation

Codecov / codecov/patch

sdk/metric/pipeline.go#L204

Added line #L204 was not covered by tests
}
return &inserter[N]{
aggregators: &cache[streamID, aggVal[N]]{},
aggregators: &cache[instID, aggVal[N]]{},
views: vc,
pipeline: p,
}
Expand Down Expand Up @@ -320,12 +320,14 @@
)
}

id := i.streamID(kind, stream)
id := i.instID(kind, stream)
// If there is a conflict, the specification says the view should
// still be applied and a warning should be logged.
i.logConflict(id)
cv := i.aggregators.Lookup(id, func() aggVal[N] {
b := aggregate.Builder[N]{Temporality: id.Temporality}
b := aggregate.Builder[N]{
Temporality: i.pipeline.reader.temporality(kind),
}
if len(stream.AllowAttributeKeys) > 0 {
b.Filter = stream.attributeFilter()
}
Expand All @@ -350,8 +352,8 @@

// logConflict validates if an instrument with the same name as id has already
// been created. If that instrument conflicts with id, a warning is logged.
func (i *inserter[N]) logConflict(id streamID) {
existing := i.views.Lookup(id.Name, func() streamID { return id })
func (i *inserter[N]) logConflict(id instID) {
existing := i.views.Lookup(id.Name, func() instID { return id })
if id == existing {
return
}
Expand All @@ -360,31 +362,21 @@
"duplicate metric stream definitions",
"names", fmt.Sprintf("%q, %q", existing.Name, id.Name),
"descriptions", fmt.Sprintf("%q, %q", existing.Description, id.Description),
"kinds", fmt.Sprintf("%s, %s", existing.Kind, id.Kind),
"units", fmt.Sprintf("%s, %s", existing.Unit, id.Unit),
"numbers", fmt.Sprintf("%s, %s", existing.Number, id.Number),
"aggregations", fmt.Sprintf("%s, %s", existing.Aggregation, id.Aggregation),
"monotonics", fmt.Sprintf("%t, %t", existing.Monotonic, id.Monotonic),
"temporalities", fmt.Sprintf("%s, %s", existing.Temporality.String(), id.Temporality.String()),
)
}

func (i *inserter[N]) streamID(kind InstrumentKind, stream Stream) streamID {
func (i *inserter[N]) instID(kind InstrumentKind, stream Stream) instID {
var zero N
id := streamID{
return instID{
Name: stream.Name,
Description: stream.Description,
Unit: stream.Unit,
Aggregation: fmt.Sprintf("%T", stream.Aggregation),
Temporality: i.pipeline.reader.temporality(kind),
Kind: kind,
Number: fmt.Sprintf("%T", zero),
}

switch kind {
case InstrumentKindObservableCounter, InstrumentKindCounter, InstrumentKindHistogram:
id.Monotonic = true
}

return id
}

// aggregateFunc returns new aggregate functions matching agg, kind, and
Expand Down Expand Up @@ -526,7 +518,7 @@
inserters []*inserter[N]
}

func newResolver[N int64 | float64](p pipelines, vc *cache[string, streamID]) resolver[N] {
func newResolver[N int64 | float64](p pipelines, vc *cache[string, instID]) resolver[N] {
in := make([]*inserter[N], len(p))
for i := range in {
in[i] = newInserter[N](p[i], vc)
Expand Down
14 changes: 7 additions & 7 deletions sdk/metric/pipeline_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
}
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
var c cache[string, streamID]
var c cache[string, instID]
p := newPipeline(nil, tt.reader, tt.views)
i := newInserter[N](p, &c)
input, err := i.Instrument(tt.inst)
Expand All @@ -371,7 +371,7 @@ func TestCreateAggregators(t *testing.T) {
}

func testInvalidInstrumentShouldPanic[N int64 | float64]() {
var c cache[string, streamID]
var c cache[string, instID]
i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}), &c)
inst := Instrument{
Name: "foo",
Expand All @@ -391,7 +391,7 @@ func TestPipelinesAggregatorForEachReader(t *testing.T) {
require.Len(t, pipes, 2, "created pipelines")

inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
var c cache[string, streamID]
var c cache[string, instID]
r := newResolver[int64](pipes, &c)
aggs, err := r.Aggregators(inst)
require.NoError(t, err, "resolved Aggregators error")
Expand Down Expand Up @@ -468,7 +468,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {

func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) {
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
var c cache[string, streamID]
var c cache[string, instID]
r := newResolver[int64](p, &c)
aggs, err := r.Aggregators(inst)
assert.NoError(t, err)
Expand All @@ -478,7 +478,7 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo

func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) {
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
var c cache[string, streamID]
var c cache[string, instID]
r := newResolver[float64](p, &c)
aggs, err := r.Aggregators(inst)
assert.NoError(t, err)
Expand All @@ -505,7 +505,7 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
p := newPipelines(resource.Empty(), readers, views)
inst := Instrument{Name: "foo", Kind: InstrumentKindObservableGauge}

var vc cache[string, streamID]
var vc cache[string, instID]
ri := newResolver[int64](p, &vc)
intAggs, err := ri.Aggregators(inst)
assert.Error(t, err)
Expand Down Expand Up @@ -556,7 +556,7 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) {

p := newPipelines(resource.Empty(), readers, views)

var vc cache[string, streamID]
var vc cache[string, instID]
ri := newResolver[int64](p, &vc)
intAggs, err := ri.Aggregators(fooInst)
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var c cache[string, streamID]
var c cache[string, instID]
i := newInserter[N](test.pipe, &c)
got, err := i.Instrument(inst)
require.NoError(t, err)
Expand Down