Skip to content

Commit

Permalink
Merge branch 'main' into otlp-concurrent-retry
Browse files Browse the repository at this point in the history
  • Loading branch information
MadVikingGod authored Feb 21, 2023
2 parents c537386 + de94faf commit 001981d
Show file tree
Hide file tree
Showing 17 changed files with 190 additions and 141 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Changed

- [bridge/ot] Fall-back to TextMap carrier when it's not ot.HttpHeaders. (#3679)
- The `Collect` method of the `"go.opentelemetry.io/otel/sdk/metric".Reader` interface is updated to accept the `metricdata.ResourceMetrics` value the collection will be made into. This change is made to enable memory reuse by SDK users. (#3732)

### Fixed

- Ensure `go.opentelemetry.io/otel` does not use generics. (#3723, #3725)
- Multi-reader `MeterProvider`s now export metrics for all readers, instead of just the first reader. (#3720, #3724)
- Remove use of deprecated `"math/rand".Seed` in `go.opentelemetry.io/otel/example/prometheus`. (#3733)
- Do not silently drop unknown schema data with `Parse` in `go.opentelemetry.io/otel/schema/v1.1`. (#3743)
- Data race issue in OTLP exporter retry mechanism. (#3756)

## [1.13.0/0.36.0] 2023-02-07
Expand Down
4 changes: 3 additions & 1 deletion exporters/prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) {

// Collect implements prometheus.Collector.
func (c *collector) Collect(ch chan<- prometheus.Metric) {
metrics, err := c.reader.Collect(context.TODO())
// TODO (#3047): Use a sync.Pool instead of allocating metrics every Collect.
metrics := metricdata.ResourceMetrics{}
err := c.reader.Collect(context.TODO(), &metrics)
if err != nil {
otel.Handle(err)
if err == metric.ErrReaderNotRegistered {
Expand Down
1 change: 1 addition & 0 deletions schema/v1.1/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func ParseFile(schemaFilePath string) (*ast.Schema, error) {
func Parse(schemaFileContent io.Reader) (*ast.Schema, error) {
var ts ast.Schema
d := yaml.NewDecoder(schemaFileContent)
d.SetStrict(true) // Do not silently drop unknown fields.
err := d.Decode(&ts)
if err != nil {
return nil, err
Expand Down
10 changes: 8 additions & 2 deletions schema/v1.1/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,14 @@ func TestParseSchemaFile(t *testing.T) {
)
}

func TestFailParseSchemaFile(t *testing.T) {
func TestFailParseFileUnsupportedFileFormat(t *testing.T) {
ts, err := ParseFile("testdata/unsupported-file-format.yaml")
assert.Error(t, err)
assert.ErrorContains(t, err, "unsupported schema file format minor version number")
assert.Nil(t, ts)
}

func TestFailParseFileUnknownField(t *testing.T) {
ts, err := ParseFile("testdata/unknown-field.yaml")
assert.ErrorContains(t, err, "field Resources not found in type ast.VersionDef")
assert.Nil(t, ts)
}
15 changes: 15 additions & 0 deletions schema/v1.1/testdata/unknown-field.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
file_format: 1.1.0
schema_url: https://opentelemetry.io/schemas/1.1.0

versions:
1.1.0:
all: # Valid entry.
changes:
- rename_attributes:
k8s.cluster.name: kubernetes.cluster.name
Resources: # Invalid uppercase.
changes:
- rename_attributes:
attribute_map:
browser.user_agent: user_agent.original
1.0.0:
6 changes: 3 additions & 3 deletions sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func BenchmarkCounterCollectOneAttr(b *testing.B) {
for i := 0; i < b.N; i++ {
cntr.Add(ctx, 1, attribute.Int("K", 1))

_, _ = rdr.Collect(ctx)
_ = rdr.Collect(ctx, nil)
}
}

Expand All @@ -104,7 +104,7 @@ func BenchmarkCounterCollectTenAttrs(b *testing.B) {
for j := 0; j < 10; j++ {
cntr.Add(ctx, 1, attribute.Int("K", j))
}
_, _ = rdr.Collect(ctx)
_ = rdr.Collect(ctx, nil)
}
}

Expand Down Expand Up @@ -140,7 +140,7 @@ func benchCollectHistograms(count int) func(*testing.B) {
b.ResetTimer()

for n := 0; n < b.N; n++ {
collectedMetrics, _ = r.Collect(ctx)
_ = r.Collect(ctx, &collectedMetrics)
if len(collectedMetrics.ScopeMetrics[0].Metrics) != count {
b.Fatalf("got %d metrics, want %d", len(collectedMetrics.ScopeMetrics[0].Metrics), count)
}
Expand Down
56 changes: 0 additions & 56 deletions sdk/metric/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"sync"

"go.opentelemetry.io/otel/sdk/metric/internal"
)

// cache is a locking storage used to quickly return already computed values.
Expand Down Expand Up @@ -54,57 +52,3 @@ func (c *cache[K, V]) Lookup(key K, f func() V) V {
c.data[key] = val
return val
}

// instrumentCache is a cache of instruments. It is scoped at the Meter level
// along with a number type. Meaning all instruments it contains need to belong
// to the same instrumentation.Scope (implicitly) and number type (explicitly).
type instrumentCache[N int64 | float64] struct {
// aggregators is used to ensure duplicate creations of the same instrument
// return the same instance of that instrument's aggregator.
aggregators *cache[streamID, aggVal[N]]
// views is used to ensure if instruments with the same name are created,
// but do not have the same identifying properties, a warning is logged.
views *cache[string, streamID]
}

// newInstrumentCache returns a new instrumentCache that uses ac as the
// underlying cache for aggregators and vc as the cache for views. If ac or vc
// are nil, a new empty cache will be used.
func newInstrumentCache[N int64 | float64](ac *cache[streamID, aggVal[N]], vc *cache[string, streamID]) instrumentCache[N] {
if ac == nil {
ac = &cache[streamID, aggVal[N]]{}
}
if vc == nil {
vc = &cache[string, streamID]{}
}
return instrumentCache[N]{aggregators: ac, views: vc}
}

// LookupAggregator returns the Aggregator and error for a cached instrument if
// it exist in the cache. Otherwise, f is called and its returned value is set
// in the cache and returned.
//
// LookupAggregator is safe to call concurrently.
func (c instrumentCache[N]) LookupAggregator(id streamID, f func() (internal.Aggregator[N], error)) (agg internal.Aggregator[N], err error) {
v := c.aggregators.Lookup(id, func() aggVal[N] {
a, err := f()
return aggVal[N]{Aggregator: a, Err: err}
})
return v.Aggregator, v.Err
}

// aggVal is the cached value of an instrumentCache's aggregators cache.
type aggVal[N int64 | float64] struct {
Aggregator internal.Aggregator[N]
Err error
}

// Unique returns if id is unique or a duplicate instrument. If an instrument
// with the same name has already been created, that streamID will be returned
// along with false. Otherwise, id is returned with true.
//
// Unique is safe to call concurrently.
func (c instrumentCache[N]) Unique(id streamID) (streamID, bool) {
got := c.views.Lookup(id.Name, func() streamID { return id })
return got, id == got
}
6 changes: 3 additions & 3 deletions sdk/metric/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type reader struct {
externalProducers []Producer
temporalityFunc TemporalitySelector
aggregationFunc AggregationSelector
collectFunc func(context.Context) (metricdata.ResourceMetrics, error)
collectFunc func(context.Context, *metricdata.ResourceMetrics) error
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
}
Expand All @@ -48,8 +48,8 @@ func (r *reader) RegisterProducer(p Producer) { r.externalProducers = append(r.e
func (r *reader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.temporalityFunc(kind)
}
func (r *reader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
return r.collectFunc(ctx)
func (r *reader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
return r.collectFunc(ctx, rm)
}
func (r *reader) ForceFlush(ctx context.Context) error { return r.forceFlushFunc(ctx) }
func (r *reader) Shutdown(ctx context.Context) error { return r.shutdownFunc(ctx) }
Expand Down
24 changes: 16 additions & 8 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -112,11 +113,17 @@ func (mr *manualReader) Shutdown(context.Context) error {
}

// Collect gathers all metrics from the SDK and other Producers, calling any
// callbacks necessary. Collect will return an error if called after shutdown.
func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
// callbacks necessary and stores the result in rm.
//
// Collect will return an error if called after shutdown.
// Collect will return an error if rm is a nil ResourceMetrics.
func (mr *manualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
if rm == nil {
return errors.New("manual reader: *metricdata.ResourceMetrics is nil")
}
p := mr.sdkProducer.Load()
if p == nil {
return metricdata.ResourceMetrics{}, ErrReaderNotRegistered
return ErrReaderNotRegistered
}

ph, ok := p.(produceHolder)
Expand All @@ -126,12 +133,13 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics
// happen, return an error instead of panicking so a users code does
// not halt in the processes.
err := fmt.Errorf("manual reader: invalid producer: %T", p)
return metricdata.ResourceMetrics{}, err
return err
}

rm, err := ph.produce(ctx)
// TODO (#3047): When produce is updated to accept output as param, pass rm.
rmTemp, err := ph.produce(ctx)
*rm = rmTemp
if err != nil {
return metricdata.ResourceMetrics{}, err
return err
}
var errs []error
for _, producer := range mr.externalProducers.Load().([]Producer) {
Expand All @@ -141,7 +149,7 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
return rm, unifyErrors(errs)
return unifyErrors(errs)
}

// manualReaderConfig contains configuration options for a ManualReader.
Expand Down
13 changes: 4 additions & 9 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,11 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter {
// meter is asked to create are logged to the user.
var viewCache cache[string, streamID]

// Passing nil as the ac parameter to newInstrumentCache will have each
// create its own aggregator cache.
ic := newInstrumentCache[int64](nil, &viewCache)
fc := newInstrumentCache[float64](nil, &viewCache)

return &meter{
scope: s,
pipes: p,
int64IP: newInstProvider(s, p, ic),
float64IP: newInstProvider(s, p, fc),
int64IP: newInstProvider[int64](s, p, &viewCache),
float64IP: newInstProvider[float64](s, p, &viewCache),
}
}

Expand Down Expand Up @@ -375,8 +370,8 @@ type instProvider[N int64 | float64] struct {
resolve resolver[N]
}

func newInstProvider[N int64 | float64](s instrumentation.Scope, p pipelines, c instrumentCache[N]) *instProvider[N] {
return &instProvider[N]{scope: s, pipes: p, resolve: newResolver(p, c)}
func newInstProvider[N int64 | float64](s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *instProvider[N] {
return &instProvider[N]{scope: s, pipes: p, resolve: newResolver[N](p, c)}
}

func (p *instProvider[N]) aggs(kind InstrumentKind, name, desc string, u unit.Unit) ([]internal.Aggregator[N], error) {
Expand Down
24 changes: 15 additions & 9 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,8 @@ func TestMeterCreatesInstruments(t *testing.T) {

tt.fn(t, m)

rm, err := rdr.Collect(context.Background())
rm := metricdata.ResourceMetrics{}
err := rdr.Collect(context.Background(), &rm)
assert.NoError(t, err)

require.Len(t, rm.ScopeMetrics, 1)
Expand Down Expand Up @@ -566,7 +567,7 @@ func TestCallbackObserverNonRegistered(t *testing.T) {

var got metricdata.ResourceMetrics
assert.NotPanics(t, func() {
got, err = rdr.Collect(context.Background())
err = rdr.Collect(context.Background(), &got)
})

assert.NoError(t, err)
Expand Down Expand Up @@ -660,7 +661,8 @@ func TestGlobalInstRegisterCallback(t *testing.T) {
_, err = preMtr.RegisterCallback(cb, preInt64Ctr, preFloat64Ctr, postInt64Ctr, postFloat64Ctr)
assert.NoError(t, err)

got, err := rdr.Collect(context.Background())
got := metricdata.ResourceMetrics{}
err = rdr.Collect(context.Background(), &got)
assert.NoError(t, err)
assert.Lenf(t, l.messages, 0, "Warnings and errors logged:\n%s", l)
metricdatatest.AssertEqual(t, metricdata.ResourceMetrics{
Expand Down Expand Up @@ -772,7 +774,8 @@ func TestMetersProvideScope(t *testing.T) {
},
}

got, err := rdr.Collect(context.Background())
got := metricdata.ResourceMetrics{}
err = rdr.Collect(context.Background(), &got)
assert.NoError(t, err)
metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp())
}
Expand Down Expand Up @@ -816,14 +819,14 @@ func TestUnregisterUnregisters(t *testing.T) {
require.NoError(t, err)

ctx := context.Background()
_, err = r.Collect(ctx)
err = r.Collect(ctx, &metricdata.ResourceMetrics{})
require.NoError(t, err)
assert.True(t, called, "callback not called for registered callback")

called = false
require.NoError(t, reg.Unregister(), "unregister")

_, err = r.Collect(ctx)
err = r.Collect(ctx, &metricdata.ResourceMetrics{})
require.NoError(t, err)
assert.False(t, called, "callback called for unregistered callback")
}
Expand Down Expand Up @@ -869,7 +872,8 @@ func TestRegisterCallbackDropAggregations(t *testing.T) {
)
require.NoError(t, err)

data, err := r.Collect(context.Background())
data := metricdata.ResourceMetrics{}
err = r.Collect(context.Background(), &data)
require.NoError(t, err)

assert.False(t, called, "callback called for all drop instruments")
Expand Down Expand Up @@ -1238,7 +1242,8 @@ func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) {
).Meter("TestAttributeFilter")
require.NoError(t, tt.register(t, mtr))

m, err := rdr.Collect(context.Background())
m := metricdata.ResourceMetrics{}
err := rdr.Collect(context.Background(), &m)
assert.NoError(t, err)

require.Len(t, m.ScopeMetrics, 1)
Expand Down Expand Up @@ -1331,7 +1336,8 @@ func TestAsynchronousExample(t *testing.T) {

collect := func(t *testing.T) {
t.Helper()
got, err := reader.Collect(context.Background())
got := metricdata.ResourceMetrics{}
err := reader.Collect(context.Background(), &got)
require.NoError(t, err)
require.Len(t, got.ScopeMetrics, 1)
metricdatatest.AssertEqual(t, *want, got.ScopeMetrics[0], metricdatatest.IgnoreTimestamp())
Expand Down
25 changes: 17 additions & 8 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -206,21 +207,29 @@ func (r *periodicReader) aggregation(kind InstrumentKind) aggregation.Aggregatio
// collectAndExport gather all metric data related to the periodicReader r from
// the SDK and exports it with r's exporter.
func (r *periodicReader) collectAndExport(ctx context.Context) error {
m, err := r.Collect(ctx)
// TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect.
rm := metricdata.ResourceMetrics{}
err := r.Collect(ctx, &rm)
if err == nil {
err = r.export(ctx, m)
err = r.export(ctx, rm)
}
return err
}

// Collect gathers and returns all metric data related to the Reader from
// the SDK and other Producers. The returned metric data is not exported
// to the configured exporter, it is left to the caller to handle that if
// desired.
// the SDK and other Producers and stores the result in rm. The returned metric
// data is not exported to the configured exporter, it is left to the caller to
// handle that if desired.
//
// An error is returned if this is called after Shutdown.
func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
return r.collect(ctx, r.sdkProducer.Load())
// An error is returned if this is called after Shutdown. An error is return if rm is nil.
func (r *periodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
if rm == nil {
return errors.New("periodic reader: *metricdata.ResourceMetrics is nil")
}
// TODO (#3047): When collect is updated to accept output as param, pass rm.
rmTemp, err := r.collect(ctx, r.sdkProducer.Load())
*rm = rmTemp
return err
}

// collect unwraps p as a produceHolder and returns its produce results.
Expand Down
Loading

0 comments on commit 001981d

Please sign in to comment.