From cb03df80c2f3e74f1d0ec18f86f954ac2f8f627c Mon Sep 17 00:00:00 2001 From: Stiven Deleur Date: Mon, 30 Jul 2018 16:19:32 -0400 Subject: [PATCH 1/6] wip --- docs/graphite.md | 4 +- expr/func_keeplastvalue.go | 72 +++++++++++++++++++++++++++++++++ expr/func_keeplastvalue_test.go | 1 + expr/funcs.go | 1 + 4 files changed, 76 insertions(+), 2 deletions(-) create mode 100644 expr/func_keeplastvalue.go create mode 100644 expr/func_keeplastvalue_test.go diff --git a/docs/graphite.md b/docs/graphite.md index 45c1ced0b2..5a99238ba6 100644 --- a/docs/graphite.md +++ b/docs/graphite.md @@ -35,7 +35,7 @@ See also: | Function name and signature | Alias | Metrictank | -| -------------------------------------------------------------- | ----------- | ---------- | +| -------------------------------------------------------------- | ------------ | ---------- | | absolute | | No | | aggregate | | No | | aggregateLine | | No | @@ -96,7 +96,7 @@ See also: | interpolate | | No | | invert | | No | | isNonNull(seriesList) seriesList | | Stable | -| keepLastValue | | No | +| keepLastValue(seriesList, limit) seriesList | | Stable | | legendValue | | No | | limit | | No | | linearRegression | | No | diff --git a/expr/func_keeplastvalue.go b/expr/func_keeplastvalue.go new file mode 100644 index 0000000000..c5440f5b83 --- /dev/null +++ b/expr/func_keeplastvalue.go @@ -0,0 +1,72 @@ +package expr + +import ( + "fmt" + "math" + + "github.com/grafana/metrictank/api/models" + schema "gopkg.in/raintank/schema.v1" +) + +type FuncKeepLastValue struct { + in GraphiteFunc + limit int64 +} + +func NewKeepLastValue() GraphiteFunc { + return &FuncKeepLastValue{limit: math.MaxInt64} +} + +func (s *FuncKeepLastValue) Signature() ([]Arg, []Arg) { + return []Arg{ + ArgSeriesList{val: &s.in}, ArgInt{key: "limit", val: &s.limit, opt: true}}, []Arg{ArgSeriesList{}} +} + +func (s *FuncKeepLastValue) Context(context Context) Context { + return context +} + +func (s *FuncKeepLastValue) Exec(cache map[Req][]models.Series) ([]models.Series, error) { + series, err := s.in.Exec(cache) + if err != nil { + return nil, err + } + + limit := int(s.limit) + outSeries := make([]models.Series, len(series)) + for i, serie := range series { + serie.Target = fmt.Sprintf("keepLastValue(%s)", serie.Target) + serie.QueryPatt = serie.Target + + out := pointSlicePool.Get().([]schema.Point) + + var consecutiveNaNs int + lastVal := math.NaN() + + for i, p := range serie.Datapoints { + out = append(out, p) + if math.IsNaN(p.Val) { + consecutiveNaNs++ + continue + } + if 0 < consecutiveNaNs && consecutiveNaNs <= limit && !math.IsNaN(lastVal) { + for j := i - consecutiveNaNs; j < i; j++ { + out[j].Val = lastVal + } + } + consecutiveNaNs = 0 + lastVal = p.Val + } + + if 0 < consecutiveNaNs && consecutiveNaNs <= limit && !math.IsNaN(lastVal) { + for i := len(out) - consecutiveNaNs; i < len(out); i++ { + out[i].Val = lastVal + } + } + + serie.Datapoints = out + outSeries[i] = serie + } + cache[Req{}] = append(cache[Req{}], outSeries...) + return outSeries, nil +} diff --git a/expr/func_keeplastvalue_test.go b/expr/func_keeplastvalue_test.go new file mode 100644 index 0000000000..3bbc46885c --- /dev/null +++ b/expr/func_keeplastvalue_test.go @@ -0,0 +1 @@ +package expr diff --git a/expr/funcs.go b/expr/funcs.go index 5313a1f327..3bef8b502f 100644 --- a/expr/funcs.go +++ b/expr/funcs.go @@ -70,6 +70,7 @@ func init() { "highestCurrent": {NewHighestLowestConstructor("current", true), true}, "highestMax": {NewHighestLowestConstructor("max", true), true}, "isNonNull": {NewIsNonNull, true}, + "keepLastValue": {NewKeepLastValue, true}, "lowest": {NewHighestLowestConstructor("", false), true}, "lowestAverage": {NewHighestLowestConstructor("average", false), true}, "lowestCurrent": {NewHighestLowestConstructor("current", false), true}, From 106ee1f069b46cc106dad36ba8d40c51caddba07 Mon Sep 17 00:00:00 2001 From: Stiven Deleur Date: Mon, 30 Jul 2018 17:47:43 -0400 Subject: [PATCH 2/6] added support for INF --- expr/func_keeplastvalue.go | 24 ++++++++++++++++++++---- expr/validator.go | 6 ++++++ 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/expr/func_keeplastvalue.go b/expr/func_keeplastvalue.go index c5440f5b83..4988b884ad 100644 --- a/expr/func_keeplastvalue.go +++ b/expr/func_keeplastvalue.go @@ -3,14 +3,16 @@ package expr import ( "fmt" "math" + "strconv" "github.com/grafana/metrictank/api/models" schema "gopkg.in/raintank/schema.v1" ) type FuncKeepLastValue struct { - in GraphiteFunc - limit int64 + in GraphiteFunc + limit int64 + slimit string } func NewKeepLastValue() GraphiteFunc { @@ -19,7 +21,16 @@ func NewKeepLastValue() GraphiteFunc { func (s *FuncKeepLastValue) Signature() ([]Arg, []Arg) { return []Arg{ - ArgSeriesList{val: &s.in}, ArgInt{key: "limit", val: &s.limit, opt: true}}, []Arg{ArgSeriesList{}} + ArgSeriesList{val: &s.in}, + ArgIn{key: "limit", + opt: true, + args: []Arg{ + ArgInt{val: &s.limit}, + ArgString{val: &s.slimit, validator: []Validator{IsNumberString}}, + }, + }, + }, + []Arg{ArgSeriesList{}} } func (s *FuncKeepLastValue) Context(context Context) Context { @@ -31,8 +42,13 @@ func (s *FuncKeepLastValue) Exec(cache map[Req][]models.Series) ([]models.Series if err != nil { return nil, err } - limit := int(s.limit) + if s.slimit != "" { + limitf, _ := strconv.ParseFloat(s.slimit, 64) + if !math.IsInf(limitf, 0) { + limit = int(limitf) + } + } outSeries := make([]models.Series, len(series)) for i, serie := range series { serie.Target = fmt.Sprintf("keepLastValue(%s)", serie.Target) diff --git a/expr/validator.go b/expr/validator.go index 07f89e13e3..2674b9540c 100644 --- a/expr/validator.go +++ b/expr/validator.go @@ -2,6 +2,7 @@ package expr import ( "errors" + "strconv" "github.com/grafana/metrictank/consolidation" "github.com/raintank/dur" @@ -44,3 +45,8 @@ func IsOperator(e *expr) error { } return errors.New("Unsupported operator: " + e.str) } + +func IsNumberString(e *expr) error { + _, err := strconv.ParseFloat("INF", 64) + return err +} From 940e62db9a2606746f51db140996eb045af971d1 Mon Sep 17 00:00:00 2001 From: Stiven Deleur Date: Wed, 1 Aug 2018 16:23:29 -0400 Subject: [PATCH 3/6] added tests, removed string validation --- expr/func_keeplastvalue.go | 14 +-- expr/func_keeplastvalue_test.go | 198 ++++++++++++++++++++++++++++++++ expr/validator.go | 6 - 3 files changed, 201 insertions(+), 17 deletions(-) diff --git a/expr/func_keeplastvalue.go b/expr/func_keeplastvalue.go index 4988b884ad..5e1b838f87 100644 --- a/expr/func_keeplastvalue.go +++ b/expr/func_keeplastvalue.go @@ -3,16 +3,14 @@ package expr import ( "fmt" "math" - "strconv" "github.com/grafana/metrictank/api/models" schema "gopkg.in/raintank/schema.v1" ) type FuncKeepLastValue struct { - in GraphiteFunc - limit int64 - slimit string + in GraphiteFunc + limit int64 } func NewKeepLastValue() GraphiteFunc { @@ -26,7 +24,7 @@ func (s *FuncKeepLastValue) Signature() ([]Arg, []Arg) { opt: true, args: []Arg{ ArgInt{val: &s.limit}, - ArgString{val: &s.slimit, validator: []Validator{IsNumberString}}, + ArgString{}, // Allow user to specify 'INF' as value. if so, will fall back to maxInt }, }, }, @@ -43,12 +41,6 @@ func (s *FuncKeepLastValue) Exec(cache map[Req][]models.Series) ([]models.Series return nil, err } limit := int(s.limit) - if s.slimit != "" { - limitf, _ := strconv.ParseFloat(s.slimit, 64) - if !math.IsInf(limitf, 0) { - limit = int(limitf) - } - } outSeries := make([]models.Series, len(series)) for i, serie := range series { serie.Target = fmt.Sprintf("keepLastValue(%s)", serie.Target) diff --git a/expr/func_keeplastvalue_test.go b/expr/func_keeplastvalue_test.go index 3bbc46885c..452e19c747 100644 --- a/expr/func_keeplastvalue_test.go +++ b/expr/func_keeplastvalue_test.go @@ -1 +1,199 @@ package expr + +import ( + "math" + "strconv" + "testing" + + "github.com/grafana/metrictank/api/models" + "github.com/grafana/metrictank/test" + "gopkg.in/raintank/schema.v1" +) + +func TestKeepLastValueAll(t *testing.T) { + out := []schema.Point{ + {Val: 0, Ts: 10}, + {Val: 0, Ts: 20}, + {Val: 5.5, Ts: 30}, + {Val: 5.5, Ts: 40}, + {Val: 5.5, Ts: 50}, + {Val: 1234567890, Ts: 60}, + } + + testKeepLastValue( + "keepAll", + math.MaxInt64, + []models.Series{ + { + Interval: 10, + Target: "a", + Datapoints: getCopy(a), + }, + }, + []models.Series{ + { + Interval: 10, + Target: "keepLastValue(a)", + Datapoints: out, + }, + }, + t, + ) +} + +func TestKeepLastValueNone(t *testing.T) { + + testKeepLastValue( + "keepNone", + 0, + []models.Series{ + { + Interval: 10, + Target: "sum4a2b", + Datapoints: getCopy(sum4a2b), + }, + }, + []models.Series{ + { + Interval: 10, + Target: "keepLastValue(sum4a2b)", + Datapoints: getCopy(sum4a2b), + }, + }, + t, + ) +} + +func TestKeepLastValueOne(t *testing.T) { + out := []schema.Point{ + {Val: 0, Ts: 10}, + {Val: math.MaxFloat64, Ts: 20}, + {Val: math.MaxFloat64 - 20, Ts: 30}, + {Val: math.MaxFloat64 - 20, Ts: 40}, + {Val: 1234567890, Ts: 50}, + {Val: 1234567890, Ts: 60}, + } + + testKeepLastValue( + "keepOne", + 1, + []models.Series{ + { + Interval: 10, + Target: "b", + Datapoints: getCopy(b), + }, + { + Interval: 10, + Target: "a", + Datapoints: getCopy(a), + }, + }, + []models.Series{ + { + Interval: 10, + Target: "keepLastValue(b)", + Datapoints: out, + }, + { + Interval: 10, + Target: "keepLastValue(a)", + Datapoints: getCopy(a), + }, + }, + t, + ) +} + +func testKeepLastValue(name string, limit int64, in []models.Series, out []models.Series, t *testing.T) { + f := NewKeepLastValue() + f.(*FuncKeepLastValue).in = NewMock(in) + f.(*FuncKeepLastValue).limit = limit + gots, err := f.Exec(make(map[Req][]models.Series)) + if err != nil { + t.Fatalf("case %q (%d): err should be nil. got %q", name, limit, err) + } + if len(gots) != len(out) { + t.Fatalf("case %q (%d): isNonNull len output expected %d, got %d", name, limit, len(out), len(gots)) + } + for i, g := range gots { + exp := out[i] + if g.Target != exp.Target { + t.Fatalf("case %q (%d): expected target %q, got %q", name, limit, exp.Target, g.Target) + } + if len(g.Datapoints) != len(exp.Datapoints) { + t.Fatalf("case %q (%d) len output expected %d, got %d", name, limit, len(exp.Datapoints), len(g.Datapoints)) + } + for j, p := range g.Datapoints { + bothNaN := math.IsNaN(p.Val) && math.IsNaN(exp.Datapoints[j].Val) + if (bothNaN || p.Val == exp.Datapoints[j].Val) && p.Ts == exp.Datapoints[j].Ts { + continue + } + t.Fatalf("case %q (%d): output point %d - expected %v got %v", name, limit, j, exp.Datapoints[j], p) + } + } +} + +func BenchmarkKeepLastValue10k_1NoNulls(b *testing.B) { + benchmarkKeepLastValue(b, 1, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkKeepLastValue10k_10NoNulls(b *testing.B) { + benchmarkKeepLastValue(b, 10, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkKeepLastValue10k_100NoNulls(b *testing.B) { + benchmarkKeepLastValue(b, 100, test.RandFloats10k, test.RandFloats10k) +} +func BenchmarkKeepLastValue10k_1000NoNulls(b *testing.B) { + benchmarkKeepLastValue(b, 1000, test.RandFloats10k, test.RandFloats10k) +} + +func BenchmarkKeepLastValue10k_1SomeSeriesHalfNulls(b *testing.B) { + benchmarkKeepLastValue(b, 1, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkKeepLastValue10k_10SomeSeriesHalfNulls(b *testing.B) { + benchmarkKeepLastValue(b, 10, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkKeepLastValue10k_100SomeSeriesHalfNulls(b *testing.B) { + benchmarkKeepLastValue(b, 100, test.RandFloats10k, test.RandFloatsWithNulls10k) +} +func BenchmarkKeepLastValue10k_1000SomeSeriesHalfNulls(b *testing.B) { + benchmarkKeepLastValue(b, 1000, test.RandFloats10k, test.RandFloatsWithNulls10k) +} + +func BenchmarkKeepLastValue10k_1AllSeriesHalfNulls(b *testing.B) { + benchmarkKeepLastValue(b, 1, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkKeepLastValue10k_10AllSeriesHalfNulls(b *testing.B) { + benchmarkKeepLastValue(b, 10, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkKeepLastValue10k_100AllSeriesHalfNulls(b *testing.B) { + benchmarkKeepLastValue(b, 100, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} +func BenchmarkKeepLastValue10k_1000AllSeriesHalfNulls(b *testing.B) { + benchmarkKeepLastValue(b, 1000, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k) +} + +func benchmarkKeepLastValue(b *testing.B, numSeries int, fn0, fn1 func() []schema.Point) { + var input []models.Series + for i := 0; i < numSeries; i++ { + series := models.Series{ + QueryPatt: strconv.Itoa(i), + } + if i%2 == 0 { + series.Datapoints = fn0() + } else { + series.Datapoints = fn1() + } + input = append(input, series) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + f := NewKeepLastValue() + f.(*FuncKeepLastValue).in = NewMock(input) + got, err := f.Exec(make(map[Req][]models.Series)) + if err != nil { + b.Fatalf("%s", err) + } + results = got + } +} diff --git a/expr/validator.go b/expr/validator.go index 2674b9540c..07f89e13e3 100644 --- a/expr/validator.go +++ b/expr/validator.go @@ -2,7 +2,6 @@ package expr import ( "errors" - "strconv" "github.com/grafana/metrictank/consolidation" "github.com/raintank/dur" @@ -45,8 +44,3 @@ func IsOperator(e *expr) error { } return errors.New("Unsupported operator: " + e.str) } - -func IsNumberString(e *expr) error { - _, err := strconv.ParseFloat("INF", 64) - return err -} From 1a464593253899e93cbeb198796be063092594da Mon Sep 17 00:00:00 2001 From: Stiven Deleur Date: Wed, 1 Aug 2018 16:28:34 -0400 Subject: [PATCH 4/6] added stub for string param --- expr/func_keeplastvalue.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/expr/func_keeplastvalue.go b/expr/func_keeplastvalue.go index 5e1b838f87..6b570b99ff 100644 --- a/expr/func_keeplastvalue.go +++ b/expr/func_keeplastvalue.go @@ -18,13 +18,14 @@ func NewKeepLastValue() GraphiteFunc { } func (s *FuncKeepLastValue) Signature() ([]Arg, []Arg) { + var stub string return []Arg{ ArgSeriesList{val: &s.in}, ArgIn{key: "limit", opt: true, args: []Arg{ ArgInt{val: &s.limit}, - ArgString{}, // Allow user to specify 'INF' as value. if so, will fall back to maxInt + ArgString{val: &stub}, // Allow user to specify 'INF' as value. if so, will fall back to maxInt }, }, }, From c969661324ccc6918c89cf9302faa8850432b6e4 Mon Sep 17 00:00:00 2001 From: Stiven Deleur Date: Tue, 14 Aug 2018 11:49:25 -0400 Subject: [PATCH 5/6] make function stable --- expr/func_keeplastvalue.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/expr/func_keeplastvalue.go b/expr/func_keeplastvalue.go index 6b570b99ff..8a4e7ea75e 100644 --- a/expr/func_keeplastvalue.go +++ b/expr/func_keeplastvalue.go @@ -25,7 +25,9 @@ func (s *FuncKeepLastValue) Signature() ([]Arg, []Arg) { opt: true, args: []Arg{ ArgInt{val: &s.limit}, - ArgString{val: &stub}, // Allow user to specify 'INF' as value. if so, will fall back to maxInt + // Allow user to specify 'INF' as value. if so, will fall back to maxInt + // Ignores any other strings (just like Graphite) + ArgString{val: &stub}, }, }, }, From 3ce3c8f857583a5c9760e632f9e9efbb2eb34e79 Mon Sep 17 00:00:00 2001 From: Stiven Deleur Date: Wed, 15 Aug 2018 10:50:16 -0400 Subject: [PATCH 6/6] changed comment --- expr/func_keeplastvalue.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/expr/func_keeplastvalue.go b/expr/func_keeplastvalue.go index 8a4e7ea75e..f3a92e0bc8 100644 --- a/expr/func_keeplastvalue.go +++ b/expr/func_keeplastvalue.go @@ -25,8 +25,8 @@ func (s *FuncKeepLastValue) Signature() ([]Arg, []Arg) { opt: true, args: []Arg{ ArgInt{val: &s.limit}, - // Allow user to specify 'INF' as value. if so, will fall back to maxInt - // Ignores any other strings (just like Graphite) + // Treats any string as infinity. This matches Graphite's behavior + // (although intended bevahior is to let user specify "INF" as the limit) ArgString{val: &stub}, }, },