From 9d826b74382071963c10f19cac15e72b3dd6ceb5 Mon Sep 17 00:00:00 2001 From: klei22 Date: Thu, 8 Feb 2018 11:13:11 -0500 Subject: [PATCH 1/4] Remove agg panics If 0 length is passed in, we should return math.NaN like python code Also follow SEP (Somebody else's problem) --- batch/aggregator.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/batch/aggregator.go b/batch/aggregator.go index 178cd71582..015d22e63f 100644 --- a/batch/aggregator.go +++ b/batch/aggregator.go @@ -11,9 +11,6 @@ import ( type AggFunc func(in []schema.Point) float64 func Avg(in []schema.Point) float64 { - if len(in) == 0 { - panic("avg() called in aggregator with 0 terms") - } valid := float64(0) sum := float64(0) for _, term := range in { @@ -42,9 +39,6 @@ func Cnt(in []schema.Point) float64 { } func Lst(in []schema.Point) float64 { - if len(in) == 0 { - panic("last() called in aggregator with 0 terms") - } lst := math.NaN() for _, v := range in { if !math.IsNaN(v.Val) { @@ -55,9 +49,6 @@ func Lst(in []schema.Point) float64 { } func Min(in []schema.Point) float64 { - if len(in) == 0 { - panic("min() called in aggregator with 0 terms") - } valid := false min := math.Inf(1) for _, v := range in { @@ -75,9 +66,6 @@ func Min(in []schema.Point) float64 { } func Max(in []schema.Point) float64 { - if len(in) == 0 { - panic("max() called in aggregator with 0 terms") - } valid := false max := math.Inf(-1) for _, v := range in { From 14de4705843047530df9850ad240ee576555467b Mon Sep 17 00:00:00 2001 From: klei22 Date: Thu, 8 Feb 2018 11:08:36 -0500 Subject: [PATCH 2/4] Added missing batch/agg functions Basically ported over https://github.com/grafana/metrictank/blob/master/expr/seriesaggregators.go ``` median def safeMedian(values): safeValues = [v for v in values if v is not None] if safeValues: sortedVals = sorted(safeValues) mid = len(sortedVals) // 2 if len(sortedVals) % 2 == 0: return float(sortedVals[mid-1] + sortedVals[mid]) / 2 else: return sortedVals[mid] diff def safeDiff(values): safeValues = [v for v in values if v is not None] if safeValues: values = list(map(lambda x: x*-1, safeValues[1:])) values.insert(0, safeValues[0]) return sum(values) stddev def safeStdDev(a): sm = safeSum(a) ln = safeLen(a) avg = safeDiv(sm,ln) if avg is None: return None sum = 0 safeValues = [v for v in a if v is not None] for val in safeValues: sum = sum + (val - avg) * (val - avg) return math.sqrt(sum/ln) range 'range': lambda row: safeSubtract(safeMax(row), safeMin(row)), multiply 'multiply': lambda row: safeMul(*row), def safeMul(*factors): if None in factors: return None factors = [float(x) for x in factors] product = reduce(lambda x,y: x*y, factors) return product ``` --- batch/aggregator.go | 80 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/batch/aggregator.go b/batch/aggregator.go index 015d22e63f..6492f7b4ae 100644 --- a/batch/aggregator.go +++ b/batch/aggregator.go @@ -6,6 +6,7 @@ package batch import ( "gopkg.in/raintank/schema.v1" "math" + "sort" ) type AggFunc func(in []schema.Point) float64 @@ -82,6 +83,85 @@ func Max(in []schema.Point) float64 { return max } +func Mult(in []schema.Point) float64 { + valid := false + mult := float64(1) + for _, term := range in { + if math.IsNaN(term.Val) { + // NaN * anything equals NaN() + mult = math.NaN() + break + } + valid = true + mult *= term.Val + } + if !valid { + mult = math.NaN() + } + return mult +} + +func Med(in []schema.Point) float64 { + med := math.NaN() + vals := make([]float64, 0, len(in)) + for i := 0; i < len(in); i++ { + p := in[i].Val + if !math.IsNaN(p) { + vals = append(vals, p) + } + } + if len(vals) != 0 { + sort.Float64s(vals) + mid := len(vals) / 2 + if len(vals)%2 == 0 { + med = (vals[mid-1] + vals[mid]) / 2 + } else { + med = vals[mid] + } + } + return med +} + +func Diff(in []schema.Point) float64 { + diff := math.NaN() + for i := 0; i < len(in); i++ { + p := in[i].Val + if !math.IsNaN(p) { + if math.IsNaN(diff) { + diff = p + } else { + diff -= p + } + } + } + return diff +} + +func StdDev(in []schema.Point) float64 { + avg := Avg(in) + if !math.IsNaN(avg) { + num := float64(0) + totalDeviationSquared := float64(0) + for i := 0; i < len(in); i++ { + p := in[i].Val + if !math.IsNaN(p) { + num++ + deviation := p - avg + totalDeviationSquared += deviation * deviation + } + } + std := math.Sqrt(totalDeviationSquared / num) + return std + } + return math.NaN() +} + +func Range(in []schema.Point) float64 { + min := Min(in) + max := Max(in) + return max - min +} + func Sum(in []schema.Point) float64 { valid := false sum := float64(0) From 51361600fd500efafe60c1ea7a6ca984a1c0cbb0 Mon Sep 17 00:00:00 2001 From: klei22 Date: Thu, 8 Feb 2018 14:53:52 -0500 Subject: [PATCH 3/4] Add batch/aggfuncs to consolidation Note that this fixes bug --> unable to get cnt aggfunc Also ignores archive which seems to be different --- consolidation/consolidation.go | 47 +++++++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/consolidation/consolidation.go b/consolidation/consolidation.go index 9b00a2e7ed..f3e5afceb5 100644 --- a/consolidation/consolidation.go +++ b/consolidation/consolidation.go @@ -23,6 +23,11 @@ const ( Max Min Cnt // not available through http api + Mult + Med + Diff + StdDev + Range ) // String provides human friendly names @@ -40,6 +45,16 @@ func (c Consolidator) String() string { return "MinimumConsolidator" case Max: return "MaximumConsolidator" + case Mult: + return "MultiplyConsolidator" + case Med: + return "MedianConsolidator" + case Diff: + return "DifferenceConsolidator" + case StdDev: + return "StdDevConsolidator" + case Range: + return "RangeConsolidator" case Sum: return "SumConsolidator" } @@ -96,6 +111,16 @@ func FromConsolidateBy(c string) Consolidator { return Min case "max": return Max + case "mult", "multiply": + return Mult + case "med", "median": + return Med + case "diff": + return Diff + case "stddev": + return StdDev + case "range": + return Range case "sum": return Sum } @@ -116,6 +141,16 @@ func GetAggFunc(consolidator Consolidator) batch.AggFunc { consFunc = batch.Min case Max: consFunc = batch.Max + case Mult: + consFunc = batch.Mult + case Med: + consFunc = batch.Med + case Diff: + consFunc = batch.Diff + case StdDev: + consFunc = batch.StdDev + case Range: + consFunc = batch.Range case Sum: consFunc = batch.Sum } @@ -123,7 +158,17 @@ func GetAggFunc(consolidator Consolidator) batch.AggFunc { } func Validate(fn string) error { - if fn == "avg" || fn == "average" || fn == "last" || fn == "min" || fn == "max" || fn == "sum" { + if fn == "avg" || + fn == "average" || + fn == "count" || fn == "last" || // bonus + fn == "min" || + fn == "max" || + fn == "mult" || fn == "multiply" || + fn == "med" || fn == "median" || + fn == "diff" || + fn == "stddev" || + fn == "range" || + fn == "sum" { return nil } return errUnknownConsolidationFunction From d57d220e6f7502eecaf8aca05cb80311dbad780f Mon Sep 17 00:00:00 2001 From: klei22 Date: Mon, 26 Feb 2018 09:54:59 -0500 Subject: [PATCH 4/4] Address comments --- batch/aggregator.go | 62 +++++++++++++++++++++++++++------------------ 1 file changed, 37 insertions(+), 25 deletions(-) diff --git a/batch/aggregator.go b/batch/aggregator.go index 6492f7b4ae..f24b47e825 100644 --- a/batch/aggregator.go +++ b/batch/aggregator.go @@ -84,19 +84,16 @@ func Max(in []schema.Point) float64 { } func Mult(in []schema.Point) float64 { - valid := false + if len(in) == 0 { + return math.NaN() + } mult := float64(1) - for _, term := range in { - if math.IsNaN(term.Val) { + for _, fact := range in { + if math.IsNaN(fact.Val) { // NaN * anything equals NaN() - mult = math.NaN() - break + return math.NaN() } - valid = true - mult *= term.Val - } - if !valid { - mult = math.NaN() + mult *= fact.Val } return mult } @@ -139,26 +136,41 @@ func Diff(in []schema.Point) float64 { func StdDev(in []schema.Point) float64 { avg := Avg(in) - if !math.IsNaN(avg) { - num := float64(0) - totalDeviationSquared := float64(0) - for i := 0; i < len(in); i++ { - p := in[i].Val - if !math.IsNaN(p) { - num++ - deviation := p - avg - totalDeviationSquared += deviation * deviation - } + if math.IsNaN(avg) { + return avg + } + num := float64(0) + sumDeviationsSquared := float64(0) + for i := 0; i < len(in); i++ { + p := in[i].Val + if !math.IsNaN(p) { + num++ + deviation := p - avg + sumDeviationsSquared += deviation * deviation } - std := math.Sqrt(totalDeviationSquared / num) - return std } - return math.NaN() + std := math.Sqrt(sumDeviationsSquared / num) + return std } func Range(in []schema.Point) float64 { - min := Min(in) - max := Max(in) + valid := false + min := math.Inf(1) + max := math.Inf(-1) + for _, v := range in { + if !math.IsNaN(v.Val) { + valid = true + if v.Val < min { + min = v.Val + } + if v.Val > max { + max = v.Val + } + } + } + if !valid { + return math.NaN() + } return max - min }