From 56a5ef9ead8532308cc3a30f2788d1725b504d8b Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 20 Jul 2023 15:45:42 +0000 Subject: [PATCH] metric.Producer can be passed as an argument to Reader using WithProducer --- CHANGELOG.md | 1 + example/opencensus/main.go | 3 +- sdk/metric/manual_reader.go | 26 ++++------------- sdk/metric/manual_reader_test.go | 8 +++++- sdk/metric/periodic_reader.go | 41 +++++++++------------------ sdk/metric/periodic_reader_test.go | 45 +++++++++++++----------------- sdk/metric/reader.go | 36 +++++++++++++++++++----- sdk/metric/reader_test.go | 36 +++++++++++------------- 8 files changed, 93 insertions(+), 103 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ebfa8d5a9a51..8f801e722f24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `PeriodicReader.Shutdown` and `PeriodicReader.ForceFlush` in `go.opentelemetry.io/otel/sdk/metric` now apply the periodic reader's timeout to the operation if the user provided context does not contain a deadline. (#4356, #4377) - Upgrade all use of `go.opentelemetry.io/otel/semconv` to use `v1.21.0`. (#4408) - Increase instrument name maximum length from 63 to 255 characters. (#4434) +- Add `go.opentelemetry.op/otel/sdk/metric.WithProducer` as an Option for metric.NewManualReader and metric.NewPeriodicReader, and remove `Reader.RegisterProducer()` (#4346) ### Removed diff --git a/example/opencensus/main.go b/example/opencensus/main.go index 8afbd9e5b9bf..c9709bad37a7 100644 --- a/example/opencensus/main.go +++ b/example/opencensus/main.go @@ -103,9 +103,8 @@ func tracing(otExporter sdktrace.SpanExporter) { // registry or an OpenCensus view. func monitoring(exporter metric.Exporter) error { log.Println("Adding the OpenCensus metric Producer to an OpenTelemetry Reader to export OpenCensus metrics using the OpenTelemetry stdout exporter.") - reader := metric.NewPeriodicReader(exporter) // Register the OpenCensus metric Producer to add metrics from OpenCensus to the output. - reader.RegisterProducer(opencensus.NewMetricProducer()) + reader := metric.NewPeriodicReader(exporter, metric.WithProducer(opencensus.NewMetricProducer())) metric.NewMeterProvider(metric.WithReader(reader)) log.Println("Registering a gauge metric using an OpenCensus registry.") diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index a7715f5b34f9..d4a1f7110f19 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -34,7 +34,7 @@ type ManualReader struct { mu sync.Mutex isShutdown bool - externalProducers atomic.Value + externalProducers []Producer temporalitySelector TemporalitySelector aggregationSelector AggregationSelector @@ -49,8 +49,8 @@ func NewManualReader(opts ...ManualReaderOption) *ManualReader { r := &ManualReader{ temporalitySelector: cfg.temporalitySelector, aggregationSelector: cfg.aggregationSelector, + externalProducers: cfg.producers, } - r.externalProducers.Store([]Producer{}) return r } @@ -64,23 +64,6 @@ func (mr *ManualReader) register(p sdkProducer) { } } -// RegisterProducer stores the external Producer which enables the caller -// to read metrics on demand. -// -// This method is safe to call concurrently. -func (mr *ManualReader) RegisterProducer(p Producer) { - mr.mu.Lock() - defer mr.mu.Unlock() - if mr.isShutdown { - return - } - currentProducers := mr.externalProducers.Load().([]Producer) - newProducers := []Producer{} - newProducers = append(newProducers, currentProducers...) - newProducers = append(newProducers, p) - mr.externalProducers.Store(newProducers) -} - // temporality reports the Temporality for the instrument kind provided. func (mr *ManualReader) temporality(kind InstrumentKind) metricdata.Temporality { return mr.temporalitySelector(kind) @@ -105,7 +88,7 @@ func (mr *ManualReader) Shutdown(context.Context) error { defer mr.mu.Unlock() mr.isShutdown = true // release references to Producer(s) - mr.externalProducers.Store([]Producer{}) + mr.externalProducers = nil err = nil }) return err @@ -143,7 +126,7 @@ func (mr *ManualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetr return err } var errs []error - for _, producer := range mr.externalProducers.Load().([]Producer) { + for _, producer := range mr.externalProducers { externalMetrics, err := producer.Produce(ctx) if err != nil { errs = append(errs, err) @@ -176,6 +159,7 @@ func (r *ManualReader) MarshalLog() interface{} { type manualReaderConfig struct { temporalitySelector TemporalitySelector aggregationSelector AggregationSelector + producers []Producer } // newManualReaderConfig returns a manualReaderConfig configured with options. diff --git a/sdk/metric/manual_reader_test.go b/sdk/metric/manual_reader_test.go index 1c2f63cf5040..210a66c1bbef 100644 --- a/sdk/metric/manual_reader_test.go +++ b/sdk/metric/manual_reader_test.go @@ -27,7 +27,13 @@ import ( ) func TestManualReader(t *testing.T) { - suite.Run(t, &readerTestSuite{Factory: func() Reader { return NewManualReader() }}) + suite.Run(t, &readerTestSuite{Factory: func(opts ...ReaderOption) Reader { + var mopts []ManualReaderOption + for _, o := range opts { + mopts = append(mopts, o) + } + return NewManualReader(mopts...) + }}) } func BenchmarkManualReader(b *testing.B) { diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index f62a2ae41e32..3f7fcb41b505 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -36,8 +36,9 @@ const ( // periodicReaderConfig contains configuration options for a PeriodicReader. type periodicReaderConfig struct { - interval time.Duration - timeout time.Duration + interval time.Duration + timeout time.Duration + producers []Producer } // newPeriodicReaderConfig returns a periodicReaderConfig configured with @@ -118,18 +119,18 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *Peri conf := newPeriodicReaderConfig(options) ctx, cancel := context.WithCancel(context.Background()) r := &PeriodicReader{ - interval: conf.interval, - timeout: conf.timeout, - exporter: exporter, - flushCh: make(chan chan error), - cancel: cancel, - done: make(chan struct{}), + interval: conf.interval, + timeout: conf.timeout, + exporter: exporter, + flushCh: make(chan chan error), + cancel: cancel, + done: make(chan struct{}), + externalProducers: conf.producers, rmPool: sync.Pool{ New: func() interface{} { return &metricdata.ResourceMetrics{} }}, } - r.externalProducers.Store([]Producer{}) go func() { defer func() { close(r.done) }() @@ -146,7 +147,7 @@ type PeriodicReader struct { mu sync.Mutex isShutdown bool - externalProducers atomic.Value + externalProducers []Producer interval time.Duration timeout time.Duration @@ -197,22 +198,6 @@ func (r *PeriodicReader) register(p sdkProducer) { } } -// RegisterProducer registers p as an external Producer of this reader. -// -// This method is safe to call concurrently. -func (r *PeriodicReader) RegisterProducer(p Producer) { - r.mu.Lock() - defer r.mu.Unlock() - if r.isShutdown { - return - } - currentProducers := r.externalProducers.Load().([]Producer) - newProducers := []Producer{} - newProducers = append(newProducers, currentProducers...) - newProducers = append(newProducers, p) - r.externalProducers.Store(newProducers) -} - // temporality reports the Temporality for the instrument kind provided. func (r *PeriodicReader) temporality(kind InstrumentKind) metricdata.Temporality { return r.exporter.Temporality(kind) @@ -278,7 +263,7 @@ func (r *PeriodicReader) collect(ctx context.Context, p interface{}, rm *metricd return err } var errs []error - for _, producer := range r.externalProducers.Load().([]Producer) { + for _, producer := range r.externalProducers { externalMetrics, err := producer.Produce(ctx) if err != nil { errs = append(errs, err) @@ -368,7 +353,7 @@ func (r *PeriodicReader) Shutdown(ctx context.Context) error { defer r.mu.Unlock() r.isShutdown = true // release references to Producer(s) - r.externalProducers.Store([]Producer{}) + r.externalProducers = nil }) return err } diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index 0e9d66944c21..0a5490423134 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -203,7 +203,7 @@ type periodicReaderTestSuite struct { } func (ts *periodicReaderTestSuite) SetupTest() { - ts.readerTestSuite.SetupTest() + ts.Reader = ts.Factory() e := &fnExporter{ exportFunc: func(context.Context, *metricdata.ResourceMetrics) error { return assert.AnError }, @@ -211,9 +211,8 @@ func (ts *periodicReaderTestSuite) SetupTest() { shutdownFunc: func(context.Context) error { return assert.AnError }, } - ts.ErrReader = NewPeriodicReader(e) + ts.ErrReader = NewPeriodicReader(e, WithProducer(testExternalProducer{})) ts.ErrReader.register(testSDKProducer{}) - ts.ErrReader.RegisterProducer(testExternalProducer{}) } func (ts *periodicReaderTestSuite) TearDownTest() { @@ -233,8 +232,12 @@ func (ts *periodicReaderTestSuite) TestShutdownPropagated() { func TestPeriodicReader(t *testing.T) { suite.Run(t, &periodicReaderTestSuite{ readerTestSuite: &readerTestSuite{ - Factory: func() Reader { - return NewPeriodicReader(new(fnExporter)) + Factory: func(opts ...ReaderOption) Reader { + var popts []PeriodicReaderOption + for _, o := range opts { + popts = append(popts, o) + } + return NewPeriodicReader(new(fnExporter), popts...) }, }, }) @@ -291,9 +294,8 @@ func TestPeriodicReaderRun(t *testing.T) { }, } - r := NewPeriodicReader(exp) + r := NewPeriodicReader(exp, WithProducer(testExternalProducer{})) r.register(testSDKProducer{}) - r.RegisterProducer(testExternalProducer{}) trigger <- time.Now() assert.Equal(t, assert.AnError, <-eh.Err) @@ -320,9 +322,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("ForceFlush", func(t *testing.T) { exp, called := expFunc(t) - r := NewPeriodicReader(exp) + r := NewPeriodicReader(exp, WithProducer(testExternalProducer{})) r.register(testSDKProducer{}) - r.RegisterProducer(testExternalProducer{}) assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") @@ -333,7 +334,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("ForceFlush timeout on producer", func(t *testing.T) { exp, called := expFunc(t) timeout := time.Millisecond - r := NewPeriodicReader(exp, WithTimeout(timeout)) + r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{})) r.register(testSDKProducer{ produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error { select { @@ -345,7 +346,6 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { } return nil }}) - r.RegisterProducer(testExternalProducer{}) assert.ErrorIs(t, r.ForceFlush(context.Background()), context.DeadlineExceeded) assert.False(t, *called, "exporter Export method called when it should have failed before export") @@ -356,9 +356,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("ForceFlush timeout on external producer", func(t *testing.T) { exp, called := expFunc(t) timeout := time.Millisecond - r := NewPeriodicReader(exp, WithTimeout(timeout)) - r.register(testSDKProducer{}) - r.RegisterProducer(testExternalProducer{ + r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{ produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) { select { case <-time.After(timeout + time.Second): @@ -368,7 +366,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { } return []metricdata.ScopeMetrics{testScopeMetricsA}, nil }, - }) + })) + r.register(testSDKProducer{}) assert.ErrorIs(t, r.ForceFlush(context.Background()), context.DeadlineExceeded) assert.False(t, *called, "exporter Export method called when it should have failed before export") @@ -378,9 +377,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("Shutdown", func(t *testing.T) { exp, called := expFunc(t) - r := NewPeriodicReader(exp) + r := NewPeriodicReader(exp, WithProducer(testExternalProducer{})) r.register(testSDKProducer{}) - r.RegisterProducer(testExternalProducer{}) assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") }) @@ -388,7 +386,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("Shutdown timeout on producer", func(t *testing.T) { exp, called := expFunc(t) timeout := time.Millisecond - r := NewPeriodicReader(exp, WithTimeout(timeout)) + r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{})) r.register(testSDKProducer{ produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error { select { @@ -400,7 +398,6 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { } return nil }}) - r.RegisterProducer(testExternalProducer{}) assert.ErrorIs(t, r.Shutdown(context.Background()), context.DeadlineExceeded) assert.False(t, *called, "exporter Export method called when it should have failed before export") }) @@ -408,9 +405,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("Shutdown timeout on external producer", func(t *testing.T) { exp, called := expFunc(t) timeout := time.Millisecond - r := NewPeriodicReader(exp, WithTimeout(timeout)) - r.register(testSDKProducer{}) - r.RegisterProducer(testExternalProducer{ + r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{ produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) { select { case <-time.After(timeout + time.Second): @@ -420,7 +415,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { } return []metricdata.ScopeMetrics{testScopeMetricsA}, nil }, - }) + })) + r.register(testSDKProducer{}) assert.ErrorIs(t, r.Shutdown(context.Background()), context.DeadlineExceeded) assert.False(t, *called, "exporter Export method called when it should have failed before export") }) @@ -428,9 +424,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { func TestPeriodicReaderMultipleForceFlush(t *testing.T) { ctx := context.Background() - r := NewPeriodicReader(new(fnExporter)) + r := NewPeriodicReader(new(fnExporter), WithProducer(testExternalProducer{})) r.register(testSDKProducer{}) - r.RegisterProducer(testExternalProducer{}) require.NoError(t, r.ForceFlush(ctx)) require.NoError(t, r.ForceFlush(ctx)) } diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index 44dee654bdf8..a4d05944d610 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -57,13 +57,6 @@ type Reader interface { // and send aggregated metric measurements. register(sdkProducer) - // RegisterProducer registers a an external Producer with this Reader. - // The Producer is used as a source of aggregated metric data which is - // incorporated into metrics collected from the SDK. - // - // This method needs to be concurrent safe. - RegisterProducer(Producer) - // temporality reports the Temporality for the instrument kind provided. // // This method needs to be concurrent safe with itself and all the other @@ -166,3 +159,32 @@ func DefaultAggregationSelector(ik InstrumentKind) aggregation.Aggregation { } panic("unknown instrument kind") } + +// ReaderOption is an option which can be applied to manual or Periodic +// readers. +type ReaderOption interface { + PeriodicReaderOption + ManualReaderOption +} + +// WithProducers registers producers as an external Producer of metric data +// for this Reader. +func WithProducer(p Producer) ReaderOption { + return producerOption{p: p} +} + +type producerOption struct { + p Producer +} + +// applyManual returns a manualReaderConfig with option applied. +func (o producerOption) applyManual(c manualReaderConfig) manualReaderConfig { + c.producers = append(c.producers, o.p) + return c +} + +// applyPeriodic returns a periodicReaderConfig with option applied. +func (o producerOption) applyPeriodic(c periodicReaderConfig) periodicReaderConfig { + c.producers = append(c.producers, o.p) + return c +} diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index f2a20c0da271..887d5d363711 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -34,7 +34,7 @@ import ( type readerTestSuite struct { suite.Suite - Factory func() Reader + Factory func(...ReaderOption) Reader Reader Reader } @@ -42,21 +42,19 @@ func (ts *readerTestSuite) SetupSuite() { otel.SetLogger(testr.New(ts.T())) } -func (ts *readerTestSuite) SetupTest() { - ts.Reader = ts.Factory() -} - func (ts *readerTestSuite) TearDownTest() { // Ensure Reader is allowed attempt to clean up. _ = ts.Reader.Shutdown(context.Background()) } func (ts *readerTestSuite) TestErrorForNotRegistered() { + ts.Reader = ts.Factory() err := ts.Reader.Collect(context.Background(), &metricdata.ResourceMetrics{}) ts.ErrorIs(err, ErrReaderNotRegistered) } func (ts *readerTestSuite) TestSDKProducer() { + ts.Reader = ts.Factory() ts.Reader.register(testSDKProducer{}) m := metricdata.ResourceMetrics{} err := ts.Reader.Collect(context.Background(), &m) @@ -65,8 +63,8 @@ func (ts *readerTestSuite) TestSDKProducer() { } func (ts *readerTestSuite) TestExternalProducer() { + ts.Reader = ts.Factory(WithProducer(testExternalProducer{})) ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer(testExternalProducer{}) m := metricdata.ResourceMetrics{} err := ts.Reader.Collect(context.Background(), &m) ts.NoError(err) @@ -74,9 +72,9 @@ func (ts *readerTestSuite) TestExternalProducer() { } func (ts *readerTestSuite) TestCollectAfterShutdown() { + ts.Reader = ts.Factory(WithProducer(testExternalProducer{})) ctx := context.Background() ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer(testExternalProducer{}) ts.Require().NoError(ts.Reader.Shutdown(ctx)) m := metricdata.ResourceMetrics{} @@ -86,14 +84,15 @@ func (ts *readerTestSuite) TestCollectAfterShutdown() { } func (ts *readerTestSuite) TestShutdownTwice() { + ts.Reader = ts.Factory(WithProducer(testExternalProducer{})) ctx := context.Background() ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer(testExternalProducer{}) ts.Require().NoError(ts.Reader.Shutdown(ctx)) ts.ErrorIs(ts.Reader.Shutdown(ctx), ErrReaderShutdown) } func (ts *readerTestSuite) TestMultipleRegister() { + ts.Reader = ts.Factory() p0 := testSDKProducer{ produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error { // Differentiate this producer from the second by returning an @@ -113,21 +112,19 @@ func (ts *readerTestSuite) TestMultipleRegister() { } func (ts *readerTestSuite) TestExternalProducerPartialSuccess() { - ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer( - testExternalProducer{ + ts.Reader = ts.Factory( + WithProducer(testExternalProducer{ produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) { return []metricdata.ScopeMetrics{}, assert.AnError }, - }, - ) - ts.Reader.RegisterProducer( - testExternalProducer{ + }), + WithProducer(testExternalProducer{ produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) { return []metricdata.ScopeMetrics{testScopeMetricsB}, nil }, - }, + }), ) + ts.Reader.register(testSDKProducer{}) m := metricdata.ResourceMetrics{} err := ts.Reader.Collect(context.Background(), &m) @@ -136,12 +133,12 @@ func (ts *readerTestSuite) TestExternalProducerPartialSuccess() { } func (ts *readerTestSuite) TestSDKFailureBlocksExternalProducer() { + ts.Reader = ts.Factory(WithProducer(testExternalProducer{})) ts.Reader.register(testSDKProducer{ produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error { *rm = metricdata.ResourceMetrics{} return assert.AnError }}) - ts.Reader.RegisterProducer(testExternalProducer{}) m := metricdata.ResourceMetrics{} err := ts.Reader.Collect(context.Background(), &m) @@ -150,11 +147,11 @@ func (ts *readerTestSuite) TestSDKFailureBlocksExternalProducer() { } func (ts *readerTestSuite) TestMethodConcurrentSafe() { + ts.Reader = ts.Factory(WithProducer(testExternalProducer{})) // Requires the race-detector (a default test option for the project). // All reader methods should be concurrent-safe. ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer(testExternalProducer{}) ctx := context.Background() var wg sync.WaitGroup @@ -196,11 +193,11 @@ func (ts *readerTestSuite) TestMethodConcurrentSafe() { } func (ts *readerTestSuite) TestShutdownBeforeRegister() { + ts.Reader = ts.Factory(WithProducer(testExternalProducer{})) ctx := context.Background() ts.Require().NoError(ts.Reader.Shutdown(ctx)) // Registering after shutdown should not revert the shutdown. ts.Reader.register(testSDKProducer{}) - ts.Reader.RegisterProducer(testExternalProducer{}) m := metricdata.ResourceMetrics{} err := ts.Reader.Collect(ctx, &m) @@ -209,6 +206,7 @@ func (ts *readerTestSuite) TestShutdownBeforeRegister() { } func (ts *readerTestSuite) TestCollectNilResourceMetricError() { + ts.Reader = ts.Factory() ctx := context.Background() ts.Assert().Error(ts.Reader.Collect(ctx, nil)) }