Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change metric.Producer to be an Option on Reader #4346

Merged
merged 3 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `PeriodicReader.Shutdown` and `PeriodicReader.ForceFlush` in `go.opentelemetry.io/otel/sdk/metric` now apply the periodic reader's timeout to the operation if the user provided context does not contain a deadline. (#4356, #4377)
- Upgrade all use of `go.opentelemetry.io/otel/semconv` to use `v1.21.0`. (#4408)
- Increase instrument name maximum length from 63 to 255 characters. (#4434)
- Add `go.opentelemetry.op/otel/sdk/metric.WithProducer` as an Option for metric.NewManualReader and metric.NewPeriodicReader, and remove `Reader.RegisterProducer()` (#4346)

### Removed

Expand Down
3 changes: 1 addition & 2 deletions example/opencensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,8 @@ func tracing(otExporter sdktrace.SpanExporter) {
// registry or an OpenCensus view.
func monitoring(exporter metric.Exporter) error {
log.Println("Adding the OpenCensus metric Producer to an OpenTelemetry Reader to export OpenCensus metrics using the OpenTelemetry stdout exporter.")
reader := metric.NewPeriodicReader(exporter)
// Register the OpenCensus metric Producer to add metrics from OpenCensus to the output.
reader.RegisterProducer(opencensus.NewMetricProducer())
reader := metric.NewPeriodicReader(exporter, metric.WithProducer(opencensus.NewMetricProducer()))
metric.NewMeterProvider(metric.WithReader(reader))

log.Println("Registering a gauge metric using an OpenCensus registry.")
Expand Down
20 changes: 2 additions & 18 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewManualReader(opts ...ManualReaderOption) *ManualReader {
temporalitySelector: cfg.temporalitySelector,
aggregationSelector: cfg.aggregationSelector,
}
r.externalProducers.Store([]Producer{})
r.externalProducers.Store(cfg.producers)
return r
}

Expand All @@ -64,23 +64,6 @@ func (mr *ManualReader) register(p sdkProducer) {
}
}

// RegisterProducer stores the external Producer which enables the caller
// to read metrics on demand.
//
// This method is safe to call concurrently.
func (mr *ManualReader) RegisterProducer(p Producer) {
mr.mu.Lock()
defer mr.mu.Unlock()
if mr.isShutdown {
return
}
currentProducers := mr.externalProducers.Load().([]Producer)
newProducers := []Producer{}
newProducers = append(newProducers, currentProducers...)
newProducers = append(newProducers, p)
mr.externalProducers.Store(newProducers)
}

// temporality reports the Temporality for the instrument kind provided.
func (mr *ManualReader) temporality(kind InstrumentKind) metricdata.Temporality {
return mr.temporalitySelector(kind)
Expand Down Expand Up @@ -176,6 +159,7 @@ func (r *ManualReader) MarshalLog() interface{} {
type manualReaderConfig struct {
temporalitySelector TemporalitySelector
aggregationSelector AggregationSelector
producers []Producer
}

// newManualReaderConfig returns a manualReaderConfig configured with options.
Expand Down
8 changes: 7 additions & 1 deletion sdk/metric/manual_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ import (
)

func TestManualReader(t *testing.T) {
suite.Run(t, &readerTestSuite{Factory: func() Reader { return NewManualReader() }})
suite.Run(t, &readerTestSuite{Factory: func(opts ...ReaderOption) Reader {
var mopts []ManualReaderOption
for _, o := range opts {
mopts = append(mopts, o)
}
return NewManualReader(mopts...)
}})
}

func BenchmarkManualReader(b *testing.B) {
Expand Down
23 changes: 4 additions & 19 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ const (

// periodicReaderConfig contains configuration options for a PeriodicReader.
type periodicReaderConfig struct {
interval time.Duration
timeout time.Duration
interval time.Duration
timeout time.Duration
producers []Producer
}

// newPeriodicReaderConfig returns a periodicReaderConfig configured with
Expand Down Expand Up @@ -129,7 +130,7 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *Peri
return &metricdata.ResourceMetrics{}
}},
}
r.externalProducers.Store([]Producer{})
r.externalProducers.Store(conf.producers)

go func() {
defer func() { close(r.done) }()
Expand Down Expand Up @@ -197,22 +198,6 @@ func (r *PeriodicReader) register(p sdkProducer) {
}
}

// RegisterProducer registers p as an external Producer of this reader.
//
// This method is safe to call concurrently.
func (r *PeriodicReader) RegisterProducer(p Producer) {
r.mu.Lock()
defer r.mu.Unlock()
if r.isShutdown {
return
}
currentProducers := r.externalProducers.Load().([]Producer)
newProducers := []Producer{}
newProducers = append(newProducers, currentProducers...)
newProducers = append(newProducers, p)
r.externalProducers.Store(newProducers)
}

// temporality reports the Temporality for the instrument kind provided.
func (r *PeriodicReader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.exporter.Temporality(kind)
Expand Down
45 changes: 20 additions & 25 deletions sdk/metric/periodic_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,17 +203,16 @@ type periodicReaderTestSuite struct {
}

func (ts *periodicReaderTestSuite) SetupTest() {
ts.readerTestSuite.SetupTest()
ts.Reader = ts.Factory()

e := &fnExporter{
exportFunc: func(context.Context, *metricdata.ResourceMetrics) error { return assert.AnError },
flushFunc: func(context.Context) error { return assert.AnError },
shutdownFunc: func(context.Context) error { return assert.AnError },
}

ts.ErrReader = NewPeriodicReader(e)
ts.ErrReader = NewPeriodicReader(e, WithProducer(testExternalProducer{}))
ts.ErrReader.register(testSDKProducer{})
ts.ErrReader.RegisterProducer(testExternalProducer{})
}

func (ts *periodicReaderTestSuite) TearDownTest() {
Expand All @@ -233,8 +232,12 @@ func (ts *periodicReaderTestSuite) TestShutdownPropagated() {
func TestPeriodicReader(t *testing.T) {
suite.Run(t, &periodicReaderTestSuite{
readerTestSuite: &readerTestSuite{
Factory: func() Reader {
return NewPeriodicReader(new(fnExporter))
Factory: func(opts ...ReaderOption) Reader {
var popts []PeriodicReaderOption
for _, o := range opts {
popts = append(popts, o)
}
return NewPeriodicReader(new(fnExporter), popts...)
},
},
})
Expand Down Expand Up @@ -291,9 +294,8 @@ func TestPeriodicReaderRun(t *testing.T) {
},
}

r := NewPeriodicReader(exp)
r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}))
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
trigger <- time.Now()
assert.Equal(t, assert.AnError, <-eh.Err)

Expand All @@ -320,9 +322,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {

t.Run("ForceFlush", func(t *testing.T) {
exp, called := expFunc(t)
r := NewPeriodicReader(exp)
r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}))
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")

Expand All @@ -333,7 +334,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
t.Run("ForceFlush timeout on producer", func(t *testing.T) {
exp, called := expFunc(t)
timeout := time.Millisecond
r := NewPeriodicReader(exp, WithTimeout(timeout))
r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{}))
r.register(testSDKProducer{
produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error {
select {
Expand All @@ -345,7 +346,6 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
}
return nil
}})
r.RegisterProducer(testExternalProducer{})
assert.ErrorIs(t, r.ForceFlush(context.Background()), context.DeadlineExceeded)
assert.False(t, *called, "exporter Export method called when it should have failed before export")

Expand All @@ -356,9 +356,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
t.Run("ForceFlush timeout on external producer", func(t *testing.T) {
exp, called := expFunc(t)
timeout := time.Millisecond
r := NewPeriodicReader(exp, WithTimeout(timeout))
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{
r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{
produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) {
select {
case <-time.After(timeout + time.Second):
Expand All @@ -368,7 +366,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
}
return []metricdata.ScopeMetrics{testScopeMetricsA}, nil
},
})
}))
r.register(testSDKProducer{})
assert.ErrorIs(t, r.ForceFlush(context.Background()), context.DeadlineExceeded)
assert.False(t, *called, "exporter Export method called when it should have failed before export")

Expand All @@ -378,17 +377,16 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {

t.Run("Shutdown", func(t *testing.T) {
exp, called := expFunc(t)
r := NewPeriodicReader(exp)
r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}))
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
})

t.Run("Shutdown timeout on producer", func(t *testing.T) {
exp, called := expFunc(t)
timeout := time.Millisecond
r := NewPeriodicReader(exp, WithTimeout(timeout))
r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{}))
r.register(testSDKProducer{
produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error {
select {
Expand All @@ -400,17 +398,14 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
}
return nil
}})
r.RegisterProducer(testExternalProducer{})
assert.ErrorIs(t, r.Shutdown(context.Background()), context.DeadlineExceeded)
assert.False(t, *called, "exporter Export method called when it should have failed before export")
})

t.Run("Shutdown timeout on external producer", func(t *testing.T) {
exp, called := expFunc(t)
timeout := time.Millisecond
r := NewPeriodicReader(exp, WithTimeout(timeout))
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{
r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{
produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) {
select {
case <-time.After(timeout + time.Second):
Expand All @@ -420,17 +415,17 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
}
return []metricdata.ScopeMetrics{testScopeMetricsA}, nil
},
})
}))
r.register(testSDKProducer{})
assert.ErrorIs(t, r.Shutdown(context.Background()), context.DeadlineExceeded)
assert.False(t, *called, "exporter Export method called when it should have failed before export")
})
}

func TestPeriodicReaderMultipleForceFlush(t *testing.T) {
ctx := context.Background()
r := NewPeriodicReader(new(fnExporter))
r := NewPeriodicReader(new(fnExporter), WithProducer(testExternalProducer{}))
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
require.NoError(t, r.ForceFlush(ctx))
require.NoError(t, r.ForceFlush(ctx))
}
Expand Down
36 changes: 29 additions & 7 deletions sdk/metric/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,6 @@ type Reader interface {
// and send aggregated metric measurements.
register(sdkProducer)

// RegisterProducer registers a an external Producer with this Reader.
// The Producer is used as a source of aggregated metric data which is
// incorporated into metrics collected from the SDK.
//
// This method needs to be concurrent safe.
RegisterProducer(Producer)

// temporality reports the Temporality for the instrument kind provided.
//
// This method needs to be concurrent safe with itself and all the other
Expand Down Expand Up @@ -166,3 +159,32 @@ func DefaultAggregationSelector(ik InstrumentKind) aggregation.Aggregation {
}
panic("unknown instrument kind")
}

// ReaderOption is an option which can be applied to manual or Periodic
// readers.
type ReaderOption interface {
PeriodicReaderOption
ManualReaderOption
}

// WithProducers registers producers as an external Producer of metric data
// for this Reader.
func WithProducer(p Producer) ReaderOption {
return producerOption{p: p}
}

type producerOption struct {
p Producer
}

// applyManual returns a manualReaderConfig with option applied.
func (o producerOption) applyManual(c manualReaderConfig) manualReaderConfig {
c.producers = append(c.producers, o.p)
return c
}

// applyPeriodic returns a periodicReaderConfig with option applied.
func (o producerOption) applyPeriodic(c periodicReaderConfig) periodicReaderConfig {
c.producers = append(c.producers, o.p)
return c
}
Loading