Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid line copy during LogQL line_format #6104

Merged
merged 1 commit into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions pkg/logql/log/fmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,7 @@ func (lf *LineFormatter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool)
lbs.SetErr(errTemplateFormat)
return line, true
}
// todo(cyriltovena): we might want to reuse the input line or a bytes buffer.
res := make([]byte, len(lf.buf.Bytes()))
copy(res, lf.buf.Bytes())
return res, true
return lf.buf.Bytes(), true
}

func (lf *LineFormatter) RequiredLabelNames() []string {
Expand Down
10 changes: 5 additions & 5 deletions pkg/logql/log/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type Pipeline interface {
// A StreamPipeline never mutate the received line.
type StreamPipeline interface {
BaseLabels() LabelsResult
// Process processes a log line and returns the transformed line and the labels.
// The buffer returned for the log line can be reused on subsequent calls to Process and therefore must be copied.
Process(ts int64, line []byte) (resultLine []byte, resultLabels LabelsResult, matches bool)
ProcessString(ts int64, line string) (resultLine string, resultLabels LabelsResult, matches bool)
}
Expand Down Expand Up @@ -149,11 +151,9 @@ func (p *streamPipeline) Process(_ int64, line []byte) ([]byte, LabelsResult, bo

func (p *streamPipeline) ProcessString(ts int64, line string) (string, LabelsResult, bool) {
// Stages only read from the line.
lb := unsafeGetBytes(line)
lb, lr, ok := p.Process(ts, lb)
// either the line is unchanged and we can just send back the same string.
// or we created a new buffer for it in which case it is still safe to avoid the string(byte) copy.
return unsafeGetString(lb), lr, ok
lb, lr, ok := p.Process(ts, unsafeGetBytes(line))
// but the returned line needs to be copied.
return string(lb), lr, ok
}

func (p *streamPipeline) BaseLabels() LabelsResult { return p.builder.currentResult }
Expand Down
59 changes: 59 additions & 0 deletions pkg/logql/syntax/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3050,6 +3050,65 @@ func Test_PipelineCombined(t *testing.T) {
require.Equal(t, string([]byte(`1.5s|POST|200`)), string(line))
}

func Benchmark_PipelineCombined(b *testing.B) {
query := `{job="cortex-ops/query-frontend"} |= "logging.go" | logfmt | line_format "{{.msg}}" | regexp "(?P<method>\\w+) (?P<path>[\\w|/]+) \\((?P<status>\\d+?)\\) (?P<duration>.*)" | (duration > 1s or status==200) and method="POST" | line_format "{{.duration}}|{{.method}}|{{.status}}"`

expr, err := ParseLogSelector(query, true)
require.Nil(b, err)

p, err := expr.Pipeline()
require.Nil(b, err)
sp := p.ForStream(labels.Labels{})
var (
line []byte
lbs log.LabelsResult
matches bool
)
in := []byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`)

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
line, lbs, matches = sp.Process(0, in)
}
require.True(b, matches)
require.Equal(
b,
labels.Labels{labels.Label{Name: "caller", Value: "logging.go:66"}, labels.Label{Name: "duration", Value: "1.5s"}, labels.Label{Name: "level", Value: "debug"}, labels.Label{Name: "method", Value: "POST"}, labels.Label{Name: "msg", Value: "POST /api/prom/api/v1/query_range (200) 1.5s"}, labels.Label{Name: "path", Value: "/api/prom/api/v1/query_range"}, labels.Label{Name: "status", Value: "200"}, labels.Label{Name: "traceID", Value: "a9d4d8a928d8db1"}, labels.Label{Name: "ts", Value: "2020-10-02T10:10:42.092268913Z"}},
lbs.Labels(),
)
require.Equal(b, string([]byte(`1.5s|POST|200`)), string(line))
}

func Benchmark_MetricPipelineCombined(b *testing.B) {
query := `count_over_time({job="cortex-ops/query-frontend"} |= "logging.go" | logfmt | line_format "{{.msg}}" | regexp "(?P<method>\\w+) (?P<path>[\\w|/]+) \\((?P<status>\\d+?)\\) (?P<duration>.*)" | (duration > 1s or status==200) and method="POST" | line_format "{{.duration}}|{{.method}}|{{.status}}"[1m])`

expr, err := ParseSampleExpr(query)
require.Nil(b, err)

p, err := expr.Extractor()
require.Nil(b, err)
sp := p.ForStream(labels.Labels{})
var (
v float64
lbs log.LabelsResult
matches bool
)
in := []byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v, lbs, matches = sp.Process(0, in)
}
require.True(b, matches)
require.Equal(
b,
labels.Labels{labels.Label{Name: "caller", Value: "logging.go:66"}, labels.Label{Name: "duration", Value: "1.5s"}, labels.Label{Name: "level", Value: "debug"}, labels.Label{Name: "method", Value: "POST"}, labels.Label{Name: "msg", Value: "POST /api/prom/api/v1/query_range (200) 1.5s"}, labels.Label{Name: "path", Value: "/api/prom/api/v1/query_range"}, labels.Label{Name: "status", Value: "200"}, labels.Label{Name: "traceID", Value: "a9d4d8a928d8db1"}, labels.Label{Name: "ts", Value: "2020-10-02T10:10:42.092268913Z"}},
lbs.Labels(),
)
require.Equal(b, 1.0, v)
}

var c []*labels.Matcher

func Benchmark_ParseMatchers(b *testing.B) {
Expand Down