Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin' into bridge-add-is-sampled
Browse files Browse the repository at this point in the history
  • Loading branch information
Pijus Navickas committed Feb 23, 2023
2 parents 6ae0f96 + 99ec432 commit b450b22
Show file tree
Hide file tree
Showing 28 changed files with 397 additions and 283 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,20 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `OtelStatusDescription` -> `OTelStatusDescription`
- Add `bridgetSpanContext.IsSampled` to `go.opentelemetry.io/otel/bridget/opentracing` to expose whether span is sampled or not. (#3570)
- The `WithInstrumentationAttributes` option to `go.opentelemetry.io/otel/metric`. (#3738)
- The `WithInstrumentationAttributes` option to `go.opentelemetry.io/otel/trace`. (#3739)

### Changed

- [bridge/ot] Fall-back to TextMap carrier when it's not ot.HttpHeaders. (#3679)
- The `Collect` method of the `"go.opentelemetry.io/otel/sdk/metric".Reader` interface is updated to accept the `metricdata.ResourceMetrics` value the collection will be made into. This change is made to enable memory reuse by SDK users. (#3732)

### Fixed

- Ensure `go.opentelemetry.io/otel` does not use generics. (#3723, #3725)
- Multi-reader `MeterProvider`s now export metrics for all readers, instead of just the first reader. (#3720, #3724)
- Remove use of deprecated `"math/rand".Seed` in `go.opentelemetry.io/otel/example/prometheus`. (#3733)
- Do not silently drop unknown schema data with `Parse` in `go.opentelemetry.io/otel/schema/v1.1`. (#3743)
- Data race issue in OTLP exporter retry mechanism. (#3756)

## [1.13.0/0.36.0] 2023-02-07

Expand Down
28 changes: 14 additions & 14 deletions exporters/otlp/internal/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,21 @@ func (c Config) RequestFunc(evaluate EvaluateFunc) RequestFunc {
}
}

// Do not use NewExponentialBackOff since it calls Reset and the code here
// must call Reset after changing the InitialInterval (this saves an
// unnecessary call to Now).
b := &backoff.ExponentialBackOff{
InitialInterval: c.InitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: c.MaxInterval,
MaxElapsedTime: c.MaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
b.Reset()

return func(ctx context.Context, fn func(context.Context) error) error {
// Do not use NewExponentialBackOff since it calls Reset and the code here
// must call Reset after changing the InitialInterval (this saves an
// unnecessary call to Now).
b := &backoff.ExponentialBackOff{
InitialInterval: c.InitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: c.MaxInterval,
MaxElapsedTime: c.MaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
b.Reset()

for {
err := fn(ctx)
if err == nil {
Expand Down
31 changes: 31 additions & 0 deletions exporters/otlp/internal/retry/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"math"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -225,3 +226,33 @@ func TestRetryNotEnabled(t *testing.T) {
return assert.AnError
}), assert.AnError)
}

func TestConcurrentRetry(t *testing.T) {
ev := func(error) (bool, time.Duration) { return true, 0 }
reqFunc := Config{
Enabled: true,
}.RequestFunc(ev)

var wg sync.WaitGroup
ctx := context.Background()

for i := 1; i < 5; i++ {
wg.Add(1)

go func() {
defer wg.Done()

var done bool
assert.NoError(t, reqFunc(ctx, func(context.Context) error {
if !done {
done = true
return assert.AnError
}

return nil
}))
}()
}

wg.Wait()
}
4 changes: 3 additions & 1 deletion exporters/prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) {

// Collect implements prometheus.Collector.
func (c *collector) Collect(ch chan<- prometheus.Metric) {
metrics, err := c.reader.Collect(context.TODO())
// TODO (#3047): Use a sync.Pool instead of allocating metrics every Collect.
metrics := metricdata.ResourceMetrics{}
err := c.reader.Collect(context.TODO(), &metrics)
if err != nil {
otel.Handle(err)
if err == metric.ErrReaderNotRegistered {
Expand Down
1 change: 1 addition & 0 deletions schema/v1.1/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func ParseFile(schemaFilePath string) (*ast.Schema, error) {
func Parse(schemaFileContent io.Reader) (*ast.Schema, error) {
var ts ast.Schema
d := yaml.NewDecoder(schemaFileContent)
d.SetStrict(true) // Do not silently drop unknown fields.
err := d.Decode(&ts)
if err != nil {
return nil, err
Expand Down
10 changes: 8 additions & 2 deletions schema/v1.1/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,14 @@ func TestParseSchemaFile(t *testing.T) {
)
}

func TestFailParseSchemaFile(t *testing.T) {
func TestFailParseFileUnsupportedFileFormat(t *testing.T) {
ts, err := ParseFile("testdata/unsupported-file-format.yaml")
assert.Error(t, err)
assert.ErrorContains(t, err, "unsupported schema file format minor version number")
assert.Nil(t, ts)
}

func TestFailParseFileUnknownField(t *testing.T) {
ts, err := ParseFile("testdata/unknown-field.yaml")
assert.ErrorContains(t, err, "field Resources not found in type ast.VersionDef")
assert.Nil(t, ts)
}
15 changes: 15 additions & 0 deletions schema/v1.1/testdata/unknown-field.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
file_format: 1.1.0
schema_url: https://opentelemetry.io/schemas/1.1.0

versions:
1.1.0:
all: # Valid entry.
changes:
- rename_attributes:
k8s.cluster.name: kubernetes.cluster.name
Resources: # Invalid uppercase.
changes:
- rename_attributes:
attribute_map:
browser.user_agent: user_agent.original
1.0.0:
6 changes: 3 additions & 3 deletions sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func BenchmarkCounterCollectOneAttr(b *testing.B) {
for i := 0; i < b.N; i++ {
cntr.Add(ctx, 1, attribute.Int("K", 1))

_, _ = rdr.Collect(ctx)
_ = rdr.Collect(ctx, nil)
}
}

Expand All @@ -104,7 +104,7 @@ func BenchmarkCounterCollectTenAttrs(b *testing.B) {
for j := 0; j < 10; j++ {
cntr.Add(ctx, 1, attribute.Int("K", j))
}
_, _ = rdr.Collect(ctx)
_ = rdr.Collect(ctx, nil)
}
}

Expand Down Expand Up @@ -140,7 +140,7 @@ func benchCollectHistograms(count int) func(*testing.B) {
b.ResetTimer()

for n := 0; n < b.N; n++ {
collectedMetrics, _ = r.Collect(ctx)
_ = r.Collect(ctx, &collectedMetrics)
if len(collectedMetrics.ScopeMetrics[0].Metrics) != count {
b.Fatalf("got %d metrics, want %d", len(collectedMetrics.ScopeMetrics[0].Metrics), count)
}
Expand Down
56 changes: 0 additions & 56 deletions sdk/metric/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"sync"

"go.opentelemetry.io/otel/sdk/metric/internal"
)

// cache is a locking storage used to quickly return already computed values.
Expand Down Expand Up @@ -54,57 +52,3 @@ func (c *cache[K, V]) Lookup(key K, f func() V) V {
c.data[key] = val
return val
}

// instrumentCache is a cache of instruments. It is scoped at the Meter level
// along with a number type. Meaning all instruments it contains need to belong
// to the same instrumentation.Scope (implicitly) and number type (explicitly).
type instrumentCache[N int64 | float64] struct {
// aggregators is used to ensure duplicate creations of the same instrument
// return the same instance of that instrument's aggregator.
aggregators *cache[streamID, aggVal[N]]
// views is used to ensure if instruments with the same name are created,
// but do not have the same identifying properties, a warning is logged.
views *cache[string, streamID]
}

// newInstrumentCache returns a new instrumentCache that uses ac as the
// underlying cache for aggregators and vc as the cache for views. If ac or vc
// are nil, a new empty cache will be used.
func newInstrumentCache[N int64 | float64](ac *cache[streamID, aggVal[N]], vc *cache[string, streamID]) instrumentCache[N] {
if ac == nil {
ac = &cache[streamID, aggVal[N]]{}
}
if vc == nil {
vc = &cache[string, streamID]{}
}
return instrumentCache[N]{aggregators: ac, views: vc}
}

// LookupAggregator returns the Aggregator and error for a cached instrument if
// it exist in the cache. Otherwise, f is called and its returned value is set
// in the cache and returned.
//
// LookupAggregator is safe to call concurrently.
func (c instrumentCache[N]) LookupAggregator(id streamID, f func() (internal.Aggregator[N], error)) (agg internal.Aggregator[N], err error) {
v := c.aggregators.Lookup(id, func() aggVal[N] {
a, err := f()
return aggVal[N]{Aggregator: a, Err: err}
})
return v.Aggregator, v.Err
}

// aggVal is the cached value of an instrumentCache's aggregators cache.
type aggVal[N int64 | float64] struct {
Aggregator internal.Aggregator[N]
Err error
}

// Unique returns if id is unique or a duplicate instrument. If an instrument
// with the same name has already been created, that streamID will be returned
// along with false. Otherwise, id is returned with true.
//
// Unique is safe to call concurrently.
func (c instrumentCache[N]) Unique(id streamID) (streamID, bool) {
got := c.views.Lookup(id.Name, func() streamID { return id })
return got, id == got
}
6 changes: 3 additions & 3 deletions sdk/metric/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type reader struct {
externalProducers []Producer
temporalityFunc TemporalitySelector
aggregationFunc AggregationSelector
collectFunc func(context.Context) (metricdata.ResourceMetrics, error)
collectFunc func(context.Context, *metricdata.ResourceMetrics) error
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
}
Expand All @@ -48,8 +48,8 @@ func (r *reader) RegisterProducer(p Producer) { r.externalProducers = append(r.e
func (r *reader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.temporalityFunc(kind)
}
func (r *reader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
return r.collectFunc(ctx)
func (r *reader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
return r.collectFunc(ctx, rm)
}
func (r *reader) ForceFlush(ctx context.Context) error { return r.forceFlushFunc(ctx) }
func (r *reader) Shutdown(ctx context.Context) error { return r.shutdownFunc(ctx) }
Expand Down
24 changes: 16 additions & 8 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -112,11 +113,17 @@ func (mr *manualReader) Shutdown(context.Context) error {
}

// Collect gathers all metrics from the SDK and other Producers, calling any
// callbacks necessary. Collect will return an error if called after shutdown.
func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
// callbacks necessary and stores the result in rm.
//
// Collect will return an error if called after shutdown.
// Collect will return an error if rm is a nil ResourceMetrics.
func (mr *manualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
if rm == nil {
return errors.New("manual reader: *metricdata.ResourceMetrics is nil")
}
p := mr.sdkProducer.Load()
if p == nil {
return metricdata.ResourceMetrics{}, ErrReaderNotRegistered
return ErrReaderNotRegistered
}

ph, ok := p.(produceHolder)
Expand All @@ -126,12 +133,13 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics
// happen, return an error instead of panicking so a users code does
// not halt in the processes.
err := fmt.Errorf("manual reader: invalid producer: %T", p)
return metricdata.ResourceMetrics{}, err
return err
}

rm, err := ph.produce(ctx)
// TODO (#3047): When produce is updated to accept output as param, pass rm.
rmTemp, err := ph.produce(ctx)
*rm = rmTemp
if err != nil {
return metricdata.ResourceMetrics{}, err
return err
}
var errs []error
for _, producer := range mr.externalProducers.Load().([]Producer) {
Expand All @@ -141,7 +149,7 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
return rm, unifyErrors(errs)
return unifyErrors(errs)
}

// manualReaderConfig contains configuration options for a ManualReader.
Expand Down
13 changes: 4 additions & 9 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,11 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter {
// meter is asked to create are logged to the user.
var viewCache cache[string, streamID]

// Passing nil as the ac parameter to newInstrumentCache will have each
// create its own aggregator cache.
ic := newInstrumentCache[int64](nil, &viewCache)
fc := newInstrumentCache[float64](nil, &viewCache)

return &meter{
scope: s,
pipes: p,
int64IP: newInstProvider(s, p, ic),
float64IP: newInstProvider(s, p, fc),
int64IP: newInstProvider[int64](s, p, &viewCache),
float64IP: newInstProvider[float64](s, p, &viewCache),
}
}

Expand Down Expand Up @@ -375,8 +370,8 @@ type instProvider[N int64 | float64] struct {
resolve resolver[N]
}

func newInstProvider[N int64 | float64](s instrumentation.Scope, p pipelines, c instrumentCache[N]) *instProvider[N] {
return &instProvider[N]{scope: s, pipes: p, resolve: newResolver(p, c)}
func newInstProvider[N int64 | float64](s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *instProvider[N] {
return &instProvider[N]{scope: s, pipes: p, resolve: newResolver[N](p, c)}
}

func (p *instProvider[N]) aggs(kind InstrumentKind, name, desc string, u unit.Unit) ([]internal.Aggregator[N], error) {
Expand Down
Loading

0 comments on commit b450b22

Please sign in to comment.