Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Add isNonNull function #959

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/graphite.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ divideSeriesLists(dividends, divisors) seriesList | | Stable
exclude(seriesList, pattern) seriesList | | Stable
grep(seriesList, pattern) seriesList | | Stable
groupByTags(seriesList, func, tagList) seriesList | | Stable
isNonNull(seriesList) seriesList | | Stable
maxSeries(seriesList) series | max | Stable
minSeries(seriesList) series | min | Stable
multiplySeries(seriesList) series | | Stable
Expand Down
62 changes: 62 additions & 0 deletions expr/func_isnonnull.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package expr

import (
"fmt"
"math"

"github.com/grafana/metrictank/api/models"
schema "gopkg.in/raintank/schema.v1"
)

type FuncIsNonNull struct {
in GraphiteFunc
}

func NewIsNonNull() GraphiteFunc {
return &FuncIsNonNull{}
}

func (s *FuncIsNonNull) Signature() ([]Arg, []Arg) {
return []Arg{
ArgSeriesList{val: &s.in}}, []Arg{ArgSeriesList{}}
}

func (s *FuncIsNonNull) Context(context Context) Context {
return context
}

func (s *FuncIsNonNull) Exec(cache map[Req][]models.Series) ([]models.Series, error) {
series, err := s.in.Exec(cache)
if err != nil {
return nil, err
}

out := make([]models.Series, 0, len(series))
for _, serie := range series {
transformed := models.Series{
Copy link
Contributor

@replay replay Jul 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the out slice is already allocated with the correct size. wouldn't it be better to just update its properties instead of instantiating a new models.Series and then copying it into out?
f.e.:

transformed := &out[i]
transformed.Target = fmt.Sprintf("isNonNull(%s)", serie.Target)

(i is the iterator of the loop)

Target: fmt.Sprintf("isNonNull(%s)", serie.Target),
QueryPatt: fmt.Sprintf("isNonNull(%s)", serie.QueryPatt),
Tags: make(map[string]string, len(serie.Tags)+1),
Datapoints: pointSlicePool.Get().([]schema.Point),
Interval: serie.Interval,
Consolidator: serie.Consolidator,
QueryCons: serie.QueryCons,
}
for k, v := range serie.Tags {
transformed.Tags[k] = v
}
transformed.Tags["isNonNull"] = "1"
for _, p := range serie.Datapoints {
if math.IsNaN(p.Val) {
p.Val = 0
} else {
p.Val = 1
}
transformed.Datapoints = append(transformed.Datapoints, p)
}
out = append(out, transformed)
cache[Req{}] = append(cache[Req{}], transformed)
}

return out, nil
}
217 changes: 217 additions & 0 deletions expr/func_isnonnull_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package expr

import (
"strconv"
"testing"

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/test"
"gopkg.in/raintank/schema.v1"
)

var aIsNonNull = []schema.Point{
{Val: 1, Ts: 10},
{Val: 1, Ts: 20},
{Val: 1, Ts: 30},
{Val: 0, Ts: 40},
{Val: 0, Ts: 50},
{Val: 1, Ts: 60},
}

var bIsNonNull = []schema.Point{
{Val: 1, Ts: 10},
{Val: 1, Ts: 20},
{Val: 1, Ts: 30},
{Val: 0, Ts: 40},
{Val: 1, Ts: 50},
{Val: 0, Ts: 60},
}

var cdIsNonNull = []schema.Point{
{Val: 1, Ts: 10},
{Val: 1, Ts: 20},
{Val: 1, Ts: 30},
{Val: 1, Ts: 40},
{Val: 1, Ts: 50},
{Val: 1, Ts: 60},
}

func TestIsNonNullSingle(t *testing.T) {
testIsNonNull(
"identity",
[]models.Series{
{
Interval: 10,
QueryPatt: "a",
Datapoints: getCopy(a),
},
},
[]models.Series{
{
Interval: 10,
QueryPatt: "isNonNull(a)",
Datapoints: getCopy(aIsNonNull),
},
},
t,
)
}

func TestIsNonNullSingleAllNonNull(t *testing.T) {
testIsNonNull(
"identity-counter8bit",
[]models.Series{
{
Interval: 10,
QueryPatt: "counter8bit",
Datapoints: getCopy(d),
},
},
[]models.Series{
{
Interval: 10,
QueryPatt: "isNonNull(counter8bit)",
Datapoints: getCopy(cdIsNonNull),
},
},
t,
)
}

func TestIsNonNullMulti(t *testing.T) {
testIsNonNull(
"multiple-series",
[]models.Series{
{
Interval: 10,
QueryPatt: "a",
Datapoints: getCopy(a),
},
{
Interval: 10,
QueryPatt: "b.*",
Datapoints: getCopy(b),
},
{
Interval: 10,
QueryPatt: "c.foo{bar,baz}",
Datapoints: getCopy(c),
},
{
Interval: 10,
QueryPatt: "movingAverage(bar, '1min')",
Datapoints: getCopy(d),
},
},
[]models.Series{
{
QueryPatt: "isNonNull(a)",
Datapoints: getCopy(aIsNonNull),
},
{
QueryPatt: "isNonNull(b.*)",
Datapoints: getCopy(bIsNonNull),
},
{
QueryPatt: "isNonNull(c.foo{bar,baz})",
Datapoints: getCopy(cdIsNonNull),
},
{
QueryPatt: "isNonNull(movingAverage(bar, '1min'))",
Datapoints: getCopy(cdIsNonNull),
},
},
t,
)
}

func testIsNonNull(name string, in []models.Series, out []models.Series, t *testing.T) {
f := NewIsNonNull()
f.(*FuncIsNonNull).in = NewMock(in)
gots, err := f.Exec(make(map[Req][]models.Series))
if err != nil {
t.Fatalf("case %q: err should be nil. got %q", name, err)
}
if len(gots) != len(out) {
t.Fatalf("case %q: isNonNull len output expected %d, got %d", name, len(out), len(gots))
}
for i, g := range gots {
exp := out[i]
if g.QueryPatt != exp.QueryPatt {
t.Fatalf("case %q: expected target %q, got %q", name, exp.QueryPatt, g.QueryPatt)
}
if len(g.Datapoints) != len(exp.Datapoints) {
t.Fatalf("case %q: len output expected %d, got %d", name, len(exp.Datapoints), len(g.Datapoints))
}
for j, p := range g.Datapoints {
if (p.Val == exp.Datapoints[j].Val) && p.Ts == exp.Datapoints[j].Ts {
continue
}
t.Fatalf("case %q: output point %d - expected %v got %v", name, j, exp.Datapoints[j], p)
}
}
}

func BenchmarkIsNonNull10k_1NoNulls(b *testing.B) {
benchmarkIsNonNull(b, 1, test.RandFloats10k, test.RandFloats10k)
}
func BenchmarkIsNonNull10k_10NoNulls(b *testing.B) {
benchmarkIsNonNull(b, 10, test.RandFloats10k, test.RandFloats10k)
}
func BenchmarkIsNonNull10k_100NoNulls(b *testing.B) {
benchmarkIsNonNull(b, 100, test.RandFloats10k, test.RandFloats10k)
}
func BenchmarkIsNonNull10k_1000NoNulls(b *testing.B) {
benchmarkIsNonNull(b, 1000, test.RandFloats10k, test.RandFloats10k)
}

func BenchmarkIsNonNull10k_1SomeSeriesHalfNulls(b *testing.B) {
benchmarkIsNonNull(b, 1, test.RandFloats10k, test.RandFloatsWithNulls10k)
}
func BenchmarkIsNonNull10k_10SomeSeriesHalfNulls(b *testing.B) {
benchmarkIsNonNull(b, 10, test.RandFloats10k, test.RandFloatsWithNulls10k)
}
func BenchmarkIsNonNull10k_100SomeSeriesHalfNulls(b *testing.B) {
benchmarkIsNonNull(b, 100, test.RandFloats10k, test.RandFloatsWithNulls10k)
}
func BenchmarkIsNonNull10k_1000SomeSeriesHalfNulls(b *testing.B) {
benchmarkIsNonNull(b, 1000, test.RandFloats10k, test.RandFloatsWithNulls10k)
}

func BenchmarkIsNonNull10k_1AllSeriesHalfNulls(b *testing.B) {
benchmarkIsNonNull(b, 1, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
}
func BenchmarkIsNonNull10k_10AllSeriesHalfNulls(b *testing.B) {
benchmarkIsNonNull(b, 10, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
}
func BenchmarkIsNonNull10k_100AllSeriesHalfNulls(b *testing.B) {
benchmarkIsNonNull(b, 100, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
}
func BenchmarkIsNonNull10k_1000AllSeriesHalfNulls(b *testing.B) {
benchmarkIsNonNull(b, 1000, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
}

func benchmarkIsNonNull(b *testing.B, numSeries int, fn0, fn1 func() []schema.Point) {
var input []models.Series
for i := 0; i < numSeries; i++ {
series := models.Series{
QueryPatt: strconv.Itoa(i),
}
if i%2 == 0 {
series.Datapoints = fn0()
} else {
series.Datapoints = fn1()
}
input = append(input, series)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
f := NewIsNonNull()
f.(*FuncIsNonNull).in = NewMock(input)
got, err := f.Exec(make(map[Req][]models.Series))
if err != nil {
b.Fatalf("%s", err)
}
results = got
}
}
1 change: 1 addition & 0 deletions expr/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func init() {
"exclude": {NewExclude, true},
"grep": {NewGrep, true},
"groupByTags": {NewGroupByTags, true},
"isNonNull": {NewIsNonNull, true},
"max": {NewAggregateConstructor("max", crossSeriesMax), true},
"maxSeries": {NewAggregateConstructor("max", crossSeriesMax), true},
"min": {NewAggregateConstructor("min", crossSeriesMin), true},
Expand Down