From 3d45fdc574fef6bb5b6087cb59dbc75f01589b8c Mon Sep 17 00:00:00 2001 From: chrismark Date: Fri, 20 Nov 2020 13:00:13 +0200 Subject: [PATCH 1/7] Properly update offset in case of unparasable line Signed-off-by: chrismark --- filebeat/input/log/harvester.go | 3 +- .../tests/files/logs/docker_corrupted.log | 21 ++++++++++ filebeat/tests/system/test_container.py | 39 +++++++++++++++++++ 3 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 filebeat/tests/files/logs/docker_corrupted.log diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 6b16861f8ec..11c6ec000d7 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -333,7 +333,8 @@ func (h *Harvester) Run() error { logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive) case reader.ErrLineUnparsable: logp.Info("Skipping unparsable line in file: %v", h.state.Source) - //line unparsable, go to next line + //line unparsable, update offset and go to next line + h.state.Offset += int64(message.Bytes) continue default: logp.Err("Read line error: %v; File: %v", err, h.state.Source) diff --git a/filebeat/tests/files/logs/docker_corrupted.log b/filebeat/tests/files/logs/docker_corrupted.log new file mode 100644 index 00000000000..b241a2691b9 --- /dev/null +++ b/filebeat/tests/files/logs/docker_corrupted.log @@ -0,0 +1,21 @@ +{"log":"Fetching main repository github.com/elastic/beats...\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":"Fetching dependencies...\n","stream":"stdout","time":"2016-03-02T22:59:04.609292428Z"} +{"log":"Execute /scripts/packetbeat_before_build.sh\n","stream":"stdout","time":"2016-03-02T22:59:04.617434682Z"} +{"log":"patching file vendor/github.com/tsg/gopacket/pcap/pcap.go\n","stream":"stdout","time":"2016-03-02T22:59:04.626534779Z"} +{"log":"cp etc/packetbeat.template.json /build/packetbeat.template.json\n","stream":"stdout","time":"2016-03-02T22:59:04.639782988Z"} +{"log":"# linux\n","stream":"stdout","time":"2016-03-02T22:59:04.646276053Z"} +"log":"cp packetbeat.yml /build/packetbeat-linux.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.647847045Z"} +{"log":"# binary\n","stream":"stdout","time":"2016-03-02T22:59:04.653740138Z"} +{"log":"cp packetbeat.yml /build/packetbeat-binary.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.655979016Z"} +{"log":"# darwin\n","stream":"stdout","time":"2016-03-02T22:59:04.661181197Z"} +{"log":"cp packetbeat.yml /build/packetbeat-darwin.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.662859769Z"} +{"log":"sed -i.bk 's/device: any/device: en0/' /build/packetbeat-darwin.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.66649744Z"} +{"log":"rm /build/packetbeat-darwin.yml.bk\n","stream":"stdout","time":"2016-03-02T22:59:04.701199002Z"} +{"log":"# win\n","stream":"stdout","time":"2016-03-02T22:59:04.705067809Z"} +{"log":"cp packetbeat.yml /build/packetbeat-win.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.706629907Z"} +{"log":"sed -i.bk 's/device: any/device: 0/' /build/packetbeat-win.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.711993313Z"} +{"log":"rm /build/packetbeat-win.yml.bk\n","stream":"stdout","time":"2016-03-02T22:59:04.757913979Z"} +{"log":"Compiling for windows/amd64...\n","stream":"stdout","time":"2016-03-02T22:59:04.761895467Z"} +{"log":"Compiling for windows/386...\n","stream":"stdout","time":"2016-03-02T22:59:29.481736885Z"} +{"log":"Compiling for darwin/amd64...\n","stream":"stdout","time":"2016-03-02T22:59:55.205334574Z"} +{"log":"Moving binaries to host...\n","stream":"stdout","time":"2016-03-02T23:00:15.140397826Z"} diff --git a/filebeat/tests/system/test_container.py b/filebeat/tests/system/test_container.py index ee0df7eb8e9..ae868c0211f 100644 --- a/filebeat/tests/system/test_container.py +++ b/filebeat/tests/system/test_container.py @@ -66,3 +66,42 @@ def test_container_input_cri(self): output = self.read_output() assert len(output) == 1 assert output[0]["stream"] == "stdout" + + def test_container_input_registry_for_unparsable_lines(self): + """ + Test container input properly updates registry offset in case + of unparsable lines + """ + input_raw = """ +- type: container + paths: + - {}/logs/*.log +""" + self.render_config_template( + input_raw=input_raw.format(os.path.abspath(self.working_dir)), + inputs=False, + ) + + os.mkdir(self.working_dir + "/logs/") + self.copy_files(["logs/docker_corrupted.log"], + target_dir="logs") + + filebeat = self.start_beat() + + self.wait_until(lambda: self.output_has(lines=20)) + + filebeat.check_kill_and_wait() + + output = self.read_output() + assert len(output) == 20 + assert output[19]["message"] == "Moving binaries to host..." + for o in output: + assert o["stream"] == "stdout" + + # Check that file exist + data = self.get_registry() + logs = self.log_access() + assert logs.contains("Skipping unparsable line in file") == True + # bytes of healthy file are 2244 so for the corrupted one should + # be 2244-1=2243 since we removed one character + assert data[0]["offset"] == 2243 From 015d379ad0c88ed8635f659d5c1fdc2c68059db1 Mon Sep 17 00:00:00 2001 From: chrismark Date: Fri, 20 Nov 2020 14:09:53 +0200 Subject: [PATCH 2/7] add changelog Signed-off-by: chrismark --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b77935ef89b..6bad58b8e7b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -332,6 +332,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix handing missing eventtime and assignip field being set to N/A for fortinet module. {pull}22361[22361] - Fix Zeek dashboard reference to `zeek.ssl.server.name` field. {pull}21696[21696] - Fix for `field [source] not present as part of path [source.ip]` error in azure pipelines. {pull}22377[22377] +- Properly update offset in case of unparasable line. {pull}22685[22685] *Heartbeat* From 6102beebd13976112bfe8f034a27797f2ec813cd Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 24 Nov 2020 11:48:52 +0200 Subject: [PATCH 3/7] Update metrics state Signed-off-by: chrismark --- filebeat/input/log/harvester.go | 1 + 1 file changed, 1 insertion(+) diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 11c6ec000d7..ac9c8029a02 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -335,6 +335,7 @@ func (h *Harvester) Run() error { logp.Info("Skipping unparsable line in file: %v", h.state.Source) //line unparsable, update offset and go to next line h.state.Offset += int64(message.Bytes) + h.metrics.readOffset.Set(h.state.Offset) continue default: logp.Err("Read line error: %v; File: %v", err, h.state.Source) From d58c3508a666dc47437d37796748790a8ac2342d Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 30 Mar 2021 16:22:18 +0300 Subject: [PATCH 4/7] Ignore parseline error Signed-off-by: chrismark --- filebeat/input/filestream/input.go | 3 --- filebeat/input/log/harvester.go | 6 ------ filebeat/tests/system/test_container.py | 2 +- libbeat/reader/reader.go | 6 ------ libbeat/reader/readjson/docker_json.go | 13 ++++++++++-- libbeat/reader/readjson/docker_json_test.go | 23 ++++++++++++++------- 6 files changed, 27 insertions(+), 26 deletions(-) diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 9c23e18473a..b63f28ff7e6 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -298,9 +298,6 @@ func (inp *filestream) readFromSource( s.Offset = 0 case ErrClosed: log.Info("Reader was closed. Closing.") - case reader.ErrLineUnparsable: - log.Info("Skipping unparsable line in file.") - continue default: log.Errorf("Read line error: %v", err) } diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index ac9c8029a02..0d4e6d6b539 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -331,12 +331,6 @@ func (h *Harvester) Run() error { logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source) case ErrInactive: logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive) - case reader.ErrLineUnparsable: - logp.Info("Skipping unparsable line in file: %v", h.state.Source) - //line unparsable, update offset and go to next line - h.state.Offset += int64(message.Bytes) - h.metrics.readOffset.Set(h.state.Offset) - continue default: logp.Err("Read line error: %v; File: %v", err, h.state.Source) } diff --git a/filebeat/tests/system/test_container.py b/filebeat/tests/system/test_container.py index ae868c0211f..067eabd1977 100644 --- a/filebeat/tests/system/test_container.py +++ b/filebeat/tests/system/test_container.py @@ -101,7 +101,7 @@ def test_container_input_registry_for_unparsable_lines(self): # Check that file exist data = self.get_registry() logs = self.log_access() - assert logs.contains("Skipping unparsable line in file") == True + assert logs.contains("Parse line error") == True # bytes of healthy file are 2244 so for the corrupted one should # be 2244-1=2243 since we removed one character assert data[0]["offset"] == 2243 diff --git a/libbeat/reader/reader.go b/libbeat/reader/reader.go index 81ae4ad8241..43c389ac7c6 100644 --- a/libbeat/reader/reader.go +++ b/libbeat/reader/reader.go @@ -18,7 +18,6 @@ package reader import ( - "errors" "io" ) @@ -30,8 +29,3 @@ type Reader interface { io.Closer Next() (Message, error) } - -var ( - //ErrLineUnparsable is error thrown when Next() element from input is corrupted and can not be parsed - ErrLineUnparsable = errors.New("line is unparsable") -) diff --git a/libbeat/reader/readjson/docker_json.go b/libbeat/reader/readjson/docker_json.go index 59dded97ec3..bdba91ec0f6 100644 --- a/libbeat/reader/readjson/docker_json.go +++ b/libbeat/reader/readjson/docker_json.go @@ -20,6 +20,7 @@ package readjson import ( "bytes" "encoding/json" + "io" "runtime" "strings" "time" @@ -190,6 +191,10 @@ func (p *DockerJSONReader) Next() (reader.Message, error) { for { message, err := p.reader.Next() + if message.Bytes == 0 || err == io.EOF { + message.Bytes = bytes + return message, io.EOF + } // keep the right bytes count even if we return an error bytes += message.Bytes message.Bytes = bytes @@ -202,13 +207,17 @@ func (p *DockerJSONReader) Next() (reader.Message, error) { err = p.parseLine(&message, &logLine) if err != nil { p.logger.Errorf("Parse line error: %v", err) - return message, reader.ErrLineUnparsable + continue } // Handle multiline messages, join partial lines for p.partial && logLine.Partial { next, err := p.reader.Next() + if next.Bytes == 0 || err == io.EOF { + message.Bytes = bytes + return message, io.EOF + } // keep the right bytes count even if we return an error bytes += next.Bytes message.Bytes = bytes @@ -219,7 +228,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) { err = p.parseLine(&next, &logLine) if err != nil { p.logger.Errorf("Parse line error: %v", err) - return message, reader.ErrLineUnparsable + continue } message.Content = append(message.Content, next.Content...) } diff --git a/libbeat/reader/readjson/docker_json_test.go b/libbeat/reader/readjson/docker_json_test.go index 2c9e2e71104..431f07a14f7 100644 --- a/libbeat/reader/readjson/docker_json_test.go +++ b/libbeat/reader/readjson/docker_json_test.go @@ -18,6 +18,7 @@ package readjson import ( + "io" "testing" "time" @@ -53,7 +54,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong JSON", input: [][]byte{[]byte(`this is not JSON`)}, stream: "all", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 16, }, @@ -73,7 +74,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong CRI", input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout`)}, stream: "all", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 37, }, @@ -82,7 +83,7 @@ func TestDockerJSON(t *testing.T) { name: "Wrong CRI", input: [][]byte{[]byte(`{this is not JSON nor CRI`)}, stream: "all", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 25, }, @@ -91,7 +92,7 @@ func TestDockerJSON(t *testing.T) { name: "Missing time", input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)}, stream: "all", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 82, }, @@ -218,7 +219,7 @@ func TestDockerJSON(t *testing.T) { input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)}, stream: "all", format: "cri", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 82, }, @@ -228,7 +229,7 @@ func TestDockerJSON(t *testing.T) { input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`)}, stream: "all", format: "docker", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 115, }, @@ -300,7 +301,7 @@ func TestDockerJSON(t *testing.T) { []byte(`{"log":"shutdown...\n","stream`), }, stream: "stdout", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 139, }, @@ -324,7 +325,7 @@ func TestDockerJSON(t *testing.T) { name: "Corrupted log message line", input: [][]byte{[]byte(`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`)}, stream: "all", - expectedError: reader.ErrLineUnparsable, + expectedError: io.EOF, expectedMessage: reader.Message{ Bytes: 97, }, @@ -358,6 +359,12 @@ type mockReader struct { } func (m *mockReader) Next() (reader.Message, error) { + if len(m.messages) < 1 { + return reader.Message{ + Content: []byte{}, + Bytes: 0, + }, nil + } message := m.messages[0] m.messages = m.messages[1:] return reader.Message{ From 324b490fc665021b4ef378b21cf5ad5acab9cfea Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 30 Mar 2021 16:28:34 +0300 Subject: [PATCH 5/7] fmt Signed-off-by: chrismark --- libbeat/reader/readjson/docker_json.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/reader/readjson/docker_json.go b/libbeat/reader/readjson/docker_json.go index bdba91ec0f6..e5120fa92bd 100644 --- a/libbeat/reader/readjson/docker_json.go +++ b/libbeat/reader/readjson/docker_json.go @@ -191,7 +191,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) { for { message, err := p.reader.Next() - if message.Bytes == 0 || err == io.EOF { + if message.Bytes == 0 { message.Bytes = bytes return message, io.EOF } @@ -214,7 +214,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) { for p.partial && logLine.Partial { next, err := p.reader.Next() - if next.Bytes == 0 || err == io.EOF { + if next.Bytes == 0 { message.Bytes = bytes return message, io.EOF } From 67dcd5f82724fdf6be526d57cb9c707f34157335 Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 30 Mar 2021 16:33:03 +0300 Subject: [PATCH 6/7] fix Signed-off-by: chrismark --- libbeat/reader/readjson/docker_json.go | 9 --------- libbeat/reader/readjson/docker_json_test.go | 2 +- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/libbeat/reader/readjson/docker_json.go b/libbeat/reader/readjson/docker_json.go index e5120fa92bd..d57c61c6a26 100644 --- a/libbeat/reader/readjson/docker_json.go +++ b/libbeat/reader/readjson/docker_json.go @@ -20,7 +20,6 @@ package readjson import ( "bytes" "encoding/json" - "io" "runtime" "strings" "time" @@ -191,10 +190,6 @@ func (p *DockerJSONReader) Next() (reader.Message, error) { for { message, err := p.reader.Next() - if message.Bytes == 0 { - message.Bytes = bytes - return message, io.EOF - } // keep the right bytes count even if we return an error bytes += message.Bytes message.Bytes = bytes @@ -214,10 +209,6 @@ func (p *DockerJSONReader) Next() (reader.Message, error) { for p.partial && logLine.Partial { next, err := p.reader.Next() - if next.Bytes == 0 { - message.Bytes = bytes - return message, io.EOF - } // keep the right bytes count even if we return an error bytes += next.Bytes message.Bytes = bytes diff --git a/libbeat/reader/readjson/docker_json_test.go b/libbeat/reader/readjson/docker_json_test.go index 431f07a14f7..d150e2fd3b7 100644 --- a/libbeat/reader/readjson/docker_json_test.go +++ b/libbeat/reader/readjson/docker_json_test.go @@ -363,7 +363,7 @@ func (m *mockReader) Next() (reader.Message, error) { return reader.Message{ Content: []byte{}, Bytes: 0, - }, nil + }, io.EOF } message := m.messages[0] m.messages = m.messages[1:] From e59afadd645916b3ad339eef042f3a0662bdbec1 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 31 Mar 2021 08:49:29 +0300 Subject: [PATCH 7/7] Add test Signed-off-by: chrismark --- libbeat/reader/readjson/docker_json_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/libbeat/reader/readjson/docker_json_test.go b/libbeat/reader/readjson/docker_json_test.go index d150e2fd3b7..de03b87da81 100644 --- a/libbeat/reader/readjson/docker_json_test.go +++ b/libbeat/reader/readjson/docker_json_test.go @@ -330,6 +330,20 @@ func TestDockerJSON(t *testing.T) { Bytes: 97, }, }, + { + name: "Corrupted log message line is skipped, keep correct bytes count", + input: [][]byte{ + []byte(`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), + []byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), + }, + stream: "all", + expectedMessage: reader.Message{ + Content: []byte("1:M 09 Nov 13:27:36.276 # User requested"), + Fields: common.MapStr{"stream": "stdout"}, + Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC), + Bytes: 205, + }, + }, } for _, test := range tests {