Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-2.6.x] Fix panic in instant query splitting when using unwrapped rate #6557

Merged
merged 1 commit into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`min_over_time({a=~".+"} | unwrap b [2s])`, time.Second},
{`min_over_time({a=~".+"} | unwrap b [2s]) by (a)`, time.Second},
{`rate({a=~".+"}[2s])`, time.Second},
{`rate({a=~".+"} | unwrap b [2s])`, time.Second},
{`bytes_rate({a=~".+"}[2s])`, time.Second},

// sum
Expand All @@ -137,6 +138,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`sum(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum(rate({a=~".+"}[2s]))`, time.Second},
{`sum(rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum(bytes_rate({a=~".+"}[2s]))`, time.Second},

// sum by
Expand All @@ -148,6 +150,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`sum by (a) (rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},

// count
Expand All @@ -159,6 +162,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`count(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count(rate({a=~".+"}[2s]))`, time.Second},
{`count(rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count(bytes_rate({a=~".+"}[2s]))`, time.Second},

// count by
Expand All @@ -170,6 +174,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`count by (a) (rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},

// max
Expand All @@ -181,6 +186,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`max(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max(rate({a=~".+"}[2s]))`, time.Second},
{`max(rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max(bytes_rate({a=~".+"}[2s]))`, time.Second},

// max by
Expand All @@ -192,6 +198,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`max by (a) (rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},

// min
Expand All @@ -203,6 +210,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`min(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min(rate({a=~".+"}[2s]))`, time.Second},
{`min(rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min(bytes_rate({a=~".+"}[2s]))`, time.Second},

// min by
Expand All @@ -214,6 +222,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`min by (a) (rate({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},

// Label extraction stage
Expand All @@ -228,6 +237,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second},
{`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`sum(rate({a=~".+"} | logfmt[2s]))`, time.Second},
{`sum(rate({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second},
{`sum(bytes_rate({a=~".+"} | logfmt[2s]))`, time.Second},
{`sum by (a) (bytes_over_time({a=~".+"} | logfmt [2s]))`, time.Second},
{`sum by (a) (count_over_time({a=~".+"} | logfmt [2s]))`, time.Second},
Expand All @@ -237,6 +247,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
{`sum by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second},
{`sum by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`sum by (a) (rate({a=~".+"} | logfmt[2s]))`, time.Second},
{`sum by (a) (rate({a=~".+"} | logfmt | unwrap line [2s]))`, time.Second},
{`sum by (a) (bytes_rate({a=~".+"} | logfmt[2s]))`, time.Second},

{`count(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
Expand Down
14 changes: 12 additions & 2 deletions pkg/logql/rangemapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,11 @@ func hasLabelExtractionStage(expr syntax.SampleExpr) bool {
// sumOverFullRange returns an expression that sums up individual downstream queries (with preserving labels)
// and dividing it by the full range in seconds to calculate a rate value.
// The operation defines the range aggregation operation of the downstream queries.
// Example:
// Examples:
// rate({app="foo"}[2m])
// => (sum without (count_over_time({app="foo"}[1m]) ++ count_over_time({app="foo"}[1m]) offset 1m) / 120)
// rate({app="foo"} | unwrap bar [2m])
// => (sum without (sum_over_time({app="foo"}[1m]) ++ sum_over_time({app="foo"}[1m]) offset 1m) / 120)
func (m RangeMapper) sumOverFullRange(expr *syntax.RangeAggregationExpr, overrideDownstream *syntax.VectorAggregationExpr, operation string, rangeInterval time.Duration, recorder *downstreamRecorder) syntax.SampleExpr {
var downstreamExpr syntax.SampleExpr = &syntax.RangeAggregationExpr{
Left: expr.Left,
Expand Down Expand Up @@ -373,7 +375,15 @@ func (m RangeMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
if labelExtractor && vectorAggrPushdown.Operation != syntax.OpTypeSum {
return expr
}
return m.sumOverFullRange(expr, vectorAggrPushdown, syntax.OpRangeTypeCount, rangeInterval, recorder)
// rate({app="foo"}[2m]) =>
// => (sum without (count_over_time({app="foo"}[1m]) ++ count_over_time({app="foo"}[1m]) offset 1m) / 120)
op := syntax.OpRangeTypeCount
if expr.Left.Unwrap != nil {
// rate({app="foo"} | unwrap bar [2m])
// => (sum without (sum_over_time({app="foo"}[1m]) ++ sum_over_time({app="foo"}[1m]) offset 1m) / 120)
op = syntax.OpRangeTypeSum
}
return m.sumOverFullRange(expr, vectorAggrPushdown, op, rangeInterval, recorder)
case syntax.OpRangeTypeBytesRate:
if labelExtractor && vectorAggrPushdown.Operation != syntax.OpTypeSum {
return expr
Expand Down
29 changes: 25 additions & 4 deletions pkg/logql/rangemapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ func Test_SplitRangeVectorMapping(t *testing.T) {
++ downstream<count_over_time({app="foo"}[1m]), shard=<nil>>
) / 180)`,
},
{
`rate({app="foo"} | unwrap bar[3m])`,
`(sum without(
downstream<sum_over_time({app="foo"} | unwrap bar [1m] offset 2m0s), shard=<nil>>
++ downstream<sum_over_time({app="foo"} | unwrap bar [1m] offset 1m0s), shard=<nil>>
++ downstream<sum_over_time({app="foo"} | unwrap bar [1m]), shard=<nil>>
) / 180)`,
},
{
`bytes_rate({app="foo"}[3m])`,
`(sum without(
Expand Down Expand Up @@ -1471,6 +1479,23 @@ func Test_SplitRangeVectorMapping(t *testing.T) {
)
)`,
},

// regression test queries
{
`topk(10,sum by (org_id) (rate({container="query-frontend",namespace="loki"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__="" [3m])))`,
`topk(10,
sum by (org_id) (
(
sum without(
downstream<sum by(org_id)(sum_over_time({container="query-frontend",namespace="loki"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__="" [1m] offset 2m0s)),shard=<nil>>
++ downstream<sum by(org_id)(sum_over_time({container="query-frontend",namespace="loki"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__="" [1m] offset 1m0s)),shard=<nil>>
++ downstream<sum by(org_id)(sum_over_time({container="query-frontend",namespace="loki"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__="" [1m])),shard=<nil>>
)
/ 180
)
)
)`,
},
} {
tc := tc
t.Run(tc.expr, func(t *testing.T) {
Expand Down Expand Up @@ -1500,10 +1525,6 @@ func Test_SplitRangeVectorMapping_Noop(t *testing.T) {
`sum(avg_over_time({app="foo"} | unwrap bar[3m]))`,
`sum(avg_over_time({app="foo"} | unwrap bar[3m]))`,
},
{ // this query caused a panic in ops
`topk(10,sum by (cluster,org_id) (rate({container="query-frontend",namespace="loki-prod",cluster="prod-us-central-0"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__=""[1h])))`,
`topk(10,sum by (cluster,org_id) (rate({container="query-frontend",namespace="loki-prod",cluster="prod-us-central-0"} |= "metrics.go" | logfmt | unwrap bytes(total_bytes) | __error__=""[1h])))`,
},

// should be noop if range interval is lower or equal to split interval (1m)
{
Expand Down