Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove metric aggregator Subtract interface #2350

Merged
merged 9 commits into from
Nov 15, 2021
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Removed

- Remove the metric Processor's ability to convert cumulative to delta aggregation temporality. (#2350)

## [1.2.0] - 2021-11-12

### Changed
Expand Down
5 changes: 4 additions & 1 deletion sdk/export/metric/aggregation/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ var (
ErrNegativeInput = fmt.Errorf("negative value is out of range for this instrument")
ErrNaNInput = fmt.Errorf("NaN value is an invalid input")
ErrInconsistentType = fmt.Errorf("inconsistent aggregator types")
ErrNoSubtraction = fmt.Errorf("aggregator does not subtract")

// ErrNoCumulativeToDelta is returned when requesting delta
// export kind for a precomputed sum instrument.
ErrNoCumulativeToDelta = fmt.Errorf("cumulative to delta not implemented")

// ErrNoData is returned when (due to a race with collection)
// the Aggregator is check-pointed before the first value is set.
Expand Down
10 changes: 0 additions & 10 deletions sdk/export/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,6 @@ type Aggregator interface {
Merge(aggregator Aggregator, descriptor *sdkapi.Descriptor) error
}

// Subtractor is an optional interface implemented by some
// Aggregators. An Aggregator must support `Subtract()` in order to
// be configured for a Precomputed-Sum instrument (CounterObserver,
// UpDownCounterObserver) using a DeltaExporter.
type Subtractor interface {
// Subtract subtracts the `operand` from this Aggregator and
// outputs the value in `result`.
Subtract(operand, result Aggregator, descriptor *sdkapi.Descriptor) error
}

// Exporter handles presentation of the checkpoint of aggregate
// metrics. This is the final stage of a metrics export pipeline,
// where metric data are formatted for a specific system.
Expand Down
17 changes: 0 additions & 17 deletions sdk/metric/aggregator/sum/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ type Aggregator struct {
}

var _ export.Aggregator = &Aggregator{}
var _ export.Subtractor = &Aggregator{}
var _ aggregation.Sum = &Aggregator{}

// New returns a new counter aggregator implemented by atomic
Expand Down Expand Up @@ -88,19 +87,3 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error
c.value.AddNumber(desc.NumberKind(), o.value)
return nil
}

func (c *Aggregator) Subtract(opAgg, resAgg export.Aggregator, descriptor *sdkapi.Descriptor) error {
op, _ := opAgg.(*Aggregator)
if op == nil {
return aggregator.NewInconsistentAggregatorError(c, opAgg)
}

res, _ := resAgg.(*Aggregator)
if res == nil {
return aggregator.NewInconsistentAggregatorError(c, resAgg)
}

res.value = c.value
res.value.AddNumber(descriptor.NumberKind(), number.NewNumberSignChange(descriptor.NumberKind(), op.value))
return nil
}
77 changes: 31 additions & 46 deletions sdk/metric/processor/basic/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,6 @@ type (
// values in a single collection round.
current export.Aggregator

// delta, if non-nil, refers to an Aggregator owned by
// the processor used to compute deltas between
// precomputed sums.
delta export.Aggregator

// cumulative, if non-nil, refers to an Aggregator owned
// by the processor used to store the last cumulative
// value.
Expand All @@ -94,9 +89,6 @@ type (
sync.RWMutex
values map[stateKey]*stateValue

// Note: the timestamp logic currently assumes all
// exports are deltas.

processStart time.Time
intervalStart time.Time
intervalEnd time.Time
Expand Down Expand Up @@ -124,8 +116,8 @@ var ErrInvalidTemporality = fmt.Errorf("invalid aggregation temporality")
// New returns a basic Processor that is also a Checkpointer using the provided
// AggregatorSelector to select Aggregators. The TemporalitySelector
// is consulted to determine the kind(s) of exporter that will consume
// data, so that this Processor can prepare to compute Delta or
// Cumulative Aggregations as needed.
// data, so that this Processor can prepare to compute Cumulative Aggregations
// as needed.
func New(aselector export.AggregatorSelector, tselector aggregation.TemporalitySelector, opts ...Option) *Processor {
return NewFactory(aselector, tselector, opts...).NewCheckpointer().(*Processor)
}
Expand Down Expand Up @@ -191,13 +183,17 @@ func (b *Processor) Process(accum export.Accumulation) error {
}
if stateful {
if desc.InstrumentKind().PrecomputedSum() {
// If we know we need to compute deltas, allocate two aggregators.
b.AggregatorFor(desc, &newValue.cumulative, &newValue.delta)
} else {
// In this case we are certain not to need a delta, only allocate
// a cumulative aggregator.
b.AggregatorFor(desc, &newValue.cumulative)
// To convert precomputed sums to
// deltas requires two aggregators to
// be allocated, one for the prior
// value and one for the output delta.
// This functionality was removed from
// the basic processor in PR #2350.
return aggregation.ErrNoCumulativeToDelta
}
// In this case allocate one aggregator to
// save the current state.
b.AggregatorFor(desc, &newValue.cumulative)
}
b.state.values[key] = newValue
return nil
Expand Down Expand Up @@ -310,28 +306,15 @@ func (b *Processor) FinishCollection() error {
continue
}

// Update Aggregator state to support exporting either a
// delta or a cumulative aggregation.
var err error
if mkind.PrecomputedSum() {
if currentSubtractor, ok := value.current.(export.Subtractor); ok {
// This line is equivalent to:
// value.delta = currentSubtractor - value.cumulative
err = currentSubtractor.Subtract(value.cumulative, value.delta, key.descriptor)

if err == nil {
err = value.current.SynchronizedMove(value.cumulative, key.descriptor)
}
} else {
err = aggregation.ErrNoSubtraction
}
} else {
// The only kind of aggregators that are not stateless
// are the ones needing delta to cumulative
// conversion. Merge aggregator state in this case.
if !mkind.PrecomputedSum() {
// This line is equivalent to:
// value.cumulative = value.cumulative + value.delta
err = value.cumulative.Merge(value.current, key.descriptor)
}
if err != nil {
return err
// value.cumulative = value.cumulative + value.current
if err := value.cumulative.Merge(value.current, key.descriptor); err != nil {
return err
}
}
}
return nil
Expand All @@ -350,13 +333,8 @@ func (b *state) ForEach(exporter aggregation.TemporalitySelector, f func(export.
var agg aggregation.Aggregation
var start time.Time

// If the processor does not have Config.Memory and it was not updated
// in the prior round, do not visit this value.
if !b.config.Memory && value.updated != (b.finishedCollection-1) {
continue
}

aggTemp := exporter.TemporalityFor(key.descriptor, value.current.Aggregation().Kind())

switch aggTemp {
case aggregation.CumulativeTemporality:
// If stateful, the sum has been computed. If stateless, the
Expand All @@ -372,16 +350,23 @@ func (b *state) ForEach(exporter aggregation.TemporalitySelector, f func(export.
case aggregation.DeltaTemporality:
// Precomputed sums are a special case.
if mkind.PrecomputedSum() {
agg = value.delta.Aggregation()
} else {
agg = value.current.Aggregation()
// This functionality was removed from
// the basic processor in PR #2350.
return aggregation.ErrNoCumulativeToDelta
}
agg = value.current.Aggregation()
start = b.intervalStart

default:
return fmt.Errorf("%v: %w", aggTemp, ErrInvalidTemporality)
}

// If the processor does not have Config.Memory and it was not updated
// in the prior round, do not visit this value.
if !b.config.Memory && value.updated != (b.finishedCollection-1) {
continue
}

if err := f(export.NewRecord(
key.descriptor,
value.labels,
Expand Down
Loading