Skip to content

Commit

Permalink
[datadogexporter] Fix cumulative histogram handling in distributions …
Browse files Browse the repository at this point in the history
…mode (#5867)

* [datadogexporter] Fix cumulative histogram handling in distributions mode

* Remove unused testSketch

* Cosmetic fixes
  • Loading branch information
KSerrania authored Oct 22, 2021
1 parent 892e540 commit 412efc2
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 16 deletions.
15 changes: 13 additions & 2 deletions exporter/datadogexporter/internal/translator/metrics_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,17 @@ func (t *Translator) getSketchBuckets(
as := &quantile.Agent{}
for j := range p.BucketCounts() {
lowerBound, upperBound := getBounds(p, j)

// Compute temporary bucketTags to have unique keys in the t.prevPts cache for each bucket
// The bucketTags are computed from the bounds before the InsertInterpolate fix is done,
// otherwise in the case where p.ExplicitBounds() has a size of 1 (eg. [0]), the two buckets
// would have the same bucketTags (lower_bound:0 and upper_bound:0), resulting in a buggy behavior.
bucketTags := []string{
fmt.Sprintf("lower_bound:%s", formatFloat(lowerBound)),
fmt.Sprintf("upper_bound:%s", formatFloat(upperBound)),
}
bucketTags = append(bucketTags, tags...)

// InsertInterpolate doesn't work with an infinite bound; insert in to the bucket that contains the non-infinite bound
// https://github.com/DataDog/datadog-agent/blob/7.31.0/pkg/aggregator/check_sampler.go#L107-L111
if math.IsInf(upperBound, 1) {
Expand All @@ -199,7 +210,7 @@ func (t *Translator) getSketchBuckets(
count := p.BucketCounts()[j]
if delta {
as.InsertInterpolate(lowerBound, upperBound, uint(count))
} else if dx, ok := t.prevPts.putAndGetDiff(name, tags, ts, float64(count)); ok {
} else if dx, ok := t.prevPts.putAndGetDiff(name, bucketTags, ts, float64(count)); ok {
as.InsertInterpolate(lowerBound, upperBound, uint(dx))
}

Expand Down Expand Up @@ -292,7 +303,7 @@ func (t *Translator) mapHistogramMetrics(
case HistogramModeCounters:
t.getLegacyBuckets(ctx, consumer, name, p, delta, tags, host)
case HistogramModeDistributions:
t.getSketchBuckets(ctx, consumer, name, ts, p, true, tags, host)
t.getSketchBuckets(ctx, consumer, name, ts, p, delta, tags, host)
}
}
}
Expand Down
120 changes: 106 additions & 14 deletions exporter/datadogexporter/internal/translator/metrics_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/DataDog/datadog-agent/pkg/quantile"
"github.com/DataDog/datadog-agent/pkg/quantile/summary"
gocache "github.com/patrickmn/go-cache"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -127,6 +128,14 @@ type metric struct {
host string
}

type sketch struct {
name string
basic summary.Summary
timestamp uint64
tags []string
host string
}

var _ TimeSeriesConsumer = (*mockTimeSeriesConsumer)(nil)

type mockTimeSeriesConsumer struct {
Expand Down Expand Up @@ -162,6 +171,10 @@ func newCount(name string, ts uint64, val float64, tags []string) metric {
return metric{name: name, typ: Count, timestamp: ts, value: val, tags: tags}
}

func newSketch(name string, ts uint64, s summary.Summary, tags []string) sketch {
return sketch{name: name, basic: s, timestamp: ts, tags: tags}
}

func TestMapIntMetrics(t *testing.T) {
ts := pdata.NewTimestampFromTime(time.Now())
slice := pdata.NewNumberDataPointSlice()
Expand Down Expand Up @@ -495,11 +508,22 @@ func TestMapDoubleMonotonicOutOfOrder(t *testing.T) {

type mockFullConsumer struct {
mockTimeSeriesConsumer
anySketch bool
sketches []sketch
}

func (c *mockFullConsumer) ConsumeSketch(_ context.Context, _ string, _ uint64, _ *quantile.Sketch, _ []string, _ string) {
c.anySketch = true
func (c *mockFullConsumer) ConsumeSketch(_ context.Context, name string, ts uint64, sk *quantile.Sketch, tags []string, host string) {
if sk == nil {
return
}
c.sketches = append(c.sketches,
sketch{
name: name,
basic: sk.Basic,
timestamp: ts,
tags: tags,
host: host,
},
)
}

func TestMapDeltaHistogramMetrics(t *testing.T) {
Expand Down Expand Up @@ -530,13 +554,31 @@ func TestMapDeltaHistogramMetrics(t *testing.T) {
consumer := &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "")
assert.ElementsMatch(t, noBuckets, consumer.metrics)
assert.False(t, consumer.anySketch)
assert.Empty(t, consumer.sketches)

tr.cfg.HistMode = HistogramModeCounters
consumer = &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "")
assert.ElementsMatch(t, append(noBuckets, buckets...), consumer.metrics)
assert.False(t, consumer.anySketch)
assert.Empty(t, consumer.sketches)

sketches := []sketch{
newSketch("doubleHist.test", uint64(ts), summary.Summary{
Min: 0,
Max: 0,
Sum: 0,
Avg: 0,
Cnt: 20,
},
[]string{},
),
}

tr.cfg.HistMode = HistogramModeDistributions
consumer = &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "")
assert.ElementsMatch(t, noBuckets, consumer.metrics)
assert.ElementsMatch(t, sketches, consumer.sketches)

// With attribute tags
noBucketsAttributeTags := []metric{
Expand All @@ -553,13 +595,31 @@ func TestMapDeltaHistogramMetrics(t *testing.T) {
consumer = &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "")
assert.ElementsMatch(t, noBucketsAttributeTags, consumer.metrics)
assert.False(t, consumer.anySketch)
assert.Empty(t, consumer.sketches)

tr.cfg.HistMode = HistogramModeCounters
consumer = &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "")
assert.ElementsMatch(t, append(noBucketsAttributeTags, bucketsAttributeTags...), consumer.metrics)
assert.False(t, consumer.anySketch)
assert.Empty(t, consumer.sketches)

sketchesAttributeTags := []sketch{
newSketch("doubleHist.test", uint64(ts), summary.Summary{
Min: 0,
Max: 0,
Sum: 0,
Avg: 0,
Cnt: 20,
},
[]string{"attribute_tag:attribute_value"},
),
}

tr.cfg.HistMode = HistogramModeDistributions
consumer = &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "")
assert.ElementsMatch(t, noBucketsAttributeTags, consumer.metrics)
assert.ElementsMatch(t, sketchesAttributeTags, consumer.sketches)
}

func TestMapCumulativeHistogramMetrics(t *testing.T) {
Expand All @@ -574,15 +634,18 @@ func TestMapCumulativeHistogramMetrics(t *testing.T) {
point = slice.AppendEmpty()
point.SetCount(20 + 30)
point.SetSum(math.Pi + 20)
point.SetBucketCounts([]uint64{2 + 11, 18 + 2})
point.SetBucketCounts([]uint64{2 + 11, 18 + 19})
point.SetExplicitBounds([]float64{0})
point.SetTimestamp(seconds(2))

expected := []metric{
expectedCounts := []metric{
newCount("doubleHist.test.count", uint64(seconds(2)), 30, []string{}),
newCount("doubleHist.test.sum", uint64(seconds(2)), 20, []string{}),
}

expectedBuckets := []metric{
newCount("doubleHist.test.bucket", uint64(seconds(2)), 11, []string{"lower_bound:-inf", "upper_bound:0"}),
newCount("doubleHist.test.bucket", uint64(seconds(2)), 2, []string{"lower_bound:0", "upper_bound:inf"}),
newCount("doubleHist.test.bucket", uint64(seconds(2)), 19, []string{"lower_bound:0", "upper_bound:inf"}),
}

ctx := context.Background()
Expand All @@ -592,10 +655,37 @@ func TestMapCumulativeHistogramMetrics(t *testing.T) {
tr.cfg.HistMode = HistogramModeCounters
consumer := &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "")
assert.False(t, consumer.anySketch)
assert.Empty(t, consumer.sketches)
assert.ElementsMatch(t,
consumer.metrics,
expected,
append(expectedCounts, expectedBuckets...),
)

expectedSketches := []sketch{
newSketch("doubleHist.test", uint64(seconds(2)), summary.Summary{
Min: 0,
Max: 0,
Sum: 0,
Avg: 0,
Cnt: 30,
},
[]string{},
),
}

tr = newTranslator(t, zap.NewNop())
delta = false

tr.cfg.HistMode = HistogramModeDistributions
consumer = &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "")
assert.ElementsMatch(t,
consumer.metrics,
expectedCounts,
)
assert.ElementsMatch(t,
consumer.sketches,
expectedSketches,
)
}

Expand Down Expand Up @@ -935,7 +1025,6 @@ func TestMapMetrics(t *testing.T) {
tr := newTranslator(t, testLogger)
err := tr.MapMetrics(ctx, md, consumer)
require.NoError(t, err)
assert.False(t, consumer.anySketch)

assert.ElementsMatch(t, consumer.metrics, []metric{
testGauge("int.gauge", 1),
Expand All @@ -953,6 +1042,7 @@ func TestMapMetrics(t *testing.T) {
testCount("int.cumulative.monotonic.sum", 3, 2),
testCount("double.cumulative.monotonic.sum", math.Pi, 2),
})
assert.Empty(t, consumer.sketches)

// One metric type was unknown or unsupported
assert.Equal(t, observed.FilterMessage("Unknown or unsupported metric type").Len(), 1)
Expand Down Expand Up @@ -1010,6 +1100,7 @@ func createNaNMetrics() pdata.Metrics {
dpDoubleHist.SetCount(20)
dpDoubleHist.SetSum(math.NaN())
dpDoubleHist.SetBucketCounts([]uint64{2, 18})
dpDoubleHist.SetExplicitBounds([]float64{0})
dpDoubleHist.SetTimestamp(seconds(0))

// Double Sum (cumulative)
Expand Down Expand Up @@ -1059,14 +1150,15 @@ func TestNaNMetrics(t *testing.T) {
tr := newTranslator(t, testLogger)
consumer := &mockFullConsumer{}
err := tr.MapMetrics(ctx, md, consumer)
assert.False(t, consumer.anySketch)
require.NoError(t, err)

assert.ElementsMatch(t, consumer.metrics, []metric{
testCount("nan.histogram.count", 20, 0),
testCount("nan.summary.count", 100, 2),
})

assert.Empty(t, consumer.sketches)

// One metric type was unknown or unsupported
assert.Equal(t, observed.FilterMessage("Unsupported metric value").Len(), 7)
}

0 comments on commit 412efc2

Please sign in to comment.