Skip to content

Commit

Permalink
[pkg/otlp] Fix cumulative histogram handling in distributions mode (#…
Browse files Browse the repository at this point in the history
…9623)

Backport of open-telemetry/opentelemetry-collector-contrib#5867 to the `pkg/otlp` package.

Fixes an issue in `mapHistogramMetrics` where the `delta` boolean wouldn't be passed to `getSketchBuckets` when the histograms mode is set to distributions.
Fixes an issue in `getSketchBuckets` where buckets were not tagged by bounds when getting and setting values in the points cache (in the cumulative histogram case). This led to all buckets of an histogram sharing the same cache key, making the resulting sketch wrong.
Fixes the `TestNaNMetrics` histogram case, which was missing its `ExplicitBounds` definition.
Adds missing unit tests for the distributions mode.
  • Loading branch information
KSerrania committed Oct 22, 2021
1 parent e378de3 commit 38d2255
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 16 deletions.
15 changes: 13 additions & 2 deletions pkg/otlp/model/translator/metrics_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,17 @@ func (t *Translator) getSketchBuckets(
as := &quantile.Agent{}
for j := range p.BucketCounts() {
lowerBound, upperBound := getBounds(p, j)

// Compute temporary bucketTags to have unique keys in the t.prevPts cache for each bucket
// The bucketTags are computed from the bounds before the InsertInterpolate fix is done,
// otherwise in the case where p.ExplicitBounds() has a size of 1 (eg. [0]), the two buckets
// would have the same bucketTags (lower_bound:0 and upper_bound:0), resulting in a buggy behavior.
bucketTags := []string{
fmt.Sprintf("lower_bound:%s", formatFloat(lowerBound)),
fmt.Sprintf("upper_bound:%s", formatFloat(upperBound)),
}
bucketTags = append(bucketTags, tags...)

// InsertInterpolate doesn't work with an infinite bound; insert in to the bucket that contains the non-infinite bound
// https://github.com/DataDog/datadog-agent/blob/7.31.0/pkg/aggregator/check_sampler.go#L107-L111
if math.IsInf(upperBound, 1) {
Expand All @@ -200,7 +211,7 @@ func (t *Translator) getSketchBuckets(
count := p.BucketCounts()[j]
if delta {
as.InsertInterpolate(lowerBound, upperBound, uint(count))
} else if dx, ok := t.prevPts.putAndGetDiff(name, tags, ts, float64(count)); ok {
} else if dx, ok := t.prevPts.putAndGetDiff(name, bucketTags, ts, float64(count)); ok {
as.InsertInterpolate(lowerBound, upperBound, uint(dx))
}

Expand Down Expand Up @@ -293,7 +304,7 @@ func (t *Translator) mapHistogramMetrics(
case HistogramModeCounters:
t.getLegacyBuckets(ctx, consumer, name, p, delta, tags, host)
case HistogramModeDistributions:
t.getSketchBuckets(ctx, consumer, name, ts, p, true, tags, host)
t.getSketchBuckets(ctx, consumer, name, ts, p, delta, tags, host)
}
}
}
Expand Down
120 changes: 106 additions & 14 deletions pkg/otlp/model/translator/metrics_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/DataDog/datadog-agent/pkg/quantile"
"github.com/DataDog/datadog-agent/pkg/quantile/summary"
gocache "github.com/patrickmn/go-cache"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -127,6 +128,14 @@ type metric struct {
host string
}

type sketch struct {
name string
basic summary.Summary
timestamp uint64
tags []string
host string
}

var _ TimeSeriesConsumer = (*mockTimeSeriesConsumer)(nil)

type mockTimeSeriesConsumer struct {
Expand Down Expand Up @@ -162,6 +171,10 @@ func newCount(name string, ts uint64, val float64, tags []string) metric {
return metric{name: name, typ: Count, timestamp: ts, value: val, tags: tags}
}

func newSketch(name string, ts uint64, s summary.Summary, tags []string) sketch {
return sketch{name: name, basic: s, timestamp: ts, tags: tags}
}

func TestMapIntMetrics(t *testing.T) {
ts := pdata.NewTimestampFromTime(time.Now())
slice := pdata.NewNumberDataPointSlice()
Expand Down Expand Up @@ -495,11 +508,22 @@ func TestMapDoubleMonotonicOutOfOrder(t *testing.T) {

type mockFullConsumer struct {
mockTimeSeriesConsumer
anySketch bool
sketches []sketch
}

func (c *mockFullConsumer) ConsumeSketch(_ context.Context, _ string, _ uint64, _ *quantile.Sketch, _ []string, _ string) {
c.anySketch = true
func (c *mockFullConsumer) ConsumeSketch(_ context.Context, name string, ts uint64, sk *quantile.Sketch, tags []string, host string) {
if sk == nil {
return
}
c.sketches = append(c.sketches,
sketch{
name: name,
basic: sk.Basic,
timestamp: ts,
tags: tags,
host: host,
},
)
}

func TestMapDeltaHistogramMetrics(t *testing.T) {
Expand Down Expand Up @@ -530,13 +554,31 @@ func TestMapDeltaHistogramMetrics(t *testing.T) {
consumer := &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "")
assert.ElementsMatch(t, noBuckets, consumer.metrics)
assert.False(t, consumer.anySketch)
assert.Empty(t, consumer.sketches)

tr.cfg.HistMode = HistogramModeCounters
consumer = &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "")
assert.ElementsMatch(t, append(noBuckets, buckets...), consumer.metrics)
assert.False(t, consumer.anySketch)
assert.Empty(t, consumer.sketches)

sketches := []sketch{
newSketch("doubleHist.test", uint64(ts), summary.Summary{
Min: 0,
Max: 0,
Sum: 0,
Avg: 0,
Cnt: 20,
},
[]string{},
),
}

tr.cfg.HistMode = HistogramModeDistributions
consumer = &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "")
assert.ElementsMatch(t, noBuckets, consumer.metrics)
assert.ElementsMatch(t, sketches, consumer.sketches)

// With attribute tags
noBucketsAttributeTags := []metric{
Expand All @@ -553,13 +595,31 @@ func TestMapDeltaHistogramMetrics(t *testing.T) {
consumer = &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "")
assert.ElementsMatch(t, noBucketsAttributeTags, consumer.metrics)
assert.False(t, consumer.anySketch)
assert.Empty(t, consumer.sketches)

tr.cfg.HistMode = HistogramModeCounters
consumer = &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "")
assert.ElementsMatch(t, append(noBucketsAttributeTags, bucketsAttributeTags...), consumer.metrics)
assert.False(t, consumer.anySketch)
assert.Empty(t, consumer.sketches)

sketchesAttributeTags := []sketch{
newSketch("doubleHist.test", uint64(ts), summary.Summary{
Min: 0,
Max: 0,
Sum: 0,
Avg: 0,
Cnt: 20,
},
[]string{"attribute_tag:attribute_value"},
),
}

tr.cfg.HistMode = HistogramModeDistributions
consumer = &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "")
assert.ElementsMatch(t, noBucketsAttributeTags, consumer.metrics)
assert.ElementsMatch(t, sketchesAttributeTags, consumer.sketches)
}

func TestMapCumulativeHistogramMetrics(t *testing.T) {
Expand All @@ -574,15 +634,18 @@ func TestMapCumulativeHistogramMetrics(t *testing.T) {
point = slice.AppendEmpty()
point.SetCount(20 + 30)
point.SetSum(math.Pi + 20)
point.SetBucketCounts([]uint64{2 + 11, 18 + 2})
point.SetBucketCounts([]uint64{2 + 11, 18 + 19})
point.SetExplicitBounds([]float64{0})
point.SetTimestamp(seconds(2))

expected := []metric{
expectedCounts := []metric{
newCount("doubleHist.test.count", uint64(seconds(2)), 30, []string{}),
newCount("doubleHist.test.sum", uint64(seconds(2)), 20, []string{}),
}

expectedBuckets := []metric{
newCount("doubleHist.test.bucket", uint64(seconds(2)), 11, []string{"lower_bound:-inf", "upper_bound:0"}),
newCount("doubleHist.test.bucket", uint64(seconds(2)), 2, []string{"lower_bound:0", "upper_bound:inf"}),
newCount("doubleHist.test.bucket", uint64(seconds(2)), 19, []string{"lower_bound:0", "upper_bound:inf"}),
}

ctx := context.Background()
Expand All @@ -592,10 +655,37 @@ func TestMapCumulativeHistogramMetrics(t *testing.T) {
tr.cfg.HistMode = HistogramModeCounters
consumer := &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "")
assert.False(t, consumer.anySketch)
assert.Empty(t, consumer.sketches)
assert.ElementsMatch(t,
consumer.metrics,
expected,
append(expectedCounts, expectedBuckets...),
)

expectedSketches := []sketch{
newSketch("doubleHist.test", uint64(seconds(2)), summary.Summary{
Min: 0,
Max: 0,
Sum: 0,
Avg: 0,
Cnt: 30,
},
[]string{},
),
}

tr = newTranslator(t, zap.NewNop())
delta = false

tr.cfg.HistMode = HistogramModeDistributions
consumer = &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "")
assert.ElementsMatch(t,
consumer.metrics,
expectedCounts,
)
assert.ElementsMatch(t,
consumer.sketches,
expectedSketches,
)
}

Expand Down Expand Up @@ -935,7 +1025,6 @@ func TestMapMetrics(t *testing.T) {
tr := newTranslator(t, testLogger)
err := tr.MapMetrics(ctx, md, consumer)
require.NoError(t, err)
assert.False(t, consumer.anySketch)

assert.ElementsMatch(t, consumer.metrics, []metric{
testGauge("int.gauge", 1),
Expand All @@ -953,6 +1042,7 @@ func TestMapMetrics(t *testing.T) {
testCount("int.cumulative.monotonic.sum", 3, 2),
testCount("double.cumulative.monotonic.sum", math.Pi, 2),
})
assert.Empty(t, consumer.sketches)

// One metric type was unknown or unsupported
assert.Equal(t, observed.FilterMessage("Unknown or unsupported metric type").Len(), 1)
Expand Down Expand Up @@ -1010,6 +1100,7 @@ func createNaNMetrics() pdata.Metrics {
dpDoubleHist.SetCount(20)
dpDoubleHist.SetSum(math.NaN())
dpDoubleHist.SetBucketCounts([]uint64{2, 18})
dpDoubleHist.SetExplicitBounds([]float64{0})
dpDoubleHist.SetTimestamp(seconds(0))

// Double Sum (cumulative)
Expand Down Expand Up @@ -1059,14 +1150,15 @@ func TestNaNMetrics(t *testing.T) {
tr := newTranslator(t, testLogger)
consumer := &mockFullConsumer{}
err := tr.MapMetrics(ctx, md, consumer)
assert.False(t, consumer.anySketch)
require.NoError(t, err)

assert.ElementsMatch(t, consumer.metrics, []metric{
testCount("nan.histogram.count", 20, 0),
testCount("nan.summary.count", 100, 2),
})

assert.Empty(t, consumer.sketches)

// One metric type was unknown or unsupported
assert.Equal(t, observed.FilterMessage("Unsupported metric value").Len(), 7)
}

0 comments on commit 38d2255

Please sign in to comment.