diff --git a/src/query/graphite/common/percentiles.go b/src/query/graphite/common/percentiles.go index 64b656c75f..0aa276a26f 100644 --- a/src/query/graphite/common/percentiles.go +++ b/src/query/graphite/common/percentiles.go @@ -66,7 +66,6 @@ func SafeSort(input []float64) int { nans++ } } - sort.Float64s(input) return nans } @@ -79,12 +78,19 @@ func SafeSum(input []float64) (float64, int) { if !math.IsNaN(v) { sum += v } else { - nans += 1 + nans++ } } return sum, nans } +// SafeAverage returns the average of the input slice the number of NaNs in the input. +func SafeAverage(input []float64) (float64, int) { + sum, nans := SafeSum(input) + count := len(input) - nans + return sum / float64(count), nans +} + // SafeMax returns the maximum value of the input slice the number of NaNs in the input. func SafeMax(input []float64) (float64, int) { nans := 0 diff --git a/src/query/graphite/common/test_util.go b/src/query/graphite/common/test_util.go index 39d67d5fe4..5dfadf2f99 100644 --- a/src/query/graphite/common/test_util.go +++ b/src/query/graphite/common/test_util.go @@ -123,6 +123,8 @@ type MovingFunctionStorage struct { StepMillis int Bootstrap []float64 Values []float64 + OriginalIDs []string + BootstrapIDs []string BootstrapStart time.Time } @@ -155,12 +157,21 @@ func (s *MovingFunctionStorage) fetchByIDs( var values []float64 if opts.StartTime.Equal(s.BootstrapStart) { values = s.Bootstrap + if s.BootstrapIDs != nil { + ids = s.BootstrapIDs + } } else { values = s.Values + if s.OriginalIDs != nil { + ids = s.OriginalIDs + } + } + + for _, id := range ids { + series := ts.NewSeries(ctx, id, opts.StartTime, + NewTestSeriesValues(ctx, s.StepMillis, values)) + seriesList = append(seriesList, series) } - series := ts.NewSeries(ctx, ids[0], opts.StartTime, - NewTestSeriesValues(ctx, s.StepMillis, values)) - seriesList = append(seriesList, series) } return storage.NewFetchResult(ctx, seriesList, block.NewResultMetadata()), nil diff --git a/src/query/graphite/native/builtin_functions.go b/src/query/graphite/native/builtin_functions.go index f6368d4d86..ccd286673a 100644 --- a/src/query/graphite/native/builtin_functions.go +++ b/src/query/graphite/native/builtin_functions.go @@ -1728,8 +1728,8 @@ func substr(_ *common.Context, seriesList singlePathSpec, start, stop int) (ts.S // combineBootstrapWithOriginal combines the bootstrapped the series with the original series. func combineBootstrapWithOriginal( ctx *common.Context, - startTime time.Time, - endTime time.Time, + bootstrapStartTime, bootstrapEndTime time.Time, + _, originalEndTime time.Time, bootstrapped ts.SeriesList, seriesList singlePathSpec, ) (ts.SeriesList, error) { @@ -1742,15 +1742,20 @@ func combineBootstrapWithOriginal( for _, series := range seriesList.Values { bs, found := nameToSeries[series.Name()] if !found { - numSteps := ts.NumSteps(startTime, endTime, series.MillisPerStep()) + numSteps := ts.NumSteps(bootstrapStartTime, bootstrapEndTime, series.MillisPerStep()) vals := ts.NewValues(ctx, series.MillisPerStep(), numSteps) - bs = ts.NewSeries(ctx, series.Name(), startTime, vals) + bs = ts.NewSeries(ctx, series.Name(), bootstrapStartTime, vals) + } else { + // Delete from the lookup so we can fill in + // the bootstrapped time series with NaNs if + // the original series list is missing the series. + delete(nameToSeries, series.Name()) } bootstrapList = append(bootstrapList, bs) } var err error - newSeriesList := make([]*ts.Series, len(seriesList.Values)) + newSeriesList := make([]*ts.Series, 0, len(seriesList.Values)+len(nameToSeries)) for i, bootstrap := range bootstrapList { original := seriesList.Values[i] if bootstrap.MillisPerStep() < original.MillisPerStep() { @@ -1759,8 +1764,8 @@ func combineBootstrapWithOriginal( return ts.NewSeriesList(), err } } - bootstrapEndStep := endTime.Truncate(original.Resolution()) - if bootstrapEndStep.Before(endTime) { + bootstrapEndStep := bootstrapEndTime.Truncate(original.Resolution()) + if bootstrapEndStep.Before(bootstrapEndTime) { bootstrapEndStep = bootstrapEndStep.Add(original.Resolution()) } // NB(braskin): using bootstrap.Len() is incorrect as it will include all @@ -1778,9 +1783,32 @@ func combineBootstrapWithOriginal( for j := numBootstrapValues; j < numCombinedValues; j++ { values.SetValueAt(j, original.ValueAt(j-numBootstrapValues)) } - newSeries := ts.NewSeries(ctx, original.Name(), startTime, values) + newSeries := ts.NewSeries(ctx, original.Name(), bootstrapStartTime, values) newSeries.Specification = original.Specification - newSeriesList[i] = newSeries + newSeriesList = append(newSeriesList, newSeries) + } + // Now add any series in bootstrap list but not original series, + // need to iterate the bootstrapped.Values to retain order + // but can check if they are in the nameToSeries map still and if so + // then there was no matching original series. + for _, series := range bootstrapped.Values { + bs, found := nameToSeries[series.Name()] + if !found { + // Processed already. + continue + } + // Extend the bootstrap series to include steps covered by original + // time range since the original series is missing from fetch. + needSteps := bs.StepAtTime(originalEndTime) + if currSteps := bs.Len(); needSteps > currSteps { + // Need to resize. + vals := ts.NewValues(ctx, bs.MillisPerStep(), needSteps) + for i := 0; i < currSteps; i++ { + vals.SetValueAt(i, bs.ValueAt(i)) + } + bs = bs.DerivedSeries(bs.StartTime(), vals) + } + newSeriesList = append(newSeriesList, bs) } r := ts.SeriesList(seriesList) @@ -2231,6 +2259,15 @@ func movingSumHelper(window []float64, vals ts.MutableValues, windowPoints int, } } +// movingAverageHelper given a slice of floats, calculates the average and assigns it into vals as index i +func movingAverageHelper(window []float64, vals ts.MutableValues, windowPoints int, i int, xFilesFactor float64) { + avg, nans := common.SafeAverage(window) + + if nans < windowPoints && effectiveXFF(windowPoints, nans, xFilesFactor) { + vals.SetValueAt(i, avg) + } +} + // movingMaxHelper given a slice of floats, finds the max and assigns it into vals as index i func movingMaxHelper(window []float64, vals ts.MutableValues, windowPoints int, i int, xFilesFactor float64) { max, nans := common.SafeMax(window) @@ -2278,12 +2315,14 @@ func newMovingBinaryTransform( return childCtx } - bootstrapStartTime, bootstrapEndTime := ctx.StartTime.Add(-interval), ctx.StartTime + originalStart, originalEnd := ctx.StartTime, ctx.EndTime + bootstrapStartTime, bootstrapEndTime := originalStart.Add(-interval), originalStart return &binaryContextShifter{ ContextShiftFunc: contextShiftingFn, BinaryTransformer: func(bootstrapped, original ts.SeriesList) (ts.SeriesList, error) { bootstrapList, err := combineBootstrapWithOriginal(ctx, bootstrapStartTime, bootstrapEndTime, + ctx.StartTime, ctx.EndTime, bootstrapped, singlePathSpec(original)) if err != nil { return ts.NewSeriesList(), err @@ -2292,7 +2331,15 @@ func newMovingBinaryTransform( results := make([]*ts.Series, 0, original.Len()) maxWindowPoints := 0 for i := range bootstrapList.Values { - series := original.Values[i] + var series *ts.Series + if i < original.Len() { + // Existing series exists, prefer that resolution. + series = original.Values[i] + } else { + // No existing series use resolution from bootstrapped. + series = bootstrapList.Values[i] + } + windowPoints := windowPointsLength(series, interval) if windowPoints <= 0 { err := xerrors.NewInvalidParamsError(fmt.Errorf( @@ -2307,7 +2354,20 @@ func newMovingBinaryTransform( windowPoints := make([]float64, maxWindowPoints) for i, bootstrap := range bootstrapList.Values { - series := original.Values[i] + var series *ts.Series + if i < original.Len() { + // Existing series exists, prefer that. + series = original.Values[i] + } else { + // No existing series, use an intersected + // version of the bootstrapped series as a + // reference for values to compute. + series, err = bootstrap.IntersectAndResize(originalStart, + originalEnd, bootstrap.MillisPerStep(), bootstrap.ConsolidationFunc()) + if err != nil { + return ts.NewSeriesList(), err + } + } currWindowPoints := windowPointsLength(series, interval) window := windowPoints[:currWindowPoints] util.Memset(window, math.NaN()) @@ -2350,25 +2410,56 @@ func newMovingBinaryTransform( } // movingMedian calculates the moving median of a metric (or metrics) over a time interval. -func movingMedian(ctx *common.Context, input singlePathSpec, windowSize genericInterface, xFilesFactor float64) (*binaryContextShifter, error) { +func movingMedian( + ctx *common.Context, + input singlePathSpec, + windowSize genericInterface, + xFilesFactor float64, +) (*binaryContextShifter, error) { return newMovingBinaryTransform(ctx, input, windowSize, "movingMedian", xFilesFactor, movingImplementationFn(movingMedianHelper)) } // movingSum calculates the moving sum of a metric (or metrics) over a time interval. -func movingSum(ctx *common.Context, input singlePathSpec, windowSize genericInterface, xFilesFactor float64) (*binaryContextShifter, error) { +func movingSum( + ctx *common.Context, + input singlePathSpec, + windowSize genericInterface, + xFilesFactor float64, +) (*binaryContextShifter, error) { return newMovingBinaryTransform(ctx, input, windowSize, "movingSum", xFilesFactor, movingImplementationFn(movingSumHelper)) } +// movingAverage calculates the moving average of a metric (or metrics) over a time interval. +func movingAverage( + ctx *common.Context, + input singlePathSpec, + windowSize genericInterface, + xFilesFactor float64, +) (*binaryContextShifter, error) { + return newMovingBinaryTransform(ctx, input, windowSize, "movingAverage", xFilesFactor, + movingImplementationFn(movingAverageHelper)) +} + // movingMax calculates the moving maximum of a metric (or metrics) over a time interval. -func movingMax(ctx *common.Context, input singlePathSpec, windowSize genericInterface, xFilesFactor float64) (*binaryContextShifter, error) { +func movingMax( + ctx *common.Context, + input singlePathSpec, + windowSize genericInterface, + xFilesFactor float64, +) (*binaryContextShifter, error) { return newMovingBinaryTransform(ctx, input, windowSize, "movingMax", xFilesFactor, movingImplementationFn(movingMaxHelper)) } // movingMin calculates the moving minimum of a metric (or metrics) over a time interval. -func movingMin(ctx *common.Context, input singlePathSpec, windowSize genericInterface, xFilesFactor float64) (*binaryContextShifter, error) { +func movingMin( + ctx *common.Context, + input singlePathSpec, + windowSize genericInterface, + xFilesFactor float64, +) (*binaryContextShifter, error) { return newMovingBinaryTransform(ctx, input, windowSize, "movingMin", xFilesFactor, movingImplementationFn(movingMinHelper)) } diff --git a/src/query/graphite/native/builtin_functions_test.go b/src/query/graphite/native/builtin_functions_test.go index 77eb4a2b4f..13e8d38ac0 100644 --- a/src/query/graphite/native/builtin_functions_test.go +++ b/src/query/graphite/native/builtin_functions_test.go @@ -925,7 +925,8 @@ func TestCombineBootstrapWithOriginal(t *testing.T) { defer func() { _ = ctx.Close() }() - output, err := combineBootstrapWithOriginal(ctx, bootstrapStartTime, bootstrapEndTime, bootstrappedSeriesList, originalSeriesList) + output, err := combineBootstrapWithOriginal(ctx, bootstrapStartTime, bootstrapEndTime, + contextStart, contextEnd, bootstrappedSeriesList, originalSeriesList) assert.Equal(t, output.Values[0], expectedSeries) assert.Nil(t, err) } @@ -1032,6 +1033,57 @@ func TestMovingSumError(t *testing.T) { testMovingFunctionError(t, "movingSum(foo.bar.baz, 0)") } +// TestMovingSumOriginalIDsMissingFromBootstrapIDs tests the case for the +// "moving" function families where the bootstrap of the time range that +// expands back returns timeseries not present from the original series list +// which can happen when using a temporal index (i.e. latest time window +// does not include the same timeseries as the time window from the bootstrap +// list, say using movingSum 1h but the query is only for the last few minutes +// and in the last few minutes data for a series from an hour ago exists +// but is not present in the last few minutes; for this case the results +// from the preceding hour should still be evaluated as part of the movingSum +// calculation). +func TestMovingSumOriginalIDsMissingFromBootstrapIDs(t *testing.T) { + ctx := common.NewTestContext() + defer func() { _ = ctx.Close() }() + + end := time.Now().Truncate(time.Minute) + start := end.Add(-3 * time.Minute) + bootstrapStart := start.Add(-10 * time.Minute) + + engine := NewEngine(&common.MovingFunctionStorage{ + StepMillis: 60000, + Bootstrap: []float64{1, 1, 1, 1, 1, 2, 2, 2, 2, 2}, + BootstrapStart: bootstrapStart, + Values: []float64{3, 3, 3}, + OriginalIDs: []string{"foo.bar"}, + BootstrapIDs: []string{"foo.bar", "foo.baz"}, + }, CompileOptions{}) + phonyContext := common.NewContext(common.ContextOptions{ + Start: start, + End: end, + Engine: engine, + }) + + target := "movingSum(foo.*, '10min')" + expr, err := phonyContext.Engine.(*Engine).Compile(target) + require.NoError(t, err) + res, err := expr.Execute(phonyContext) + require.NoError(t, err) + expected := []common.TestSeries{ + { + Name: "movingSum(foo.bar,\"10min\")", + Data: []float64{15, 17, 19}, + }, + { + Name: "movingSum(foo.baz,\"10min\")", + Data: []float64{15, 14, 13}, + }, + } + common.CompareOutputsAndExpected(t, 60000, start, + expected, res.Values) +} + func TestMovingMaxSuccess(t *testing.T) { values := []float64{12.0, 19.0, -10.0, math.NaN(), 10.0} bootstrap := []float64{3.0, 4.0, 5.0}