Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #995 from bloomberg/keepLastValue1
Browse files Browse the repository at this point in the history
Add keepLastValue function
  • Loading branch information
Dieterbe authored Aug 15, 2018
2 parents 3afbcd6 + 3ce3c8f commit 7841e3c
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 1 deletion.
2 changes: 1 addition & 1 deletion docs/graphite.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ See also:
| interpolate | | No |
| invert | | No |
| isNonNull(seriesList) seriesList | | Stable |
| keepLastValue | | No |
| keepLastValue(seriesList, limit) seriesList | | Stable |
| legendValue | | No |
| limit | | No |
| linearRegression | | No |
Expand Down
83 changes: 83 additions & 0 deletions expr/func_keeplastvalue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package expr

import (
"fmt"
"math"

"github.com/grafana/metrictank/api/models"
schema "gopkg.in/raintank/schema.v1"
)

type FuncKeepLastValue struct {
in GraphiteFunc
limit int64
}

func NewKeepLastValue() GraphiteFunc {
return &FuncKeepLastValue{limit: math.MaxInt64}
}

func (s *FuncKeepLastValue) Signature() ([]Arg, []Arg) {
var stub string
return []Arg{
ArgSeriesList{val: &s.in},
ArgIn{key: "limit",
opt: true,
args: []Arg{
ArgInt{val: &s.limit},
// Treats any string as infinity. This matches Graphite's behavior
// (although intended bevahior is to let user specify "INF" as the limit)
ArgString{val: &stub},
},
},
},
[]Arg{ArgSeriesList{}}
}

func (s *FuncKeepLastValue) Context(context Context) Context {
return context
}

func (s *FuncKeepLastValue) Exec(cache map[Req][]models.Series) ([]models.Series, error) {
series, err := s.in.Exec(cache)
if err != nil {
return nil, err
}
limit := int(s.limit)
outSeries := make([]models.Series, len(series))
for i, serie := range series {
serie.Target = fmt.Sprintf("keepLastValue(%s)", serie.Target)
serie.QueryPatt = serie.Target

out := pointSlicePool.Get().([]schema.Point)

var consecutiveNaNs int
lastVal := math.NaN()

for i, p := range serie.Datapoints {
out = append(out, p)
if math.IsNaN(p.Val) {
consecutiveNaNs++
continue
}
if 0 < consecutiveNaNs && consecutiveNaNs <= limit && !math.IsNaN(lastVal) {
for j := i - consecutiveNaNs; j < i; j++ {
out[j].Val = lastVal
}
}
consecutiveNaNs = 0
lastVal = p.Val
}

if 0 < consecutiveNaNs && consecutiveNaNs <= limit && !math.IsNaN(lastVal) {
for i := len(out) - consecutiveNaNs; i < len(out); i++ {
out[i].Val = lastVal
}
}

serie.Datapoints = out
outSeries[i] = serie
}
cache[Req{}] = append(cache[Req{}], outSeries...)
return outSeries, nil
}
199 changes: 199 additions & 0 deletions expr/func_keeplastvalue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package expr

import (
"math"
"strconv"
"testing"

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/test"
"gopkg.in/raintank/schema.v1"
)

func TestKeepLastValueAll(t *testing.T) {
out := []schema.Point{
{Val: 0, Ts: 10},
{Val: 0, Ts: 20},
{Val: 5.5, Ts: 30},
{Val: 5.5, Ts: 40},
{Val: 5.5, Ts: 50},
{Val: 1234567890, Ts: 60},
}

testKeepLastValue(
"keepAll",
math.MaxInt64,
[]models.Series{
{
Interval: 10,
Target: "a",
Datapoints: getCopy(a),
},
},
[]models.Series{
{
Interval: 10,
Target: "keepLastValue(a)",
Datapoints: out,
},
},
t,
)
}

func TestKeepLastValueNone(t *testing.T) {

testKeepLastValue(
"keepNone",
0,
[]models.Series{
{
Interval: 10,
Target: "sum4a2b",
Datapoints: getCopy(sum4a2b),
},
},
[]models.Series{
{
Interval: 10,
Target: "keepLastValue(sum4a2b)",
Datapoints: getCopy(sum4a2b),
},
},
t,
)
}

func TestKeepLastValueOne(t *testing.T) {
out := []schema.Point{
{Val: 0, Ts: 10},
{Val: math.MaxFloat64, Ts: 20},
{Val: math.MaxFloat64 - 20, Ts: 30},
{Val: math.MaxFloat64 - 20, Ts: 40},
{Val: 1234567890, Ts: 50},
{Val: 1234567890, Ts: 60},
}

testKeepLastValue(
"keepOne",
1,
[]models.Series{
{
Interval: 10,
Target: "b",
Datapoints: getCopy(b),
},
{
Interval: 10,
Target: "a",
Datapoints: getCopy(a),
},
},
[]models.Series{
{
Interval: 10,
Target: "keepLastValue(b)",
Datapoints: out,
},
{
Interval: 10,
Target: "keepLastValue(a)",
Datapoints: getCopy(a),
},
},
t,
)
}

func testKeepLastValue(name string, limit int64, in []models.Series, out []models.Series, t *testing.T) {
f := NewKeepLastValue()
f.(*FuncKeepLastValue).in = NewMock(in)
f.(*FuncKeepLastValue).limit = limit
gots, err := f.Exec(make(map[Req][]models.Series))
if err != nil {
t.Fatalf("case %q (%d): err should be nil. got %q", name, limit, err)
}
if len(gots) != len(out) {
t.Fatalf("case %q (%d): isNonNull len output expected %d, got %d", name, limit, len(out), len(gots))
}
for i, g := range gots {
exp := out[i]
if g.Target != exp.Target {
t.Fatalf("case %q (%d): expected target %q, got %q", name, limit, exp.Target, g.Target)
}
if len(g.Datapoints) != len(exp.Datapoints) {
t.Fatalf("case %q (%d) len output expected %d, got %d", name, limit, len(exp.Datapoints), len(g.Datapoints))
}
for j, p := range g.Datapoints {
bothNaN := math.IsNaN(p.Val) && math.IsNaN(exp.Datapoints[j].Val)
if (bothNaN || p.Val == exp.Datapoints[j].Val) && p.Ts == exp.Datapoints[j].Ts {
continue
}
t.Fatalf("case %q (%d): output point %d - expected %v got %v", name, limit, j, exp.Datapoints[j], p)
}
}
}

func BenchmarkKeepLastValue10k_1NoNulls(b *testing.B) {
benchmarkKeepLastValue(b, 1, test.RandFloats10k, test.RandFloats10k)
}
func BenchmarkKeepLastValue10k_10NoNulls(b *testing.B) {
benchmarkKeepLastValue(b, 10, test.RandFloats10k, test.RandFloats10k)
}
func BenchmarkKeepLastValue10k_100NoNulls(b *testing.B) {
benchmarkKeepLastValue(b, 100, test.RandFloats10k, test.RandFloats10k)
}
func BenchmarkKeepLastValue10k_1000NoNulls(b *testing.B) {
benchmarkKeepLastValue(b, 1000, test.RandFloats10k, test.RandFloats10k)
}

func BenchmarkKeepLastValue10k_1SomeSeriesHalfNulls(b *testing.B) {
benchmarkKeepLastValue(b, 1, test.RandFloats10k, test.RandFloatsWithNulls10k)
}
func BenchmarkKeepLastValue10k_10SomeSeriesHalfNulls(b *testing.B) {
benchmarkKeepLastValue(b, 10, test.RandFloats10k, test.RandFloatsWithNulls10k)
}
func BenchmarkKeepLastValue10k_100SomeSeriesHalfNulls(b *testing.B) {
benchmarkKeepLastValue(b, 100, test.RandFloats10k, test.RandFloatsWithNulls10k)
}
func BenchmarkKeepLastValue10k_1000SomeSeriesHalfNulls(b *testing.B) {
benchmarkKeepLastValue(b, 1000, test.RandFloats10k, test.RandFloatsWithNulls10k)
}

func BenchmarkKeepLastValue10k_1AllSeriesHalfNulls(b *testing.B) {
benchmarkKeepLastValue(b, 1, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
}
func BenchmarkKeepLastValue10k_10AllSeriesHalfNulls(b *testing.B) {
benchmarkKeepLastValue(b, 10, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
}
func BenchmarkKeepLastValue10k_100AllSeriesHalfNulls(b *testing.B) {
benchmarkKeepLastValue(b, 100, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
}
func BenchmarkKeepLastValue10k_1000AllSeriesHalfNulls(b *testing.B) {
benchmarkKeepLastValue(b, 1000, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
}

func benchmarkKeepLastValue(b *testing.B, numSeries int, fn0, fn1 func() []schema.Point) {
var input []models.Series
for i := 0; i < numSeries; i++ {
series := models.Series{
QueryPatt: strconv.Itoa(i),
}
if i%2 == 0 {
series.Datapoints = fn0()
} else {
series.Datapoints = fn1()
}
input = append(input, series)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
f := NewKeepLastValue()
f.(*FuncKeepLastValue).in = NewMock(input)
got, err := f.Exec(make(map[Req][]models.Series))
if err != nil {
b.Fatalf("%s", err)
}
results = got
}
}
1 change: 1 addition & 0 deletions expr/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func init() {
"highestCurrent": {NewHighestLowestConstructor("current", true), true},
"highestMax": {NewHighestLowestConstructor("max", true), true},
"isNonNull": {NewIsNonNull, true},
"keepLastValue": {NewKeepLastValue, true},
"lowest": {NewHighestLowestConstructor("", false), true},
"lowestAverage": {NewHighestLowestConstructor("average", false), true},
"lowestCurrent": {NewHighestLowestConstructor("current", false), true},
Expand Down

0 comments on commit 7841e3c

Please sign in to comment.