Skip to content

Commit

Permalink
Change the Reader.Collect Signature. (#3732)
Browse files Browse the repository at this point in the history
* Changes the signature of Collect().

This DOES NOT make the SDK reuse memory, but it does enable it to be added.
  • Loading branch information
MadVikingGod authored Feb 21, 2023
1 parent ecf0838 commit cc8bdaa
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Changed

- [bridge/ot] Fall-back to TextMap carrier when it's not ot.HttpHeaders. (#3679)
- The `Collect` method of the `"go.opentelemetry.io/otel/sdk/metric".Reader` interface is updated to accept the `metricdata.ResourceMetrics` value the collection will be made into. This change is made to enable memory reuse by SDK users. (#3732)

### Fixed

Expand Down
4 changes: 3 additions & 1 deletion exporters/prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) {

// Collect implements prometheus.Collector.
func (c *collector) Collect(ch chan<- prometheus.Metric) {
metrics, err := c.reader.Collect(context.TODO())
// TODO (#3047): Use a sync.Pool instead of allocating metrics every Collect.
metrics := metricdata.ResourceMetrics{}
err := c.reader.Collect(context.TODO(), &metrics)
if err != nil {
otel.Handle(err)
if err == metric.ErrReaderNotRegistered {
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func BenchmarkCounterCollectOneAttr(b *testing.B) {
for i := 0; i < b.N; i++ {
cntr.Add(ctx, 1, attribute.Int("K", 1))

_, _ = rdr.Collect(ctx)
_ = rdr.Collect(ctx, nil)
}
}

Expand All @@ -104,7 +104,7 @@ func BenchmarkCounterCollectTenAttrs(b *testing.B) {
for j := 0; j < 10; j++ {
cntr.Add(ctx, 1, attribute.Int("K", j))
}
_, _ = rdr.Collect(ctx)
_ = rdr.Collect(ctx, nil)
}
}

Expand Down Expand Up @@ -140,7 +140,7 @@ func benchCollectHistograms(count int) func(*testing.B) {
b.ResetTimer()

for n := 0; n < b.N; n++ {
collectedMetrics, _ = r.Collect(ctx)
_ = r.Collect(ctx, &collectedMetrics)
if len(collectedMetrics.ScopeMetrics[0].Metrics) != count {
b.Fatalf("got %d metrics, want %d", len(collectedMetrics.ScopeMetrics[0].Metrics), count)
}
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type reader struct {
externalProducers []Producer
temporalityFunc TemporalitySelector
aggregationFunc AggregationSelector
collectFunc func(context.Context) (metricdata.ResourceMetrics, error)
collectFunc func(context.Context, *metricdata.ResourceMetrics) error
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
}
Expand All @@ -48,8 +48,8 @@ func (r *reader) RegisterProducer(p Producer) { r.externalProducers = append(r.e
func (r *reader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.temporalityFunc(kind)
}
func (r *reader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
return r.collectFunc(ctx)
func (r *reader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
return r.collectFunc(ctx, rm)
}
func (r *reader) ForceFlush(ctx context.Context) error { return r.forceFlushFunc(ctx) }
func (r *reader) Shutdown(ctx context.Context) error { return r.shutdownFunc(ctx) }
Expand Down
24 changes: 16 additions & 8 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -112,11 +113,17 @@ func (mr *manualReader) Shutdown(context.Context) error {
}

// Collect gathers all metrics from the SDK and other Producers, calling any
// callbacks necessary. Collect will return an error if called after shutdown.
func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
// callbacks necessary and stores the result in rm.
//
// Collect will return an error if called after shutdown.
// Collect will return an error if rm is a nil ResourceMetrics.
func (mr *manualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
if rm == nil {
return errors.New("manual reader: *metricdata.ResourceMetrics is nil")
}
p := mr.sdkProducer.Load()
if p == nil {
return metricdata.ResourceMetrics{}, ErrReaderNotRegistered
return ErrReaderNotRegistered
}

ph, ok := p.(produceHolder)
Expand All @@ -126,12 +133,13 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics
// happen, return an error instead of panicking so a users code does
// not halt in the processes.
err := fmt.Errorf("manual reader: invalid producer: %T", p)
return metricdata.ResourceMetrics{}, err
return err
}

rm, err := ph.produce(ctx)
// TODO (#3047): When produce is updated to accept output as param, pass rm.
rmTemp, err := ph.produce(ctx)
*rm = rmTemp
if err != nil {
return metricdata.ResourceMetrics{}, err
return err
}
var errs []error
for _, producer := range mr.externalProducers.Load().([]Producer) {
Expand All @@ -141,7 +149,7 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
return rm, unifyErrors(errs)
return unifyErrors(errs)
}

// manualReaderConfig contains configuration options for a ManualReader.
Expand Down
24 changes: 15 additions & 9 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,8 @@ func TestMeterCreatesInstruments(t *testing.T) {

tt.fn(t, m)

rm, err := rdr.Collect(context.Background())
rm := metricdata.ResourceMetrics{}
err := rdr.Collect(context.Background(), &rm)
assert.NoError(t, err)

require.Len(t, rm.ScopeMetrics, 1)
Expand Down Expand Up @@ -566,7 +567,7 @@ func TestCallbackObserverNonRegistered(t *testing.T) {

var got metricdata.ResourceMetrics
assert.NotPanics(t, func() {
got, err = rdr.Collect(context.Background())
err = rdr.Collect(context.Background(), &got)
})

assert.NoError(t, err)
Expand Down Expand Up @@ -660,7 +661,8 @@ func TestGlobalInstRegisterCallback(t *testing.T) {
_, err = preMtr.RegisterCallback(cb, preInt64Ctr, preFloat64Ctr, postInt64Ctr, postFloat64Ctr)
assert.NoError(t, err)

got, err := rdr.Collect(context.Background())
got := metricdata.ResourceMetrics{}
err = rdr.Collect(context.Background(), &got)
assert.NoError(t, err)
assert.Lenf(t, l.messages, 0, "Warnings and errors logged:\n%s", l)
metricdatatest.AssertEqual(t, metricdata.ResourceMetrics{
Expand Down Expand Up @@ -772,7 +774,8 @@ func TestMetersProvideScope(t *testing.T) {
},
}

got, err := rdr.Collect(context.Background())
got := metricdata.ResourceMetrics{}
err = rdr.Collect(context.Background(), &got)
assert.NoError(t, err)
metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp())
}
Expand Down Expand Up @@ -816,14 +819,14 @@ func TestUnregisterUnregisters(t *testing.T) {
require.NoError(t, err)

ctx := context.Background()
_, err = r.Collect(ctx)
err = r.Collect(ctx, &metricdata.ResourceMetrics{})
require.NoError(t, err)
assert.True(t, called, "callback not called for registered callback")

called = false
require.NoError(t, reg.Unregister(), "unregister")

_, err = r.Collect(ctx)
err = r.Collect(ctx, &metricdata.ResourceMetrics{})
require.NoError(t, err)
assert.False(t, called, "callback called for unregistered callback")
}
Expand Down Expand Up @@ -869,7 +872,8 @@ func TestRegisterCallbackDropAggregations(t *testing.T) {
)
require.NoError(t, err)

data, err := r.Collect(context.Background())
data := metricdata.ResourceMetrics{}
err = r.Collect(context.Background(), &data)
require.NoError(t, err)

assert.False(t, called, "callback called for all drop instruments")
Expand Down Expand Up @@ -1238,7 +1242,8 @@ func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) {
).Meter("TestAttributeFilter")
require.NoError(t, tt.register(t, mtr))

m, err := rdr.Collect(context.Background())
m := metricdata.ResourceMetrics{}
err := rdr.Collect(context.Background(), &m)
assert.NoError(t, err)

require.Len(t, m.ScopeMetrics, 1)
Expand Down Expand Up @@ -1331,7 +1336,8 @@ func TestAsynchronousExample(t *testing.T) {

collect := func(t *testing.T) {
t.Helper()
got, err := reader.Collect(context.Background())
got := metricdata.ResourceMetrics{}
err := reader.Collect(context.Background(), &got)
require.NoError(t, err)
require.Len(t, got.ScopeMetrics, 1)
metricdatatest.AssertEqual(t, *want, got.ScopeMetrics[0], metricdatatest.IgnoreTimestamp())
Expand Down
25 changes: 17 additions & 8 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -206,21 +207,29 @@ func (r *periodicReader) aggregation(kind InstrumentKind) aggregation.Aggregatio
// collectAndExport gather all metric data related to the periodicReader r from
// the SDK and exports it with r's exporter.
func (r *periodicReader) collectAndExport(ctx context.Context) error {
m, err := r.Collect(ctx)
// TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect.
rm := metricdata.ResourceMetrics{}
err := r.Collect(ctx, &rm)
if err == nil {
err = r.export(ctx, m)
err = r.export(ctx, rm)
}
return err
}

// Collect gathers and returns all metric data related to the Reader from
// the SDK and other Producers. The returned metric data is not exported
// to the configured exporter, it is left to the caller to handle that if
// desired.
// the SDK and other Producers and stores the result in rm. The returned metric
// data is not exported to the configured exporter, it is left to the caller to
// handle that if desired.
//
// An error is returned if this is called after Shutdown.
func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
return r.collect(ctx, r.sdkProducer.Load())
// An error is returned if this is called after Shutdown. An error is return if rm is nil.
func (r *periodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
if rm == nil {
return errors.New("periodic reader: *metricdata.ResourceMetrics is nil")
}
// TODO (#3047): When collect is updated to accept output as param, pass rm.
rmTemp, err := r.collect(ctx, r.sdkProducer.Load())
*rm = rmTemp
return err
}

// collect unwraps p as a produceHolder and returns its produce results.
Expand Down
5 changes: 3 additions & 2 deletions sdk/metric/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ type Reader interface {
aggregation(InstrumentKind) aggregation.Aggregation // nolint:revive // import-shadow for method scoped by type.

// Collect gathers and returns all metric data related to the Reader from
// the SDK. An error is returned if this is called after Shutdown.
Collect(context.Context) (metricdata.ResourceMetrics, error)
// the SDK and stores it in out. An error is returned if this is called
// after Shutdown or if out is nil.
Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error

// ForceFlush flushes all metric measurements held in an export pipeline.
//
Expand Down
31 changes: 21 additions & 10 deletions sdk/metric/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,23 @@ func (ts *readerTestSuite) TearDownTest() {
}

func (ts *readerTestSuite) TestErrorForNotRegistered() {
_, err := ts.Reader.Collect(context.Background())
err := ts.Reader.Collect(context.Background(), &metricdata.ResourceMetrics{})
ts.ErrorIs(err, ErrReaderNotRegistered)
}

func (ts *readerTestSuite) TestSDKProducer() {
ts.Reader.register(testSDKProducer{})
m, err := ts.Reader.Collect(context.Background())
m := metricdata.ResourceMetrics{}
err := ts.Reader.Collect(context.Background(), &m)
ts.NoError(err)
ts.Equal(testResourceMetricsA, m)
}

func (ts *readerTestSuite) TestExternalProducer() {
ts.Reader.register(testSDKProducer{})
ts.Reader.RegisterProducer(testExternalProducer{})
m, err := ts.Reader.Collect(context.Background())
m := metricdata.ResourceMetrics{}
err := ts.Reader.Collect(context.Background(), &m)
ts.NoError(err)
ts.Equal(testResourceMetricsAB, m)
}
Expand All @@ -78,7 +80,8 @@ func (ts *readerTestSuite) TestCollectAfterShutdown() {
ts.Reader.RegisterProducer(testExternalProducer{})
ts.Require().NoError(ts.Reader.Shutdown(ctx))

m, err := ts.Reader.Collect(ctx)
m := metricdata.ResourceMetrics{}
err := ts.Reader.Collect(ctx, &m)
ts.ErrorIs(err, ErrReaderShutdown)
ts.Equal(metricdata.ResourceMetrics{}, m)
}
Expand Down Expand Up @@ -113,7 +116,7 @@ func (ts *readerTestSuite) TestMultipleRegister() {
// This should be ignored.
ts.Reader.register(p1)

_, err := ts.Reader.Collect(context.Background())
err := ts.Reader.Collect(context.Background(), &metricdata.ResourceMetrics{})
ts.Equal(assert.AnError, err)
}

Expand All @@ -134,7 +137,8 @@ func (ts *readerTestSuite) TestExternalProducerPartialSuccess() {
},
)

m, err := ts.Reader.Collect(context.Background())
m := metricdata.ResourceMetrics{}
err := ts.Reader.Collect(context.Background(), &m)
ts.Equal(assert.AnError, err)
ts.Equal(testResourceMetricsAB, m)
}
Expand All @@ -146,7 +150,8 @@ func (ts *readerTestSuite) TestSDKFailureBlocksExternalProducer() {
}})
ts.Reader.RegisterProducer(testExternalProducer{})

m, err := ts.Reader.Collect(context.Background())
m := metricdata.ResourceMetrics{}
err := ts.Reader.Collect(context.Background(), &m)
ts.Equal(assert.AnError, err)
ts.Equal(metricdata.ResourceMetrics{}, m)
}
Expand All @@ -165,7 +170,7 @@ func (ts *readerTestSuite) TestMethodConcurrency() {
wg.Add(1)
go func() {
defer wg.Done()
_, _ = ts.Reader.Collect(ctx)
_ = ts.Reader.Collect(ctx, nil)
}()

wg.Add(1)
Expand All @@ -190,11 +195,17 @@ func (ts *readerTestSuite) TestShutdownBeforeRegister() {
ts.Reader.register(testSDKProducer{})
ts.Reader.RegisterProducer(testExternalProducer{})

m, err := ts.Reader.Collect(ctx)
m := metricdata.ResourceMetrics{}
err := ts.Reader.Collect(ctx, &m)
ts.ErrorIs(err, ErrReaderShutdown)
ts.Equal(metricdata.ResourceMetrics{}, m)
}

func (ts *readerTestSuite) TestCollectNilResourceMetricError() {
ctx := context.Background()
ts.Assert().Error(ts.Reader.Collect(ctx, nil))
}

var testScopeMetricsA = metricdata.ScopeMetrics{
Scope: instrumentation.Scope{Name: "sdk/metric/test/reader"},
Metrics: []metricdata.Metrics{{
Expand Down Expand Up @@ -279,7 +290,7 @@ func benchReaderCollectFunc(r Reader) func(*testing.B) {
b.ResetTimer()

for n := 0; n < b.N; n++ {
collectedMetrics, err = r.Collect(ctx)
err = r.Collect(ctx, &collectedMetrics)
assert.Equalf(b, testResourceMetricsA, collectedMetrics, "unexpected Collect response: (%#v, %v)", collectedMetrics, err)
}
}
Expand Down

0 comments on commit cc8bdaa

Please sign in to comment.