From 38d22552f4cabd456cc13add1954844fac326e7b Mon Sep 17 00:00:00 2001 From: Kylian Serrania Date: Fri, 22 Oct 2021 11:27:01 +0200 Subject: [PATCH] [pkg/otlp] Fix cumulative histogram handling in distributions mode (#9623) Backport of https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/5867 to the `pkg/otlp` package. Fixes an issue in `mapHistogramMetrics` where the `delta` boolean wouldn't be passed to `getSketchBuckets` when the histograms mode is set to distributions. Fixes an issue in `getSketchBuckets` where buckets were not tagged by bounds when getting and setting values in the points cache (in the cumulative histogram case). This led to all buckets of an histogram sharing the same cache key, making the resulting sketch wrong. Fixes the `TestNaNMetrics` histogram case, which was missing its `ExplicitBounds` definition. Adds missing unit tests for the distributions mode. --- .../model/translator/metrics_translator.go | 15 ++- .../translator/metrics_translator_test.go | 120 ++++++++++++++++-- 2 files changed, 119 insertions(+), 16 deletions(-) diff --git a/pkg/otlp/model/translator/metrics_translator.go b/pkg/otlp/model/translator/metrics_translator.go index ac9318842643b..0eb049ed42e75 100644 --- a/pkg/otlp/model/translator/metrics_translator.go +++ b/pkg/otlp/model/translator/metrics_translator.go @@ -189,6 +189,17 @@ func (t *Translator) getSketchBuckets( as := &quantile.Agent{} for j := range p.BucketCounts() { lowerBound, upperBound := getBounds(p, j) + + // Compute temporary bucketTags to have unique keys in the t.prevPts cache for each bucket + // The bucketTags are computed from the bounds before the InsertInterpolate fix is done, + // otherwise in the case where p.ExplicitBounds() has a size of 1 (eg. [0]), the two buckets + // would have the same bucketTags (lower_bound:0 and upper_bound:0), resulting in a buggy behavior. + bucketTags := []string{ + fmt.Sprintf("lower_bound:%s", formatFloat(lowerBound)), + fmt.Sprintf("upper_bound:%s", formatFloat(upperBound)), + } + bucketTags = append(bucketTags, tags...) + // InsertInterpolate doesn't work with an infinite bound; insert in to the bucket that contains the non-infinite bound // https://github.com/DataDog/datadog-agent/blob/7.31.0/pkg/aggregator/check_sampler.go#L107-L111 if math.IsInf(upperBound, 1) { @@ -200,7 +211,7 @@ func (t *Translator) getSketchBuckets( count := p.BucketCounts()[j] if delta { as.InsertInterpolate(lowerBound, upperBound, uint(count)) - } else if dx, ok := t.prevPts.putAndGetDiff(name, tags, ts, float64(count)); ok { + } else if dx, ok := t.prevPts.putAndGetDiff(name, bucketTags, ts, float64(count)); ok { as.InsertInterpolate(lowerBound, upperBound, uint(dx)) } @@ -293,7 +304,7 @@ func (t *Translator) mapHistogramMetrics( case HistogramModeCounters: t.getLegacyBuckets(ctx, consumer, name, p, delta, tags, host) case HistogramModeDistributions: - t.getSketchBuckets(ctx, consumer, name, ts, p, true, tags, host) + t.getSketchBuckets(ctx, consumer, name, ts, p, delta, tags, host) } } } diff --git a/pkg/otlp/model/translator/metrics_translator_test.go b/pkg/otlp/model/translator/metrics_translator_test.go index 8b59d8d3aa9af..e5077cede544a 100644 --- a/pkg/otlp/model/translator/metrics_translator_test.go +++ b/pkg/otlp/model/translator/metrics_translator_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/DataDog/datadog-agent/pkg/quantile" + "github.com/DataDog/datadog-agent/pkg/quantile/summary" gocache "github.com/patrickmn/go-cache" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -127,6 +128,14 @@ type metric struct { host string } +type sketch struct { + name string + basic summary.Summary + timestamp uint64 + tags []string + host string +} + var _ TimeSeriesConsumer = (*mockTimeSeriesConsumer)(nil) type mockTimeSeriesConsumer struct { @@ -162,6 +171,10 @@ func newCount(name string, ts uint64, val float64, tags []string) metric { return metric{name: name, typ: Count, timestamp: ts, value: val, tags: tags} } +func newSketch(name string, ts uint64, s summary.Summary, tags []string) sketch { + return sketch{name: name, basic: s, timestamp: ts, tags: tags} +} + func TestMapIntMetrics(t *testing.T) { ts := pdata.NewTimestampFromTime(time.Now()) slice := pdata.NewNumberDataPointSlice() @@ -495,11 +508,22 @@ func TestMapDoubleMonotonicOutOfOrder(t *testing.T) { type mockFullConsumer struct { mockTimeSeriesConsumer - anySketch bool + sketches []sketch } -func (c *mockFullConsumer) ConsumeSketch(_ context.Context, _ string, _ uint64, _ *quantile.Sketch, _ []string, _ string) { - c.anySketch = true +func (c *mockFullConsumer) ConsumeSketch(_ context.Context, name string, ts uint64, sk *quantile.Sketch, tags []string, host string) { + if sk == nil { + return + } + c.sketches = append(c.sketches, + sketch{ + name: name, + basic: sk.Basic, + timestamp: ts, + tags: tags, + host: host, + }, + ) } func TestMapDeltaHistogramMetrics(t *testing.T) { @@ -530,13 +554,31 @@ func TestMapDeltaHistogramMetrics(t *testing.T) { consumer := &mockFullConsumer{} tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") assert.ElementsMatch(t, noBuckets, consumer.metrics) - assert.False(t, consumer.anySketch) + assert.Empty(t, consumer.sketches) tr.cfg.HistMode = HistogramModeCounters consumer = &mockFullConsumer{} tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") assert.ElementsMatch(t, append(noBuckets, buckets...), consumer.metrics) - assert.False(t, consumer.anySketch) + assert.Empty(t, consumer.sketches) + + sketches := []sketch{ + newSketch("doubleHist.test", uint64(ts), summary.Summary{ + Min: 0, + Max: 0, + Sum: 0, + Avg: 0, + Cnt: 20, + }, + []string{}, + ), + } + + tr.cfg.HistMode = HistogramModeDistributions + consumer = &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") + assert.ElementsMatch(t, noBuckets, consumer.metrics) + assert.ElementsMatch(t, sketches, consumer.sketches) // With attribute tags noBucketsAttributeTags := []metric{ @@ -553,13 +595,31 @@ func TestMapDeltaHistogramMetrics(t *testing.T) { consumer = &mockFullConsumer{} tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "") assert.ElementsMatch(t, noBucketsAttributeTags, consumer.metrics) - assert.False(t, consumer.anySketch) + assert.Empty(t, consumer.sketches) tr.cfg.HistMode = HistogramModeCounters consumer = &mockFullConsumer{} tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "") assert.ElementsMatch(t, append(noBucketsAttributeTags, bucketsAttributeTags...), consumer.metrics) - assert.False(t, consumer.anySketch) + assert.Empty(t, consumer.sketches) + + sketchesAttributeTags := []sketch{ + newSketch("doubleHist.test", uint64(ts), summary.Summary{ + Min: 0, + Max: 0, + Sum: 0, + Avg: 0, + Cnt: 20, + }, + []string{"attribute_tag:attribute_value"}, + ), + } + + tr.cfg.HistMode = HistogramModeDistributions + consumer = &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "") + assert.ElementsMatch(t, noBucketsAttributeTags, consumer.metrics) + assert.ElementsMatch(t, sketchesAttributeTags, consumer.sketches) } func TestMapCumulativeHistogramMetrics(t *testing.T) { @@ -574,15 +634,18 @@ func TestMapCumulativeHistogramMetrics(t *testing.T) { point = slice.AppendEmpty() point.SetCount(20 + 30) point.SetSum(math.Pi + 20) - point.SetBucketCounts([]uint64{2 + 11, 18 + 2}) + point.SetBucketCounts([]uint64{2 + 11, 18 + 19}) point.SetExplicitBounds([]float64{0}) point.SetTimestamp(seconds(2)) - expected := []metric{ + expectedCounts := []metric{ newCount("doubleHist.test.count", uint64(seconds(2)), 30, []string{}), newCount("doubleHist.test.sum", uint64(seconds(2)), 20, []string{}), + } + + expectedBuckets := []metric{ newCount("doubleHist.test.bucket", uint64(seconds(2)), 11, []string{"lower_bound:-inf", "upper_bound:0"}), - newCount("doubleHist.test.bucket", uint64(seconds(2)), 2, []string{"lower_bound:0", "upper_bound:inf"}), + newCount("doubleHist.test.bucket", uint64(seconds(2)), 19, []string{"lower_bound:0", "upper_bound:inf"}), } ctx := context.Background() @@ -592,10 +655,37 @@ func TestMapCumulativeHistogramMetrics(t *testing.T) { tr.cfg.HistMode = HistogramModeCounters consumer := &mockFullConsumer{} tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") - assert.False(t, consumer.anySketch) + assert.Empty(t, consumer.sketches) assert.ElementsMatch(t, consumer.metrics, - expected, + append(expectedCounts, expectedBuckets...), + ) + + expectedSketches := []sketch{ + newSketch("doubleHist.test", uint64(seconds(2)), summary.Summary{ + Min: 0, + Max: 0, + Sum: 0, + Avg: 0, + Cnt: 30, + }, + []string{}, + ), + } + + tr = newTranslator(t, zap.NewNop()) + delta = false + + tr.cfg.HistMode = HistogramModeDistributions + consumer = &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") + assert.ElementsMatch(t, + consumer.metrics, + expectedCounts, + ) + assert.ElementsMatch(t, + consumer.sketches, + expectedSketches, ) } @@ -935,7 +1025,6 @@ func TestMapMetrics(t *testing.T) { tr := newTranslator(t, testLogger) err := tr.MapMetrics(ctx, md, consumer) require.NoError(t, err) - assert.False(t, consumer.anySketch) assert.ElementsMatch(t, consumer.metrics, []metric{ testGauge("int.gauge", 1), @@ -953,6 +1042,7 @@ func TestMapMetrics(t *testing.T) { testCount("int.cumulative.monotonic.sum", 3, 2), testCount("double.cumulative.monotonic.sum", math.Pi, 2), }) + assert.Empty(t, consumer.sketches) // One metric type was unknown or unsupported assert.Equal(t, observed.FilterMessage("Unknown or unsupported metric type").Len(), 1) @@ -1010,6 +1100,7 @@ func createNaNMetrics() pdata.Metrics { dpDoubleHist.SetCount(20) dpDoubleHist.SetSum(math.NaN()) dpDoubleHist.SetBucketCounts([]uint64{2, 18}) + dpDoubleHist.SetExplicitBounds([]float64{0}) dpDoubleHist.SetTimestamp(seconds(0)) // Double Sum (cumulative) @@ -1059,7 +1150,6 @@ func TestNaNMetrics(t *testing.T) { tr := newTranslator(t, testLogger) consumer := &mockFullConsumer{} err := tr.MapMetrics(ctx, md, consumer) - assert.False(t, consumer.anySketch) require.NoError(t, err) assert.ElementsMatch(t, consumer.metrics, []metric{ @@ -1067,6 +1157,8 @@ func TestNaNMetrics(t *testing.T) { testCount("nan.summary.count", 100, 2), }) + assert.Empty(t, consumer.sketches) + // One metric type was unknown or unsupported assert.Equal(t, observed.FilterMessage("Unsupported metric value").Len(), 7) }