From 118e96781d4d181a0117ebd2cfe1493912b45bc3 Mon Sep 17 00:00:00 2001 From: ruflin Date: Mon, 29 Feb 2016 10:29:23 +0100 Subject: [PATCH] Fix bug that state was not persisted for rotated files. Closes #1010 --- .gitignore | 1 + CHANGELOG.asciidoc | 1 + filebeat/crawler/prospector_log.go | 7 ++-- filebeat/tests/system/test_registrar.py | 44 +++++++++++++++++++++++++ 4 files changed, 49 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 0e6de4d38bd..323f0f0fd0e 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ .DS_Store /glide.lock /beats.iml +*.dev.yml # Editor swap files *.swp diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index f9626ad0657..88b2164de43 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -48,6 +48,7 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff] - Stop filebeat if filebeat is started without any prospectors defined or empty prospectors {pull}644[644] {pull}647[647] - Improve shutdown of crawler and prospector to wait for clean completion {pull}720[720] - Omit `fields` from Filebeat events when null {issue}899[899] +- - Fix registrar bug for rotated files {pull}1010[1010] *Winlogbeat* diff --git a/filebeat/crawler/prospector_log.go b/filebeat/crawler/prospector_log.go index b9778e72a85..5df7242cc71 100644 --- a/filebeat/crawler/prospector_log.go +++ b/filebeat/crawler/prospector_log.go @@ -149,11 +149,12 @@ func (p ProspectorLog) checkNewFile(h *harvester.Harvester) { logp.Debug("prospector", "Start harvesting unknown file: %s", h.Path) + // Call crawler if there if there exists a state for the given file + offset, resuming := p.Prospector.registrar.fetchState(h.Path, h.Stat.Fileinfo) + if p.checkOldFile(h) { logp.Debug("prospector", "Fetching old state of file to resume: %s", h.Path) - // Call crawler if there if there exists a state for the given file - offset, resuming := p.Prospector.registrar.fetchState(h.Path, h.Stat.Fileinfo) // Are we resuming a dead file? We have to resume even if dead so we catch any old updates to the file // This is safe as the harvester, once it hits the EOF and a timeout, will stop harvesting @@ -172,8 +173,6 @@ func (p ProspectorLog) checkNewFile(h *harvester.Harvester) { } else if previousFile, err := p.getPreviousFile(h.Path, h.Stat.Fileinfo); err == nil { p.continueExistingFile(h, previousFile) } else { - // Call crawler if there if there exists a state for the given file - offset, _ := p.Prospector.registrar.fetchState(h.Path, h.Stat.Fileinfo) p.resumeHarvesting(h, offset) } } diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index bbe9b493310..d2a71077b62 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -153,3 +153,47 @@ def test_custom_registry_file_location(self): filebeat.check_kill_and_wait() assert os.path.isfile(os.path.join(self.working_dir, "a/b/c/registry")) + + def test_rotating_file(self): + """ + Checks that the registry is properly updated after a file is rotated + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*" + ) + + os.mkdir(self.working_dir + "/log/") + testfile = self.working_dir + "/log/test.log" + + filebeat = self.start_beat() + + with open(testfile, 'w') as f: + f.write("offset 9\n") + + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=10) + + + testfilerenamed = self.working_dir + "/log/test.1.log" + os.rename(testfile, testfilerenamed) + + with open(testfile, 'w') as f: + f.write("offset 10\n") + + + self.wait_until( + lambda: self.output_has(lines=2), + max_timeout=10) + + filebeat.check_kill_and_wait() + + # Check that file exist + data = self.get_dot_filebeat() + + # Make sure the offsets are correctly set + data[os.path.abspath(testfile)]["offset"] = 10 + data[os.path.abspath(testfilerenamed)]["offset"] = 9 + + # Check that 2 files are port of the registrar file + assert len(data) == 2