diff --git a/expr/expr.go b/expr/expr.go index 14dcdfe691..b418bfe234 100644 --- a/expr/expr.go +++ b/expr/expr.go @@ -2,6 +2,7 @@ package expr import ( "fmt" + "regexp" "strings" ) @@ -126,6 +127,20 @@ func (e expr) consumeBasicArg(pos int, exp Arg) (int, error) { } } *v.val = got.str + case ArgRegex: + if got.etype != etString { + return 0, ErrBadArgumentStr{"string (regex)", string(got.etype)} + } + for _, va := range v.validator { + if err := va(got); err != nil { + return 0, fmt.Errorf("%s: %s", v.key, err.Error()) + } + } + re, err := regexp.Compile(got.str) + if err != nil { + return 0, err + } + *v.val = re case ArgBool: if got.etype != etBool { return 0, ErrBadArgumentStr{"string", string(got.etype)} diff --git a/expr/func_aliassub.go b/expr/func_aliassub.go new file mode 100644 index 0000000000..5ac3ea6ca3 --- /dev/null +++ b/expr/func_aliassub.go @@ -0,0 +1,45 @@ +package expr + +import ( + "regexp" + + "github.com/raintank/metrictank/api/models" +) + +var groupPython = regexp.MustCompile(`\\(\d+)`) + +type FuncAliasSub struct { + in GraphiteFunc + search *regexp.Regexp + replace string +} + +func NewAliasSub() GraphiteFunc { + return &FuncAliasSub{} +} + +func (s *FuncAliasSub) Signature() ([]Arg, []Arg) { + return []Arg{ + ArgSeriesList{val: &s.in}, + ArgRegex{key: "search", val: &s.search}, + ArgString{key: "replace", val: &s.replace}, + }, []Arg{ArgSeries{}} +} + +func (s *FuncAliasSub) NeedRange(from, to uint32) (uint32, uint32) { + return from, to +} + +func (s *FuncAliasSub) Exec(cache map[Req][]models.Series) ([]models.Series, error) { + // support native graphite (python) groups like \3 by turning them into ${3} + replace := groupPython.ReplaceAllString(s.replace, "$${$1}") + series, err := s.in.Exec(cache) + if err != nil { + return nil, err + } + for i := range series { + metric := extractMetric(series[i].Target) + series[i].Target = s.search.ReplaceAllString(metric, replace) + } + return series, err +} diff --git a/expr/func_aliassub_test.go b/expr/func_aliassub_test.go new file mode 100644 index 0000000000..6f21be8674 --- /dev/null +++ b/expr/func_aliassub_test.go @@ -0,0 +1,99 @@ +package expr + +import ( + "fmt" + "regexp" + "testing" + + "github.com/raintank/metrictank/api/models" +) + +func TestAliasSub(t *testing.T) { + cases := []struct { + search string + replace string + in []string + out []string + }{ + { + "this", + "that", + []string{"series.name.this.ok"}, + []string{"series.name.that.ok"}, + }, + { + `^.*TCP(\d+)`, + `\1`, + []string{"ip-foobar-TCP25", "ip-foobar-TCPfoo"}, + []string{"25", "ip-foobar-TCPfoo"}, + }, + { + ".*\\.([^\\.]+)\\.metrics_received.*", + "\\1 in", + []string{"metrictank.stats.env.instance.input.pluginname.metrics_received.counter32"}, + []string{"pluginname in"}, + }, + } + for i, c := range cases { + f := NewAliasSub() + alias := f.(*FuncAliasSub) + alias.search = regexp.MustCompile(c.search) + alias.replace = c.replace + var in []models.Series + for _, name := range c.in { + in = append(in, models.Series{ + Target: name, + }) + } + alias.in = NewMock(in) + got, err := f.Exec(make(map[Req][]models.Series)) + if err != nil { + t.Fatalf("case %d: err should be nil. got %q", i, err) + } + if len(got) != len(in) { + t.Fatalf("case %d: alias output should be same amount of series as input: %d, not %d", i, len(in), len(got)) + } + for i, o := range c.out { + g := got[i] + if o != g.Target { + t.Fatalf("case %d: expected target %q, got %q", i, o, g.Target) + } + } + } +} + +func BenchmarkAliasSub_1(b *testing.B) { + benchmarkAliasSub(b, 1) +} +func BenchmarkAliasSub_10(b *testing.B) { + benchmarkAliasSub(b, 10) +} +func BenchmarkAliasSub_100(b *testing.B) { + benchmarkAliasSub(b, 100) +} +func BenchmarkAliasSub_1000(b *testing.B) { + benchmarkAliasSub(b, 1000) +} + +func benchmarkAliasSub(b *testing.B, numSeries int) { + var input []models.Series + for i := 0; i < numSeries; i++ { + series := models.Series{ + Target: fmt.Sprintf("metrictank.stats.env.instance.input.plugin%d.metrics_received.counter32", i), + } + input = append(input, series) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + f := NewAliasSub() + alias := f.(*FuncAliasSub) + alias.search = regexp.MustCompile(".*\\.([^\\.]+)\\.metrics_received.*") + alias.replace = "\\1 in" + alias.in = NewMock(input) + got, err := f.Exec(make(map[Req][]models.Series)) + if err != nil { + b.Fatalf("%s", err) + } + results = got + } +} diff --git a/expr/func_divideseries.go b/expr/func_divideseries.go new file mode 100644 index 0000000000..b247160144 --- /dev/null +++ b/expr/func_divideseries.go @@ -0,0 +1,69 @@ +package expr + +import ( + "errors" + "fmt" + "math" + + "github.com/raintank/metrictank/api/models" + "gopkg.in/raintank/schema.v1" +) + +type FuncDivideSeries struct { + dividend GraphiteFunc + divisor GraphiteFunc +} + +func NewDivideSeries() GraphiteFunc { + return &FuncDivideSeries{} +} + +func (s *FuncDivideSeries) Signature() ([]Arg, []Arg) { + return []Arg{ + ArgSeriesList{val: &s.dividend}, + ArgSeries{val: &s.divisor}, + }, []Arg{ArgSeries{}} +} + +func (s *FuncDivideSeries) NeedRange(from, to uint32) (uint32, uint32) { + return from, to +} + +func (s *FuncDivideSeries) Exec(cache map[Req][]models.Series) ([]models.Series, error) { + dividends, err := s.dividend.Exec(cache) + if err != nil { + return nil, err + } + divisors, err := s.divisor.Exec(cache) + if err != nil { + return nil, err + } + if len(divisors) != 1 { + return nil, errors.New(fmt.Sprintf("need 1 divisor series, not %d", len(divisors))) + } + divisor := divisors[0] + + var series []models.Series + for _, dividend := range dividends { + out := pointSlicePool.Get().([]schema.Point) + for i := 0; i < len(dividend.Datapoints); i++ { + p := schema.Point{ + Ts: dividend.Datapoints[i].Ts, + } + if divisor.Datapoints[i].Val == 0 { + p.Val = math.NaN() + } else { + p.Val = dividend.Datapoints[i].Val / divisor.Datapoints[i].Val + } + out = append(out, p) + } + output := models.Series{ + Target: fmt.Sprintf("divideSeries(%s,%s)", dividend.Target, divisor.Target), + Datapoints: out, + Interval: divisor.Interval, + } + cache[Req{}] = append(cache[Req{}], output) + series = append(series, output) + } + return series, nil +} diff --git a/expr/func_scale.go b/expr/func_scale.go new file mode 100644 index 0000000000..84562721a5 --- /dev/null +++ b/expr/func_scale.go @@ -0,0 +1,52 @@ +package expr + +import ( + "fmt" + + "github.com/raintank/metrictank/api/models" + "gopkg.in/raintank/schema.v1" +) + +type FuncScale struct { + in GraphiteFunc + factor float64 +} + +func NewScale() GraphiteFunc { + return &FuncScale{} +} + +func (s *FuncScale) Signature() ([]Arg, []Arg) { + return []Arg{ + ArgSeriesList{val: &s.in}, + ArgFloat{key: "factor", val: &s.factor}, + }, []Arg{ + ArgSeriesList{}, + } +} + +func (s *FuncScale) NeedRange(from, to uint32) (uint32, uint32) { + return from, to +} + +func (s *FuncScale) Exec(cache map[Req][]models.Series) ([]models.Series, error) { + series, err := s.in.Exec(cache) + if err != nil { + return nil, err + } + var outputs []models.Series + for _, serie := range series { + out := pointSlicePool.Get().([]schema.Point) + for _, v := range serie.Datapoints { + out = append(out, schema.Point{Val: v.Val * s.factor, Ts: v.Ts}) + } + s := models.Series{ + Target: fmt.Sprintf("scale(%s,%f)", serie.Target, s.factor), + Datapoints: out, + Interval: serie.Interval, + } + outputs = append(outputs, s) + cache[Req{}] = append(cache[Req{}], s) + } + return outputs, nil +} diff --git a/expr/funcs.go b/expr/funcs.go index 344b3636de..d6ce2eb40f 100644 --- a/expr/funcs.go +++ b/expr/funcs.go @@ -37,11 +37,14 @@ func init() { funcs = map[string]funcDef{ "alias": {NewAlias, true}, "aliasByNode": {NewAliasByNode, true}, + "aliasSub": {NewAliasSub, true}, "avg": {NewAvgSeries, true}, "averageSeries": {NewAvgSeries, true}, "consolidateBy": {NewConsolidateBy, true}, + "divideSeries": {NewDivideSeries, true}, "movingAverage": {NewMovingAverage, false}, "perSecond": {NewPerSecond, true}, + "scale": {NewScale, true}, "smartSummarize": {NewSmartSummarize, false}, "sum": {NewSumSeries, true}, "sumSeries": {NewSumSeries, true}, diff --git a/expr/types.go b/expr/types.go index 378f0956d9..7553f034fc 100644 --- a/expr/types.go +++ b/expr/types.go @@ -1,6 +1,8 @@ // argument types. to let functions describe their inputs and outputs package expr +import "regexp" + // Arg is an argument to a GraphiteFunc // note how every implementation has a val property. // this property should point to value accessible to the function. @@ -89,6 +91,17 @@ type ArgString struct { func (a ArgString) Key() string { return a.key } func (a ArgString) Optional() bool { return a.opt } +// like string, but should result in a regex +type ArgRegex struct { + key string + opt bool + validator []Validator + val **regexp.Regexp +} + +func (a ArgRegex) Key() string { return a.key } +func (a ArgRegex) Optional() bool { return a.opt } + // True or False type ArgBool struct { key string