Skip to content

Commit

Permalink
[query] Cleanup exponentialMovingAverage implementation to use standa…
Browse files Browse the repository at this point in the history
…rd moving function semantics (#3370)
  • Loading branch information
robskillington authored Mar 17, 2021
1 parent 0826447 commit 965f566
Showing 1 changed file with 134 additions and 88 deletions.
222 changes: 134 additions & 88 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ import (
)

const (
millisPerSecond = 1000
secondsPerDay = 24 * 3600
daysPerWeek = 7
secondsPerWeek = secondsPerDay * daysPerWeek
cactiStyleFormat = "%.2f"
wrappingFmt = "%s(%s)"
alpha = 0.1
gamma = 0.1
beta = 0.0035
millisPerSecond = 1000
secondsPerDay = 24 * 3600
daysPerWeek = 7
secondsPerWeek = secondsPerDay * daysPerWeek
cactiStyleFormat = "%.2f"
wrappingFmt = "%s(%s)"
alpha = 0.1
gamma = 0.1
beta = 0.0035
defaultXFilesFactor = 0.0
)

func joinPathExpr(series ts.SeriesList) string {
Expand Down Expand Up @@ -955,84 +956,73 @@ func movingAverage(ctx *common.Context, input singlePathSpec, windowSizeValue ge
// The `constant` is calculated as:
// constant = 2 / (windowSize + 1)
// the first period EMA uses a simple moving average for its value.
func exponentialMovingAverage(ctx *common.Context, input singlePathSpec, windowSizeValue genericInterface) (*binaryContextShifter, error) {
if len(input.Values) == 0 {
return nil, nil
}
func exponentialMovingAverage(
ctx *common.Context,
input singlePathSpec,
windowSize genericInterface,
) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize,
"exponentialMovingAverage", defaultXFilesFactor,
newExponentialMovingAverageImpl())
}

windowSize, err := parseWindowSize(windowSizeValue, input)
if err != nil {
return nil, err
}
var _ movingImpl = (*exponentialMovingAverageImpl)(nil)

contextShiftingFn := func(c *common.Context) *common.Context {
opts := common.NewChildContextOptions()
opts.AdjustTimeRange(0, 0, windowSize.deltaValue, 0)
childCtx := c.NewChildContext(opts)
return childCtx
}
type exponentialMovingAverageImpl struct {
bootstrap *ts.Series
offset int
ema float64
emaConstant float64
}

bootstrapStartTime, bootstrapEndTime := ctx.StartTime.Add(-windowSize.deltaValue), ctx.StartTime
transformerFn := func(bootstrapped, original ts.SeriesList) (ts.SeriesList, error) {
bootstrapList, err := combineBootstrapWithOriginal(ctx,
bootstrapStartTime, bootstrapEndTime,
bootstrapped, singlePathSpec(original))
if err != nil {
return ts.NewSeriesList(), err
}
func newExponentialMovingAverageImpl() *exponentialMovingAverageImpl {
return &exponentialMovingAverageImpl{}
}

results := make([]*ts.Series, 0, original.Len())
for i, bootstrap := range bootstrapList.Values {
series := original.Values[i]
stepSize := series.MillisPerStep()
windowPoints := windowSize.windowSizeFunc(stepSize)
if windowPoints == 0 {
err := errors.NewInvalidParamsError(fmt.Errorf(
"windowSize should not be smaller than stepSize, windowSize=%v, stepSize=%d",
windowSizeValue, stepSize))
return ts.NewSeriesList(), err
}
emaConstant := 2.0 / (float64(windowPoints) + 1.0)
func (impl *exponentialMovingAverageImpl) Reset(opts movingImplResetOptions) error {
impl.emaConstant = 2.0 / (float64(opts.WindowPoints) + 1.0)

numSteps := series.Len()
offset := bootstrap.Len() - numSteps
vals := ts.NewValues(ctx, series.MillisPerStep(), numSteps)
firstWindow, err := bootstrap.Slice(0, offset)
if err != nil {
return ts.NewSeriesList(), err
}
numSteps := opts.Original.Len()
impl.bootstrap = opts.Bootstrap
impl.offset = impl.bootstrap.Len() - numSteps

// the first value is just a regular moving average
ema := firstWindow.SafeAvg()
if math.IsNaN(ema) {
ema = 0
}
vals.SetValueAt(0, ema)
for i := 1; i < numSteps; i++ {
curr := bootstrap.ValueAt(i + offset)
if !math.IsNaN(curr) {
// formula: ema(current) = constant * (Current Value) + (1 - constant) * ema(previous)
ema = emaConstant*curr + (1-emaConstant)*ema
vals.SetValueAt(i, ema)
} else {
vals.SetValueAt(i, math.NaN())
}
firstWindow, err := impl.bootstrap.Slice(0, impl.offset)
if err != nil {
return err
}

}
// The first value is just a regular moving average.
impl.ema = firstWindow.SafeAvg()
if !math.IsNaN(impl.ema) {
return nil
}

name := fmt.Sprintf("exponentialMovingAverage(%s,%s)", series.Name(), windowSize.stringValue)
newSeries := ts.NewSeries(ctx, name, series.StartTime(), vals)
results = append(results, newSeries)
}
// Use zero if NaN.
impl.ema = 0
return nil
}

original.Values = results
return original, nil
func (impl *exponentialMovingAverageImpl) Evaluate(
window []float64,
values ts.MutableValues,
windowPoints int,
i int,
_ float64,
) {
if i == 0 {
// First value is the first moving average value.
values.SetValueAt(i, impl.ema)
return
}

return &binaryContextShifter{
ContextShiftFunc: contextShiftingFn,
BinaryTransformer: transformerFn,
}, nil
curr := impl.bootstrap.ValueAt(i + impl.offset)
if !math.IsNaN(curr) {
// Formula: ema(current) = constant * (Current Value) + (1 - constant) * ema(previous).
impl.ema = impl.emaConstant*curr + (1-impl.emaConstant)*impl.ema
values.SetValueAt(i, impl.ema)
} else {
values.SetValueAt(i, math.NaN())
}
}

// totalFunc takes an index and returns a total value for that index
Expand Down Expand Up @@ -2079,7 +2069,50 @@ func windowPointsLength(series *ts.Series, interval time.Duration) int {
return int(interval / (time.Duration(series.MillisPerStep()) * time.Millisecond))
}

type movingImplementationFn func(window []float64, values ts.MutableValues, windowPoints int, i int, xFilesFactor float64)
type movingImpl interface {
Reset(
opts movingImplResetOptions,
) error

Evaluate(
window []float64,
values ts.MutableValues,
windowPoints int,
i int,
xFilesFactor float64,
)
}

type movingImplResetOptions struct {
Original *ts.Series
Bootstrap *ts.Series
WindowPoints int
}

// Ensure movingImplementationFn implements movingImpl.
var _ movingImpl = movingImplementationFn(nil)

type movingImplementationFn func(
window []float64,
values ts.MutableValues,
windowPoints int,
i int,
xFilesFactor float64,
)

func (f movingImplementationFn) Reset(opts movingImplResetOptions) error {
return nil
}

func (f movingImplementationFn) Evaluate(
window []float64,
values ts.MutableValues,
windowPoints int,
i int,
xFilesFactor float64,
) {
f(window, values, windowPoints, i, xFilesFactor)
}

// movingMedianHelper given a slice of floats, calculates the median and assigns it into vals as index i
func movingMedianHelper(window []float64, vals ts.MutableValues, windowPoints int, i int, xFilesFactor float64) {
Expand Down Expand Up @@ -2125,7 +2158,7 @@ func newMovingBinaryTransform(
windowSizeValue genericInterface,
movingFunctionName string,
xFilesFactor float64,
impl movingImplementationFn,
impl movingImpl,
) (*binaryContextShifter, error) {
if len(input.Values) == 0 {
return nil, nil
Expand Down Expand Up @@ -2181,9 +2214,18 @@ func newMovingBinaryTransform(
currWindowPoints := windowPointsLength(series, interval)
window := windowPoints[:currWindowPoints]
util.Memset(window, math.NaN())

numSteps := series.Len()
offset := bootstrap.Len() - numSteps
vals := ts.NewValues(ctx, series.MillisPerStep(), numSteps)

if err := impl.Reset(movingImplResetOptions{
Original: series,
Bootstrap: bootstrap,
WindowPoints: currWindowPoints,
}); err != nil {
return ts.SeriesList{}, err
}
for i := 0; i < numSteps; i++ {
for j := i + offset - currWindowPoints; j < i+offset; j++ {
if j < 0 || j >= bootstrap.Len() {
Expand All @@ -2197,7 +2239,7 @@ func newMovingBinaryTransform(

window[idx] = bootstrap.ValueAt(j)
}
impl(window, vals, currWindowPoints, i, xFilesFactor)
impl.Evaluate(window, vals, currWindowPoints, i, xFilesFactor)
}
name := fmt.Sprintf("%s(%s,%s)", movingFunctionName, series.Name(), windowSize.stringValue)
newSeries := ts.NewSeries(ctx, name, series.StartTime(), vals)
Expand All @@ -2212,22 +2254,26 @@ func newMovingBinaryTransform(

// movingMedian calculates the moving median of a metric (or metrics) over a time interval.
func movingMedian(ctx *common.Context, input singlePathSpec, windowSize genericInterface, xFilesFactor float64) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingMedian", xFilesFactor, movingMedianHelper)
return newMovingBinaryTransform(ctx, input, windowSize, "movingMedian", xFilesFactor,
movingImplementationFn(movingMedianHelper))
}

// movingSum calculates the moving sum of a metric (or metrics) over a time interval.
func movingSum(ctx *common.Context, input singlePathSpec, windowSize genericInterface, xFilesFactor float64) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingSum", xFilesFactor, movingSumHelper)
return newMovingBinaryTransform(ctx, input, windowSize, "movingSum", xFilesFactor,
movingImplementationFn(movingSumHelper))
}

// movingMax calculates the moving maximum of a metric (or metrics) over a time interval.
func movingMax(ctx *common.Context, input singlePathSpec, windowSize genericInterface, xFilesFactor float64) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingMax", xFilesFactor, movingMaxHelper)
return newMovingBinaryTransform(ctx, input, windowSize, "movingMax", xFilesFactor,
movingImplementationFn(movingMaxHelper))
}

// movingMin calculates the moving minimum of a metric (or metrics) over a time interval.
func movingMin(ctx *common.Context, input singlePathSpec, windowSize genericInterface, xFilesFactor float64) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingMin", xFilesFactor, movingMinHelper)
return newMovingBinaryTransform(ctx, input, windowSize, "movingMin", xFilesFactor,
movingImplementationFn(movingMinHelper))
}

// legendValue takes one metric or a wildcard seriesList and a string in quotes.
Expand Down Expand Up @@ -2511,19 +2557,19 @@ func init() {
MustRegisterFunction(minimumAbove)
MustRegisterFunction(mostDeviant)
MustRegisterFunction(movingAverage).WithDefaultParams(map[uint8]interface{}{
3: 0.0, // XFilesFactor
3: defaultXFilesFactor, // XFilesFactor
})
MustRegisterFunction(movingMedian).WithDefaultParams(map[uint8]interface{}{
3: 0.0, // XFilesFactor
3: defaultXFilesFactor, // XFilesFactor
})
MustRegisterFunction(movingSum).WithDefaultParams(map[uint8]interface{}{
3: 0.0, // XFilesFactor
3: defaultXFilesFactor, // XFilesFactor
})
MustRegisterFunction(movingMax).WithDefaultParams(map[uint8]interface{}{
3: 0.0, // XFilesFactor
3: defaultXFilesFactor, // XFilesFactor
})
MustRegisterFunction(movingMin).WithDefaultParams(map[uint8]interface{}{
3: 0.0, // XFilesFactor
3: defaultXFilesFactor, // XFilesFactor
})
MustRegisterFunction(multiplySeries)
MustRegisterFunction(nonNegativeDerivative).WithDefaultParams(map[uint8]interface{}{
Expand Down

0 comments on commit 965f566

Please sign in to comment.