Skip to content

Commit

Permalink
Add support for last_over_time
Browse files Browse the repository at this point in the history
This `<aggregation>_over_time` function was added in
Prometheus v2.26.0.
  • Loading branch information
ryansammonaiven committed Nov 17, 2021
1 parent c2fb55c commit 641ed97
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 36 deletions.
13 changes: 13 additions & 0 deletions src/query/functions/temporal/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ const (
// StdVarType calculates the standard variance of all values in the specified interval.
StdVarType = "stdvar_over_time"

// LastType returns the most recent value in the specified interval.
LastType = "last_over_time"

// QuantileType calculates the φ-quantile (0 ≤ φ ≤ 1) of the values in the specified interval.
QuantileType = "quantile_over_time"
)
Expand All @@ -67,6 +70,7 @@ var (
SumType: sumOverTime,
StdDevType: stddevOverTime,
StdVarType: stdvarOverTime,
LastType: lastOverTime,
}
)

Expand Down Expand Up @@ -220,6 +224,15 @@ func stdvarOverTime(values []float64) float64 {
return aux / count
}

func lastOverTime(values []float64) float64 {
length := len(values)
if length == 0 {
return math.NaN()
}

return values[length-1]
}

func sumAndCount(values []float64) (float64, float64) {
sum := 0.0
count := 0.0
Expand Down
36 changes: 36 additions & 0 deletions src/query/functions/temporal/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,42 @@ var aggregationTestCases = []testCase{
{nan, nan, nan, nan, nan, nan, nan, nan, nan, nan},
},
},
{
name: "last_over_time",
opType: LastType,
vals: [][]float64{
{nan, 1, 2, 3, 4, 0, 1, 2, 3, 4},
{5, 6, 7, 8, 9, 5, 6, 7, 8, 9},
},
expected: [][]float64{
{nan, 1, 2, 3, 4, 0, 1, 2, 3, 4},
{5, 6, 7, 8, 9, 5, 6, 7, 8, 9},
},
},
{
name: "last_over_time leading NaNs",
opType: LastType,
vals: [][]float64{
{nan, 1, nan, 3, nan, nan, 2, nan, nan, nan},
{5, nan, nan, nan, nan, nan, nan, 7, nan, nan},
},
expected: [][]float64{
{nan, 1, nan, 3, nan, nan, 2, nan, nan, nan},
{5, nan, nan, nan, nan, nan, nan, 7, nan, nan},
},
},
{
name: "last_over_time all NaNs",
opType: LastType,
vals: [][]float64{
{nan, nan, nan, nan, nan, nan, nan, nan, nan, nan},
{nan, nan, nan, nan, nan, nan, nan, nan, nan, nan},
},
expected: [][]float64{
{nan, nan, nan, nan, nan, nan, nan, nan, nan, nan},
{nan, nan, nan, nan, nan, nan, nan, nan, nan, nan},
},
},
{
name: "quantile_over_time",
opType: QuantileType,
Expand Down
38 changes: 26 additions & 12 deletions src/query/functions/temporal/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (c *baseNode) batchProcess(
idx = idx + batch.Size
p := c.makeProcessor.initialize(c.op.duration, c.transformOpts)
go func() {
err := parallelProcess(ctx, loopIndex, batch.Iter, builder, m, p, &mu)
err := parallelProcess(ctx, c.op.OpType(), loopIndex, batch.Iter, builder, m, p, &mu)
if err != nil {
mu.Lock()
// NB: this no-ops if the error is nil.
Expand All @@ -226,6 +226,7 @@ func (c *baseNode) batchProcess(

func parallelProcess(
ctx context.Context,
opType string,
idx int,
iter block.SeriesIter,
builder block.Builder,
Expand Down Expand Up @@ -275,10 +276,15 @@ func parallelProcess(
decodeDuration += stats.DecodeDuration
}

// rename series to exclude their __name__ tag as
// part of function processing.
seriesMeta.Tags = seriesMeta.Tags.WithoutName()
seriesMeta.Name = seriesMeta.Tags.ID()
// The last_over_time function acts like offset;
// thus, it should keep the metric name.
// For all other functions,
// rename series to exclude their __name__ tag as part of function processing.
if opType != LastType {
seriesMeta.Tags = seriesMeta.Tags.WithoutName()
seriesMeta.Name = seriesMeta.Tags.ID()
}

values = values[:0]
for i := 0; i < blockMeta.steps; i++ {
iterBounds := iterationBounds{
Expand Down Expand Up @@ -342,14 +348,22 @@ func (c *baseNode) singleProcess(
return nil, err
}

// The last_over_time function acts like offset;
// thus, it should keep the metric name.
// For all other functions,
// rename series to exclude their __name__ tag as part of function processing.
resultSeriesMeta := make([]block.SeriesMeta, 0, len(seriesIter.SeriesMeta()))
for _, m := range seriesIter.SeriesMeta() {
tags := m.Tags.WithoutName()
resultSeriesMeta = append(resultSeriesMeta, block.SeriesMeta{
Name: tags.ID(),
Tags: tags,
})
var resultSeriesMeta []block.SeriesMeta
if c.op.OpType() != LastType {
resultSeriesMeta = make([]block.SeriesMeta, 0, len(seriesIter.SeriesMeta()))
for _, m := range seriesIter.SeriesMeta() {
tags := m.Tags.WithoutName()
resultSeriesMeta = append(resultSeriesMeta, block.SeriesMeta{
Name: tags.ID(),
Tags: tags,
})
}
} else {
resultSeriesMeta = seriesIter.SeriesMeta()
}

meta := b.Meta()
Expand Down
14 changes: 11 additions & 3 deletions src/query/functions/temporal/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,17 @@ func testTemporalFunc(t *testing.T, opGen opGenerator, tests []testCase) {
Value: []byte("v2"),
}})}

// NB: name should be dropped from series tags, and the name
// should be the updated ID.
expectedSeriesMetas := []block.SeriesMeta{metaOne, metaTwo}
// The last_over_time function acts like offset;
// thus, it should keep the metric name.
// For all other functions,
// name should be dropped from series tags,
// and the name should be the updated ID.
var expectedSeriesMetas []block.SeriesMeta
if tt.opType != LastType {
expectedSeriesMetas = []block.SeriesMeta{metaOne, metaTwo}
} else {
expectedSeriesMetas = seriesMetas
}
require.Equal(t, expectedSeriesMetas, sink.Metas)
})
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/parser/promql/matchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func NewFunctionExpr(

case temporal.AvgType, temporal.CountType, temporal.MinType,
temporal.MaxType, temporal.SumType, temporal.StdDevType,
temporal.StdVarType:
temporal.StdVarType, temporal.LastType:
p, err = temporal.NewAggOp(argValues, name)
return p, true, err

Expand Down
1 change: 1 addition & 0 deletions src/query/parser/promql/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ var temporalParseTests = []struct {
{"sum_over_time(up[5m])", temporal.SumType},
{"stddev_over_time(up[5m])", temporal.StdDevType},
{"stdvar_over_time(up[5m])", temporal.StdVarType},
{"last_over_time(up[5m])", temporal.LastType},
{"quantile_over_time(0.2, up[5m])", temporal.QuantileType},
{"irate(up[5m])", temporal.IRateType},
{"idelta(up[5m])", temporal.IDeltaType},
Expand Down
40 changes: 20 additions & 20 deletions src/query/test/compatibility/testdata/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -593,26 +593,26 @@ load 10s
data{type="some_nan3"} NaN 0 1
data{type="only_nan"} NaN NaN NaN

# Failing with keepNaN feature. eval instant at 1m min_over_time(data[1m])
# {type="numbers"} 0
# {type="some_nan"} 0
# {type="some_nan2"} 1
# {type="some_nan3"} 0
# {type="only_nan"} NaN

# Failing with keepNaN feature. eval instant at 1m max_over_time(data[1m])
# {type="numbers"} 3
# {type="some_nan"} 2
# {type="some_nan2"} 2
# {type="some_nan3"} 1
# {type="only_nan"} NaN

#eval instant at 1m last_over_time(data[1m])
# data{type="numbers"} 3
# data{type="some_nan"} NaN
# data{type="some_nan2"} 1
# data{type="some_nan3"} 1
# data{type="only_nan"} NaN
eval instant at 1m min_over_time(data[1m])
{type="numbers"} 0
{type="some_nan"} 0
{type="some_nan2"} 1
{type="some_nan3"} 0
# Failing with keepNaN feature. {type="only_nan"} NaN

eval instant at 1m max_over_time(data[1m])
{type="numbers"} 3
{type="some_nan"} 2
{type="some_nan2"} 2
{type="some_nan3"} 1
# Failing with keepNaN feature. {type="only_nan"} NaN

eval instant at 1m last_over_time(data[1m])
data{type="numbers"} 3
data{type="some_nan2"} 1
data{type="some_nan3"} 1
# Failing with keepNaN feature. data{type="some_nan"} NaN
# Failing with keepNaN feature. data{type="only_nan"} NaN

clear

Expand Down

0 comments on commit 641ed97

Please sign in to comment.