Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly update offset in case of unparasable line #22685

Merged
merged 8 commits into from
Apr 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix handing missing eventtime and assignip field being set to N/A for fortinet module. {pull}22361[22361]
- Fix Zeek dashboard reference to `zeek.ssl.server.name` field. {pull}21696[21696]
- Fix for `field [source] not present as part of path [source.ip]` error in azure pipelines. {pull}22377[22377]
- Properly update offset in case of unparasable line. {pull}22685[22685]
- Drop aws.vpcflow.pkt_srcaddr and aws.vpcflow.pkt_dstaddr when equal to "-". {pull}22721[22721] {issue}22716[22716]
- Fix cisco umbrella module config by adding input variable. {pull}22892[22892]
- Fix network.direction logic in zeek connection fileset. {pull}22967[22967]
Expand Down
3 changes: 0 additions & 3 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,6 @@ func (inp *filestream) readFromSource(
s.Offset = 0
case ErrClosed:
log.Info("Reader was closed. Closing.")
case reader.ErrLineUnparsable:
log.Info("Skipping unparsable line in file.")
continue
default:
log.Errorf("Read line error: %v", err)
}
Expand Down
4 changes: 0 additions & 4 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,6 @@ func (h *Harvester) Run() error {
logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source)
case ErrInactive:
logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive)
case reader.ErrLineUnparsable:
logp.Info("Skipping unparsable line in file: %v", h.state.Source)
//line unparsable, go to next line
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love when issues get solved by removing code 🙂

default:
logp.Err("Read line error: %v; File: %v", err, h.state.Source)
}
Expand Down
21 changes: 21 additions & 0 deletions filebeat/tests/files/logs/docker_corrupted.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{"log":"Fetching main repository github.com/elastic/beats...\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
{"log":"Fetching dependencies...\n","stream":"stdout","time":"2016-03-02T22:59:04.609292428Z"}
{"log":"Execute /scripts/packetbeat_before_build.sh\n","stream":"stdout","time":"2016-03-02T22:59:04.617434682Z"}
{"log":"patching file vendor/github.com/tsg/gopacket/pcap/pcap.go\n","stream":"stdout","time":"2016-03-02T22:59:04.626534779Z"}
{"log":"cp etc/packetbeat.template.json /build/packetbeat.template.json\n","stream":"stdout","time":"2016-03-02T22:59:04.639782988Z"}
{"log":"# linux\n","stream":"stdout","time":"2016-03-02T22:59:04.646276053Z"}
"log":"cp packetbeat.yml /build/packetbeat-linux.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.647847045Z"}
{"log":"# binary\n","stream":"stdout","time":"2016-03-02T22:59:04.653740138Z"}
{"log":"cp packetbeat.yml /build/packetbeat-binary.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.655979016Z"}
{"log":"# darwin\n","stream":"stdout","time":"2016-03-02T22:59:04.661181197Z"}
{"log":"cp packetbeat.yml /build/packetbeat-darwin.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.662859769Z"}
{"log":"sed -i.bk 's/device: any/device: en0/' /build/packetbeat-darwin.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.66649744Z"}
{"log":"rm /build/packetbeat-darwin.yml.bk\n","stream":"stdout","time":"2016-03-02T22:59:04.701199002Z"}
{"log":"# win\n","stream":"stdout","time":"2016-03-02T22:59:04.705067809Z"}
{"log":"cp packetbeat.yml /build/packetbeat-win.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.706629907Z"}
{"log":"sed -i.bk 's/device: any/device: 0/' /build/packetbeat-win.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.711993313Z"}
{"log":"rm /build/packetbeat-win.yml.bk\n","stream":"stdout","time":"2016-03-02T22:59:04.757913979Z"}
{"log":"Compiling for windows/amd64...\n","stream":"stdout","time":"2016-03-02T22:59:04.761895467Z"}
{"log":"Compiling for windows/386...\n","stream":"stdout","time":"2016-03-02T22:59:29.481736885Z"}
{"log":"Compiling for darwin/amd64...\n","stream":"stdout","time":"2016-03-02T22:59:55.205334574Z"}
{"log":"Moving binaries to host...\n","stream":"stdout","time":"2016-03-02T23:00:15.140397826Z"}
39 changes: 39 additions & 0 deletions filebeat/tests/system/test_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,42 @@ def test_container_input_cri(self):
output = self.read_output()
assert len(output) == 1
assert output[0]["stream"] == "stdout"

def test_container_input_registry_for_unparsable_lines(self):
"""
Test container input properly updates registry offset in case
of unparsable lines
"""
input_raw = """
- type: container
paths:
- {}/logs/*.log
"""
self.render_config_template(
input_raw=input_raw.format(os.path.abspath(self.working_dir)),
inputs=False,
)

os.mkdir(self.working_dir + "/logs/")
self.copy_files(["logs/docker_corrupted.log"],
target_dir="logs")

filebeat = self.start_beat()

self.wait_until(lambda: self.output_has(lines=20))

filebeat.check_kill_and_wait()

output = self.read_output()
assert len(output) == 20
assert output[19]["message"] == "Moving binaries to host..."
for o in output:
assert o["stream"] == "stdout"

# Check that file exist
data = self.get_registry()
logs = self.log_access()
assert logs.contains("Parse line error") == True
# bytes of healthy file are 2244 so for the corrupted one should
# be 2244-1=2243 since we removed one character
assert data[0]["offset"] == 2243
6 changes: 0 additions & 6 deletions libbeat/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package reader

import (
"errors"
"io"
)

Expand All @@ -30,8 +29,3 @@ type Reader interface {
io.Closer
Next() (Message, error)
}

var (
//ErrLineUnparsable is error thrown when Next() element from input is corrupted and can not be parsed
ErrLineUnparsable = errors.New("line is unparsable")
)
4 changes: 2 additions & 2 deletions libbeat/reader/readjson/docker_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
err = p.parseLine(&message, &logLine)
if err != nil {
p.logger.Errorf("Parse line error: %v", err)
return message, reader.ErrLineUnparsable
continue
}

// Handle multiline messages, join partial lines
Expand All @@ -219,7 +219,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
err = p.parseLine(&next, &logLine)
if err != nil {
p.logger.Errorf("Parse line error: %v", err)
return message, reader.ErrLineUnparsable
continue
}
message.Content = append(message.Content, next.Content...)
}
Expand Down
37 changes: 29 additions & 8 deletions libbeat/reader/readjson/docker_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package readjson

import (
"io"
"testing"
"time"

Expand Down Expand Up @@ -53,7 +54,7 @@ func TestDockerJSON(t *testing.T) {
name: "Wrong JSON",
input: [][]byte{[]byte(`this is not JSON`)},
stream: "all",
expectedError: reader.ErrLineUnparsable,
expectedError: io.EOF,
expectedMessage: reader.Message{
Bytes: 16,
},
Expand All @@ -73,7 +74,7 @@ func TestDockerJSON(t *testing.T) {
name: "Wrong CRI",
input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout`)},
stream: "all",
expectedError: reader.ErrLineUnparsable,
expectedError: io.EOF,
expectedMessage: reader.Message{
Bytes: 37,
},
Expand All @@ -82,7 +83,7 @@ func TestDockerJSON(t *testing.T) {
name: "Wrong CRI",
input: [][]byte{[]byte(`{this is not JSON nor CRI`)},
stream: "all",
expectedError: reader.ErrLineUnparsable,
expectedError: io.EOF,
expectedMessage: reader.Message{
Bytes: 25,
},
Expand All @@ -91,7 +92,7 @@ func TestDockerJSON(t *testing.T) {
name: "Missing time",
input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)},
stream: "all",
expectedError: reader.ErrLineUnparsable,
expectedError: io.EOF,
expectedMessage: reader.Message{
Bytes: 82,
},
Expand Down Expand Up @@ -218,7 +219,7 @@ func TestDockerJSON(t *testing.T) {
input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)},
stream: "all",
format: "cri",
expectedError: reader.ErrLineUnparsable,
expectedError: io.EOF,
expectedMessage: reader.Message{
Bytes: 82,
},
Expand All @@ -228,7 +229,7 @@ func TestDockerJSON(t *testing.T) {
input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`)},
stream: "all",
format: "docker",
expectedError: reader.ErrLineUnparsable,
expectedError: io.EOF,
expectedMessage: reader.Message{
Bytes: 115,
},
Expand Down Expand Up @@ -300,7 +301,7 @@ func TestDockerJSON(t *testing.T) {
[]byte(`{"log":"shutdown...\n","stream`),
},
stream: "stdout",
expectedError: reader.ErrLineUnparsable,
expectedError: io.EOF,
expectedMessage: reader.Message{
Bytes: 139,
},
Expand All @@ -324,11 +325,25 @@ func TestDockerJSON(t *testing.T) {
name: "Corrupted log message line",
input: [][]byte{[]byte(`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`)},
stream: "all",
expectedError: reader.ErrLineUnparsable,
expectedError: io.EOF,
expectedMessage: reader.Message{
Bytes: 97,
},
},
{
name: "Corrupted log message line is skipped, keep correct bytes count",
input: [][]byte{
[]byte(`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`),
[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`),
},
stream: "all",
expectedMessage: reader.Message{
Content: []byte("1:M 09 Nov 13:27:36.276 # User requested"),
Fields: common.MapStr{"stream": "stdout"},
Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC),
Bytes: 205,
},
},
}

for _, test := range tests {
Expand Down Expand Up @@ -358,6 +373,12 @@ type mockReader struct {
}

func (m *mockReader) Next() (reader.Message, error) {
if len(m.messages) < 1 {
return reader.Message{
Content: []byte{},
Bytes: 0,
}, io.EOF
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test were the first line is broken (from a truncate) but the next one is valid? we should get the content from the valid one only but bytes should account for both

message := m.messages[0]
m.messages = m.messages[1:]
return reader.Message{
Expand Down