Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sdk/metric: Reader factories return structs #4244

Merged
merged 3 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Starting from `v1.21.0` of semantic conventions, `go.opentelemetry.io/otel/semconv/{version}/httpconv` and `go.opentelemetry.io/otel/semconv/{version}/netconv` packages will no longer be published. (#4145)
- Log duplicate instrument conflict at a warning level instead of info in `go.opentelemetry.io/otel/sdk/metric`. (#4202)
- Return an error on the creation of new instruments if their name doesn't pass regexp validation. (#4210)
- `NewManualReader` in `go.opentelemetry.io/otel/sdk/metric` returns `*ManualReader` instead of `Reader`. (#4244)
- `NewPeriodicReader` in `go.opentelemetry.io/otel/sdk/metric` returns `*PeriodicReader` instead of `Reader`. (#4244)
pellared marked this conversation as resolved.
Show resolved Hide resolved

## [1.16.0/0.39.0] 2023-05-18

Expand Down
24 changes: 12 additions & 12 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// manualReader is a simple Reader that allows an application to
// ManualReader is a simple Reader that allows an application to
// read metrics on demand.
type manualReader struct {
type ManualReader struct {
sdkProducer atomic.Value
shutdownOnce sync.Once

Expand All @@ -41,12 +41,12 @@ type manualReader struct {
}

// Compile time check the manualReader implements Reader and is comparable.
var _ = map[Reader]struct{}{&manualReader{}: {}}
var _ = map[Reader]struct{}{&ManualReader{}: {}}

// NewManualReader returns a Reader which is directly called to collect metrics.
func NewManualReader(opts ...ManualReaderOption) Reader {
func NewManualReader(opts ...ManualReaderOption) *ManualReader {
cfg := newManualReaderConfig(opts)
r := &manualReader{
r := &ManualReader{
temporalitySelector: cfg.temporalitySelector,
aggregationSelector: cfg.aggregationSelector,
}
Expand All @@ -56,7 +56,7 @@ func NewManualReader(opts ...ManualReaderOption) Reader {

// register stores the sdkProducer which enables the caller
// to read metrics from the SDK on demand.
func (mr *manualReader) register(p sdkProducer) {
func (mr *ManualReader) register(p sdkProducer) {
// Only register once. If producer is already set, do nothing.
if !mr.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register manual reader"
Expand All @@ -66,7 +66,7 @@ func (mr *manualReader) register(p sdkProducer) {

// RegisterProducer stores the external Producer which enables the caller
// to read metrics on demand.
func (mr *manualReader) RegisterProducer(p Producer) {
func (mr *ManualReader) RegisterProducer(p Producer) {
mr.mu.Lock()
defer mr.mu.Unlock()
if mr.isShutdown {
Expand All @@ -80,22 +80,22 @@ func (mr *manualReader) RegisterProducer(p Producer) {
}

// temporality reports the Temporality for the instrument kind provided.
func (mr *manualReader) temporality(kind InstrumentKind) metricdata.Temporality {
func (mr *ManualReader) temporality(kind InstrumentKind) metricdata.Temporality {
return mr.temporalitySelector(kind)
}

// aggregation returns what Aggregation to use for kind.
func (mr *manualReader) aggregation(kind InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type.
func (mr *ManualReader) aggregation(kind InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type.
return mr.aggregationSelector(kind)
}

// ForceFlush is a no-op, it always returns nil.
func (mr *manualReader) ForceFlush(context.Context) error {
func (mr *ManualReader) ForceFlush(context.Context) error {
return nil
}

// Shutdown closes any connections and frees any resources used by the reader.
func (mr *manualReader) Shutdown(context.Context) error {
func (mr *ManualReader) Shutdown(context.Context) error {
err := ErrReaderShutdown
mr.shutdownOnce.Do(func() {
// Any future call to Collect will now return ErrReaderShutdown.
Expand All @@ -117,7 +117,7 @@ func (mr *manualReader) Shutdown(context.Context) error {
//
// Collect will return an error if called after shutdown.
// Collect will return an error if rm is a nil ResourceMetrics.
func (mr *manualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
func (mr *ManualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
if rm == nil {
return errors.New("manual reader: *metricdata.ResourceMetrics is nil")
}
Expand Down
32 changes: 16 additions & 16 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ func WithInterval(d time.Duration) PeriodicReaderOption {
// The Collect method of the returned Reader continues to gather and return
// metric data to the user. It will not automatically send that data to the
// exporter. That is left to the user to accomplish.
func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reader {
func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *PeriodicReader {
conf := newPeriodicReaderConfig(options)
ctx, cancel := context.WithCancel(context.Background())
r := &periodicReader{
r := &PeriodicReader{
timeout: conf.timeout,
exporter: exporter,
flushCh: make(chan chan error),
Expand All @@ -135,9 +135,9 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
return r
}

// periodicReader is a Reader that continuously collects and exports metric
// PeriodicReader is a Reader that continuously collects and exports metric
// data at a set interval.
type periodicReader struct {
type PeriodicReader struct {
sdkProducer atomic.Value

mu sync.Mutex
Expand All @@ -156,14 +156,14 @@ type periodicReader struct {
}

// Compile time check the periodicReader implements Reader and is comparable.
var _ = map[Reader]struct{}{&periodicReader{}: {}}
var _ = map[Reader]struct{}{&PeriodicReader{}: {}}

// newTicker allows testing override.
var newTicker = time.NewTicker

// run continuously collects and exports metric data at the specified
// interval. This will run until ctx is canceled or times out.
func (r *periodicReader) run(ctx context.Context, interval time.Duration) {
func (r *PeriodicReader) run(ctx context.Context, interval time.Duration) {
ticker := newTicker(interval)
defer ticker.Stop()

Expand All @@ -184,7 +184,7 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) {
}

// register registers p as the producer of this reader.
func (r *periodicReader) register(p sdkProducer) {
func (r *PeriodicReader) register(p sdkProducer) {
// Only register once. If producer is already set, do nothing.
if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register periodic reader"
Expand All @@ -193,7 +193,7 @@ func (r *periodicReader) register(p sdkProducer) {
}

// RegisterProducer registers p as an external Producer of this reader.
func (r *periodicReader) RegisterProducer(p Producer) {
func (r *PeriodicReader) RegisterProducer(p Producer) {
r.mu.Lock()
defer r.mu.Unlock()
if r.isShutdown {
Expand All @@ -207,18 +207,18 @@ func (r *periodicReader) RegisterProducer(p Producer) {
}

// temporality reports the Temporality for the instrument kind provided.
func (r *periodicReader) temporality(kind InstrumentKind) metricdata.Temporality {
func (r *PeriodicReader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.exporter.Temporality(kind)
}

// aggregation returns what Aggregation to use for kind.
func (r *periodicReader) aggregation(kind InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type.
func (r *PeriodicReader) aggregation(kind InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type.
return r.exporter.Aggregation(kind)
}

// collectAndExport gather all metric data related to the periodicReader r from
// the SDK and exports it with r's exporter.
func (r *periodicReader) collectAndExport(ctx context.Context) error {
func (r *PeriodicReader) collectAndExport(ctx context.Context) error {
// TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect.
rm := r.rmPool.Get().(*metricdata.ResourceMetrics)
err := r.Collect(ctx, rm)
Expand All @@ -235,7 +235,7 @@ func (r *periodicReader) collectAndExport(ctx context.Context) error {
// handle that if desired.
//
// An error is returned if this is called after Shutdown. An error is return if rm is nil.
func (r *periodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
func (r *PeriodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
if rm == nil {
return errors.New("periodic reader: *metricdata.ResourceMetrics is nil")
}
Expand All @@ -244,7 +244,7 @@ func (r *periodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMet
}

// collect unwraps p as a produceHolder and returns its produce results.
func (r *periodicReader) collect(ctx context.Context, p interface{}, rm *metricdata.ResourceMetrics) error {
func (r *PeriodicReader) collect(ctx context.Context, p interface{}, rm *metricdata.ResourceMetrics) error {
if p == nil {
return ErrReaderNotRegistered
}
Expand Down Expand Up @@ -275,14 +275,14 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}, rm *metricd
}

// export exports metric data m using r's exporter.
func (r *periodicReader) export(ctx context.Context, m *metricdata.ResourceMetrics) error {
func (r *PeriodicReader) export(ctx context.Context, m *metricdata.ResourceMetrics) error {
c, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel()
return r.exporter.Export(c, m)
}

// ForceFlush flushes pending telemetry.
func (r *periodicReader) ForceFlush(ctx context.Context) error {
func (r *PeriodicReader) ForceFlush(ctx context.Context) error {
errCh := make(chan error, 1)
select {
case r.flushCh <- errCh:
Expand All @@ -304,7 +304,7 @@ func (r *periodicReader) ForceFlush(ctx context.Context) error {
}

// Shutdown flushes pending telemetry and then stops the export pipeline.
func (r *periodicReader) Shutdown(ctx context.Context) error {
func (r *PeriodicReader) Shutdown(ctx context.Context) error {
err := ErrReaderShutdown
r.shutdownOnce.Do(func() {
// Stop the run loop.
Expand Down