Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[datadogexporter] Fix cumulative histogram handling in distributions mode #5867

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,17 @@ func (t *Translator) getSketchBuckets(
as := &quantile.Agent{}
for j := range p.BucketCounts() {
lowerBound, upperBound := getBounds(p, j)

// Compute temporary bucketTags to have unique keys in the t.prevPts cache for each bucket
// The bucketTags are computed from the bounds before the InsertInterpolate fix is done,
// otherwise in the case where p.ExplicitBounds() has a size of 1 (eg. [0]), the two buckets
// would have the same bucketTags (lower_bound:0 and upper_bound:0), resulting in a buggy behavior.
bucketTags := []string{
fmt.Sprintf("lower_bound:%s", formatFloat(lowerBound)),
fmt.Sprintf("upper_bound:%s", formatFloat(upperBound)),
}
bucketTags = append(bucketTags, tags...)

// InsertInterpolate doesn't work with an infinite bound; insert in to the bucket that contains the non-infinite bound
// https://github.com/DataDog/datadog-agent/blob/7.31.0/pkg/aggregator/check_sampler.go#L107-L111
if math.IsInf(upperBound, 1) {
Expand All @@ -199,7 +210,7 @@ func (t *Translator) getSketchBuckets(
count := p.BucketCounts()[j]
if delta {
as.InsertInterpolate(lowerBound, upperBound, uint(count))
} else if dx, ok := t.prevPts.putAndGetDiff(name, tags, ts, float64(count)); ok {
} else if dx, ok := t.prevPts.putAndGetDiff(name, bucketTags, ts, float64(count)); ok {
as.InsertInterpolate(lowerBound, upperBound, uint(dx))
}

Expand Down Expand Up @@ -292,7 +303,7 @@ func (t *Translator) mapHistogramMetrics(
case HistogramModeCounters:
t.getLegacyBuckets(ctx, consumer, name, p, delta, tags, host)
case HistogramModeDistributions:
t.getSketchBuckets(ctx, consumer, name, ts, p, true, tags, host)
t.getSketchBuckets(ctx, consumer, name, ts, p, delta, tags, host)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/DataDog/datadog-agent/pkg/quantile"
"github.com/DataDog/datadog-agent/pkg/quantile/summary"
gocache "github.com/patrickmn/go-cache"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -127,6 +128,14 @@ type metric struct {
host string
}

type sketch struct {
name string
basic summary.Summary
timestamp uint64
tags []string
host string
}

var _ TimeSeriesConsumer = (*mockTimeSeriesConsumer)(nil)

type mockTimeSeriesConsumer struct {
Expand Down Expand Up @@ -162,6 +171,10 @@ func newCount(name string, ts uint64, val float64, tags []string) metric {
return metric{name: name, typ: Count, timestamp: ts, value: val, tags: tags}
}

func newSketch(name string, ts uint64, s summary.Summary, tags []string) sketch {
return sketch{name: name, basic: s, timestamp: ts, tags: tags}
}

func TestMapIntMetrics(t *testing.T) {
ts := pdata.NewTimestampFromTime(time.Now())
slice := pdata.NewNumberDataPointSlice()
Expand Down Expand Up @@ -495,11 +508,22 @@ func TestMapDoubleMonotonicOutOfOrder(t *testing.T) {

type mockFullConsumer struct {
mockTimeSeriesConsumer
anySketch bool
sketches []sketch
}

func (c *mockFullConsumer) ConsumeSketch(_ context.Context, _ string, _ uint64, _ *quantile.Sketch, _ []string, _ string) {
c.anySketch = true
func (c *mockFullConsumer) ConsumeSketch(_ context.Context, name string, ts uint64, sk *quantile.Sketch, tags []string, host string) {
if sk == nil {
return
}
c.sketches = append(c.sketches,
sketch{
name: name,
basic: sk.Basic,
timestamp: ts,
tags: tags,
host: host,
},
)
}

func TestMapDeltaHistogramMetrics(t *testing.T) {
Expand Down Expand Up @@ -530,13 +554,31 @@ func TestMapDeltaHistogramMetrics(t *testing.T) {
consumer := &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "")
assert.ElementsMatch(t, noBuckets, consumer.metrics)
assert.False(t, consumer.anySketch)
assert.Empty(t, consumer.sketches)

tr.cfg.HistMode = HistogramModeCounters
consumer = &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "")
assert.ElementsMatch(t, append(noBuckets, buckets...), consumer.metrics)
assert.False(t, consumer.anySketch)
assert.Empty(t, consumer.sketches)

sketches := []sketch{
mx-psi marked this conversation as resolved.
Show resolved Hide resolved
newSketch("doubleHist.test", uint64(ts), summary.Summary{
Min: 0,
Max: 0,
Sum: 0,
Avg: 0,
Cnt: 20,
},
[]string{},
),
}

tr.cfg.HistMode = HistogramModeDistributions
consumer = &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "")
assert.ElementsMatch(t, noBuckets, consumer.metrics)
assert.ElementsMatch(t, sketches, consumer.sketches)

// With attribute tags
noBucketsAttributeTags := []metric{
Expand All @@ -553,13 +595,31 @@ func TestMapDeltaHistogramMetrics(t *testing.T) {
consumer = &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "")
assert.ElementsMatch(t, noBucketsAttributeTags, consumer.metrics)
assert.False(t, consumer.anySketch)
assert.Empty(t, consumer.sketches)

tr.cfg.HistMode = HistogramModeCounters
consumer = &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "")
assert.ElementsMatch(t, append(noBucketsAttributeTags, bucketsAttributeTags...), consumer.metrics)
assert.False(t, consumer.anySketch)
assert.Empty(t, consumer.sketches)

sketchesAttributeTags := []sketch{
mx-psi marked this conversation as resolved.
Show resolved Hide resolved
newSketch("doubleHist.test", uint64(ts), summary.Summary{
Min: 0,
Max: 0,
Sum: 0,
Avg: 0,
Cnt: 20,
},
[]string{"attribute_tag:attribute_value"},
),
}

tr.cfg.HistMode = HistogramModeDistributions
consumer = &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "")
assert.ElementsMatch(t, noBucketsAttributeTags, consumer.metrics)
assert.ElementsMatch(t, sketchesAttributeTags, consumer.sketches)
}

func TestMapCumulativeHistogramMetrics(t *testing.T) {
Expand All @@ -574,15 +634,18 @@ func TestMapCumulativeHistogramMetrics(t *testing.T) {
point = slice.AppendEmpty()
point.SetCount(20 + 30)
point.SetSum(math.Pi + 20)
point.SetBucketCounts([]uint64{2 + 11, 18 + 2})
point.SetBucketCounts([]uint64{2 + 11, 18 + 19})
point.SetExplicitBounds([]float64{0})
point.SetTimestamp(seconds(2))

expected := []metric{
expectedCounts := []metric{
newCount("doubleHist.test.count", uint64(seconds(2)), 30, []string{}),
newCount("doubleHist.test.sum", uint64(seconds(2)), 20, []string{}),
}

expectedBuckets := []metric{
newCount("doubleHist.test.bucket", uint64(seconds(2)), 11, []string{"lower_bound:-inf", "upper_bound:0"}),
newCount("doubleHist.test.bucket", uint64(seconds(2)), 2, []string{"lower_bound:0", "upper_bound:inf"}),
newCount("doubleHist.test.bucket", uint64(seconds(2)), 19, []string{"lower_bound:0", "upper_bound:inf"}),
}

ctx := context.Background()
Expand All @@ -592,10 +655,37 @@ func TestMapCumulativeHistogramMetrics(t *testing.T) {
tr.cfg.HistMode = HistogramModeCounters
consumer := &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "")
assert.False(t, consumer.anySketch)
assert.Empty(t, consumer.sketches)
assert.ElementsMatch(t,
consumer.metrics,
expected,
append(expectedCounts, expectedBuckets...),
)

expectedSketches := []sketch{
newSketch("doubleHist.test", uint64(seconds(2)), summary.Summary{
Min: 0,
Max: 0,
Sum: 0,
Avg: 0,
Cnt: 30,
},
[]string{},
),
}

tr = newTranslator(t, zap.NewNop())
delta = false

tr.cfg.HistMode = HistogramModeDistributions
consumer = &mockFullConsumer{}
tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "")
assert.ElementsMatch(t,
consumer.metrics,
expectedCounts,
)
assert.ElementsMatch(t,
consumer.sketches,
expectedSketches,
)
}

Expand Down Expand Up @@ -935,7 +1025,6 @@ func TestMapMetrics(t *testing.T) {
tr := newTranslator(t, testLogger)
err := tr.MapMetrics(ctx, md, consumer)
require.NoError(t, err)
assert.False(t, consumer.anySketch)

assert.ElementsMatch(t, consumer.metrics, []metric{
testGauge("int.gauge", 1),
Expand All @@ -953,6 +1042,7 @@ func TestMapMetrics(t *testing.T) {
testCount("int.cumulative.monotonic.sum", 3, 2),
testCount("double.cumulative.monotonic.sum", math.Pi, 2),
})
assert.Empty(t, consumer.sketches)

// One metric type was unknown or unsupported
assert.Equal(t, observed.FilterMessage("Unknown or unsupported metric type").Len(), 1)
Expand Down Expand Up @@ -1010,6 +1100,7 @@ func createNaNMetrics() pdata.Metrics {
dpDoubleHist.SetCount(20)
dpDoubleHist.SetSum(math.NaN())
dpDoubleHist.SetBucketCounts([]uint64{2, 18})
dpDoubleHist.SetExplicitBounds([]float64{0})
dpDoubleHist.SetTimestamp(seconds(0))

// Double Sum (cumulative)
Expand Down Expand Up @@ -1059,14 +1150,15 @@ func TestNaNMetrics(t *testing.T) {
tr := newTranslator(t, testLogger)
consumer := &mockFullConsumer{}
err := tr.MapMetrics(ctx, md, consumer)
assert.False(t, consumer.anySketch)
require.NoError(t, err)

assert.ElementsMatch(t, consumer.metrics, []metric{
testCount("nan.histogram.count", 20, 0),
testCount("nan.summary.count", 100, 2),
})

assert.Empty(t, consumer.sketches)

// One metric type was unknown or unsupported
assert.Equal(t, observed.FilterMessage("Unsupported metric value").Len(), 7)
}