Skip to content

Commit

Permalink
[chore] Use errors.Join to unify errors (open-telemetry#5907)
Browse files Browse the repository at this point in the history
Resolve open-telemetry#3544

Replace all custom implementations for multi-error unification with the
`errors.Join` function of the standard library.
  • Loading branch information
MrAlias authored Oct 22, 2024
1 parent 1a964cc commit 92ccad7
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 131 deletions.
22 changes: 5 additions & 17 deletions sdk/metric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"fmt"
"errors"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -44,25 +44,13 @@ func (c config) readerSignals() (forceFlush, shutdown func(context.Context) erro
// value.
func unify(funcs []func(context.Context) error) func(context.Context) error {
return func(ctx context.Context) error {
var errs []error
var err error
for _, f := range funcs {
if err := f(ctx); err != nil {
errs = append(errs, err)
if e := f(ctx); e != nil {
err = errors.Join(err, e)
}
}
return unifyErrors(errs)
}
}

// unifyErrors combines multiple errors into a single error.
func unifyErrors(errs []error) error {
switch len(errs) {
case 0:
return nil
case 1:
return errs[0]
default:
return fmt.Errorf("%v", errs)
return err
}
}

Expand Down
20 changes: 14 additions & 6 deletions sdk/metric/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package metric

import (
"context"
"fmt"
"errors"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -108,11 +108,19 @@ func TestConfigReaderSignalsForwardedErrors(t *testing.T) {
}

func TestUnifyMultiError(t *testing.T) {
f := func(context.Context) error { return assert.AnError }
funcs := []func(context.Context) error{f, f, f}
errs := []error{assert.AnError, assert.AnError, assert.AnError}
target := fmt.Errorf("%v", errs)
assert.Equal(t, unify(funcs)(context.Background()), target)
var (
e0 = errors.New("0")
e1 = errors.New("1")
e2 = errors.New("2")
)
err := unify([]func(context.Context) error{
func(ctx context.Context) error { return e0 },
func(ctx context.Context) error { return e1 },
func(ctx context.Context) error { return e2 },
})(context.Background())
assert.ErrorIs(t, err, e0)
assert.ErrorIs(t, err, e1)
assert.ErrorIs(t, err, e2)
}

func mergeResource(t *testing.T, r1, r2 *resource.Resource) *resource.Resource {
Expand Down
9 changes: 4 additions & 5 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,17 @@ func (mr *ManualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetr
if err != nil {
return err
}
var errs []error
for _, producer := range mr.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
errs = append(errs, err)
externalMetrics, e := producer.Produce(ctx)
if e != nil {
err = errors.Join(err, e)
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}

global.Debug("ManualReader collection", "Data", rm)

return unifyErrors(errs)
return err
}

// MarshalLog returns logging data about the ManualReader.
Expand Down
15 changes: 7 additions & 8 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,21 +442,21 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
}

reg := newObserver()
var errs multierror
var err error
for _, inst := range insts {
switch o := inst.(type) {
case int64Observable:
if err := o.registerable(m); err != nil {
if !errors.Is(err, errEmptyAgg) {
errs.append(err)
if e := o.registerable(m); e != nil {
if !errors.Is(e, errEmptyAgg) {
err = errors.Join(err, e)
}
continue
}
reg.registerInt64(o.observableID)
case float64Observable:
if err := o.registerable(m); err != nil {
if !errors.Is(err, errEmptyAgg) {
errs.append(err)
if e := o.registerable(m); e != nil {
if !errors.Is(e, errEmptyAgg) {
err = errors.Join(err, e)
}
continue
}
Expand All @@ -467,7 +467,6 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
}
}

err := errs.errorOrNil()
if reg.len() == 0 {
// All insts use drop aggregation or are invalid.
return noopRegister{}, err
Expand Down
9 changes: 4 additions & 5 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,18 +251,17 @@ func (r *PeriodicReader) collect(ctx context.Context, p interface{}, rm *metricd
if err != nil {
return err
}
var errs []error
for _, producer := range r.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
errs = append(errs, err)
externalMetrics, e := producer.Produce(ctx)
if e != nil {
err = errors.Join(err, e)
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}

global.Debug("PeriodicReader collection", "Data", rm)

return unifyErrors(errs)
return err
}

// export exports metric data m using r's exporter.
Expand Down
77 changes: 32 additions & 45 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -108,11 +107,11 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
p.Lock()
defer p.Unlock()

var errs multierror
var err error
for _, c := range p.callbacks {
// TODO make the callbacks parallel. ( #3034 )
if err := c(ctx); err != nil {
errs.append(err)
if e := c(ctx); e != nil {
err = errors.Join(err, e)
}
if err := ctx.Err(); err != nil {
rm.Resource = nil
Expand All @@ -123,8 +122,8 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
for e := p.multiCallbacks.Front(); e != nil; e = e.Next() {
// TODO make the callbacks parallel. ( #3034 )
f := e.Value.(multiCallback)
if err := f(ctx); err != nil {
errs.append(err)
if e := f(ctx); e != nil {
err = errors.Join(err, e)
}
if err := ctx.Err(); err != nil {
// This means the context expired before we finished running callbacks.
Expand Down Expand Up @@ -160,7 +159,7 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)

rm.ScopeMetrics = rm.ScopeMetrics[:i]

return errs.errorOrNil()
return err
}

// inserter facilitates inserting of new instruments from a single scope into a
Expand Down Expand Up @@ -222,17 +221,17 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
measures []aggregate.Measure[N]
)

errs := &multierror{wrapped: errCreatingAggregators}
var err error
seen := make(map[uint64]struct{})
for _, v := range i.pipeline.views {
stream, match := v(inst)
if !match {
continue
}
matched = true
in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
if err != nil {
errs.append(err)
in, id, e := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
if e != nil {
err = errors.Join(err, e)
}
if in == nil { // Drop aggregation.
continue
Expand All @@ -245,8 +244,12 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
measures = append(measures, in)
}

if err != nil {
err = errors.Join(errCreatingAggregators, err)
}

if matched {
return measures, errs.errorOrNil()
return measures, err
}

// Apply implicit default view if no explicit matched.
Expand All @@ -255,15 +258,18 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
Description: inst.Description,
Unit: inst.Unit,
}
in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
if err != nil {
errs.append(err)
in, _, e := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
if e != nil {
if err == nil {
err = errCreatingAggregators
}
err = errors.Join(err, e)
}
if in != nil {
// Ensured to have not seen given matched was false.
measures = append(measures, in)
}
return measures, errs.errorOrNil()
return measures, err
}

// addCallback registers a single instrument callback to be run when
Expand Down Expand Up @@ -609,15 +615,15 @@ func newResolver[N int64 | float64](p pipelines, vc *cache[string, instID]) reso
func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error) {
var measures []aggregate.Measure[N]

errs := &multierror{}
var err error
for _, i := range r.inserters {
in, err := i.Instrument(id, i.readerDefaultAggregation(id.Kind))
if err != nil {
errs.append(err)
in, e := i.Instrument(id, i.readerDefaultAggregation(id.Kind))
if e != nil {
err = errors.Join(err, e)
}
measures = append(measures, in...)
}
return measures, errs.errorOrNil()
return measures, err
}

// HistogramAggregators returns the histogram Aggregators that must be updated by the instrument
Expand All @@ -626,37 +632,18 @@ func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error)
func (r resolver[N]) HistogramAggregators(id Instrument, boundaries []float64) ([]aggregate.Measure[N], error) {
var measures []aggregate.Measure[N]

errs := &multierror{}
var err error
for _, i := range r.inserters {
agg := i.readerDefaultAggregation(id.Kind)
if histAgg, ok := agg.(AggregationExplicitBucketHistogram); ok && len(boundaries) > 0 {
histAgg.Boundaries = boundaries
agg = histAgg
}
in, err := i.Instrument(id, agg)
if err != nil {
errs.append(err)
in, e := i.Instrument(id, agg)
if e != nil {
err = errors.Join(err, e)
}
measures = append(measures, in...)
}
return measures, errs.errorOrNil()
}

type multierror struct {
wrapped error
errors []string
}

func (m *multierror) errorOrNil() error {
if len(m.errors) == 0 {
return nil
}
if m.wrapped == nil {
return errors.New(strings.Join(m.errors, "; "))
}
return fmt.Errorf("%w: %s", m.wrapped, strings.Join(m.errors, "; "))
}

func (m *multierror) append(err error) {
m.errors = append(m.errors, err.Error())
return measures, err
}
2 changes: 1 addition & 1 deletion sdk/metric/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (ts *readerTestSuite) TestExternalProducerPartialSuccess() {

m := metricdata.ResourceMetrics{}
err := ts.Reader.Collect(context.Background(), &m)
ts.Equal(assert.AnError, err)
ts.ErrorIs(err, assert.AnError)
ts.Equal(testResourceMetricsAB, m)
}

Expand Down
Loading

0 comments on commit 92ccad7

Please sign in to comment.