Skip to content

Commit

Permalink
Move to recording dropped at Offer
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Jan 17, 2024
1 parent 623ed1b commit 6a94b22
Show file tree
Hide file tree
Showing 18 changed files with 93 additions and 212 deletions.
8 changes: 4 additions & 4 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,18 @@ func (b Builder[N]) resFunc() func() exemplar.Reservoir[N] {
return exemplar.Drop[N]
}

type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, origAttr, fltrAttr attribute.Set)
type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)

func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] {
if b.Filter != nil {
fltr := b.Filter // Copy to make it immutable after assignment.
return func(ctx context.Context, n N, a attribute.Set) {
fAttr, _ := a.Filter(fltr)
f(ctx, n, a, fAttr)
fAttr, dropped := a.Filter(fltr)
f(ctx, n, fAttr, dropped)
}
}
return func(ctx context.Context, n N, a attribute.Set) {
f(ctx, n, a, a)
f(ctx, n, a, nil)
}
}

Expand Down
10 changes: 5 additions & 5 deletions sdk/metric/internal/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,21 @@ func testBuilderFilter[N int64 | float64]() func(t *testing.T) {
t.Helper()

value, attr := N(1), alice
run := func(b Builder[N], wantO, wantF attribute.Set) func(*testing.T) {
run := func(b Builder[N], wantF attribute.Set, wantD []attribute.KeyValue) func(*testing.T) {
return func(t *testing.T) {
t.Helper()

meas := b.filter(func(_ context.Context, v N, o, f attribute.Set) {
meas := b.filter(func(_ context.Context, v N, f attribute.Set, d []attribute.KeyValue) {
assert.Equal(t, value, v, "measured incorrect value")
assert.Equal(t, wantO, o, "measured incorrect original attributes")
assert.Equal(t, wantF, f, "measured incorrect filtered attributes")
assert.ElementsMatch(t, wantD, d, "measured incorrect dropped attributes")
})
meas(context.Background(), value, attr)
}
}

t.Run("NoFilter", run(Builder[N]{}, attr, attr))
t.Run("Filter", run(Builder[N]{Filter: attrFltr}, alice, fltrAlice))
t.Run("NoFilter", run(Builder[N]{}, attr, nil))
t.Run("Filter", run(Builder[N]{Filter: attrFltr}, fltrAlice, []attribute.KeyValue{adminTrue}))
}
}

Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ type expoHistogram[N int64 | float64] struct {
start time.Time
}

func (e *expoHistogram[N]) measure(ctx context.Context, value N, origAttr, fltrAttr attribute.Set) {
func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
// Ignore NaN and infinity.
if math.IsInf(float64(value), 0) || math.IsNaN(float64(value)) {
return
Expand All @@ -342,7 +342,7 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, origAttr, fltrA
e.values[attr] = v
}
v.record(value)
v.res.Offer(ctx, t, value, origAttr)
v.res.Offer(ctx, t, value, droppedAttr)
}

func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
Expand Down Expand Up @@ -384,7 +384,7 @@ func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
hDPts[i].Max = metricdata.NewExtrema(b.max)
}

b.res.Flush(&hDPts[i].Exemplars, a)
b.res.Flush(&hDPts[i].Exemplars)

delete(e.values, a)
i++
Expand Down Expand Up @@ -434,7 +434,7 @@ func (e *expoHistogram[N]) cumulative(dest *metricdata.Aggregation) int {
hDPts[i].Max = metricdata.NewExtrema(b.max)
}

b.res.Collect(&hDPts[i].Exemplars, a)
b.res.Collect(&hDPts[i].Exemplars)

i++
// TODO (#3006): This will use an unbounded amount of memory if there
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/exponential_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func testExpoHistogramMinMaxSumInt64(t *testing.T) {

h := newExponentialHistogram[int64](4, 20, false, false, 0, dropExemplars[int64])
for _, v := range tt.values {
h.measure(context.Background(), v, alice, alice)
h.measure(context.Background(), v, alice, nil)
}
dp := h.values[alice]

Expand Down Expand Up @@ -227,7 +227,7 @@ func testExpoHistogramMinMaxSumFloat64(t *testing.T) {

h := newExponentialHistogram[float64](4, 20, false, false, 0, dropExemplars[float64])
for _, v := range tt.values {
h.measure(context.Background(), v, alice, alice)
h.measure(context.Background(), v, alice, nil)
}
dp := h.values[alice]

Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/internal/aggregate/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r

// Aggregate records the measurement value, scoped by attr, and aggregates it
// into a histogram.
func (s *histValues[N]) measure(ctx context.Context, value N, origAttr, fltrAttr attribute.Set) {
func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
// This search will return an index in the range [0, len(s.bounds)], where
// it will return len(s.bounds) if value is greater than the last element
// of s.bounds. This aligns with the buckets in that the length of buckets
Expand Down Expand Up @@ -116,7 +116,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, origAttr, fltrAttr
if !s.noSum {
b.sum(value)
}
b.res.Offer(ctx, t, value, origAttr)
b.res.Offer(ctx, t, value, droppedAttr)
}

// newHistogram returns an Aggregator that summarizes a set of measurements as
Expand Down Expand Up @@ -174,7 +174,7 @@ func (s *histogram[N]) delta(dest *metricdata.Aggregation) int {
hDPts[i].Max = metricdata.NewExtrema(b.max)
}

b.res.Flush(&hDPts[i].Exemplars, a)
b.res.Flush(&hDPts[i].Exemplars)

// Unused attribute sets do not report.
delete(s.values, a)
Expand Down Expand Up @@ -233,7 +233,7 @@ func (s *histogram[N]) cumulative(dest *metricdata.Aggregation) int {
hDPts[i].Max = metricdata.NewExtrema(b.max)
}

b.res.Collect(&hDPts[i].Exemplars, a)
b.res.Collect(&hDPts[i].Exemplars)

i++
// TODO (#3006): This will use an unbounded amount of memory if there
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/internal/aggregate/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func TestHistogramImmutableBounds(t *testing.T) {
b[0] = 10
assert.Equal(t, cpB, h.bounds, "modifying the bounds argument should not change the bounds")

h.measure(context.Background(), 5, alice, alice)
h.measure(context.Background(), 5, alice, nil)

var data metricdata.Aggregation = metricdata.Histogram[int64]{}
h.cumulative(&data)
Expand All @@ -330,7 +330,7 @@ func TestHistogramImmutableBounds(t *testing.T) {

func TestCumulativeHistogramImutableCounts(t *testing.T) {
h := newHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64])
h.measure(context.Background(), 5, alice, alice)
h.measure(context.Background(), 5, alice, nil)

var data metricdata.Aggregation = metricdata.Histogram[int64]{}
h.cumulative(&data)
Expand All @@ -353,7 +353,7 @@ func TestDeltaHistogramReset(t *testing.T) {
require.Equal(t, 0, h.delta(&data))
require.Len(t, data.(metricdata.Histogram[int64]).DataPoints, 0)

h.measure(context.Background(), 1, alice, alice)
h.measure(context.Background(), 1, alice, nil)

expect := metricdata.Histogram[int64]{Temporality: metricdata.DeltaTemporality}
expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](alice, 1, 1)}
Expand All @@ -366,7 +366,7 @@ func TestDeltaHistogramReset(t *testing.T) {
assert.Len(t, data.(metricdata.Histogram[int64]).DataPoints, 0)

// Aggregating another set should not affect the original (alice).
h.measure(context.Background(), 1, bob, bob)
h.measure(context.Background(), 1, bob, nil)
expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](bob, 1, 1)}
h.delta(&data)
metricdatatest.AssertAggregationsEqual(t, expect, data)
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type lastValue[N int64 | float64] struct {
values map[attribute.Set]datapoint[N]
}

func (s *lastValue[N]) measure(ctx context.Context, value N, origAttr, fltrAttr attribute.Set) {
func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
t := now()

s.Lock()
Expand All @@ -62,7 +62,7 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, origAttr, fltrAttr

d.timestamp = t
d.value = value
d.res.Offer(ctx, t, value, origAttr)
d.res.Offer(ctx, t, value, droppedAttr)

s.values[attr] = d
}
Expand All @@ -81,7 +81,7 @@ func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) {
// ignored.
(*dest)[i].Time = v.timestamp
(*dest)[i].Value = v.value
v.res.Flush(&(*dest)[i].Exemplars, a)
v.res.Flush(&(*dest)[i].Exemplars)
// Do not report stale values.
delete(s.values, a)
i++
Expand Down
12 changes: 6 additions & 6 deletions sdk/metric/internal/aggregate/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func newValueMap[N int64 | float64](limit int, r func() exemplar.Reservoir[N]) *
}
}

func (s *valueMap[N]) measure(ctx context.Context, value N, origAttr, fltrAttr attribute.Set) {
func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
t := now()

s.Lock()
Expand All @@ -58,7 +58,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, origAttr, fltrAttr a
}

v.n += value
v.res.Offer(ctx, t, value, origAttr)
v.res.Offer(ctx, t, value, droppedAttr)

s.values[attr] = v
}
Expand Down Expand Up @@ -103,7 +103,7 @@ func (s *sum[N]) delta(dest *metricdata.Aggregation) int {
dPts[i].StartTime = s.start
dPts[i].Time = t
dPts[i].Value = val.n
val.res.Flush(&dPts[i].Exemplars, attr)
val.res.Flush(&dPts[i].Exemplars)
// Do not report stale values.
delete(s.values, attr)
i++
Expand Down Expand Up @@ -138,7 +138,7 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int {
dPts[i].StartTime = s.start
dPts[i].Time = t
dPts[i].Value = value.n
value.res.Collect(&dPts[i].Exemplars, attr)
value.res.Collect(&dPts[i].Exemplars)
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
Expand Down Expand Up @@ -197,7 +197,7 @@ func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int {
dPts[i].StartTime = s.start
dPts[i].Time = t
dPts[i].Value = delta
value.res.Flush(&dPts[i].Exemplars, attr)
value.res.Flush(&dPts[i].Exemplars)

newReported[attr] = value.n
// Unused attribute sets do not report.
Expand Down Expand Up @@ -236,7 +236,7 @@ func (s *precomputedSum[N]) cumulative(dest *metricdata.Aggregation) int {
dPts[i].StartTime = s.start
dPts[i].Time = t
dPts[i].Value = val.n
val.res.Collect(&dPts[i].Exemplars, attr)
val.res.Collect(&dPts[i].Exemplars)

// Unused attribute sets do not report.
delete(s.values, attr)
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/internal/exemplar/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ func Drop[N int64 | float64]() Reservoir[N] { return &dropRes[N]{} }

type dropRes[N int64 | float64] struct{}

func (r *dropRes[N]) Offer(context.Context, time.Time, N, attribute.Set) {}
func (r *dropRes[N]) Offer(context.Context, time.Time, N, []attribute.KeyValue) {}

func (r *dropRes[N]) Collect(dest *[]metricdata.Exemplar[N], _ attribute.Set) {
func (r *dropRes[N]) Collect(dest *[]metricdata.Exemplar[N]) {
*dest = (*dest)[:0]
}

func (r *dropRes[N]) Flush(dest *[]metricdata.Exemplar[N], _ attribute.Set) {
func (r *dropRes[N]) Flush(dest *[]metricdata.Exemplar[N]) {
*dest = (*dest)[:0]
}
12 changes: 6 additions & 6 deletions sdk/metric/internal/exemplar/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,24 @@ import (
// return, the measurement will be considered for sampling.
//
// See [Filtered] for how to create a [Reservoir] that uses a Filter.
type Filter[N int64 | float64] func(context.Context, N, attribute.Set) bool
type Filter[N int64 | float64] func(context.Context) bool

// AlwaysSample is a Filter that always signals measurements should be
// considered for sampling by a [Reservoir].
func AlwaysSample[N int64 | float64](context.Context, N, attribute.Set) bool {
func AlwaysSample[N int64 | float64](context.Context) bool {
return true
}

// NeverSample is a Filter that always signals measurements should not be
// considered for sampling by a [Reservoir].
func NeverSample[N int64 | float64](context.Context, N, attribute.Set) bool {
func NeverSample[N int64 | float64](context.Context) bool {
return false
}

// TraceBasedSample is a Filter that signals measurements should be considered
// for sampling by a [Reservoir] if the ctx contains a
// [go.opentelemetry.io/otel/trace.SpanContext] that is sampled.
func TraceBasedSample[N int64 | float64](ctx context.Context, _ N, _ attribute.Set) bool {
func TraceBasedSample[N int64 | float64](ctx context.Context) bool {
return trace.SpanContextFromContext(ctx).IsSampled()
}

Expand All @@ -59,8 +59,8 @@ type filtered[N int64 | float64] struct {
Filter Filter[N]
}

func (f filtered[N]) Offer(ctx context.Context, t time.Time, n N, a attribute.Set) {
if f.Filter(ctx, n, a) {
func (f filtered[N]) Offer(ctx context.Context, t time.Time, n N, a []attribute.KeyValue) {
if f.Filter(ctx) {
f.Reservoir.Offer(ctx, t, n, a)
}
}
Loading

0 comments on commit 6a94b22

Please sign in to comment.