-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add tag "truncated" to "log.flags" if incoming line is longer than configured limit #7991
Changes from 13 commits
5aaadff
e13ab92
a81626a
af7c81d
f8eb9c7
b16cff8
84584c5
03ee796
496ebe4
a01440c
b9aa508
cd0561e
43b8c57
da33c4d
2870f20
5d8cb05
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,13 +49,28 @@ func (m *Message) IsEmpty() bool { | |
return false | ||
} | ||
|
||
func (msg *Message) AddFields(fields common.MapStr) { | ||
// AddFields adds fields to the message. | ||
func (m *Message) AddFields(fields common.MapStr) { | ||
if fields == nil { | ||
return | ||
} | ||
|
||
if msg.Fields == nil { | ||
msg.Fields = common.MapStr{} | ||
if m.Fields == nil { | ||
m.Fields = common.MapStr{} | ||
} | ||
msg.Fields.Update(fields) | ||
m.Fields.Update(fields) | ||
} | ||
|
||
// AddTagsWithKey adds tags to the message with an arbitrary key. | ||
// If the field does not exist, it is created. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe make
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should name it in the code the same as it's in the event again. If we stay with Would |
||
func (m *Message) AddFlagsWithKey(key string, flags ...string) error { | ||
if len(flags) == 0 { | ||
return nil | ||
} | ||
|
||
if m.Fields == nil { | ||
m.Fields = common.MapStr{} | ||
} | ||
|
||
return common.AddTagsWithKey(m.Fields, key, flags) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,6 +48,7 @@ type Reader struct { | |
separator []byte | ||
last []byte | ||
numLines int | ||
truncated int | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I didn't think about multiline being able to capture truncated bytes as well. Good catch 👍 |
||
err error // last seen error | ||
state func(*Reader) (reader.Message, error) | ||
message reader.Message | ||
|
@@ -262,13 +263,19 @@ func (mlr *Reader) clear() { | |
mlr.message = reader.Message{} | ||
mlr.last = nil | ||
mlr.numLines = 0 | ||
mlr.truncated = 0 | ||
mlr.err = nil | ||
} | ||
|
||
// finalize writes the existing content into the returned message and resets all reader variables. | ||
func (mlr *Reader) finalize() reader.Message { | ||
if mlr.truncated > 0 { | ||
mlr.message.AddFlagsWithKey("log.flags", "truncated") | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
// Copy message from existing content | ||
msg := mlr.message | ||
|
||
mlr.clear() | ||
return msg | ||
} | ||
|
@@ -303,6 +310,16 @@ func (mlr *Reader) addLine(m reader.Message) { | |
} | ||
mlr.message.Content = append(tmp, m.Content[:space]...) | ||
mlr.numLines++ | ||
|
||
// add number of truncated bytes to fields | ||
diff := len(m.Content) - space | ||
if diff > 0 { | ||
mlr.truncated += diff | ||
} | ||
} else { | ||
// increase the number of skipped bytes, if cannot add | ||
mlr.truncated += len(m.Content) | ||
|
||
} | ||
|
||
mlr.last = m.Content | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -150,6 +150,41 @@ func TestMultilineBeforeNegateOKWithEmptyLine(t *testing.T) { | |
) | ||
} | ||
|
||
func TestMultilineAfterTruncated(t *testing.T) { | ||
pattern := match.MustCompile(`^[ ]`) // next line is indented a space | ||
maxLines := 2 | ||
testMultilineTruncated(t, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you also add a test where the truncated flag should be misssing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
Config{ | ||
Pattern: &pattern, | ||
Match: "after", | ||
MaxLines: &maxLines, | ||
}, | ||
2, | ||
true, | ||
[]string{ | ||
"line1\n line1.1\n line1.2\n", | ||
"line2\n line2.1\n line2.2\n"}, | ||
[]string{ | ||
"line1\n line1.1", | ||
"line2\n line2.1"}, | ||
) | ||
testMultilineTruncated(t, | ||
Config{ | ||
Pattern: &pattern, | ||
Match: "after", | ||
MaxLines: &maxLines, | ||
}, | ||
2, | ||
false, | ||
[]string{ | ||
"line1\n line1.1\n", | ||
"line2\n line2.1\n"}, | ||
[]string{ | ||
"line1\n line1.1", | ||
"line2\n line2.1"}, | ||
) | ||
} | ||
|
||
func testMultilineOK(t *testing.T, cfg Config, events int, expected ...string) { | ||
_, buf := createLineBuffer(expected...) | ||
r := createMultilineTestReader(t, buf, cfg) | ||
|
@@ -177,6 +212,54 @@ func testMultilineOK(t *testing.T, cfg Config, events int, expected ...string) { | |
} | ||
} | ||
|
||
func testMultilineTruncated(t *testing.T, cfg Config, events int, truncated bool, input, expected []string) { | ||
_, buf := createLineBuffer(input...) | ||
r := createMultilineTestReader(t, buf, cfg) | ||
|
||
var messages []reader.Message | ||
for { | ||
message, err := r.Next() | ||
if err != nil { | ||
break | ||
} | ||
|
||
messages = append(messages, message) | ||
} | ||
|
||
if len(messages) != events { | ||
t.Fatalf("expected %v lines, read only %v line(s)", len(expected), len(messages)) | ||
} | ||
|
||
for _, message := range messages { | ||
found := false | ||
statusFlags, err := message.Fields.GetValue("log.flags") | ||
if err != nil { | ||
if !truncated { | ||
assert.False(t, found) | ||
return | ||
} | ||
t.Fatalf("error while getting log.status field: %v", err) | ||
} | ||
|
||
switch flags := statusFlags.(type) { | ||
case []string: | ||
for _, f := range flags { | ||
if f == "truncated" { | ||
found = true | ||
} | ||
} | ||
default: | ||
t.Fatalf("incorrect type for log.flags") | ||
} | ||
|
||
if truncated { | ||
assert.True(t, found) | ||
} else { | ||
assert.False(t, found) | ||
} | ||
} | ||
} | ||
|
||
func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reader.Reader { | ||
encFactory, ok := encoding.FindEncoding("plain") | ||
if !ok { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
// +build !integration | ||
|
||
package readfile | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
|
||
"github.com/elastic/beats/filebeat/reader" | ||
) | ||
|
||
type mockReader struct { | ||
line []byte | ||
} | ||
|
||
func (m *mockReader) Next() (reader.Message, error) { | ||
return reader.Message{ | ||
Content: m.line, | ||
}, nil | ||
} | ||
|
||
var limitTests = []struct { | ||
line string | ||
maxBytes int | ||
truncated bool | ||
}{ | ||
{"long-long-line", 5, true}, | ||
{"long-long-line", 3, true}, | ||
{"long-long-line", len("long-long-line"), false}, | ||
} | ||
|
||
func TestLimitReader(t *testing.T) { | ||
for _, test := range limitTests { | ||
r := NewLimitReader(&mockReader{[]byte(test.line)}, test.maxBytes) | ||
|
||
msg, err := r.Next() | ||
if err != nil { | ||
t.Fatalf("Error reading from mock reader: %v", err) | ||
} | ||
|
||
assert.Equal(t, test.maxBytes, len(msg.Content)) | ||
|
||
found := false | ||
statusFlags, err := msg.Fields.GetValue("log.flags") | ||
if err != nil { | ||
if !test.truncated { | ||
assert.False(t, found) | ||
return | ||
} | ||
t.Fatalf("Error getting truncated value: %v", err) | ||
} | ||
|
||
switch flags := statusFlags.(type) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why can it be both types? Same question for the other test. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added it to mimic the testing of the old |
||
case []string: | ||
for _, f := range flags { | ||
if f == "truncated" { | ||
found = true | ||
} | ||
} | ||
default: | ||
t.Fatalf("incorrect type for log.flags") | ||
} | ||
|
||
if test.truncated { | ||
assert.True(t, found) | ||
} else { | ||
assert.False(t, found) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -305,25 +305,38 @@ func MergeFields(ms, fields MapStr, underRoot bool) error { | |
// exist then it will be created. If the tags field exists and is not a []string | ||
// then an error will be returned. It does not deduplicate the list of tags. | ||
func AddTags(ms MapStr, tags []string) error { | ||
return AddTagsWithKey(ms, TagsKey, tags) | ||
} | ||
|
||
// AddTagsWithKey appends a tag to the key field of ms. If the field does not | ||
// exist then it will be created. If the field exists and is not a []string | ||
// then an error will be returned. It does not deduplicate the list. | ||
func AddTagsWithKey(ms MapStr, key string, tags []string) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add a test for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: I would keep the naming here as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. Done. |
||
if ms == nil || len(tags) == 0 { | ||
return nil | ||
} | ||
eventTags, exists := ms[TagsKey] | ||
if !exists { | ||
ms[TagsKey] = tags | ||
|
||
k, subMap, oldTags, present, err := mapFind(key, ms, true) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wodner if this has a perfomance impact on the old There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. Keys without dots can be processed in the first loop on a fast path in // Fast path, key is present as is.
if v, exists := data[key]; exists {
return key, data, v, true, nil
} If not, the only additional cost is one idx := strings.IndexRune(key, '.')
if idx < 0 {
return key, data, nil, false, nil
} |
||
if err != nil { | ||
return err | ||
} | ||
|
||
if !present { | ||
subMap[k] = tags | ||
return nil | ||
} | ||
|
||
switch arr := eventTags.(type) { | ||
switch arr := oldTags.(type) { | ||
case []string: | ||
ms[TagsKey] = append(arr, tags...) | ||
subMap[k] = append(arr, tags...) | ||
case []interface{}: | ||
for _, tag := range tags { | ||
arr = append(arr, tag) | ||
} | ||
ms[TagsKey] = arr | ||
subMap[k] = arr | ||
default: | ||
return errors.Errorf("expected string array by type is %T", eventTags) | ||
return errors.Errorf("expected string array by type is %T", oldTags) | ||
|
||
} | ||
return nil | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment on exported method Message.AddFlagsWithKey should be of the form "AddFlagsWithKey ..."