Skip to content

Commit

Permalink
Fix panic in instant query splitting when using unwrapped rate
Browse files Browse the repository at this point in the history
The range aggregation `rate()` supports both log ranges and unwrapped
ranges, e.g.

`rate({app="foo"} [$__interval])`
and
`rate({app="foo"} | unwrap bar [$__interval])`

Since `rate()` was split into multiple `count_over_time()` over total
duration, but `count_over_time()` does not support `unwrap`, unwrapped
rate queries caused panics.

This fix changes the splitting of `rate({app="foo"} | unwrap bar [$__interval]`
into multiple `sum_over_time()` over total duration.

Fixes #6344

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Jun 19, 2022
1 parent a1c72ad commit 2c5931e
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 6 deletions.
14 changes: 12 additions & 2 deletions pkg/logql/rangemapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,11 @@ func hasLabelExtractionStage(expr syntax.SampleExpr) bool {
// sumOverFullRange returns an expression that sums up individual downstream queries (with preserving labels)
// and dividing it by the full range in seconds to calculate a rate value.
// The operation defines the range aggregation operation of the downstream queries.
// Example:
// Examples:
// rate({app="foo"}[2m])
// => (sum without (count_over_time({app="foo"}[1m]) ++ count_over_time({app="foo"}[1m]) offset 1m) / 120)
// rate({app="foo"} | unwrap bar [2m])
// => (sum without (sum_over_time({app="foo"}[1m]) ++ sum_over_time({app="foo"}[1m]) offset 1m) / 120)
func (m RangeMapper) sumOverFullRange(expr *syntax.RangeAggregationExpr, overrideDownstream *syntax.VectorAggregationExpr, operation string, rangeInterval time.Duration, recorder *downstreamRecorder) syntax.SampleExpr {
var downstreamExpr syntax.SampleExpr = &syntax.RangeAggregationExpr{
Left: expr.Left,
Expand Down Expand Up @@ -373,7 +375,15 @@ func (m RangeMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
if labelExtractor && vectorAggrPushdown.Operation != syntax.OpTypeSum {
return expr
}
return m.sumOverFullRange(expr, vectorAggrPushdown, syntax.OpRangeTypeCount, rangeInterval, recorder)
// rate({app="foo"}[2m]) =>
// => (sum without (count_over_time({app="foo"}[1m]) ++ count_over_time({app="foo"}[1m]) offset 1m) / 120)
op := syntax.OpRangeTypeCount
if expr.Left.Unwrap != nil {
// rate({app="foo"} | unwrap bar [2m])
// => (sum without (sum_over_time({app="foo"}[1m]) ++ sum_over_time({app="foo"}[1m]) offset 1m) / 120)
op = syntax.OpRangeTypeSum
}
return m.sumOverFullRange(expr, vectorAggrPushdown, op, rangeInterval, recorder)
case syntax.OpRangeTypeBytesRate:
if labelExtractor && vectorAggrPushdown.Operation != syntax.OpTypeSum {
return expr
Expand Down
29 changes: 25 additions & 4 deletions pkg/logql/rangemapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ func Test_SplitRangeVectorMapping(t *testing.T) {
++ downstream<count_over_time({app="foo"}[1m]), shard=<nil>>
) / 180)`,
},
{
`rate({app="foo"} | unwrap bar[3m])`,
`(sum without(
downstream<sum_over_time({app="foo"} | unwrap bar [1m] offset 2m0s), shard=<nil>>
++ downstream<sum_over_time({app="foo"} | unwrap bar [1m] offset 1m0s), shard=<nil>>
++ downstream<sum_over_time({app="foo"} | unwrap bar [1m]), shard=<nil>>
) / 180)`,
},
{
`bytes_rate({app="foo"}[3m])`,
`(sum without(
Expand Down Expand Up @@ -1471,6 +1479,23 @@ func Test_SplitRangeVectorMapping(t *testing.T) {
)
)`,
},

// regression test queries
{
`topk(10,sum by (org_id) (rate({container="query-frontend",namespace="loki"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__="" [3m])))`,
`topk(10,
sum by (org_id) (
(
sum without(
downstream<sumby(org_id)(sum_over_time({container="query-frontend",namespace="loki"} |= "metrics.go" | logfmt | unwrapbytes(total_bytes) | __error__="" [1m] offset 2m0s)),shard=<nil>>
++ downstream<sumby(org_id)(sum_over_time({container="query-frontend",namespace="loki"} |= "metrics.go" | logfmt | unwrapbytes(total_bytes) | __error__="" [1m] offset 1m0s)),shard=<nil>>
++ downstream<sumby(org_id)(sum_over_time({container="query-frontend",namespace="loki"} |= "metrics.go" | logfmt | unwrapbytes(total_bytes) | __error__="" [1m])),shard=<nil>>
)
/ 180
)
)
)`,
},
} {
tc := tc
t.Run(tc.expr, func(t *testing.T) {
Expand Down Expand Up @@ -1500,10 +1525,6 @@ func Test_SplitRangeVectorMapping_Noop(t *testing.T) {
`sum(avg_over_time({app="foo"} | unwrap bar[3m]))`,
`sum(avg_over_time({app="foo"} | unwrap bar[3m]))`,
},
{ // this query caused a panic in ops
`topk(10,sum by (cluster,org_id) (rate({container="query-frontend",namespace="loki-prod",cluster="prod-us-central-0"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__=""[1h])))`,
`topk(10,sum by (cluster,org_id) (rate({container="query-frontend",namespace="loki-prod",cluster="prod-us-central-0"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__=""[1h])))`,
},

// should be noop if range interval is lower or equal to split interval (1m)
{
Expand Down

0 comments on commit 2c5931e

Please sign in to comment.