Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Delta & Cumulative temporality for LastValue aggregates #5305

Merged
merged 12 commits into from
May 10, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- De-duplicate map attributes added to a `Record` in `go.opentelemetry.io/otel/sdk/log`. (#5230)
- The `go.opentelemetry.io/otel/exporters/stdout/stdoutlog` exporter won't print `AttributeValueLengthLimit` and `AttributeCountLimit` fields now, instead it prints the `DroppedAttributes` field. (#5272)
- Improved performance in the `Stringer` implementation of `go.opentelemetry.io/otel/baggage.Member` by reducing the number of allocations. (#5286)
- Set the start time for last-value aggregates in `go.opentelemetry.io/otel/sdk/metric`. (#5305)
- The `Span` in `go.opentelemetry.io/otel/sdk/trace` will record links without span context if either non-empty `TraceState` or attributes are provided. (#5315)

### Fixed
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/instrument_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func BenchmarkInstrument(b *testing.B) {
build := aggregate.Builder[int64]{}
var meas []aggregate.Measure[int64]

in, _ := build.LastValue()
in, _ := build.PrecomputedLastValue()
meas = append(meas, in)

build.Temporality = metricdata.CumulativeTemporality
Expand All @@ -50,7 +50,7 @@ func BenchmarkInstrument(b *testing.B) {
build := aggregate.Builder[int64]{}
var meas []aggregate.Measure[int64]

in, _ := build.LastValue()
in, _ := build.PrecomputedLastValue()
meas = append(meas, in)

build.Temporality = metricdata.CumulativeTemporality
Expand Down
29 changes: 17 additions & 12 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,26 @@ func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] {
}

// LastValue returns a last-value aggregate function input and output.
//
// The Builder.Temporality is ignored and delta is use always.
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// Delta temporality is the only temporality that makes semantic sense for
// a last-value aggregate.
lv := newLastValue[N](b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(lv.measure), lv.delta
default:
return b.filter(lv.measure), lv.cumulative
}
}

return b.filter(lv.measure), func(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory
// reuse of the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
lv.computeAggregation(&gData.DataPoints)
*dest = gData

return len(gData.DataPoints)
// PrecomputedLastValue returns a last-value aggregate function input and
// output. The aggregation returned from the returned ComputeAggregation
// function will always only return values from the previous collection cycle.
func (b Builder[N]) PrecomputedLastValue() (Measure[N], ComputeAggregation) {
lv := newPrecomputedLastValue[N](b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(lv.measure), lv.delta
default:
return b.filter(lv.measure), lv.cumulative
}
}

Expand Down
88 changes: 85 additions & 3 deletions sdk/metric/internal/aggregate/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *la
newRes: r,
limit: newLimiter[datapoint[N]](limit),
values: make(map[attribute.Distinct]datapoint[N]),
start: now(),
}
}

Expand All @@ -36,6 +37,7 @@ type lastValue[N int64 | float64] struct {
newRes func() exemplar.Reservoir
limit limiter[datapoint[N]]
values map[attribute.Distinct]datapoint[N]
start time.Time
}

func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
Expand All @@ -58,23 +60,103 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.
s.values[attr.Equivalent()] = d
}

func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) {
func (s *lastValue[N]) delta(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])

s.Lock()
defer s.Unlock()

n := s.copyDpts(&gData.DataPoints)
// Do not report stale values.
clear(s.values)
// Update start time for delta temporality.
s.start = now()

*dest = gData

return n
}

func (s *lastValue[N]) cumulative(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])

s.Lock()
defer s.Unlock()

n := s.copyDpts(&gData.DataPoints)
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
*dest = gData

return n
}

// copyDpts copies the datapoints held by s into dest. The number of datapoints
// copied is returned.
func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N]) int {
n := len(s.values)
*dest = reset(*dest, n, n)

var i int
for _, v := range s.values {
(*dest)[i].Attributes = v.attrs
// The event time is the only meaningful timestamp, StartTime is
// ignored.
(*dest)[i].StartTime = s.start
(*dest)[i].Time = v.timestamp
(*dest)[i].Value = v.value
collectExemplars(&(*dest)[i].Exemplars, v.res.Collect)
i++
}
return n
}

// newPrecomputedLastValue returns an aggregator that summarizes a set of
// observations as the last one made.
func newPrecomputedLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *precomputedLastValue[N] {
return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)}
}

// precomputedLastValue summarizes a set of observations as the last one made.
type precomputedLastValue[N int64 | float64] struct {
*lastValue[N]
}

func (s *precomputedLastValue[N]) delta(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])

s.Lock()
defer s.Unlock()

n := s.copyDpts(&gData.DataPoints)
// Do not report stale values.
clear(s.values)
// Update start time for delta temporality.
s.start = now()

*dest = gData

return n
}

func (s *precomputedLastValue[N]) cumulative(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])

s.Lock()
defer s.Unlock()

n := s.copyDpts(&gData.DataPoints)
// Do not report stale values.
clear(s.values)
*dest = gData

return n
}
Loading