diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index 34f72dbeeb..bdc1262e30 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -42,7 +42,7 @@ func wrapPathExpr(wrapper string, series ts.SeriesList) string { // sumSeries adds metrics together and returns the sum at each datapoint. // If the time series have different intervals, the coarsest interval will be used. func sumSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { - return combineSeries(ctx, series, wrapPathExpr("sumSeries", ts.SeriesList(series)), ts.Sum) + return combineSeries(ctx, series, wrapPathExpr(sumSeriesFnName, ts.SeriesList(series)), ts.Sum) } // diffSeries subtracts all but the first series from the first series. @@ -67,31 +67,98 @@ func diffSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, e } } - return combineSeries(ctx, transformedSeries, wrapPathExpr("diffSeries", ts.SeriesList(series)), ts.Sum) + return combineSeries(ctx, transformedSeries, wrapPathExpr(diffSeriesFnName, ts.SeriesList(series)), ts.Sum) } // multiplySeries multiplies metrics together and returns the product at each datapoint. // If the time series have different intervals, the coarsest interval will be used. func multiplySeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { - return combineSeries(ctx, series, wrapPathExpr("multiplySeries", ts.SeriesList(series)), ts.Mul) + return combineSeries(ctx, series, wrapPathExpr(multiplySeriesFnName, ts.SeriesList(series)), ts.Mul) } // averageSeries takes a list of series and returns a new series containing the // average of all values at each datapoint. func averageSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { - return combineSeries(ctx, series, wrapPathExpr("averageSeries", ts.SeriesList(series)), ts.Avg) + return combineSeries(ctx, series, wrapPathExpr(averageSeriesFnName, ts.SeriesList(series)), ts.Avg) } // minSeries takes a list of series and returns a new series containing the // minimum value across the series at each datapoint func minSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { - return combineSeries(ctx, series, wrapPathExpr("minSeries", ts.SeriesList(series)), ts.Min) + return combineSeries(ctx, series, wrapPathExpr(minSeriesFnName, ts.SeriesList(series)), ts.Min) } // maxSeries takes a list of series and returns a new series containing the // maximum value across the series at each datapoint func maxSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { - return combineSeries(ctx, series, wrapPathExpr("maxSeries", ts.SeriesList(series)), ts.Max) + return combineSeries(ctx, series, wrapPathExpr(maxSeriesFnName, ts.SeriesList(series)), ts.Max) +} + +// lastSeries takes a list of series and returns a new series containing the +// last value at each datapoint +func lastSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { + return combineSeries(ctx, series, joinPathExpr(ts.SeriesList(series)), ts.Last) +} + +// standardDeviationHelper returns the standard deviation of a slice of a []float64 +func standardDeviationHelper(values []float64) float64 { + var count, sum float64 + + for _, value := range values { + if !math.IsNaN(value) { + sum += value + count++ + } + } + if count == 0 { + return math.NaN() + } + avg := sum / count + + m2 := float64(0) + for _, value := range values { + if !math.IsNaN(value) { + diff := value - avg + m2 += diff * diff + } + } + + variance := m2 / count + + return math.Sqrt(variance) +} + +// stddevSeries takes a list of series and returns a new series containing the +// standard deviation at each datapoint +// At step n, stddevSeries will make a list of every series' nth value, +// and calculate the standard deviation of that list. +// The output is a seriesList containing 1 series +func stddevSeries(ctx *common.Context, seriesList multiplePathSpecs) (ts.SeriesList, error) { + if len(seriesList.Values) == 0 { + return ts.NewSeriesList(), nil + } + + firstSeries := seriesList.Values[0] + numSteps := firstSeries.Len() + values := ts.NewValues(ctx, firstSeries.MillisPerStep(), numSteps) + valuesAtTime := make([]float64, 0, numSteps) + for i := 0; i < numSteps; i++ { + valuesAtTime = valuesAtTime[:0] + for _, series := range seriesList.Values { + if l := series.Len(); l != numSteps { + return ts.NewSeriesList(), fmt.Errorf("mismatched series length, expected %d, got %d", numSteps, l) + } + valuesAtTime = append(valuesAtTime, series.ValueAt(i)) + } + values.SetValueAt(i, standardDeviationHelper(valuesAtTime)) + } + + name := wrapPathExpr(stddevSeriesFnName, ts.SeriesList(seriesList)) + output := ts.NewSeries(ctx, name, firstSeries.StartTime(), values) + return ts.SeriesList{ + Values: []*ts.Series{output}, + Metadata: seriesList.Metadata, + }, nil } func divideSeriesHelper(ctx *common.Context, dividendSeries, divisorSeries *ts.Series, metadata block.ResultMetadata) (*ts.Series, error) { @@ -177,6 +244,40 @@ func divideSeriesLists(ctx *common.Context, dividendSeriesList, divisorSeriesLis return r, nil } +// aggregate takes a list of series and returns a new series containing the +// value aggregated across the series at each datapoint using the specified function. +// This function can be used with aggregation functionsL average (or avg), avg_zero, +// median, sum (or total), min, max, diff, stddev, count, +// range (or rangeOf), multiply & last (or current). +func aggregate(ctx *common.Context, series singlePathSpec, fname string) (ts.SeriesList, error) { + switch fname { + case emptyFnName, sumFnName, sumSeriesFnName, totalFnName: + return sumSeries(ctx, multiplePathSpecs(series)) + case minFnName, minSeriesFnName: + return minSeries(ctx, multiplePathSpecs(series)) + case maxFnName, maxSeriesFnName: + return maxSeries(ctx, multiplePathSpecs(series)) + case avgFnName, averageFnName, averageSeriesFnName: + return averageSeries(ctx, multiplePathSpecs(series)) + case multiplyFnName, multiplySeriesFnName: + return multiplySeries(ctx, multiplePathSpecs(series)) + case diffFnName, diffSeriesFnName: + return diffSeries(ctx, multiplePathSpecs(series)) + case countFnName, countSeriesFnName: + return countSeries(ctx, multiplePathSpecs(series)) + case rangeFnName, rangeOfFnName, rangeOfSeriesFnName: + return rangeOfSeries(ctx, series) + case lastFnName, currentFnName: + return lastSeries(ctx, multiplePathSpecs(series)) + case stddevFnName, stdevFnName, stddevSeriesFnName: + return stddevSeries(ctx, multiplePathSpecs(series)) + default: + // Median: the movingMedian() method already implemented is returning an series non compatible result. skip support for now. + // avg_zero is not implemented, skip support for now unless later identified actual use cases. + return ts.NewSeriesList(), errors.NewInvalidParamsError(fmt.Errorf("invalid func %s", fname)) + } +} + // averageSeriesWithWildcards splits the given set of series into sub-groupings // based on wildcard matches in the hierarchy, then averages the values in each // grouping @@ -498,7 +599,7 @@ func groupByNodes(ctx *common.Context, series singlePathSpec, fname string, node func applyFnToMetaSeries(ctx *common.Context, series singlePathSpec, metaSeries map[string][]*ts.Series, fname string) (ts.SeriesList, error) { if fname == "" { - fname = "sum" + fname = sumFnName } f, fexists := summarizeFuncs[fname] @@ -667,7 +768,7 @@ func weightedAverage( // countSeries draws a horizontal line representing the number of nodes found in the seriesList. func countSeries(ctx *common.Context, seriesList multiplePathSpecs) (ts.SeriesList, error) { count, err := common.Count(ctx, ts.SeriesList(seriesList), func(series ts.SeriesList) string { - return wrapPathExpr("countSeries", series) + return wrapPathExpr(countSeriesFnName, series) }) if err != nil { return ts.NewSeriesList(), err diff --git a/src/query/graphite/native/aggregation_functions_test.go b/src/query/graphite/native/aggregation_functions_test.go index 2dae34f374..c2c76df758 100644 --- a/src/query/graphite/native/aggregation_functions_test.go +++ b/src/query/graphite/native/aggregation_functions_test.go @@ -136,6 +136,46 @@ func TestSumSeries(t *testing.T) { }, 15.0, 28.0, 30.0, 17.0, "invalid sum value for step %d") } +func TestStdDevSeries(t *testing.T) { + var ( + ctrl = xgomock.NewController(t) + store = storage.NewMockStorage(ctrl) + engine = NewEngine(store) + start, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:41:19 GMT") + end, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:43:19 GMT") + ctx = common.NewContext(common.ContextOptions{Start: start, End: end, Engine: engine}) + millisPerStep = 60000 + inputs = []*ts.Series{ + ts.NewSeries(ctx, "servers.s2", start, + common.NewTestSeriesValues(ctx, millisPerStep, []float64{10, 20, 30})), + ts.NewSeries(ctx, "servers.s1", start, + common.NewTestSeriesValues(ctx, millisPerStep, []float64{90, 80, 70})), + } + ) + + expectedResults := []common.TestSeries{ + { + Name: "stddevSeries(servers.s2,servers.s1)", + Data: []float64{40, 30, 20}, + }, + } + result, err := stddevSeries(ctx, multiplePathSpecs{ + Values: inputs, + }) + require.NoError(t, err) + common.CompareOutputsAndExpected(t, 60000, start, expectedResults, result.Values) +} + +func TestAggregate(t *testing.T) { + testAggregatedSeries(t, func(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { + return aggregate(ctx, singlePathSpec(series), "sum") + }, 15.0, 28.0, 30.0, 17.0, "invalid sum value for step %d") + + testAggregatedSeries(t, func(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { + return aggregate(ctx, singlePathSpec(series), "maxSeries") + }, 15.0, 15.0, 17.0, 17.0, "invalid max value for step %d") +} + type mockEngine struct { fn func( ctx context.Context, diff --git a/src/query/graphite/native/builtin_functions.go b/src/query/graphite/native/builtin_functions.go index 2a57bdd121..22117f5c3a 100644 --- a/src/query/graphite/native/builtin_functions.go +++ b/src/query/graphite/native/builtin_functions.go @@ -2255,6 +2255,7 @@ func threshold(ctx *common.Context, value float64, label string, color string) ( func init() { // functions - in alpha ordering MustRegisterFunction(absolute) + MustRegisterFunction(aggregate) MustRegisterFunction(aggregateLine).WithDefaultParams(map[uint8]interface{}{ 2: "avg", // f }) @@ -2368,6 +2369,7 @@ func init() { MustRegisterFunction(stdev).WithDefaultParams(map[uint8]interface{}{ 3: 0.1, // windowTolerance }) + MustRegisterFunction(stddevSeries) MustRegisterFunction(substr).WithDefaultParams(map[uint8]interface{}{ 2: 0, // start 3: 0, // stop diff --git a/src/query/graphite/native/builtin_functions_test.go b/src/query/graphite/native/builtin_functions_test.go index 1aab65b565..7b9fe7c0fc 100644 --- a/src/query/graphite/native/builtin_functions_test.go +++ b/src/query/graphite/native/builtin_functions_test.go @@ -3322,6 +3322,7 @@ func TestFunctionsRegistered(t *testing.T) { fnames := []string{ "abs", "absolute", + "aggregate", "aggregateLine", "alias", "aliasByMetric", @@ -3409,6 +3410,7 @@ func TestFunctionsRegistered(t *testing.T) { "sortByTotal", "squareRoot", "stdev", + "stddevSeries", "substr", "sum", "sumSeries", diff --git a/src/query/graphite/native/functions.go b/src/query/graphite/native/functions.go index 7b2098ad14..76c08c5a08 100644 --- a/src/query/graphite/native/functions.go +++ b/src/query/graphite/native/functions.go @@ -40,6 +40,36 @@ var ( functions = map[string]*Function{} ) +// list of graphite function name strings. (not whole list, update on-demand) +const ( + averageFnName = "average" + averageSeriesFnName = "averageSeries" + avgFnName = "avg" + countFnName = "count" + countSeriesFnName = "countSeries" + currentFnName = "current" + diffFnName = "diff" + diffSeriesFnName = "diffSeries" + emptyFnName = "" + lastFnName = "last" + lastSeriesFnName = "lastSeries" + maxFnName = "max" + maxSeriesFnName = "maxSeries" + minFnName = "min" + minSeriesFnName = "minSeries" + multiplyFnName = "multiply" + multiplySeriesFnName = "multiplySeries" + rangeFnName = "range" + rangeOfFnName = "rangeOf" + rangeOfSeriesFnName = "rangeOfSeries" + stdevFnName = "stdev" + stddevFnName = "stddev" + stddevSeriesFnName = "stddevSeries" + sumFnName = "sum" + sumSeriesFnName = "sumSeries" + totalFnName = "total" +) + // registerFunction is used to register a function under a specific name func registerFunction(f interface{}) (*Function, error) { fn, err := buildFunction(f)