From ff0081185894f51da5d343ba08754ed467f55b62 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 5 May 2022 17:26:02 +0200 Subject: [PATCH] Avoid line copy during LogQL line_format --- pkg/logql/log/fmt.go | 5 +-- pkg/logql/log/pipeline.go | 10 +++--- pkg/logql/syntax/parser_test.go | 59 +++++++++++++++++++++++++++++++++ 3 files changed, 65 insertions(+), 9 deletions(-) diff --git a/pkg/logql/log/fmt.go b/pkg/logql/log/fmt.go index 600d12d7ab55..93498e20c608 100644 --- a/pkg/logql/log/fmt.go +++ b/pkg/logql/log/fmt.go @@ -131,10 +131,7 @@ func (lf *LineFormatter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) lbs.SetErr(errTemplateFormat) return line, true } - // todo(cyriltovena): we might want to reuse the input line or a bytes buffer. - res := make([]byte, len(lf.buf.Bytes())) - copy(res, lf.buf.Bytes()) - return res, true + return lf.buf.Bytes(), true } func (lf *LineFormatter) RequiredLabelNames() []string { diff --git a/pkg/logql/log/pipeline.go b/pkg/logql/log/pipeline.go index fa6f4180ff23..610e9c20f9d2 100644 --- a/pkg/logql/log/pipeline.go +++ b/pkg/logql/log/pipeline.go @@ -19,6 +19,8 @@ type Pipeline interface { // A StreamPipeline never mutate the received line. type StreamPipeline interface { BaseLabels() LabelsResult + // Process processes a log line and returns the transformed line and the labels. + // The buffer returned for the log line can be reused on subsequent calls to Process and therefore must be copied. Process(ts int64, line []byte) (resultLine []byte, resultLabels LabelsResult, matches bool) ProcessString(ts int64, line string) (resultLine string, resultLabels LabelsResult, matches bool) } @@ -149,11 +151,9 @@ func (p *streamPipeline) Process(_ int64, line []byte) ([]byte, LabelsResult, bo func (p *streamPipeline) ProcessString(ts int64, line string) (string, LabelsResult, bool) { // Stages only read from the line. - lb := unsafeGetBytes(line) - lb, lr, ok := p.Process(ts, lb) - // either the line is unchanged and we can just send back the same string. - // or we created a new buffer for it in which case it is still safe to avoid the string(byte) copy. - return unsafeGetString(lb), lr, ok + lb, lr, ok := p.Process(ts, unsafeGetBytes(line)) + // but the returned line needs to be copied. + return string(lb), lr, ok } func (p *streamPipeline) BaseLabels() LabelsResult { return p.builder.currentResult } diff --git a/pkg/logql/syntax/parser_test.go b/pkg/logql/syntax/parser_test.go index 56d09b4e41b0..7eda4d6296b9 100644 --- a/pkg/logql/syntax/parser_test.go +++ b/pkg/logql/syntax/parser_test.go @@ -3050,6 +3050,65 @@ func Test_PipelineCombined(t *testing.T) { require.Equal(t, string([]byte(`1.5s|POST|200`)), string(line)) } +func Benchmark_PipelineCombined(b *testing.B) { + query := `{job="cortex-ops/query-frontend"} |= "logging.go" | logfmt | line_format "{{.msg}}" | regexp "(?P\\w+) (?P[\\w|/]+) \\((?P\\d+?)\\) (?P.*)" | (duration > 1s or status==200) and method="POST" | line_format "{{.duration}}|{{.method}}|{{.status}}"` + + expr, err := ParseLogSelector(query, true) + require.Nil(b, err) + + p, err := expr.Pipeline() + require.Nil(b, err) + sp := p.ForStream(labels.Labels{}) + var ( + line []byte + lbs log.LabelsResult + matches bool + ) + in := []byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`) + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + line, lbs, matches = sp.Process(0, in) + } + require.True(b, matches) + require.Equal( + b, + labels.Labels{labels.Label{Name: "caller", Value: "logging.go:66"}, labels.Label{Name: "duration", Value: "1.5s"}, labels.Label{Name: "level", Value: "debug"}, labels.Label{Name: "method", Value: "POST"}, labels.Label{Name: "msg", Value: "POST /api/prom/api/v1/query_range (200) 1.5s"}, labels.Label{Name: "path", Value: "/api/prom/api/v1/query_range"}, labels.Label{Name: "status", Value: "200"}, labels.Label{Name: "traceID", Value: "a9d4d8a928d8db1"}, labels.Label{Name: "ts", Value: "2020-10-02T10:10:42.092268913Z"}}, + lbs.Labels(), + ) + require.Equal(b, string([]byte(`1.5s|POST|200`)), string(line)) +} + +func Benchmark_MetricPipelineCombined(b *testing.B) { + query := `count_over_time({job="cortex-ops/query-frontend"} |= "logging.go" | logfmt | line_format "{{.msg}}" | regexp "(?P\\w+) (?P[\\w|/]+) \\((?P\\d+?)\\) (?P.*)" | (duration > 1s or status==200) and method="POST" | line_format "{{.duration}}|{{.method}}|{{.status}}"[1m])` + + expr, err := ParseSampleExpr(query) + require.Nil(b, err) + + p, err := expr.Extractor() + require.Nil(b, err) + sp := p.ForStream(labels.Labels{}) + var ( + v float64 + lbs log.LabelsResult + matches bool + ) + in := []byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + v, lbs, matches = sp.Process(0, in) + } + require.True(b, matches) + require.Equal( + b, + labels.Labels{labels.Label{Name: "caller", Value: "logging.go:66"}, labels.Label{Name: "duration", Value: "1.5s"}, labels.Label{Name: "level", Value: "debug"}, labels.Label{Name: "method", Value: "POST"}, labels.Label{Name: "msg", Value: "POST /api/prom/api/v1/query_range (200) 1.5s"}, labels.Label{Name: "path", Value: "/api/prom/api/v1/query_range"}, labels.Label{Name: "status", Value: "200"}, labels.Label{Name: "traceID", Value: "a9d4d8a928d8db1"}, labels.Label{Name: "ts", Value: "2020-10-02T10:10:42.092268913Z"}}, + lbs.Labels(), + ) + require.Equal(b, 1.0, v) +} + var c []*labels.Matcher func Benchmark_ParseMatchers(b *testing.B) {