Skip to content

Commit

Permalink
Fix bug that state was not persisted for rotated files.
Browse files Browse the repository at this point in the history
Closes #1010
  • Loading branch information
ruflin committed Feb 29, 2016
1 parent 41e1a6a commit 118e967
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 4 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
.DS_Store
/glide.lock
/beats.iml
*.dev.yml

# Editor swap files
*.swp
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff]
- Stop filebeat if filebeat is started without any prospectors defined or empty prospectors {pull}644[644] {pull}647[647]
- Improve shutdown of crawler and prospector to wait for clean completion {pull}720[720]
- Omit `fields` from Filebeat events when null {issue}899[899]
- - Fix registrar bug for rotated files {pull}1010[1010]

*Winlogbeat*

Expand Down
7 changes: 3 additions & 4 deletions filebeat/crawler/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,12 @@ func (p ProspectorLog) checkNewFile(h *harvester.Harvester) {

logp.Debug("prospector", "Start harvesting unknown file: %s", h.Path)

// Call crawler if there if there exists a state for the given file
offset, resuming := p.Prospector.registrar.fetchState(h.Path, h.Stat.Fileinfo)

if p.checkOldFile(h) {

logp.Debug("prospector", "Fetching old state of file to resume: %s", h.Path)
// Call crawler if there if there exists a state for the given file
offset, resuming := p.Prospector.registrar.fetchState(h.Path, h.Stat.Fileinfo)

// Are we resuming a dead file? We have to resume even if dead so we catch any old updates to the file
// This is safe as the harvester, once it hits the EOF and a timeout, will stop harvesting
Expand All @@ -172,8 +173,6 @@ func (p ProspectorLog) checkNewFile(h *harvester.Harvester) {
} else if previousFile, err := p.getPreviousFile(h.Path, h.Stat.Fileinfo); err == nil {
p.continueExistingFile(h, previousFile)
} else {
// Call crawler if there if there exists a state for the given file
offset, _ := p.Prospector.registrar.fetchState(h.Path, h.Stat.Fileinfo)
p.resumeHarvesting(h, offset)
}
}
Expand Down
44 changes: 44 additions & 0 deletions filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,47 @@ def test_custom_registry_file_location(self):
filebeat.check_kill_and_wait()

assert os.path.isfile(os.path.join(self.working_dir, "a/b/c/registry"))

def test_rotating_file(self):
"""
Checks that the registry is properly updated after a file is rotated
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*"
)

os.mkdir(self.working_dir + "/log/")
testfile = self.working_dir + "/log/test.log"

filebeat = self.start_beat()

with open(testfile, 'w') as f:
f.write("offset 9\n")

self.wait_until(
lambda: self.output_has(lines=1),
max_timeout=10)


testfilerenamed = self.working_dir + "/log/test.1.log"
os.rename(testfile, testfilerenamed)

with open(testfile, 'w') as f:
f.write("offset 10\n")


self.wait_until(
lambda: self.output_has(lines=2),
max_timeout=10)

filebeat.check_kill_and_wait()

# Check that file exist
data = self.get_dot_filebeat()

# Make sure the offsets are correctly set
data[os.path.abspath(testfile)]["offset"] = 10
data[os.path.abspath(testfilerenamed)]["offset"] = 9

# Check that 2 files are port of the registrar file
assert len(data) == 2

0 comments on commit 118e967

Please sign in to comment.