From a9e366dfcd3a6d7665143c8708bb025efb068766 Mon Sep 17 00:00:00 2001 From: Kylian Serrania Date: Thu, 21 Oct 2021 16:02:31 +0200 Subject: [PATCH 1/3] [datadogexporter] Fix cumulative histogram handling in distributions mode --- .../internal/translator/metrics_translator.go | 16 ++- .../translator/metrics_translator_test.go | 126 ++++++++++++++++-- 2 files changed, 126 insertions(+), 16 deletions(-) diff --git a/exporter/datadogexporter/internal/translator/metrics_translator.go b/exporter/datadogexporter/internal/translator/metrics_translator.go index 5797132f9926..40804e8c7fb7 100644 --- a/exporter/datadogexporter/internal/translator/metrics_translator.go +++ b/exporter/datadogexporter/internal/translator/metrics_translator.go @@ -166,6 +166,7 @@ func getBounds(p pdata.HistogramDataPoint, idx int) (lowerBound float64, upperBo // See https://github.com/open-telemetry/opentelemetry-proto/blob/v0.10.0/opentelemetry/proto/metrics/v1/metrics.proto#L427-L439 lowerBound = math.Inf(-1) upperBound = math.Inf(1) + if idx > 0 { lowerBound = p.ExplicitBounds()[idx-1] } @@ -188,6 +189,17 @@ func (t *Translator) getSketchBuckets( as := &quantile.Agent{} for j := range p.BucketCounts() { lowerBound, upperBound := getBounds(p, j) + + // Compute temporary bucketTags to have unique keys in the t.prevPts cache for each bucket + // The bucketTags are computed from the bounds before the InsertInterpolate fix is done, + // otherwise in the case where p.ExplicitBounds() has a size of 1 (eg. [0]), the two buckets + // would have the same bucketTags (lower_bound:0 and upper_bound:0), resulting in a buggy behavior. + bucketTags := []string{ + fmt.Sprintf("lower_bound:%s", formatFloat(lowerBound)), + fmt.Sprintf("upper_bound:%s", formatFloat(upperBound)), + } + bucketTags = append(bucketTags, tags...) + // InsertInterpolate doesn't work with an infinite bound; insert in to the bucket that contains the non-infinite bound // https://github.com/DataDog/datadog-agent/blob/7.31.0/pkg/aggregator/check_sampler.go#L107-L111 if math.IsInf(upperBound, 1) { @@ -199,7 +211,7 @@ func (t *Translator) getSketchBuckets( count := p.BucketCounts()[j] if delta { as.InsertInterpolate(lowerBound, upperBound, uint(count)) - } else if dx, ok := t.prevPts.putAndGetDiff(name, tags, ts, float64(count)); ok { + } else if dx, ok := t.prevPts.putAndGetDiff(name, bucketTags, ts, float64(count)); ok { as.InsertInterpolate(lowerBound, upperBound, uint(dx)) } @@ -292,7 +304,7 @@ func (t *Translator) mapHistogramMetrics( case HistogramModeCounters: t.getLegacyBuckets(ctx, consumer, name, p, delta, tags, host) case HistogramModeDistributions: - t.getSketchBuckets(ctx, consumer, name, ts, p, true, tags, host) + t.getSketchBuckets(ctx, consumer, name, ts, p, delta, tags, host) } } } diff --git a/exporter/datadogexporter/internal/translator/metrics_translator_test.go b/exporter/datadogexporter/internal/translator/metrics_translator_test.go index d2c687b2a049..4cd372134fca 100644 --- a/exporter/datadogexporter/internal/translator/metrics_translator_test.go +++ b/exporter/datadogexporter/internal/translator/metrics_translator_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/DataDog/datadog-agent/pkg/quantile" + "github.com/DataDog/datadog-agent/pkg/quantile/summary" gocache "github.com/patrickmn/go-cache" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -162,6 +163,10 @@ func newCount(name string, ts uint64, val float64, tags []string) metric { return metric{name: name, typ: Count, timestamp: ts, value: val, tags: tags} } +func newSketch(name string, ts uint64, s summary.Summary, tags []string) sketch { + return sketch{name: name, basic: s, timestamp: ts, tags: tags} +} + func TestMapIntMetrics(t *testing.T) { ts := pdata.NewTimestampFromTime(time.Now()) slice := pdata.NewNumberDataPointSlice() @@ -493,13 +498,32 @@ func TestMapDoubleMonotonicOutOfOrder(t *testing.T) { ) } +type sketch struct { + name string + basic summary.Summary + timestamp uint64 + tags []string + host string +} + type mockFullConsumer struct { mockTimeSeriesConsumer - anySketch bool + sketches []sketch } -func (c *mockFullConsumer) ConsumeSketch(_ context.Context, _ string, _ uint64, _ *quantile.Sketch, _ []string, _ string) { - c.anySketch = true +func (c *mockFullConsumer) ConsumeSketch(_ context.Context, name string, ts uint64, sk *quantile.Sketch, tags []string, host string) { + if sk == nil { + return + } + c.sketches = append(c.sketches, + sketch{ + name: name, + basic: sk.Basic, + timestamp: ts, + tags: tags, + host: host, + }, + ) } func TestMapDeltaHistogramMetrics(t *testing.T) { @@ -530,13 +554,31 @@ func TestMapDeltaHistogramMetrics(t *testing.T) { consumer := &mockFullConsumer{} tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") assert.ElementsMatch(t, noBuckets, consumer.metrics) - assert.False(t, consumer.anySketch) + assert.Empty(t, consumer.sketches) tr.cfg.HistMode = HistogramModeCounters consumer = &mockFullConsumer{} tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") assert.ElementsMatch(t, append(noBuckets, buckets...), consumer.metrics) - assert.False(t, consumer.anySketch) + assert.Empty(t, consumer.sketches) + + sketches := []sketch{ + newSketch("doubleHist.test", uint64(ts), summary.Summary{ + Min: 0, + Max: 0, + Sum: 0, + Avg: 0, + Cnt: 20, + }, + []string{}, + ), + } + + tr.cfg.HistMode = HistogramModeDistributions + consumer = &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") + assert.ElementsMatch(t, noBuckets, consumer.metrics) + assert.ElementsMatch(t, sketches, consumer.sketches) // With attribute tags noBucketsAttributeTags := []metric{ @@ -553,13 +595,31 @@ func TestMapDeltaHistogramMetrics(t *testing.T) { consumer = &mockFullConsumer{} tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "") assert.ElementsMatch(t, noBucketsAttributeTags, consumer.metrics) - assert.False(t, consumer.anySketch) + assert.Empty(t, consumer.sketches) tr.cfg.HistMode = HistogramModeCounters consumer = &mockFullConsumer{} tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "") assert.ElementsMatch(t, append(noBucketsAttributeTags, bucketsAttributeTags...), consumer.metrics) - assert.False(t, consumer.anySketch) + assert.Empty(t, consumer.sketches) + + sketchesAttributeTags := []sketch{ + newSketch("doubleHist.test", uint64(ts), summary.Summary{ + Min: 0, + Max: 0, + Sum: 0, + Avg: 0, + Cnt: 20, + }, + []string{"attribute_tag:attribute_value"}, + ), + } + + tr.cfg.HistMode = HistogramModeDistributions + consumer = &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "") + assert.ElementsMatch(t, noBucketsAttributeTags, consumer.metrics) + assert.ElementsMatch(t, sketchesAttributeTags, consumer.sketches) } func TestMapCumulativeHistogramMetrics(t *testing.T) { @@ -574,15 +634,18 @@ func TestMapCumulativeHistogramMetrics(t *testing.T) { point = slice.AppendEmpty() point.SetCount(20 + 30) point.SetSum(math.Pi + 20) - point.SetBucketCounts([]uint64{2 + 11, 18 + 2}) + point.SetBucketCounts([]uint64{2 + 11, 18 + 19}) point.SetExplicitBounds([]float64{0}) point.SetTimestamp(seconds(2)) - expected := []metric{ + expectedCounts := []metric{ newCount("doubleHist.test.count", uint64(seconds(2)), 30, []string{}), newCount("doubleHist.test.sum", uint64(seconds(2)), 20, []string{}), + } + + expectedBuckets := []metric{ newCount("doubleHist.test.bucket", uint64(seconds(2)), 11, []string{"lower_bound:-inf", "upper_bound:0"}), - newCount("doubleHist.test.bucket", uint64(seconds(2)), 2, []string{"lower_bound:0", "upper_bound:inf"}), + newCount("doubleHist.test.bucket", uint64(seconds(2)), 19, []string{"lower_bound:0", "upper_bound:inf"}), } ctx := context.Background() @@ -592,10 +655,37 @@ func TestMapCumulativeHistogramMetrics(t *testing.T) { tr.cfg.HistMode = HistogramModeCounters consumer := &mockFullConsumer{} tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") - assert.False(t, consumer.anySketch) + assert.Empty(t, consumer.sketches) + assert.ElementsMatch(t, + consumer.metrics, + append(expectedCounts, expectedBuckets...), + ) + + expectedSketches := []sketch{ + newSketch("doubleHist.test", uint64(seconds(2)), summary.Summary{ + Min: 0, + Max: 0, + Sum: 0, + Avg: 0, + Cnt: 30, + }, + []string{}, + ), + } + + tr = newTranslator(t, zap.NewNop()) + delta = false + + tr.cfg.HistMode = HistogramModeDistributions + consumer = &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") assert.ElementsMatch(t, consumer.metrics, - expected, + expectedCounts, + ) + assert.ElementsMatch(t, + consumer.sketches, + expectedSketches, ) } @@ -925,6 +1015,12 @@ func testCount(name string, val float64, seconds uint64) metric { return m } +func testSketch(name string, summary summary.Summary, seconds uint64) sketch { + s := newSketch(name, seconds, summary, []string{}) + s.host = testHostname + return s +} + func TestMapMetrics(t *testing.T) { md := createTestMetrics() @@ -935,7 +1031,6 @@ func TestMapMetrics(t *testing.T) { tr := newTranslator(t, testLogger) err := tr.MapMetrics(ctx, md, consumer) require.NoError(t, err) - assert.False(t, consumer.anySketch) assert.ElementsMatch(t, consumer.metrics, []metric{ testGauge("int.gauge", 1), @@ -953,6 +1048,7 @@ func TestMapMetrics(t *testing.T) { testCount("int.cumulative.monotonic.sum", 3, 2), testCount("double.cumulative.monotonic.sum", math.Pi, 2), }) + assert.Empty(t, consumer.sketches) // One metric type was unknown or unsupported assert.Equal(t, observed.FilterMessage("Unknown or unsupported metric type").Len(), 1) @@ -1010,6 +1106,7 @@ func createNaNMetrics() pdata.Metrics { dpDoubleHist.SetCount(20) dpDoubleHist.SetSum(math.NaN()) dpDoubleHist.SetBucketCounts([]uint64{2, 18}) + dpDoubleHist.SetExplicitBounds([]float64{0}) dpDoubleHist.SetTimestamp(seconds(0)) // Double Sum (cumulative) @@ -1059,7 +1156,6 @@ func TestNaNMetrics(t *testing.T) { tr := newTranslator(t, testLogger) consumer := &mockFullConsumer{} err := tr.MapMetrics(ctx, md, consumer) - assert.False(t, consumer.anySketch) require.NoError(t, err) assert.ElementsMatch(t, consumer.metrics, []metric{ @@ -1067,6 +1163,8 @@ func TestNaNMetrics(t *testing.T) { testCount("nan.summary.count", 100, 2), }) + assert.Empty(t, consumer.sketches) + // One metric type was unknown or unsupported assert.Equal(t, observed.FilterMessage("Unsupported metric value").Len(), 7) } From f8cf8eec4d4569f08e5a5801d089c4ff9fbba84f Mon Sep 17 00:00:00 2001 From: Kylian Serrania Date: Thu, 21 Oct 2021 22:45:33 +0200 Subject: [PATCH 2/3] Remove unused testSketch --- .../internal/translator/metrics_translator_test.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/exporter/datadogexporter/internal/translator/metrics_translator_test.go b/exporter/datadogexporter/internal/translator/metrics_translator_test.go index 4cd372134fca..ccf8c639933e 100644 --- a/exporter/datadogexporter/internal/translator/metrics_translator_test.go +++ b/exporter/datadogexporter/internal/translator/metrics_translator_test.go @@ -1015,12 +1015,6 @@ func testCount(name string, val float64, seconds uint64) metric { return m } -func testSketch(name string, summary summary.Summary, seconds uint64) sketch { - s := newSketch(name, seconds, summary, []string{}) - s.host = testHostname - return s -} - func TestMapMetrics(t *testing.T) { md := createTestMetrics() From 9e9155eb321841103383aa0123c75b7ca163feb4 Mon Sep 17 00:00:00 2001 From: Kylian Serrania Date: Fri, 22 Oct 2021 10:19:43 +0200 Subject: [PATCH 3/3] Cosmetic fixes --- .../internal/translator/metrics_translator.go | 1 - .../translator/metrics_translator_test.go | 16 ++++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/exporter/datadogexporter/internal/translator/metrics_translator.go b/exporter/datadogexporter/internal/translator/metrics_translator.go index 40804e8c7fb7..87f06232d8b1 100644 --- a/exporter/datadogexporter/internal/translator/metrics_translator.go +++ b/exporter/datadogexporter/internal/translator/metrics_translator.go @@ -166,7 +166,6 @@ func getBounds(p pdata.HistogramDataPoint, idx int) (lowerBound float64, upperBo // See https://github.com/open-telemetry/opentelemetry-proto/blob/v0.10.0/opentelemetry/proto/metrics/v1/metrics.proto#L427-L439 lowerBound = math.Inf(-1) upperBound = math.Inf(1) - if idx > 0 { lowerBound = p.ExplicitBounds()[idx-1] } diff --git a/exporter/datadogexporter/internal/translator/metrics_translator_test.go b/exporter/datadogexporter/internal/translator/metrics_translator_test.go index ccf8c639933e..70ee01e2ca6a 100644 --- a/exporter/datadogexporter/internal/translator/metrics_translator_test.go +++ b/exporter/datadogexporter/internal/translator/metrics_translator_test.go @@ -128,6 +128,14 @@ type metric struct { host string } +type sketch struct { + name string + basic summary.Summary + timestamp uint64 + tags []string + host string +} + var _ TimeSeriesConsumer = (*mockTimeSeriesConsumer)(nil) type mockTimeSeriesConsumer struct { @@ -498,14 +506,6 @@ func TestMapDoubleMonotonicOutOfOrder(t *testing.T) { ) } -type sketch struct { - name string - basic summary.Summary - timestamp uint64 - tags []string - host string -} - type mockFullConsumer struct { mockTimeSeriesConsumer sketches []sketch