Skip to content

Commit

Permalink
Merge branch 'master' into r/fix-as-percent
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington committed Mar 18, 2021
2 parents b657a51 + 8df132f commit eb8cc84
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 22 deletions.
10 changes: 8 additions & 2 deletions src/query/graphite/common/percentiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func SafeSort(input []float64) int {
nans++
}
}

sort.Float64s(input)
return nans
}
Expand All @@ -79,12 +78,19 @@ func SafeSum(input []float64) (float64, int) {
if !math.IsNaN(v) {
sum += v
} else {
nans += 1
nans++
}
}
return sum, nans
}

// SafeAverage returns the average of the input slice the number of NaNs in the input.
func SafeAverage(input []float64) (float64, int) {
sum, nans := SafeSum(input)
count := len(input) - nans
return sum / float64(count), nans
}

// SafeMax returns the maximum value of the input slice the number of NaNs in the input.
func SafeMax(input []float64) (float64, int) {
nans := 0
Expand Down
17 changes: 14 additions & 3 deletions src/query/graphite/common/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ type MovingFunctionStorage struct {
StepMillis int
Bootstrap []float64
Values []float64
OriginalIDs []string
BootstrapIDs []string
BootstrapStart time.Time
}

Expand Down Expand Up @@ -155,12 +157,21 @@ func (s *MovingFunctionStorage) fetchByIDs(
var values []float64
if opts.StartTime.Equal(s.BootstrapStart) {
values = s.Bootstrap
if s.BootstrapIDs != nil {
ids = s.BootstrapIDs
}
} else {
values = s.Values
if s.OriginalIDs != nil {
ids = s.OriginalIDs
}
}

for _, id := range ids {
series := ts.NewSeries(ctx, id, opts.StartTime,
NewTestSeriesValues(ctx, s.StepMillis, values))
seriesList = append(seriesList, series)
}
series := ts.NewSeries(ctx, ids[0], opts.StartTime,
NewTestSeriesValues(ctx, s.StepMillis, values))
seriesList = append(seriesList, series)
}

return storage.NewFetchResult(ctx, seriesList, block.NewResultMetadata()), nil
Expand Down
123 changes: 107 additions & 16 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1728,8 +1728,8 @@ func substr(_ *common.Context, seriesList singlePathSpec, start, stop int) (ts.S
// combineBootstrapWithOriginal combines the bootstrapped the series with the original series.
func combineBootstrapWithOriginal(
ctx *common.Context,
startTime time.Time,
endTime time.Time,
bootstrapStartTime, bootstrapEndTime time.Time,
_, originalEndTime time.Time,
bootstrapped ts.SeriesList,
seriesList singlePathSpec,
) (ts.SeriesList, error) {
Expand All @@ -1742,15 +1742,20 @@ func combineBootstrapWithOriginal(
for _, series := range seriesList.Values {
bs, found := nameToSeries[series.Name()]
if !found {
numSteps := ts.NumSteps(startTime, endTime, series.MillisPerStep())
numSteps := ts.NumSteps(bootstrapStartTime, bootstrapEndTime, series.MillisPerStep())
vals := ts.NewValues(ctx, series.MillisPerStep(), numSteps)
bs = ts.NewSeries(ctx, series.Name(), startTime, vals)
bs = ts.NewSeries(ctx, series.Name(), bootstrapStartTime, vals)
} else {
// Delete from the lookup so we can fill in
// the bootstrapped time series with NaNs if
// the original series list is missing the series.
delete(nameToSeries, series.Name())
}
bootstrapList = append(bootstrapList, bs)
}

var err error
newSeriesList := make([]*ts.Series, len(seriesList.Values))
newSeriesList := make([]*ts.Series, 0, len(seriesList.Values)+len(nameToSeries))
for i, bootstrap := range bootstrapList {
original := seriesList.Values[i]
if bootstrap.MillisPerStep() < original.MillisPerStep() {
Expand All @@ -1759,8 +1764,8 @@ func combineBootstrapWithOriginal(
return ts.NewSeriesList(), err
}
}
bootstrapEndStep := endTime.Truncate(original.Resolution())
if bootstrapEndStep.Before(endTime) {
bootstrapEndStep := bootstrapEndTime.Truncate(original.Resolution())
if bootstrapEndStep.Before(bootstrapEndTime) {
bootstrapEndStep = bootstrapEndStep.Add(original.Resolution())
}
// NB(braskin): using bootstrap.Len() is incorrect as it will include all
Expand All @@ -1778,9 +1783,32 @@ func combineBootstrapWithOriginal(
for j := numBootstrapValues; j < numCombinedValues; j++ {
values.SetValueAt(j, original.ValueAt(j-numBootstrapValues))
}
newSeries := ts.NewSeries(ctx, original.Name(), startTime, values)
newSeries := ts.NewSeries(ctx, original.Name(), bootstrapStartTime, values)
newSeries.Specification = original.Specification
newSeriesList[i] = newSeries
newSeriesList = append(newSeriesList, newSeries)
}
// Now add any series in bootstrap list but not original series,
// need to iterate the bootstrapped.Values to retain order
// but can check if they are in the nameToSeries map still and if so
// then there was no matching original series.
for _, series := range bootstrapped.Values {
bs, found := nameToSeries[series.Name()]
if !found {
// Processed already.
continue
}
// Extend the bootstrap series to include steps covered by original
// time range since the original series is missing from fetch.
needSteps := bs.StepAtTime(originalEndTime)
if currSteps := bs.Len(); needSteps > currSteps {
// Need to resize.
vals := ts.NewValues(ctx, bs.MillisPerStep(), needSteps)
for i := 0; i < currSteps; i++ {
vals.SetValueAt(i, bs.ValueAt(i))
}
bs = bs.DerivedSeries(bs.StartTime(), vals)
}
newSeriesList = append(newSeriesList, bs)
}

r := ts.SeriesList(seriesList)
Expand Down Expand Up @@ -2231,6 +2259,15 @@ func movingSumHelper(window []float64, vals ts.MutableValues, windowPoints int,
}
}

// movingAverageHelper given a slice of floats, calculates the average and assigns it into vals as index i
func movingAverageHelper(window []float64, vals ts.MutableValues, windowPoints int, i int, xFilesFactor float64) {
avg, nans := common.SafeAverage(window)

if nans < windowPoints && effectiveXFF(windowPoints, nans, xFilesFactor) {
vals.SetValueAt(i, avg)
}
}

// movingMaxHelper given a slice of floats, finds the max and assigns it into vals as index i
func movingMaxHelper(window []float64, vals ts.MutableValues, windowPoints int, i int, xFilesFactor float64) {
max, nans := common.SafeMax(window)
Expand Down Expand Up @@ -2278,12 +2315,14 @@ func newMovingBinaryTransform(
return childCtx
}

bootstrapStartTime, bootstrapEndTime := ctx.StartTime.Add(-interval), ctx.StartTime
originalStart, originalEnd := ctx.StartTime, ctx.EndTime
bootstrapStartTime, bootstrapEndTime := originalStart.Add(-interval), originalStart
return &binaryContextShifter{
ContextShiftFunc: contextShiftingFn,
BinaryTransformer: func(bootstrapped, original ts.SeriesList) (ts.SeriesList, error) {
bootstrapList, err := combineBootstrapWithOriginal(ctx,
bootstrapStartTime, bootstrapEndTime,
ctx.StartTime, ctx.EndTime,
bootstrapped, singlePathSpec(original))
if err != nil {
return ts.NewSeriesList(), err
Expand All @@ -2292,7 +2331,15 @@ func newMovingBinaryTransform(
results := make([]*ts.Series, 0, original.Len())
maxWindowPoints := 0
for i := range bootstrapList.Values {
series := original.Values[i]
var series *ts.Series
if i < original.Len() {
// Existing series exists, prefer that resolution.
series = original.Values[i]
} else {
// No existing series use resolution from bootstrapped.
series = bootstrapList.Values[i]
}

windowPoints := windowPointsLength(series, interval)
if windowPoints <= 0 {
err := xerrors.NewInvalidParamsError(fmt.Errorf(
Expand All @@ -2307,7 +2354,20 @@ func newMovingBinaryTransform(

windowPoints := make([]float64, maxWindowPoints)
for i, bootstrap := range bootstrapList.Values {
series := original.Values[i]
var series *ts.Series
if i < original.Len() {
// Existing series exists, prefer that.
series = original.Values[i]
} else {
// No existing series, use an intersected
// version of the bootstrapped series as a
// reference for values to compute.
series, err = bootstrap.IntersectAndResize(originalStart,
originalEnd, bootstrap.MillisPerStep(), bootstrap.ConsolidationFunc())
if err != nil {
return ts.NewSeriesList(), err
}
}
currWindowPoints := windowPointsLength(series, interval)
window := windowPoints[:currWindowPoints]
util.Memset(window, math.NaN())
Expand Down Expand Up @@ -2350,25 +2410,56 @@ func newMovingBinaryTransform(
}

// movingMedian calculates the moving median of a metric (or metrics) over a time interval.
func movingMedian(ctx *common.Context, input singlePathSpec, windowSize genericInterface, xFilesFactor float64) (*binaryContextShifter, error) {
func movingMedian(
ctx *common.Context,
input singlePathSpec,
windowSize genericInterface,
xFilesFactor float64,
) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingMedian", xFilesFactor,
movingImplementationFn(movingMedianHelper))
}

// movingSum calculates the moving sum of a metric (or metrics) over a time interval.
func movingSum(ctx *common.Context, input singlePathSpec, windowSize genericInterface, xFilesFactor float64) (*binaryContextShifter, error) {
func movingSum(
ctx *common.Context,
input singlePathSpec,
windowSize genericInterface,
xFilesFactor float64,
) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingSum", xFilesFactor,
movingImplementationFn(movingSumHelper))
}

// movingAverage calculates the moving average of a metric (or metrics) over a time interval.
func movingAverage(
ctx *common.Context,
input singlePathSpec,
windowSize genericInterface,
xFilesFactor float64,
) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingAverage", xFilesFactor,
movingImplementationFn(movingAverageHelper))
}

// movingMax calculates the moving maximum of a metric (or metrics) over a time interval.
func movingMax(ctx *common.Context, input singlePathSpec, windowSize genericInterface, xFilesFactor float64) (*binaryContextShifter, error) {
func movingMax(
ctx *common.Context,
input singlePathSpec,
windowSize genericInterface,
xFilesFactor float64,
) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingMax", xFilesFactor,
movingImplementationFn(movingMaxHelper))
}

// movingMin calculates the moving minimum of a metric (or metrics) over a time interval.
func movingMin(ctx *common.Context, input singlePathSpec, windowSize genericInterface, xFilesFactor float64) (*binaryContextShifter, error) {
func movingMin(
ctx *common.Context,
input singlePathSpec,
windowSize genericInterface,
xFilesFactor float64,
) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingMin", xFilesFactor,
movingImplementationFn(movingMinHelper))
}
Expand Down
54 changes: 53 additions & 1 deletion src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,8 @@ func TestCombineBootstrapWithOriginal(t *testing.T) {

defer func() { _ = ctx.Close() }()

output, err := combineBootstrapWithOriginal(ctx, bootstrapStartTime, bootstrapEndTime, bootstrappedSeriesList, originalSeriesList)
output, err := combineBootstrapWithOriginal(ctx, bootstrapStartTime, bootstrapEndTime,
contextStart, contextEnd, bootstrappedSeriesList, originalSeriesList)
assert.Equal(t, output.Values[0], expectedSeries)
assert.Nil(t, err)
}
Expand Down Expand Up @@ -1032,6 +1033,57 @@ func TestMovingSumError(t *testing.T) {
testMovingFunctionError(t, "movingSum(foo.bar.baz, 0)")
}

// TestMovingSumOriginalIDsMissingFromBootstrapIDs tests the case for the
// "moving" function families where the bootstrap of the time range that
// expands back returns timeseries not present from the original series list
// which can happen when using a temporal index (i.e. latest time window
// does not include the same timeseries as the time window from the bootstrap
// list, say using movingSum 1h but the query is only for the last few minutes
// and in the last few minutes data for a series from an hour ago exists
// but is not present in the last few minutes; for this case the results
// from the preceding hour should still be evaluated as part of the movingSum
// calculation).
func TestMovingSumOriginalIDsMissingFromBootstrapIDs(t *testing.T) {
ctx := common.NewTestContext()
defer func() { _ = ctx.Close() }()

end := time.Now().Truncate(time.Minute)
start := end.Add(-3 * time.Minute)
bootstrapStart := start.Add(-10 * time.Minute)

engine := NewEngine(&common.MovingFunctionStorage{
StepMillis: 60000,
Bootstrap: []float64{1, 1, 1, 1, 1, 2, 2, 2, 2, 2},
BootstrapStart: bootstrapStart,
Values: []float64{3, 3, 3},
OriginalIDs: []string{"foo.bar"},
BootstrapIDs: []string{"foo.bar", "foo.baz"},
}, CompileOptions{})
phonyContext := common.NewContext(common.ContextOptions{
Start: start,
End: end,
Engine: engine,
})

target := "movingSum(foo.*, '10min')"
expr, err := phonyContext.Engine.(*Engine).Compile(target)
require.NoError(t, err)
res, err := expr.Execute(phonyContext)
require.NoError(t, err)
expected := []common.TestSeries{
{
Name: "movingSum(foo.bar,\"10min\")",
Data: []float64{15, 17, 19},
},
{
Name: "movingSum(foo.baz,\"10min\")",
Data: []float64{15, 14, 13},
},
}
common.CompareOutputsAndExpected(t, 60000, start,
expected, res.Values)
}

func TestMovingMaxSuccess(t *testing.T) {
values := []float64{12.0, 19.0, -10.0, math.NaN(), 10.0}
bootstrap := []float64{3.0, 4.0, 5.0}
Expand Down

0 comments on commit eb8cc84

Please sign in to comment.