Skip to content

Commit

Permalink
Avoid line copy during LogQL line_format (#6104)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Jun 3, 2022
1 parent d9cc6a8 commit 7e53381
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 9 deletions.
5 changes: 1 addition & 4 deletions pkg/logql/log/fmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,7 @@ func (lf *LineFormatter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool)
lbs.SetErr(errTemplateFormat)
return line, true
}
// todo(cyriltovena): we might want to reuse the input line or a bytes buffer.
res := make([]byte, len(lf.buf.Bytes()))
copy(res, lf.buf.Bytes())
return res, true
return lf.buf.Bytes(), true
}

func (lf *LineFormatter) RequiredLabelNames() []string {
Expand Down
10 changes: 5 additions & 5 deletions pkg/logql/log/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type Pipeline interface {
// A StreamPipeline never mutate the received line.
type StreamPipeline interface {
BaseLabels() LabelsResult
// Process processes a log line and returns the transformed line and the labels.
// The buffer returned for the log line can be reused on subsequent calls to Process and therefore must be copied.
Process(ts int64, line []byte) (resultLine []byte, resultLabels LabelsResult, matches bool)
ProcessString(ts int64, line string) (resultLine string, resultLabels LabelsResult, matches bool)
}
Expand Down Expand Up @@ -149,11 +151,9 @@ func (p *streamPipeline) Process(_ int64, line []byte) ([]byte, LabelsResult, bo

func (p *streamPipeline) ProcessString(ts int64, line string) (string, LabelsResult, bool) {
// Stages only read from the line.
lb := unsafeGetBytes(line)
lb, lr, ok := p.Process(ts, lb)
// either the line is unchanged and we can just send back the same string.
// or we created a new buffer for it in which case it is still safe to avoid the string(byte) copy.
return unsafeGetString(lb), lr, ok
lb, lr, ok := p.Process(ts, unsafeGetBytes(line))
// but the returned line needs to be copied.
return string(lb), lr, ok
}

func (p *streamPipeline) BaseLabels() LabelsResult { return p.builder.currentResult }
Expand Down
59 changes: 59 additions & 0 deletions pkg/logql/syntax/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3072,6 +3072,65 @@ func Test_PipelineCombined(t *testing.T) {
require.Equal(t, string([]byte(`1.5s|POST|200`)), string(line))
}

func Benchmark_PipelineCombined(b *testing.B) {
query := `{job="cortex-ops/query-frontend"} |= "logging.go" | logfmt | line_format "{{.msg}}" | regexp "(?P<method>\\w+) (?P<path>[\\w|/]+) \\((?P<status>\\d+?)\\) (?P<duration>.*)" | (duration > 1s or status==200) and method="POST" | line_format "{{.duration}}|{{.method}}|{{.status}}"`

expr, err := ParseLogSelector(query, true)
require.Nil(b, err)

p, err := expr.Pipeline()
require.Nil(b, err)
sp := p.ForStream(labels.Labels{})
var (
line []byte
lbs log.LabelsResult
matches bool
)
in := []byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`)

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
line, lbs, matches = sp.Process(0, in)
}
require.True(b, matches)
require.Equal(
b,
labels.Labels{labels.Label{Name: "caller", Value: "logging.go:66"}, labels.Label{Name: "duration", Value: "1.5s"}, labels.Label{Name: "level", Value: "debug"}, labels.Label{Name: "method", Value: "POST"}, labels.Label{Name: "msg", Value: "POST /api/prom/api/v1/query_range (200) 1.5s"}, labels.Label{Name: "path", Value: "/api/prom/api/v1/query_range"}, labels.Label{Name: "status", Value: "200"}, labels.Label{Name: "traceID", Value: "a9d4d8a928d8db1"}, labels.Label{Name: "ts", Value: "2020-10-02T10:10:42.092268913Z"}},
lbs.Labels(),
)
require.Equal(b, string([]byte(`1.5s|POST|200`)), string(line))
}

func Benchmark_MetricPipelineCombined(b *testing.B) {
query := `count_over_time({job="cortex-ops/query-frontend"} |= "logging.go" | logfmt | line_format "{{.msg}}" | regexp "(?P<method>\\w+) (?P<path>[\\w|/]+) \\((?P<status>\\d+?)\\) (?P<duration>.*)" | (duration > 1s or status==200) and method="POST" | line_format "{{.duration}}|{{.method}}|{{.status}}"[1m])`

expr, err := ParseSampleExpr(query)
require.Nil(b, err)

p, err := expr.Extractor()
require.Nil(b, err)
sp := p.ForStream(labels.Labels{})
var (
v float64
lbs log.LabelsResult
matches bool
)
in := []byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
v, lbs, matches = sp.Process(0, in)
}
require.True(b, matches)
require.Equal(
b,
labels.Labels{labels.Label{Name: "caller", Value: "logging.go:66"}, labels.Label{Name: "duration", Value: "1.5s"}, labels.Label{Name: "level", Value: "debug"}, labels.Label{Name: "method", Value: "POST"}, labels.Label{Name: "msg", Value: "POST /api/prom/api/v1/query_range (200) 1.5s"}, labels.Label{Name: "path", Value: "/api/prom/api/v1/query_range"}, labels.Label{Name: "status", Value: "200"}, labels.Label{Name: "traceID", Value: "a9d4d8a928d8db1"}, labels.Label{Name: "ts", Value: "2020-10-02T10:10:42.092268913Z"}},
lbs.Labels(),
)
require.Equal(b, 1.0, v)
}

var c []*labels.Matcher

func Benchmark_ParseMatchers(b *testing.B) {
Expand Down

0 comments on commit 7e53381

Please sign in to comment.