Skip to content

Commit

Permalink
Add diff and non_negative_diff to basicstats aggregator (influxdata#4435
Browse files Browse the repository at this point in the history
)
  • Loading branch information
m0 authored and idohalevi committed Sep 23, 2020
1 parent 7eb4087 commit 6b9b2bb
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 13 deletions.
12 changes: 7 additions & 5 deletions plugins/aggregators/basicstats/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# BasicStats Aggregator Plugin

The BasicStats aggregator plugin give us count,max,min,mean,sum,s2(variance), stdev for a set of values,
The BasicStats aggregator plugin give us count,diff,max,min,mean,non_negative_diff,sum,s2(variance), stdev for a set of values,
emitting the aggregate every `period` seconds.

### Configuration:
Expand All @@ -15,20 +15,22 @@ emitting the aggregate every `period` seconds.
drop_original = false

## Configures which basic stats to push as fields
# stats = ["count", "min", "max", "mean", "stdev", "s2", "sum"]
# stats = ["count","diff","min","max","mean","non_negative_diff","stdev","s2","sum"]
```

- stats
- If not specified, then `count`, `min`, `max`, `mean`, `stdev`, and `s2` are aggregated and pushed as fields. `sum` is not aggregated by default to maintain backwards compatibility.
- If not specified, then `count`, `min`, `max`, `mean`, `stdev`, and `s2` are aggregated and pushed as fields. `sum`, `diff` and `non_negative_diff` are not aggregated by default to maintain backwards compatibility.
- If empty array, no stats are aggregated

### Measurements & Fields:

- measurement1
- field1_count
- field1_diff (difference)
- field1_max
- field1_min
- field1_mean
- field1_non_negative_diff (non-negative difference)
- field1_sum
- field1_s2 (variance)
- field1_stdev (standard deviation)
Expand All @@ -43,8 +45,8 @@ No tags are applied by this aggregator.
$ telegraf --config telegraf.conf --quiet
system,host=tars load1=1 1475583980000000000
system,host=tars load1=1 1475583990000000000
system,host=tars load1_count=2,load1_max=1,load1_min=1,load1_mean=1,load1_sum=2,load1_s2=0,load1_stdev=0 1475584010000000000
system,host=tars load1_count=2,load1_diff=0,load1_max=1,load1_min=1,load1_mean=1,load1_sum=2,load1_s2=0,load1_stdev=0 1475584010000000000
system,host=tars load1=1 1475584020000000000
system,host=tars load1=3 1475584030000000000
system,host=tars load1_count=2,load1_max=3,load1_min=1,load1_mean=2,load1_sum=4,load1_s2=2,load1_stdev=1.414162 1475584010000000000
system,host=tars load1_count=2,load1_diff=2,load1_max=3,load1_min=1,load1_mean=2,load1_sum=4,load1_s2=2,load1_stdev=1.414162 1475584010000000000
```
38 changes: 30 additions & 8 deletions plugins/aggregators/basicstats/basicstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ type BasicStats struct {
}

type configuredStats struct {
count bool
min bool
max bool
mean bool
variance bool
stdev bool
sum bool
count bool
min bool
max bool
mean bool
variance bool
stdev bool
sum bool
diff bool
non_negative_diff bool
}

func NewBasicStats() *BasicStats {
Expand All @@ -43,7 +45,9 @@ type basicstats struct {
max float64
sum float64
mean float64
M2 float64 //intermedia value for variance/stdev
diff float64
M2 float64 //intermediate value for variance/stdev
LAST float64 //intermediate value for diff
}

var sampleConfig = `
Expand Down Expand Up @@ -82,7 +86,9 @@ func (m *BasicStats) Add(in telegraf.Metric) {
max: fv,
mean: fv,
sum: fv,
diff: 0.0,
M2: 0.0,
LAST: fv,
}
}
}
Expand All @@ -98,7 +104,9 @@ func (m *BasicStats) Add(in telegraf.Metric) {
max: fv,
mean: fv,
sum: fv,
diff: 0.0,
M2: 0.0,
LAST: fv,
}
continue
}
Expand Down Expand Up @@ -127,6 +135,8 @@ func (m *BasicStats) Add(in telegraf.Metric) {
}
//sum compute
tmp.sum += fv
//diff compute
tmp.diff = fv - tmp.LAST
//store final data
m.cache[id].fields[field.Key] = tmp
}
Expand Down Expand Up @@ -167,6 +177,13 @@ func (m *BasicStats) Push(acc telegraf.Accumulator) {
if config.stdev {
fields[k+"_stdev"] = math.Sqrt(variance)
}
if config.diff {
fields[k+"_diff"] = v.diff
}
if config.non_negative_diff && v.diff >= 0 {
fields[k+"_non_negative_diff"] = v.diff
}

}
//if count == 1 StdDev = infinite => so I won't send data
}
Expand Down Expand Up @@ -199,6 +216,10 @@ func parseStats(names []string) *configuredStats {
parsed.stdev = true
case "sum":
parsed.sum = true
case "diff":
parsed.diff = true
case "non_negative_diff":
parsed.non_negative_diff = true

default:
log.Printf("W! Unrecognized basic stat '%s', ignoring", name)
Expand All @@ -219,6 +240,7 @@ func defaultStats() *configuredStats {
defaults.variance = true
defaults.stdev = true
defaults.sum = false
defaults.non_negative_diff = false

return defaults
}
Expand Down
81 changes: 81 additions & 0 deletions plugins/aggregators/basicstats/basicstats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var m1, _ = metric.New("m1",
"b": int64(1),
"c": float64(2),
"d": float64(2),
"g": int64(3),
},
time.Now(),
)
Expand All @@ -31,6 +32,7 @@ var m2, _ = metric.New("m1",
"f": uint64(200),
"ignoreme": "string",
"andme": true,
"g": int64(1),
},
time.Now(),
)
Expand Down Expand Up @@ -86,6 +88,12 @@ func TestBasicStatsWithPeriod(t *testing.T) {
"f_max": float64(200),
"f_min": float64(200),
"f_mean": float64(200),
"g_count": float64(2), //g
"g_max": float64(3),
"g_min": float64(1),
"g_mean": float64(2),
"g_s2": float64(2),
"g_stdev": math.Sqrt(2),
}
expectedTags := map[string]string{
"foo": "bar",
Expand Down Expand Up @@ -118,6 +126,10 @@ func TestBasicStatsDifferentPeriods(t *testing.T) {
"d_max": float64(2),
"d_min": float64(2),
"d_mean": float64(2),
"g_count": float64(1), //g
"g_max": float64(3),
"g_min": float64(3),
"g_mean": float64(3),
}
expectedTags := map[string]string{
"foo": "bar",
Expand Down Expand Up @@ -153,6 +165,10 @@ func TestBasicStatsDifferentPeriods(t *testing.T) {
"f_max": float64(200),
"f_min": float64(200),
"f_mean": float64(200),
"g_count": float64(1), //g
"g_max": float64(1),
"g_min": float64(1),
"g_mean": float64(1),
}
expectedTags = map[string]string{
"foo": "bar",
Expand All @@ -179,6 +195,7 @@ func TestBasicStatsWithOnlyCount(t *testing.T) {
"d_count": float64(2),
"e_count": float64(1),
"f_count": float64(1),
"g_count": float64(2),
}
expectedTags := map[string]string{
"foo": "bar",
Expand All @@ -205,6 +222,7 @@ func TestBasicStatsWithOnlyMin(t *testing.T) {
"d_min": float64(2),
"e_min": float64(200),
"f_min": float64(200),
"g_min": float64(1),
}
expectedTags := map[string]string{
"foo": "bar",
Expand All @@ -231,6 +249,7 @@ func TestBasicStatsWithOnlyMax(t *testing.T) {
"d_max": float64(6),
"e_max": float64(200),
"f_max": float64(200),
"g_max": float64(3),
}
expectedTags := map[string]string{
"foo": "bar",
Expand All @@ -257,6 +276,7 @@ func TestBasicStatsWithOnlyMean(t *testing.T) {
"d_mean": float64(4),
"e_mean": float64(200),
"f_mean": float64(200),
"g_mean": float64(2),
}
expectedTags := map[string]string{
"foo": "bar",
Expand All @@ -283,6 +303,7 @@ func TestBasicStatsWithOnlySum(t *testing.T) {
"d_sum": float64(8),
"e_sum": float64(200),
"f_sum": float64(200),
"g_sum": float64(4),
}
expectedTags := map[string]string{
"foo": "bar",
Expand Down Expand Up @@ -359,6 +380,7 @@ func TestBasicStatsWithOnlyVariance(t *testing.T) {
"b_s2": float64(2),
"c_s2": float64(2),
"d_s2": float64(8),
"g_s2": float64(2),
}
expectedTags := map[string]string{
"foo": "bar",
Expand All @@ -383,6 +405,7 @@ func TestBasicStatsWithOnlyStandardDeviation(t *testing.T) {
"b_stdev": math.Sqrt(2),
"c_stdev": math.Sqrt(2),
"d_stdev": math.Sqrt(8),
"g_stdev": math.Sqrt(2),
}
expectedTags := map[string]string{
"foo": "bar",
Expand Down Expand Up @@ -415,6 +438,57 @@ func TestBasicStatsWithMinAndMax(t *testing.T) {
"e_min": float64(200),
"f_max": float64(200), //f
"f_min": float64(200),
"g_max": float64(3), //g
"g_min": float64(1),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Test only aggregating diff
func TestBasicStatsWithDiff(t *testing.T) {

aggregator := NewBasicStats()
aggregator.Stats = []string{"diff"}

aggregator.Add(m1)
aggregator.Add(m2)

acc := testutil.Accumulator{}
aggregator.Push(&acc)

expectedFields := map[string]interface{}{
"a_diff": float64(0),
"b_diff": float64(2),
"c_diff": float64(2),
"d_diff": float64(4),
"g_diff": float64(-2),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Test only aggregating non_negative_diff
func TestBasicStatsWithNonNegativeDiff(t *testing.T) {

aggregator := NewBasicStats()
aggregator.Stats = []string{"non_negative_diff"}

aggregator.Add(m1)
aggregator.Add(m2)

acc := testutil.Accumulator{}
aggregator.Push(&acc)

expectedFields := map[string]interface{}{
"a_non_negative_diff": float64(0),
"b_non_negative_diff": float64(2),
"c_non_negative_diff": float64(2),
"d_non_negative_diff": float64(4),
}
expectedTags := map[string]string{
"foo": "bar",
Expand Down Expand Up @@ -471,6 +545,13 @@ func TestBasicStatsWithAllStats(t *testing.T) {
"f_min": float64(200),
"f_mean": float64(200),
"f_sum": float64(200),
"g_count": float64(2), //g
"g_max": float64(3),
"g_min": float64(1),
"g_mean": float64(2),
"g_s2": float64(2),
"g_stdev": math.Sqrt(2),
"g_sum": float64(4),
}
expectedTags := map[string]string{
"foo": "bar",
Expand Down

0 comments on commit 6b9b2bb

Please sign in to comment.