diff --git a/CHANGELOG.md b/CHANGELOG.md index 82c4288d7f8..7ffccf17b8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - If an attribute set is Observed multiple times in an async callback, the values will be summed instead of the last observation winning. (#4289) - Allow the explicit bucket histogram aggregation to be used for the up-down counter, observable counter, observable up-down counter, and observable gauge in the `go.opentelemetry.io/otel/sdk/metric` package. (#4332) - Restrict `Meter`s in `go.opentelemetry.io/otel/sdk/metric` to only register and collect instruments it created. (#4333) +- `PeriodicReader.Shutdown` in `go.opentelemetry.io/otel/sdk/metric` now applies the periodic reader's timeout by default. (#4356) ### Fixed diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 3ea2c2f0fb2..48bbdffd82f 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -67,7 +67,8 @@ func (o periodicReaderOptionFunc) applyPeriodic(conf periodicReaderConfig) perio } // WithTimeout configures the time a PeriodicReader waits for an export to -// complete before canceling it. +// complete before canceling it. This includes an export which occurs as part +// of Shutdown. // // This option overrides any value set for the // OTEL_METRIC_EXPORT_TIMEOUT environment variable. @@ -323,6 +324,8 @@ func (r *PeriodicReader) ForceFlush(ctx context.Context) error { func (r *PeriodicReader) Shutdown(ctx context.Context) error { err := ErrReaderShutdown r.shutdownOnce.Do(func() { + ctx, cancel := context.WithTimeout(ctx, r.timeout) + defer cancel() // Stop the run loop. r.cancel() <-r.done diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index e5d8fd2ef30..ad8b5598756 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -337,6 +337,46 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") }) + + t.Run("Shutdown timeout on producer", func(t *testing.T) { + exp, called := expFunc(t) + timeout := time.Millisecond + r := NewPeriodicReader(exp, WithTimeout(timeout)) + r.register(testSDKProducer{ + produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error { + select { + case <-time.After(timeout + time.Second): + *rm = testResourceMetricsA + case <-ctx.Done(): + // we timed out before we could collect metrics + return ctx.Err() + } + return nil + }}) + r.RegisterProducer(testExternalProducer{}) + assert.Equal(t, context.DeadlineExceeded, r.Shutdown(context.Background()), "timeout error not returned") + assert.False(t, *called, "exporter Export method called when it should have failed before export") + }) + + t.Run("Shutdown timeout on external producer", func(t *testing.T) { + exp, called := expFunc(t) + timeout := time.Millisecond + r := NewPeriodicReader(exp, WithTimeout(timeout)) + r.register(testSDKProducer{}) + r.RegisterProducer(testExternalProducer{ + produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) { + select { + case <-time.After(timeout + time.Second): + case <-ctx.Done(): + // we timed out before we could collect metrics + return nil, ctx.Err() + } + return []metricdata.ScopeMetrics{testScopeMetricsA}, nil + }, + }) + assert.Equal(t, context.DeadlineExceeded, r.Shutdown(context.Background()), "timeout error not returned") + assert.False(t, *called, "exporter Export method called when it should have failed before export") + }) } func BenchmarkPeriodicReader(b *testing.B) {