diff --git a/pkg/otlp/model/translator/metrics_translator.go b/pkg/otlp/model/translator/metrics_translator.go index ac9318842643b..0eb049ed42e75 100644 --- a/pkg/otlp/model/translator/metrics_translator.go +++ b/pkg/otlp/model/translator/metrics_translator.go @@ -189,6 +189,17 @@ func (t *Translator) getSketchBuckets( as := &quantile.Agent{} for j := range p.BucketCounts() { lowerBound, upperBound := getBounds(p, j) + + // Compute temporary bucketTags to have unique keys in the t.prevPts cache for each bucket + // The bucketTags are computed from the bounds before the InsertInterpolate fix is done, + // otherwise in the case where p.ExplicitBounds() has a size of 1 (eg. [0]), the two buckets + // would have the same bucketTags (lower_bound:0 and upper_bound:0), resulting in a buggy behavior. + bucketTags := []string{ + fmt.Sprintf("lower_bound:%s", formatFloat(lowerBound)), + fmt.Sprintf("upper_bound:%s", formatFloat(upperBound)), + } + bucketTags = append(bucketTags, tags...) + // InsertInterpolate doesn't work with an infinite bound; insert in to the bucket that contains the non-infinite bound // https://github.com/DataDog/datadog-agent/blob/7.31.0/pkg/aggregator/check_sampler.go#L107-L111 if math.IsInf(upperBound, 1) { @@ -200,7 +211,7 @@ func (t *Translator) getSketchBuckets( count := p.BucketCounts()[j] if delta { as.InsertInterpolate(lowerBound, upperBound, uint(count)) - } else if dx, ok := t.prevPts.putAndGetDiff(name, tags, ts, float64(count)); ok { + } else if dx, ok := t.prevPts.putAndGetDiff(name, bucketTags, ts, float64(count)); ok { as.InsertInterpolate(lowerBound, upperBound, uint(dx)) } @@ -293,7 +304,7 @@ func (t *Translator) mapHistogramMetrics( case HistogramModeCounters: t.getLegacyBuckets(ctx, consumer, name, p, delta, tags, host) case HistogramModeDistributions: - t.getSketchBuckets(ctx, consumer, name, ts, p, true, tags, host) + t.getSketchBuckets(ctx, consumer, name, ts, p, delta, tags, host) } } } diff --git a/pkg/otlp/model/translator/metrics_translator_test.go b/pkg/otlp/model/translator/metrics_translator_test.go index 8b59d8d3aa9af..e5077cede544a 100644 --- a/pkg/otlp/model/translator/metrics_translator_test.go +++ b/pkg/otlp/model/translator/metrics_translator_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/DataDog/datadog-agent/pkg/quantile" + "github.com/DataDog/datadog-agent/pkg/quantile/summary" gocache "github.com/patrickmn/go-cache" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -127,6 +128,14 @@ type metric struct { host string } +type sketch struct { + name string + basic summary.Summary + timestamp uint64 + tags []string + host string +} + var _ TimeSeriesConsumer = (*mockTimeSeriesConsumer)(nil) type mockTimeSeriesConsumer struct { @@ -162,6 +171,10 @@ func newCount(name string, ts uint64, val float64, tags []string) metric { return metric{name: name, typ: Count, timestamp: ts, value: val, tags: tags} } +func newSketch(name string, ts uint64, s summary.Summary, tags []string) sketch { + return sketch{name: name, basic: s, timestamp: ts, tags: tags} +} + func TestMapIntMetrics(t *testing.T) { ts := pdata.NewTimestampFromTime(time.Now()) slice := pdata.NewNumberDataPointSlice() @@ -495,11 +508,22 @@ func TestMapDoubleMonotonicOutOfOrder(t *testing.T) { type mockFullConsumer struct { mockTimeSeriesConsumer - anySketch bool + sketches []sketch } -func (c *mockFullConsumer) ConsumeSketch(_ context.Context, _ string, _ uint64, _ *quantile.Sketch, _ []string, _ string) { - c.anySketch = true +func (c *mockFullConsumer) ConsumeSketch(_ context.Context, name string, ts uint64, sk *quantile.Sketch, tags []string, host string) { + if sk == nil { + return + } + c.sketches = append(c.sketches, + sketch{ + name: name, + basic: sk.Basic, + timestamp: ts, + tags: tags, + host: host, + }, + ) } func TestMapDeltaHistogramMetrics(t *testing.T) { @@ -530,13 +554,31 @@ func TestMapDeltaHistogramMetrics(t *testing.T) { consumer := &mockFullConsumer{} tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") assert.ElementsMatch(t, noBuckets, consumer.metrics) - assert.False(t, consumer.anySketch) + assert.Empty(t, consumer.sketches) tr.cfg.HistMode = HistogramModeCounters consumer = &mockFullConsumer{} tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") assert.ElementsMatch(t, append(noBuckets, buckets...), consumer.metrics) - assert.False(t, consumer.anySketch) + assert.Empty(t, consumer.sketches) + + sketches := []sketch{ + newSketch("doubleHist.test", uint64(ts), summary.Summary{ + Min: 0, + Max: 0, + Sum: 0, + Avg: 0, + Cnt: 20, + }, + []string{}, + ), + } + + tr.cfg.HistMode = HistogramModeDistributions + consumer = &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") + assert.ElementsMatch(t, noBuckets, consumer.metrics) + assert.ElementsMatch(t, sketches, consumer.sketches) // With attribute tags noBucketsAttributeTags := []metric{ @@ -553,13 +595,31 @@ func TestMapDeltaHistogramMetrics(t *testing.T) { consumer = &mockFullConsumer{} tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "") assert.ElementsMatch(t, noBucketsAttributeTags, consumer.metrics) - assert.False(t, consumer.anySketch) + assert.Empty(t, consumer.sketches) tr.cfg.HistMode = HistogramModeCounters consumer = &mockFullConsumer{} tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "") assert.ElementsMatch(t, append(noBucketsAttributeTags, bucketsAttributeTags...), consumer.metrics) - assert.False(t, consumer.anySketch) + assert.Empty(t, consumer.sketches) + + sketchesAttributeTags := []sketch{ + newSketch("doubleHist.test", uint64(ts), summary.Summary{ + Min: 0, + Max: 0, + Sum: 0, + Avg: 0, + Cnt: 20, + }, + []string{"attribute_tag:attribute_value"}, + ), + } + + tr.cfg.HistMode = HistogramModeDistributions + consumer = &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "") + assert.ElementsMatch(t, noBucketsAttributeTags, consumer.metrics) + assert.ElementsMatch(t, sketchesAttributeTags, consumer.sketches) } func TestMapCumulativeHistogramMetrics(t *testing.T) { @@ -574,15 +634,18 @@ func TestMapCumulativeHistogramMetrics(t *testing.T) { point = slice.AppendEmpty() point.SetCount(20 + 30) point.SetSum(math.Pi + 20) - point.SetBucketCounts([]uint64{2 + 11, 18 + 2}) + point.SetBucketCounts([]uint64{2 + 11, 18 + 19}) point.SetExplicitBounds([]float64{0}) point.SetTimestamp(seconds(2)) - expected := []metric{ + expectedCounts := []metric{ newCount("doubleHist.test.count", uint64(seconds(2)), 30, []string{}), newCount("doubleHist.test.sum", uint64(seconds(2)), 20, []string{}), + } + + expectedBuckets := []metric{ newCount("doubleHist.test.bucket", uint64(seconds(2)), 11, []string{"lower_bound:-inf", "upper_bound:0"}), - newCount("doubleHist.test.bucket", uint64(seconds(2)), 2, []string{"lower_bound:0", "upper_bound:inf"}), + newCount("doubleHist.test.bucket", uint64(seconds(2)), 19, []string{"lower_bound:0", "upper_bound:inf"}), } ctx := context.Background() @@ -592,10 +655,37 @@ func TestMapCumulativeHistogramMetrics(t *testing.T) { tr.cfg.HistMode = HistogramModeCounters consumer := &mockFullConsumer{} tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") - assert.False(t, consumer.anySketch) + assert.Empty(t, consumer.sketches) assert.ElementsMatch(t, consumer.metrics, - expected, + append(expectedCounts, expectedBuckets...), + ) + + expectedSketches := []sketch{ + newSketch("doubleHist.test", uint64(seconds(2)), summary.Summary{ + Min: 0, + Max: 0, + Sum: 0, + Avg: 0, + Cnt: 30, + }, + []string{}, + ), + } + + tr = newTranslator(t, zap.NewNop()) + delta = false + + tr.cfg.HistMode = HistogramModeDistributions + consumer = &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") + assert.ElementsMatch(t, + consumer.metrics, + expectedCounts, + ) + assert.ElementsMatch(t, + consumer.sketches, + expectedSketches, ) } @@ -935,7 +1025,6 @@ func TestMapMetrics(t *testing.T) { tr := newTranslator(t, testLogger) err := tr.MapMetrics(ctx, md, consumer) require.NoError(t, err) - assert.False(t, consumer.anySketch) assert.ElementsMatch(t, consumer.metrics, []metric{ testGauge("int.gauge", 1), @@ -953,6 +1042,7 @@ func TestMapMetrics(t *testing.T) { testCount("int.cumulative.monotonic.sum", 3, 2), testCount("double.cumulative.monotonic.sum", math.Pi, 2), }) + assert.Empty(t, consumer.sketches) // One metric type was unknown or unsupported assert.Equal(t, observed.FilterMessage("Unknown or unsupported metric type").Len(), 1) @@ -1010,6 +1100,7 @@ func createNaNMetrics() pdata.Metrics { dpDoubleHist.SetCount(20) dpDoubleHist.SetSum(math.NaN()) dpDoubleHist.SetBucketCounts([]uint64{2, 18}) + dpDoubleHist.SetExplicitBounds([]float64{0}) dpDoubleHist.SetTimestamp(seconds(0)) // Double Sum (cumulative) @@ -1059,7 +1150,6 @@ func TestNaNMetrics(t *testing.T) { tr := newTranslator(t, testLogger) consumer := &mockFullConsumer{} err := tr.MapMetrics(ctx, md, consumer) - assert.False(t, consumer.anySketch) require.NoError(t, err) assert.ElementsMatch(t, consumer.metrics, []metric{ @@ -1067,6 +1157,8 @@ func TestNaNMetrics(t *testing.T) { testCount("nan.summary.count", 100, 2), }) + assert.Empty(t, consumer.sketches) + // One metric type was unknown or unsupported assert.Equal(t, observed.FilterMessage("Unsupported metric value").Len(), 7) }