From 40f4f1479170a90b39c005292e11a3ec4db4bc34 Mon Sep 17 00:00:00 2001 From: Johan Lindvall Date: Mon, 19 Aug 2024 17:07:29 +0200 Subject: [PATCH] fix: Improve execution speed for queries with label filters (#13922) Co-authored-by: Travis Patterson --- pkg/logql/syntax/ast.go | 51 ++++++++++++++++++------------------ pkg/logql/syntax/ast_test.go | 32 +++++++++++++++++++++- 2 files changed, 57 insertions(+), 26 deletions(-) diff --git a/pkg/logql/syntax/ast.go b/pkg/logql/syntax/ast.go index e391291c2324..38231a936a02 100644 --- a/pkg/logql/syntax/ast.go +++ b/pkg/logql/syntax/ast.go @@ -133,53 +133,54 @@ func (m MultiStageExpr) stages() ([]log.Stage, error) { // are as close to the front of the filter as possible. func (m MultiStageExpr) reorderStages() []StageExpr { var ( - result = make([]StageExpr, 0, len(m)) - filters = make([]*LineFilterExpr, 0, len(m)) - rest = make([]StageExpr, 0, len(m)) + result = make([]StageExpr, 0, len(m)) + lineFilters = make([]*LineFilterExpr, 0, len(m)) + notLineFilters = make([]StageExpr, 0, len(m)) ) + combineFilters := func() { + if len(lineFilters) > 0 { + result = append(result, combineFilters(lineFilters)) + } + + result = append(result, notLineFilters...) + + lineFilters = lineFilters[:0] + notLineFilters = notLineFilters[:0] + } + for _, s := range m { switch f := s.(type) { + case *LabelFilterExpr: + combineFilters() + result = append(result, f) case *LineFilterExpr: - filters = append(filters, f) + lineFilters = append(lineFilters, f) case *LineFmtExpr: // line_format modifies the contents of the line so any line filter // originally after a line_format must still be after the same // line_format. - rest = append(rest, f) + notLineFilters = append(notLineFilters, f) - if len(filters) > 0 { - result = append(result, combineFilters(filters)) - } - result = append(result, rest...) - - filters = filters[:0] - rest = rest[:0] + combineFilters() case *LabelParserExpr: - rest = append(rest, f) + notLineFilters = append(notLineFilters, f) // unpack modifies the contents of the line so any line filter // originally after an unpack must still be after the same // unpack. if f.Op == OpParserTypeUnpack { - if len(filters) > 0 { - result = append(result, combineFilters(filters)) - } - result = append(result, rest...) - - filters = filters[:0] - rest = rest[:0] + combineFilters() } default: - rest = append(rest, f) + notLineFilters = append(notLineFilters, f) } } - if len(filters) > 0 { - result = append(result, combineFilters(filters)) - } - return append(result, rest...) + combineFilters() + + return result } func combineFilters(in []*LineFilterExpr) StageExpr { diff --git a/pkg/logql/syntax/ast_test.go b/pkg/logql/syntax/ast_test.go index b9f6dcdc46bd..98987e38c4be 100644 --- a/pkg/logql/syntax/ast_test.go +++ b/pkg/logql/syntax/ast_test.go @@ -988,7 +988,7 @@ func Test_MergeBinOpVectors_Filter(t *testing.T) { require.Equal(t, &promql.Sample{F: 2}, res) } -func TestFilterReodering(t *testing.T) { +func TestFilterReordering(t *testing.T) { t.Run("it makes sure line filters are as early in the pipeline stages as possible", func(t *testing.T) { logExpr := `{container_name="app"} |= "foo" |= "next" | logfmt |="bar" |="baz" | line_format "{{.foo}}" |="1" |="2" | logfmt |="3"` l, err := ParseExpr(logExpr) @@ -1008,6 +1008,36 @@ func TestFilterReodering(t *testing.T) { require.Len(t, stages, 5) require.Equal(t, `|= "06497595" | unpack != "message" | json | line_format "new log: {{.foo}}"`, MultiStageExpr(stages).String()) }) + + t.Run("it makes sure label filter order is kept", func(t *testing.T) { + logExpr := `{container_name="app"} | bar="next" |= "foo" | logfmt |="bar" |="baz" | line_format "{{.foo}}" |="1" |="2" | logfmt |="3"` + l, err := ParseExpr(logExpr) + require.NoError(t, err) + + stages := l.(*PipelineExpr).MultiStages.reorderStages() + require.Len(t, stages, 6) + require.Equal(t, `| bar="next" |= "foo" |= "bar" |= "baz" | logfmt | line_format "{{.foo}}" |= "1" |= "2" |= "3" | logfmt`, MultiStageExpr(stages).String()) + }) + + t.Run("it makes sure line filters before labels filters keeps correct ordering", func(t *testing.T) { + logExpr := `{container_name="app"} |= "foo" |bar="next"` + l, err := ParseExpr(logExpr) + require.NoError(t, err) + + stages := l.(*PipelineExpr).MultiStages.reorderStages() + require.Len(t, stages, 2) + require.Equal(t, `|= "foo" | bar="next"`, MultiStageExpr(stages).String()) + }) + + t.Run("it makes sure json before label filter keeps correct ordering", func(t *testing.T) { + logExpr := `{container_name="app"} | json | bar="next"` + l, err := ParseExpr(logExpr) + require.NoError(t, err) + + stages := l.(*PipelineExpr).MultiStages.reorderStages() + require.Len(t, stages, 2) + require.Equal(t, `| json | bar="next"`, MultiStageExpr(stages).String()) + }) } var result bool