From ddf4a46be0a31b9838f7648d60f148754558956f Mon Sep 17 00:00:00 2001 From: Sean Hanson Date: Tue, 3 Oct 2017 17:33:25 -0400 Subject: [PATCH] Add maxSeries function --- docs/graphite.md | 1 + expr/data_test.go | 18 ++++ expr/func_maxseries.go | 76 ++++++++++++++ expr/func_maxseries_test.go | 200 ++++++++++++++++++++++++++++++++++++ expr/funcs.go | 2 + 5 files changed, 297 insertions(+) create mode 100644 expr/func_maxseries.go create mode 100644 expr/func_maxseries_test.go diff --git a/docs/graphite.md b/docs/graphite.md index e947d09bf1..9786f30a4f 100644 --- a/docs/graphite.md +++ b/docs/graphite.md @@ -35,6 +35,7 @@ aliasSub(seriesList, pattern, replacement) seriesList | | Stable averageSeries(seriesLists) series | avg | Stable consolidateBy(seriesList, func) seriesList | | Stable divideSeries(seriesList, dividend, divisor) seriesList| | Stable +maxSeries(seriesList) series | max | Stable movingAverage(seriesLists, windowSize) seriesList | | Unstable perSecond(seriesLists) seriesList | | Stable scale(seriesLists, num) series | sum | Stable diff --git a/expr/data_test.go b/expr/data_test.go index 4da41cc5bc..fe2bce8faf 100644 --- a/expr/data_test.go +++ b/expr/data_test.go @@ -88,6 +88,24 @@ var avgabc = []schema.Point{ {Val: float64(1234567894) / 2, Ts: 60}, } +var maxab = []schema.Point{ + {Val: 0, Ts: 10}, + {Val: math.MaxFloat64, Ts: 20}, + {Val: math.MaxFloat64 - 20, Ts: 30}, + {Val: math.NaN(), Ts: 40}, + {Val: 1234567890, Ts: 50}, // in accordance with graphite, max(5,null) = 5 + {Val: 1234567890, Ts: 60}, +} + +var maxabc = []schema.Point{ + {Val: 0, Ts: 10}, + {Val: math.MaxFloat64, Ts: 20}, + {Val: math.MaxFloat64 - 20, Ts: 30}, + {Val: 2, Ts: 40}, + {Val: 1234567890, Ts: 50}, + {Val: 1234567890, Ts: 60}, +} + // make sure we test with the correct data, don't mask if processing accidentally modifies our input data func getCopy(in []schema.Point) []schema.Point { out := make([]schema.Point, len(in)) diff --git a/expr/func_maxseries.go b/expr/func_maxseries.go new file mode 100644 index 0000000000..c54e5d0f1c --- /dev/null +++ b/expr/func_maxseries.go @@ -0,0 +1,76 @@ +package expr + +import ( + "fmt" + "math" + "strings" + + "github.com/grafana/metrictank/api/models" + "gopkg.in/raintank/schema.v1" +) + +type FuncMaxSeries struct { + in []GraphiteFunc +} + +func NewMaxSeries() GraphiteFunc { + return &FuncMaxSeries{} +} + +func (s *FuncMaxSeries) Signature() ([]Arg, []Arg) { + return []Arg{ + ArgSeriesLists{val: &s.in}, + }, []Arg{ArgSeries{}} +} + +func (s *FuncMaxSeries) Context(context Context) Context { + return context +} + +func (s *FuncMaxSeries) Exec(cache map[Req][]models.Series) ([]models.Series, error) { + series, queryPatts, err := consumeFuncs(cache, s.in) + if err != nil { + return nil, err + } + + if len(series) == 0 { + return series, nil + } + + if len(series) == 1 { + name := fmt.Sprintf("maxSeries(%s)", series[0].QueryPatt) + series[0].Target = name + series[0].QueryPatt = name + return series, nil + } + out := pointSlicePool.Get().([]schema.Point) + for i := 0; i < len(series[0].Datapoints); i++ { + nan := true + point := schema.Point{ + Ts: series[0].Datapoints[i].Ts, + Val: 0, + } + for j := 0; j < len(series); j++ { + if !math.IsNaN(series[j].Datapoints[i].Val) { + point.Val = math.Max(point.Val, series[j].Datapoints[i].Val) + nan = false + } + } + if nan { + point.Val = math.NaN() + } + out = append(out, point) + } + name := fmt.Sprintf("maxSeries(%s)", strings.Join(queryPatts, ",")) + cons, queryCons := summarizeCons(series) + output := models.Series{ + Target: name, + QueryPatt: name, + Datapoints: out, + Interval: series[0].Interval, + Consolidator: cons, + QueryCons: queryCons, + } + cache[Req{}] = append(cache[Req{}], output) + return []models.Series{output}, nil +} diff --git a/expr/func_maxseries_test.go b/expr/func_maxseries_test.go new file mode 100644 index 0000000000..1cf1775a70 --- /dev/null +++ b/expr/func_maxseries_test.go @@ -0,0 +1,200 @@ +package expr + +import ( + "math" + "strconv" + "testing" + + "github.com/grafana/metrictank/api/models" + "github.com/grafana/metrictank/test" + "gopkg.in/raintank/schema.v1" +) + +func TestMaxSeriesIdentity(t *testing.T) { + testMaxSeries( + "identity", + [][]models.Series{ + { + { + QueryPatt: "single", + Target: "single", + Datapoints: getCopy(a), + }, + }, + }, + models.Series{ + QueryPatt: "maxSeries(single)", + Datapoints: getCopy(a), + }, + t, + ) +} +func TestMaxSeriesQueryToSingle(t *testing.T) { + testMaxSeries( + "query-to-single", + [][]models.Series{ + { + { + QueryPatt: "foo.*", + Target: "foo", + Datapoints: getCopy(a), + }, + }, + }, + models.Series{ + QueryPatt: "maxSeries(foo.*)", + Datapoints: getCopy(a), + }, + t, + ) +} +func TestMaxSeriesMultipleSameQuery(t *testing.T) { + testMaxSeries( + "max-multiple-series", + [][]models.Series{ + { + { + QueryPatt: "foo.*", + Target: "foo.a", + Datapoints: getCopy(a), + }, + { + QueryPatt: "foo.*", + Target: "foo.b", + Datapoints: getCopy(b), + }, + }, + }, + models.Series{ + QueryPatt: "maxSeries(foo.*)", + Datapoints: getCopy(maxab), + }, + t, + ) +} +func TestMaxSeriesMultipleDiffQuery(t *testing.T) { + testMaxSeries( + "max-multiple-serieslists", + [][]models.Series{ + { + { + QueryPatt: "foo.*", + Target: "foo.a", + Datapoints: getCopy(a), + }, + { + QueryPatt: "foo.*", + Target: "foo.b", + Datapoints: getCopy(b), + }, + }, + { + { + QueryPatt: "movingAverage(bar, '1min')", + Target: "movingAverage(bar, '1min')", + Datapoints: getCopy(c), + }, + }, + }, + models.Series{ + QueryPatt: "maxSeries(foo.*,movingAverage(bar, '1min'))", + Datapoints: getCopy(maxabc), + }, + t, + ) +} + +func testMaxSeries(name string, in [][]models.Series, out models.Series, t *testing.T) { + f := NewMaxSeries() + max := f.(*FuncMaxSeries) + for _, i := range in { + max.in = append(max.in, NewMock(i)) + } + got, err := f.Exec(make(map[Req][]models.Series)) + if err != nil { + t.Fatalf("case %q: err should be nil. got %q", name, err) + } + if len(got) != 1 { + t.Fatalf("case %q: maxSeries output should be only 1 thing (a series) not %d", name, len(got)) + } + g := got[0] + if g.QueryPatt != out.QueryPatt { + t.Fatalf("case %q: expected target %q, got %q", name, out.QueryPatt, g.QueryPatt) + } + if len(g.Datapoints) != len(out.Datapoints) { + t.Fatalf("case %q: len output expected %d, got %d", name, len(out.Datapoints), len(g.Datapoints)) + } + for j, p := range g.Datapoints { + bothNaN := math.IsNaN(p.Val) && math.IsNaN(out.Datapoints[j].Val) + if (bothNaN || p.Val == out.Datapoints[j].Val) && p.Ts == out.Datapoints[j].Ts { + continue + } + t.Fatalf("case %q: output point %d - expected %v got %v", name, j, out.Datapoints[j], p) + } +} + +func BenchmarkMaxSeries10k_1NoNulls(b *testing.B) { + benchmarkMaxSeries(b, 1, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkMaxSeries10k_10NoNulls(b *testing.B) { + benchmarkMaxSeries(b, 10, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkMaxSeries10k_100NoNulls(b *testing.B) { + benchmarkMaxSeries(b, 100, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkMaxSeries10k_1000NoNulls(b *testing.B) { + benchmarkMaxSeries(b, 1000, test.RandFloats10k, test.RandFloats10k) +} + +func BenchmarkMaxSeries10k_1SomeSeriesHalfNulls(b *testing.B) { + benchmarkMaxSeries(b, 1, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkMaxSeries10k_10SomeSeriesHalfNulls(b *testing.B) { + benchmarkMaxSeries(b, 10, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkMaxSeries10k_100SomeSeriesHalfNulls(b *testing.B) { + benchmarkMaxSeries(b, 100, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkMaxSeries10k_1000SomeSeriesHalfNulls(b *testing.B) { + benchmarkMaxSeries(b, 1000, test.RandFloats10k, test.RandFloatsWithNulls10k) +} + +func BenchmarkMaxSeries10k_1AllSeriesHalfNulls(b *testing.B) { + benchmarkMaxSeries(b, 1, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkMaxSeries10k_10AllSeriesHalfNulls(b *testing.B) { + benchmarkMaxSeries(b, 10, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkMaxSeries10k_100AllSeriesHalfNulls(b *testing.B) { + benchmarkMaxSeries(b, 100, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkMaxSeries10k_1000AllSeriesHalfNulls(b *testing.B) { + benchmarkMaxSeries(b, 1000, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} + +func benchmarkMaxSeries(b *testing.B, numSeries int, fn0, fn1 func() []schema.Point) { + var input []models.Series + for i := 0; i < numSeries; i++ { + series := models.Series{ + QueryPatt: strconv.Itoa(i), + } + if i%1 == 0 { + series.Datapoints = fn0() + } else { + series.Datapoints = fn1() + } + input = append(input, series) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + f := NewMaxSeries() + max := f.(*FuncMaxSeries) + max.in = append(max.in, NewMock(input)) + got, err := f.Exec(make(map[Req][]models.Series)) + if err != nil { + b.Fatalf("%s", err) + } + results = got + } + b.SetBytes(int64(numSeries * len(input[0].Datapoints) * 12)) +} diff --git a/expr/funcs.go b/expr/funcs.go index 619f3a67f7..419c08c33d 100644 --- a/expr/funcs.go +++ b/expr/funcs.go @@ -53,6 +53,8 @@ func init() { "averageSeries": {NewAvgSeries, true}, "consolidateBy": {NewConsolidateBy, true}, "divideSeries": {NewDivideSeries, true}, + "max": {NewMaxSeries, true}, + "maxSeries": {NewMaxSeries, true}, "movingAverage": {NewMovingAverage, false}, "perSecond": {NewPerSecond, true}, "scale": {NewScale, true},