diff --git a/pkg/logql/log/filter.go b/pkg/logql/log/filter.go index 53851d009354..8378247df2f1 100644 --- a/pkg/logql/log/filter.go +++ b/pkg/logql/log/filter.go @@ -39,9 +39,11 @@ func (n notFilter) Filter(line []byte) bool { } func (n notFilter) ToStage() Stage { - return StageFunc(func(line []byte, _ *LabelsBuilder) ([]byte, bool) { - return line, n.Filter(line) - }) + return StageFunc{ + process: func(line []byte, _ *LabelsBuilder) ([]byte, bool) { + return line, n.Filter(line) + }, + } } // newNotFilter creates a new filter which matches only if the base filter doesn't match. @@ -81,9 +83,11 @@ func (a andFilter) Filter(line []byte) bool { } func (a andFilter) ToStage() Stage { - return StageFunc(func(line []byte, _ *LabelsBuilder) ([]byte, bool) { - return line, a.Filter(line) - }) + return StageFunc{ + process: func(line []byte, _ *LabelsBuilder) ([]byte, bool) { + return line, a.Filter(line) + }, + } } type orFilter struct { @@ -120,9 +124,11 @@ func (a orFilter) Filter(line []byte) bool { } func (a orFilter) ToStage() Stage { - return StageFunc(func(line []byte, _ *LabelsBuilder) ([]byte, bool) { - return line, a.Filter(line) - }) + return StageFunc{ + process: func(line []byte, _ *LabelsBuilder) ([]byte, bool) { + return line, a.Filter(line) + }, + } } type regexpFilter struct { @@ -148,9 +154,11 @@ func (r regexpFilter) Filter(line []byte) bool { } func (r regexpFilter) ToStage() Stage { - return StageFunc(func(line []byte, _ *LabelsBuilder) ([]byte, bool) { - return line, r.Filter(line) - }) + return StageFunc{ + process: func(line []byte, _ *LabelsBuilder) ([]byte, bool) { + return line, r.Filter(line) + }, + } } type containsFilter struct { @@ -166,9 +174,11 @@ func (l containsFilter) Filter(line []byte) bool { } func (l containsFilter) ToStage() Stage { - return StageFunc(func(line []byte, _ *LabelsBuilder) ([]byte, bool) { - return line, l.Filter(line) - }) + return StageFunc{ + process: func(line []byte, _ *LabelsBuilder) ([]byte, bool) { + return line, l.Filter(line) + }, + } } func (l containsFilter) String() string { diff --git a/pkg/logql/log/fmt.go b/pkg/logql/log/fmt.go index 745aed711974..e30d92a82334 100644 --- a/pkg/logql/log/fmt.go +++ b/pkg/logql/log/fmt.go @@ -6,6 +6,7 @@ import ( "regexp" "strings" "text/template" + "text/template/parse" ) var ( @@ -86,6 +87,61 @@ func (lf *LineFormatter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) return res, true } +func (lf *LineFormatter) RequiredLabelNames() []string { + return uniqueString(listNodeFields(lf.Root)) +} + +func listNodeFields(node parse.Node) []string { + var res []string + if node.Type() == parse.NodeAction { + res = append(res, listNodeFieldsFromPipe(node.(*parse.ActionNode).Pipe)...) + } + res = append(res, listNodeFieldsFromBranch(node)...) + if ln, ok := node.(*parse.ListNode); ok { + for _, n := range ln.Nodes { + res = append(res, listNodeFields(n)...) + } + } + return res +} + +func listNodeFieldsFromBranch(node parse.Node) []string { + var res []string + var b parse.BranchNode + switch node.Type() { + case parse.NodeIf: + b = node.(*parse.IfNode).BranchNode + case parse.NodeWith: + b = node.(*parse.WithNode).BranchNode + case parse.NodeRange: + b = node.(*parse.RangeNode).BranchNode + default: + return res + } + if b.Pipe != nil { + res = append(res, listNodeFieldsFromPipe(b.Pipe)...) + } + if b.List != nil { + res = append(res, listNodeFields(b.List)...) + } + if b.ElseList != nil { + res = append(res, listNodeFields(b.ElseList)...) + } + return res +} + +func listNodeFieldsFromPipe(p *parse.PipeNode) []string { + var res []string + for _, c := range p.Cmds { + for _, a := range c.Args { + if f, ok := a.(*parse.FieldNode); ok { + res = append(res, f.Ident...) + } + } + } + return res +} + // LabelFmt is a configuration struct for formatting a label. type LabelFmt struct { Name string @@ -187,6 +243,11 @@ func (lf *LabelsFormatter) Process(l []byte, lbs *LabelsBuilder) ([]byte, bool) return l, true } +func (lf *LabelsFormatter) RequiredLabelNames() []string { + var names []string + return names +} + func trunc(c int, s string) string { runes := []rune(s) l := len(runes) diff --git a/pkg/logql/log/fmt_test.go b/pkg/logql/log/fmt_test.go index 784c1434983f..0e9a3e3a8483 100644 --- a/pkg/logql/log/fmt_test.go +++ b/pkg/logql/log/fmt_test.go @@ -290,3 +290,22 @@ func Test_substring(t *testing.T) { }) } } + +func TestLineFormatter_RequiredLabelNames(t *testing.T) { + tests := []struct { + fmt string + want []string + }{ + {`{{.foo}} and {{.bar}}`, []string{"foo", "bar"}}, + {`{{ .foo | ToUpper | .buzz }} and {{.bar}}`, []string{"foo", "buzz", "bar"}}, + {`{{ regexReplaceAllLiteral "(p)" .foo "${1}" }}`, []string{"foo"}}, + {`{{ if .foo | hasSuffix "Ip" }} {{.bar}} {{end}}-{{ if .foo | hasSuffix "pw"}}no{{end}}`, []string{"foo", "bar"}}, + {`{{with .foo}}{{printf "%q" .}} {{end}}`, []string{"foo"}}, + {`{{with .foo}}{{printf "%q" .}} {{else}} {{ .buzz | lower }} {{end}}`, []string{"foo", "buzz"}}, + } + for _, tt := range tests { + t.Run(tt.fmt, func(t *testing.T) { + require.Equal(t, tt.want, newMustLineFormatter(tt.fmt).RequiredLabelNames()) + }) + } +} diff --git a/pkg/logql/log/label_filter.go b/pkg/logql/log/label_filter.go index 5edca0f4f49e..d433e48a9753 100644 --- a/pkg/logql/log/label_filter.go +++ b/pkg/logql/log/label_filter.go @@ -95,6 +95,13 @@ func (b *BinaryLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bo return line, lok && rok } +func (b *BinaryLabelFilter) RequiredLabelNames() []string { + var names []string + names = append(names, b.Left.RequiredLabelNames()...) + names = append(names, b.Right.RequiredLabelNames()...) + return uniqueString(names) +} + func (b *BinaryLabelFilter) String() string { var sb strings.Builder sb.WriteString("( ") @@ -113,6 +120,7 @@ type noopLabelFilter struct{} func (noopLabelFilter) String() string { return "" } func (noopLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { return line, true } +func (noopLabelFilter) RequiredLabelNames() []string { return []string{} } // ReduceAndLabelFilter Reduces multiple label filterer into one using binary and operation. func ReduceAndLabelFilter(filters []LabelFilterer) LabelFilterer { @@ -179,6 +187,10 @@ func (d *BytesLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, boo } } +func (d *BytesLabelFilter) RequiredLabelNames() []string { + return []string{d.Name} +} + func (d *BytesLabelFilter) String() string { b := strings.Map(func(r rune) rune { if unicode.IsSpace(r) { @@ -239,6 +251,10 @@ func (d *DurationLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, } } +func (d *DurationLabelFilter) RequiredLabelNames() []string { + return []string{d.Name} +} + func (d *DurationLabelFilter) String() string { return fmt.Sprintf("%s%s%s", d.Name, d.Type, d.Value) } @@ -294,6 +310,10 @@ func (n *NumericLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, b } +func (n *NumericLabelFilter) RequiredLabelNames() []string { + return []string{n.Name} +} + func (n *NumericLabelFilter) String() string { return fmt.Sprintf("%s%s%s", n.Name, n.Type, strconv.FormatFloat(n.Value, 'f', -1, 64)) } @@ -318,3 +338,7 @@ func (s *StringLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bo v, _ := lbs.Get(s.Name) return line, s.Matches(v) } + +func (s *StringLabelFilter) RequiredLabelNames() []string { + return []string{s.Name} +} diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index df63afc86e79..3d6d2bd833d2 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -68,6 +68,7 @@ type BaseLabelsBuilder struct { err string groups []string + parserKeyHints []string // label key hints for metric queries that allows to limit parser extractions to only this list of labels. without, noLabels bool resultCache map[uint64]LabelsResult @@ -84,21 +85,22 @@ type LabelsBuilder struct { } // NewBaseLabelsBuilderWithGrouping creates a new base labels builder with grouping to compute results. -func NewBaseLabelsBuilderWithGrouping(groups []string, without, noLabels bool) *BaseLabelsBuilder { +func NewBaseLabelsBuilderWithGrouping(groups []string, parserKeyHints []string, without, noLabels bool) *BaseLabelsBuilder { return &BaseLabelsBuilder{ - del: make([]string, 0, 5), - add: make([]labels.Label, 0, 16), - resultCache: make(map[uint64]LabelsResult), - hasher: newHasher(), - groups: groups, - noLabels: noLabels, - without: without, + del: make([]string, 0, 5), + add: make([]labels.Label, 0, 16), + resultCache: make(map[uint64]LabelsResult), + hasher: newHasher(), + groups: groups, + parserKeyHints: parserKeyHints, + noLabels: noLabels, + without: without, } } // NewLabelsBuilder creates a new base labels builder. func NewBaseLabelsBuilder() *BaseLabelsBuilder { - return NewBaseLabelsBuilderWithGrouping(nil, false, false) + return NewBaseLabelsBuilderWithGrouping(nil, nil, false, false) } // ForLabels creates a labels builder for a given labels set as base. @@ -129,6 +131,12 @@ func (b *LabelsBuilder) Reset() { b.err = "" } +// ParserLabelHints returns a limited list of expected labels to extract for metric queries. +// Returns nil when it's impossible to hint labels extractions. +func (b *BaseLabelsBuilder) ParserLabelHints() []string { + return b.parserKeyHints +} + // SetErr sets the error label. func (b *LabelsBuilder) SetErr(err string) *LabelsBuilder { b.err = err diff --git a/pkg/logql/log/labels_test.go b/pkg/logql/log/labels_test.go index 493fb617bb51..69c446982ae2 100644 --- a/pkg/logql/log/labels_test.go +++ b/pkg/logql/log/labels_test.go @@ -88,7 +88,7 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) { labels.Label{Name: "cluster", Value: "us-central1"}, } sort.Sort(lbs) - b := NewBaseLabelsBuilderWithGrouping([]string{"namespace"}, false, false).ForLabels(lbs, lbs.Hash()) + b := NewBaseLabelsBuilderWithGrouping([]string{"namespace"}, nil, false, false).ForLabels(lbs, lbs.Hash()) b.Reset() assertLabelResult(t, labels.Labels{labels.Label{Name: "namespace", Value: "loki"}}, b.GroupedLabels()) b.SetErr("err") @@ -109,7 +109,7 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) { // cached. assertLabelResult(t, expected, b.GroupedLabels()) - b = NewBaseLabelsBuilderWithGrouping([]string{"job"}, false, false).ForLabels(lbs, lbs.Hash()) + b = NewBaseLabelsBuilderWithGrouping([]string{"job"}, nil, false, false).ForLabels(lbs, lbs.Hash()) assertLabelResult(t, labels.Labels{labels.Label{Name: "job", Value: "us-central1/loki"}}, b.GroupedLabels()) assertLabelResult(t, labels.Labels{labels.Label{Name: "job", Value: "us-central1/loki"}}, b.GroupedLabels()) b.Del("job") @@ -118,7 +118,7 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) { b.Set("namespace", "tempo") assertLabelResult(t, labels.Labels{labels.Label{Name: "job", Value: "us-central1/loki"}}, b.GroupedLabels()) - b = NewBaseLabelsBuilderWithGrouping([]string{"job"}, true, false).ForLabels(lbs, lbs.Hash()) + b = NewBaseLabelsBuilderWithGrouping([]string{"job"}, nil, true, false).ForLabels(lbs, lbs.Hash()) b.Del("job") b.Set("foo", "bar") b.Set("job", "something") @@ -130,7 +130,7 @@ func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) { sort.Sort(expected) assertLabelResult(t, expected, b.GroupedLabels()) - b = NewBaseLabelsBuilderWithGrouping(nil, false, false).ForLabels(lbs, lbs.Hash()) + b = NewBaseLabelsBuilderWithGrouping(nil, nil, false, false).ForLabels(lbs, lbs.Hash()) b.Set("foo", "bar") b.Set("job", "something") expected = labels.Labels{ diff --git a/pkg/logql/log/metrics_extraction.go b/pkg/logql/log/metrics_extraction.go index 58cacf503816..f710b47a1824 100644 --- a/pkg/logql/log/metrics_extraction.go +++ b/pkg/logql/log/metrics_extraction.go @@ -48,10 +48,16 @@ type lineSampleExtractor struct { // NewLineSampleExtractor creates a SampleExtractor from a LineExtractor. // Multiple log stages are run before converting the log line. func NewLineSampleExtractor(ex LineExtractor, stages []Stage, groups []string, without bool, noLabels bool) (SampleExtractor, error) { + s := ReduceStages(stages) + var expectedLabels []string + if !without { + expectedLabels = append(expectedLabels, s.RequiredLabelNames()...) + expectedLabels = uniqueString(append(expectedLabels, groups...)) + } return &lineSampleExtractor{ - Stage: ReduceStages(stages), + Stage: s, LineExtractor: ex, - baseBuilder: NewBaseLabelsBuilderWithGrouping(groups, without, noLabels), + baseBuilder: NewBaseLabelsBuilderWithGrouping(groups, expectedLabels, without, noLabels), streamExtractors: make(map[uint64]StreamSampleExtractor), }, nil } @@ -132,12 +138,20 @@ func LabelExtractorWithStages( groups = append(groups, labelName) sort.Strings(groups) } + preStage := ReduceStages(preStages) + var expectedLabels []string + if !without { + expectedLabels = append(expectedLabels, preStage.RequiredLabelNames()...) + expectedLabels = append(expectedLabels, groups...) + expectedLabels = append(expectedLabels, postFilter.RequiredLabelNames()...) + expectedLabels = uniqueString(expectedLabels) + } return &labelSampleExtractor{ - preStage: ReduceStages(preStages), + preStage: preStage, conversionFn: convFn, labelName: labelName, postFilter: postFilter, - baseBuilder: NewBaseLabelsBuilderWithGrouping(groups, without, noLabels), + baseBuilder: NewBaseLabelsBuilderWithGrouping(groups, expectedLabels, without, noLabels), streamExtractors: make(map[uint64]StreamSampleExtractor), }, nil } diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go index aa9ef3d28eda..47bd612c534f 100644 --- a/pkg/logql/log/parser.go +++ b/pkg/logql/log/parser.go @@ -3,8 +3,8 @@ package log import ( "errors" "fmt" + "io" "regexp" - "strconv" "strings" "github.com/grafana/loki/pkg/logql/log/logfmt" @@ -14,8 +14,10 @@ import ( ) const ( - jsonSpacer = "_" + jsonSpacer = '_' duplicateSuffix = "_extracted" + trueString = "true" + falseString = "false" ) var ( @@ -26,67 +28,169 @@ var ( errMissingCapture = errors.New("at least one named capture must be supplied") ) -func addLabel(lbs *LabelsBuilder, key, value string) { - key = sanitizeKey(key) - if lbs.BaseHas(key) { - key = fmt.Sprintf("%s%s", key, duplicateSuffix) +type JSONParser struct { + buf []byte // buffer used to build json keys + lbs *LabelsBuilder +} + +// NewJSONParser creates a log stage that can parse a json log line and add properties as labels. +func NewJSONParser() *JSONParser { + return &JSONParser{ + buf: make([]byte, 0, 1024), } - lbs.Set(key, value) } -func sanitizeKey(key string) string { - if len(key) == 0 { - return key +func (j *JSONParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { + it := jsoniter.ConfigFastest.BorrowIterator(line) + defer jsoniter.ConfigFastest.ReturnIterator(it) + + // reset the state. + j.buf = j.buf[:0] + j.lbs = lbs + + if err := j.readObject(it); err != nil { + lbs.SetErr(errJSON) + return line, true + } + return line, true +} + +func (j *JSONParser) readObject(it *jsoniter.Iterator) error { + // we only care about object and values. + if nextType := it.WhatIsNext(); nextType != jsoniter.ObjectValue { + return fmt.Errorf("expecting json object(%d), got %d", jsoniter.ObjectValue, nextType) } - key = strings.TrimSpace(key) - if key[0] >= '0' && key[0] <= '9' { - key = "_" + key + _ = it.ReadMapCB(j.parseMap("")) + if it.Error != nil && it.Error != io.EOF { + return it.Error } - return strings.Map(func(r rune) rune { - if (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || r == '_' || (r >= '0' && r <= '9') { - return r + return nil +} + +func (j *JSONParser) parseMap(prefix string) func(iter *jsoniter.Iterator, field string) bool { + return func(iter *jsoniter.Iterator, field string) bool { + switch iter.WhatIsNext() { + // are we looking at a value that needs to be added ? + case jsoniter.StringValue, jsoniter.NumberValue, jsoniter.BoolValue: + j.parseLabelValue(iter, prefix, field) + // Or another new object based on a prefix. + case jsoniter.ObjectValue: + if key, ok := j.nextKeyPrefix(prefix, field); ok { + return iter.ReadMapCB(j.parseMap(key)) + } + // If this keys is not expected we skip the object + iter.Skip() + default: + iter.Skip() } - return '_' - }, key) + return true + } } -type JSONParser struct{} +func (j *JSONParser) nextKeyPrefix(prefix, field string) (string, bool) { + // first time we add return the field as prefix. + if len(prefix) == 0 { + field = sanitizeLabelKey(field, true) + if isValidKeyPrefix(field, j.lbs.ParserLabelHints()) { + return field, true + } + return "", false + } + // otherwise we build the prefix and check using the buffer + j.buf = j.buf[:0] + j.buf = append(j.buf, prefix...) + j.buf = append(j.buf, byte(jsonSpacer)) + j.buf = append(j.buf, sanitizeLabelKey(field, false)...) + // if matches keep going + if isValidKeyPrefix(string(j.buf), j.lbs.ParserLabelHints()) { + return string(j.buf), true + } + return "", false -// NewJSONParser creates a log stage that can parse a json log line and add properties as labels. -func NewJSONParser() *JSONParser { - return &JSONParser{} } -func (j *JSONParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { - data := map[string]interface{}{} - err := jsoniter.ConfigFastest.Unmarshal(line, &data) - if err != nil { - lbs.SetErr(errJSON) - return line, true +// isValidKeyPrefix extract an object if the prefix is valid +func isValidKeyPrefix(objectprefix string, hints []string) bool { + if len(hints) == 0 { + return true } - parseMap("", data, lbs) - return line, true + for _, k := range hints { + if strings.HasPrefix(k, objectprefix) { + return true + } + } + return false +} + +func (j *JSONParser) parseLabelValue(iter *jsoniter.Iterator, prefix, field string) { + // the first time we use the field as label key. + if len(prefix) == 0 { + field = sanitizeLabelKey(field, true) + if !shouldExtractKey(field, j.lbs.ParserLabelHints()) { + // we can skip the value + iter.Skip() + return + + } + if j.lbs.BaseHas(field) { + field = field + duplicateSuffix + } + j.lbs.Set(field, readValue(iter)) + return + + } + // otherwise we build the label key using the buffer + j.buf = j.buf[:0] + j.buf = append(j.buf, prefix...) + j.buf = append(j.buf, byte(jsonSpacer)) + j.buf = append(j.buf, sanitizeLabelKey(field, false)...) + if j.lbs.BaseHas(string(j.buf)) { + j.buf = append(j.buf, duplicateSuffix...) + } + if !shouldExtractKey(string(j.buf), j.lbs.ParserLabelHints()) { + iter.Skip() + return + } + j.lbs.Set(string(j.buf), readValue(iter)) } -func parseMap(prefix string, data map[string]interface{}, lbs *LabelsBuilder) { - for key, val := range data { - switch concrete := val.(type) { - case map[string]interface{}: - parseMap(jsonKey(prefix, key), concrete, lbs) - case string: - addLabel(lbs, jsonKey(prefix, key), concrete) - case float64: - f := strconv.FormatFloat(concrete, 'f', -1, 64) - addLabel(lbs, jsonKey(prefix, key), f) +func (j *JSONParser) RequiredLabelNames() []string { return []string{} } + +func readValue(iter *jsoniter.Iterator) string { + switch iter.WhatIsNext() { + case jsoniter.StringValue: + return iter.ReadString() + case jsoniter.NumberValue: + return iter.ReadNumber().String() + case jsoniter.BoolValue: + if iter.ReadBool() { + return trueString } + return falseString + default: + iter.Skip() + return "" } } -func jsonKey(prefix, key string) string { - if prefix == "" { - return key +func shouldExtractKey(key string, hints []string) bool { + if len(hints) == 0 { + return true + } + for _, k := range hints { + if k == key { + return true + } + } + return false +} + +func addLabel(lbs *LabelsBuilder, key, value string) { + key = sanitizeLabelKey(key, true) + if lbs.BaseHas(key) { + key = fmt.Sprintf("%s%s", key, duplicateSuffix) } - return fmt.Sprintf("%s%s%s", prefix, jsonSpacer, key) + lbs.Set(key, value) } type RegexpParser struct { @@ -144,6 +248,8 @@ func (r *RegexpParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { return line, true } +func (r *RegexpParser) RequiredLabelNames() []string { return []string{} } + type LogfmtParser struct { dec *logfmt.Decoder } @@ -158,8 +264,10 @@ func NewLogfmtParser() *LogfmtParser { func (l *LogfmtParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { l.dec.Reset(line) - for l.dec.ScanKeyval() { + if !shouldExtractKey(string(l.dec.Key()), lbs.ParserLabelHints()) { + continue + } key := string(l.dec.Key()) val := string(l.dec.Value()) addLabel(lbs, key, val) @@ -170,3 +278,5 @@ func (l *LogfmtParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { } return line, true } + +func (l *LogfmtParser) RequiredLabelNames() []string { return []string{} } diff --git a/pkg/logql/log/parser_test.go b/pkg/logql/log/parser_test.go index 659b7ce17a8a..d2b44940725d 100644 --- a/pkg/logql/log/parser_test.go +++ b/pkg/logql/log/parser_test.go @@ -61,7 +61,7 @@ func Test_jsonParser_Parse(t *testing.T) { }, { "duplicate extraction", - []byte(`{"app":"foo","namespace":"prod","pod":{"uuid":"foo","deployment":{"ref":"foobar"}}}`), + []byte(`{"app":"foo","namespace":"prod","pod":{"uuid":"foo","deployment":{"ref":"foobar"}},"next":{"err":false}}`), labels.Labels{ {Name: "app", Value: "bar"}, }, @@ -70,6 +70,7 @@ func Test_jsonParser_Parse(t *testing.T) { {Name: "app_extracted", Value: "foo"}, {Name: "namespace", Value: "prod"}, {Name: "pod_uuid", Value: "foo"}, + {Name: "next_err", Value: "false"}, {Name: "pod_deployment_ref", Value: "foobar"}, }, }, @@ -86,6 +87,57 @@ func Test_jsonParser_Parse(t *testing.T) { } } +func Benchmark_Parser(b *testing.B) { + + lbs := labels.Labels{ + {Name: "cluster", Value: "qa-us-central1"}, + {Name: "namespace", Value: "qa"}, + {Name: "filename", Value: "/var/log/pods/ingress-nginx_nginx-ingress-controller-7745855568-blq6t_1f8962ef-f858-4188-a573-ba276a3cacc3/ingress-nginx/0.log"}, + {Name: "job", Value: "ingress-nginx/nginx-ingress-controller"}, + {Name: "name", Value: "nginx-ingress-controller"}, + {Name: "pod", Value: "nginx-ingress-controller-7745855568-blq6t"}, + {Name: "pod_template_hash", Value: "7745855568"}, + {Name: "stream", Value: "stdout"}, + } + + jsonLine := `{"proxy_protocol_addr": "","remote_addr": "3.112.221.14","remote_user": "","upstream_addr": "10.12.15.234:5000","the_real_ip": "3.112.221.14","timestamp": "2020-12-11T16:20:07+00:00","protocol": "HTTP/1.1","upstream_name": "hosted-grafana-hosted-grafana-api-80","request": {"id": "c8eacb6053552c0cd1ae443bc660e140","time": "0.001","method" : "GET","host": "hg-api-qa-us-central1.grafana.net","uri": "/","size" : "128","user_agent": "worldping-api","referer": ""},"response": {"status": 200,"upstream_status": "200","size": "1155","size_sent": "265","latency_seconds": "0.001"}}` + logfmtLine := `level=info ts=2020-12-14T21:25:20.947307459Z caller=metrics.go:83 org_id=29 traceID=c80e691e8db08e2 latency=fast query="sum by (object_name) (rate(({container=\"metrictank\", cluster=\"hm-us-east2\"} |= \"PANIC\")[5m]))" query_type=metric range_type=range length=5m0s step=15s duration=322.623724ms status=200 throughput=1.2GB total_bytes=375MB` + nginxline := `10.1.0.88 - - [14/Dec/2020:22:56:24 +0000] "GET /static/img/about/bob.jpg HTTP/1.1" 200 60755 "https://grafana.com/go/observabilitycon/grafana-the-open-and-composable-observability-platform/?tech=ggl-o&pg=oss-graf&plcmt=hero-txt" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.1 Safari/605.1.15" "123.123.123.123, 35.35.122.223" "TLSv1.3"` + + for _, tt := range []struct { + name string + line string + s Stage + LabelParseHints []string // hints to reduce label extractions. + }{ + {"json", jsonLine, NewJSONParser(), []string{"response_latency_seconds"}}, + {"logfmt", logfmtLine, NewLogfmtParser(), []string{"info", "throughput", "org_id"}}, + {"regex greedy", nginxline, mustNewRegexParser(`GET (?P.*?)/\?`), []string{"path"}}, + {"regex status digits", nginxline, mustNewRegexParser(`HTTP/1.1" (?P\d{3}) `), []string{"statuscode"}}, + } { + b.Run(tt.name, func(b *testing.B) { + line := []byte(tt.line) + b.Run("no labels hints", func(b *testing.B) { + builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) + for n := 0; n < b.N; n++ { + builder.Reset() + _, _ = tt.s.Process(line, builder) + } + }) + + b.Run("labels hints", func(b *testing.B) { + builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) + builder.parserKeyHints = tt.LabelParseHints + for n := 0; n < b.N; n++ { + builder.Reset() + _, _ = tt.s.Process(line, builder) + } + }) + }) + } + +} + func TestNewRegexpParser(t *testing.T) { tests := []struct { name string @@ -289,24 +341,3 @@ func Test_logfmtParser_Parse(t *testing.T) { }) } } - -func Test_sanitizeKey(t *testing.T) { - tests := []struct { - key string - want string - }{ - {"1", "_1"}, - {"1 1 1", "_1_1_1"}, - {"abc", "abc"}, - {"$a$bc", "_a_bc"}, - {"$a$bc", "_a_bc"}, - {" 1 1 1 \t", "_1_1_1"}, - } - for _, tt := range tests { - t.Run(tt.key, func(t *testing.T) { - if got := sanitizeKey(tt.key); got != tt.want { - t.Errorf("sanitizeKey() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/pkg/logql/log/pipeline.go b/pkg/logql/log/pipeline.go index 3dafcd77f9a8..874b3b540dd1 100644 --- a/pkg/logql/log/pipeline.go +++ b/pkg/logql/log/pipeline.go @@ -28,6 +28,7 @@ type StreamPipeline interface { // return the line unchanged or allocate a new line. type Stage interface { Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) + RequiredLabelNames() []string } // NewNoopPipeline creates a pipelines that does not process anything and returns log streams as is. @@ -74,11 +75,22 @@ type noopStage struct{} func (noopStage) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { return line, true } +func (noopStage) RequiredLabelNames() []string { return []string{} } -type StageFunc func(line []byte, lbs *LabelsBuilder) ([]byte, bool) +type StageFunc struct { + process func(line []byte, lbs *LabelsBuilder) ([]byte, bool) + requiredLabels []string +} func (fn StageFunc) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { - return fn(line, lbs) + return fn.process(line, lbs) +} + +func (fn StageFunc) RequiredLabelNames() []string { + if fn.requiredLabels == nil { + return []string{} + } + return fn.requiredLabels } // pipeline is a combinations of multiple stages. @@ -147,16 +159,23 @@ func ReduceStages(stages []Stage) Stage { if len(stages) == 0 { return NoopStage } - return StageFunc(func(line []byte, lbs *LabelsBuilder) ([]byte, bool) { - var ok bool - for _, p := range stages { - line, ok = p.Process(line, lbs) - if !ok { - return nil, false + var requiredLabelNames []string + for _, s := range stages { + requiredLabelNames = append(requiredLabelNames, s.RequiredLabelNames()...) + } + return StageFunc{ + process: func(line []byte, lbs *LabelsBuilder) ([]byte, bool) { + var ok bool + for _, p := range stages { + line, ok = p.Process(line, lbs) + if !ok { + return nil, false + } } - } - return line, true - }) + return line, true + }, + requiredLabels: requiredLabelNames, + } } func unsafeGetBytes(s string) []byte { diff --git a/pkg/logql/log/util.go b/pkg/logql/log/util.go new file mode 100644 index 000000000000..d12ed21525f7 --- /dev/null +++ b/pkg/logql/log/util.go @@ -0,0 +1,35 @@ +package log + +import ( + "strings" +) + +func uniqueString(s []string) []string { + unique := make(map[string]bool, len(s)) + us := make([]string, len(unique)) + for _, elem := range s { + if len(elem) != 0 { + if !unique[elem] { + us = append(us, elem) + unique[elem] = true + } + } + } + return us +} + +func sanitizeLabelKey(key string, isPrefix bool) string { + if len(key) == 0 { + return key + } + key = strings.TrimSpace(key) + if isPrefix && key[0] >= '0' && key[0] <= '9' { + key = "_" + key + } + return strings.Map(func(r rune) rune { + if (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || r == '_' || (r >= '0' && r <= '9') { + return r + } + return '_' + }, key) +} diff --git a/pkg/logql/log/util_test.go b/pkg/logql/log/util_test.go new file mode 100644 index 000000000000..b16172abf9e2 --- /dev/null +++ b/pkg/logql/log/util_test.go @@ -0,0 +1,24 @@ +package log + +import "testing" + +func Test_sanitizeLabelKey(t *testing.T) { + tests := []struct { + key string + want string + }{ + {"1", "_1"}, + {"1 1 1", "_1_1_1"}, + {"abc", "abc"}, + {"$a$bc", "_a_bc"}, + {"$a$bc", "_a_bc"}, + {" 1 1 1 \t", "_1_1_1"}, + } + for _, tt := range tests { + t.Run(tt.key, func(t *testing.T) { + if got := sanitizeLabelKey(tt.key, true); got != tt.want { + t.Errorf("sanitizeKey() = %v, want %v", got, tt.want) + } + }) + } +}