From 16548e8de1bda24f03bfbe8d22d08ece082ca48e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Wed, 13 Apr 2022 09:49:16 +0200 Subject: [PATCH] Detect new files under known paths in filestream input (#31268) This PR fixes the `FileWatcher` of `filestream` input. Now a file is considered new even if the scanner has already found it in the previous iteration and the underlying file is different. In the PR the file comparator function is passed as a parameter to make unit testing easier. The problem is if an input file is renamed and a new file shows up, Filebeat did not register is as a new file. The new file was either considered updated. Or if the new file was smaller than the previous file, the file was deemed truncated and the complete contents of the previous file was reread from the beginning. (cherry picked from commit 54997acd2c75f5dde1ae0fe6229e34dfcff74a19) --- CHANGELOG.next.asciidoc | 1 + filebeat/input/filestream/fswatch.go | 22 +++--- filebeat/input/filestream/fswatch_test.go | 15 +++-- .../filestream/fswatch_test_non_windows.go | 67 ++++++++++++++++++- 4 files changed, 89 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 5afa21a4f583..cac717ef478d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -59,6 +59,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix Azure activitylogs identity field type and several missing fields. {pull}31170[31170] - checkpoint: Fix ingest error when a message contains trailing spaces {pull}31197[31197] - m365_defender: Fix processing when alerts.entities is an empty list. {issue}31223[31223] {pull}31227[31227] +- Prevent filestream from rereading whole files if they are rotated using rename. {pull}31268[31268] *Heartbeat* diff --git a/filebeat/input/filestream/fswatch.go b/filebeat/input/filestream/fswatch.go index 26ad77cb3306..50aa25980280 100644 --- a/filebeat/input/filestream/fswatch.go +++ b/filebeat/input/filestream/fswatch.go @@ -77,6 +77,7 @@ type fileWatcher struct { scanner loginp.FSScanner log *logp.Logger events chan loginp.FSEvent + sameFileFunc func(os.FileInfo, os.FileInfo) bool } func newFileWatcher(paths []string, ns *common.ConfigNamespace) (loginp.FSWatcher, error) { @@ -110,6 +111,7 @@ func newScannerWatcher(paths []string, c *common.Config) (loginp.FSWatcher, erro prev: make(map[string]os.FileInfo, 0), scanner: scanner, events: make(chan loginp.FSEvent), + sameFileFunc: os.SameFile, }, nil } @@ -127,7 +129,7 @@ func (w *fileWatcher) Run(ctx unison.Canceler) { // run initial scan before starting regular w.watch(ctx) - timed.Periodic(ctx, w.interval, func() error { + _ = timed.Periodic(ctx, w.interval, func() error { w.watch(ctx) return nil @@ -143,12 +145,16 @@ func (w *fileWatcher) watch(ctx unison.Canceler) { for path, info := range paths { + // if the scanner found a new path or an existing path + // with a different file, it is a new file prevInfo, ok := w.prev[path] - if !ok { - newFiles[path] = paths[path] + if !ok || !w.sameFileFunc(prevInfo, info) { + newFiles[path] = info continue } + // if the two infos belong to the same file and it has been modified + // if the size is smaller than before, it is truncated, if bigger, it is a write event if prevInfo.ModTime() != info.ModTime() { if prevInfo.Size() > info.Size() || w.resendOnModTime && prevInfo.Size() == info.Size() { select { @@ -173,7 +179,7 @@ func (w *fileWatcher) watch(ctx unison.Canceler) { // either because they have been deleted or renamed for removedPath, removedInfo := range w.prev { for newPath, newInfo := range newFiles { - if os.SameFile(removedInfo, newInfo) { + if w.sameFileFunc(removedInfo, newInfo) { select { case <-ctx.Done(): return @@ -293,13 +299,13 @@ func (s *fileScanner) resolveRecursiveGlobs(c fileScannerConfig) error { // normalizeGlobPatterns calls `filepath.Abs` on all the globs from config func (s *fileScanner) normalizeGlobPatterns() error { - var paths []string - for _, path := range s.paths { + paths := make([]string, len(s.paths)) + for i, path := range s.paths { pathAbs, err := filepath.Abs(path) if err != nil { - return fmt.Errorf("failed to get the absolute path for %s: %v", path, err) + return fmt.Errorf("failed to get the absolute path for %s: %w", path, err) } - paths = append(paths, pathAbs) + paths[i] = pathAbs } s.paths = paths return nil diff --git a/filebeat/input/filestream/fswatch_test.go b/filebeat/input/filestream/fswatch_test.go index 54fe3804f02d..56e3d8314508 100644 --- a/filebeat/input/filestream/fswatch_test.go +++ b/filebeat/input/filestream/fswatch_test.go @@ -105,7 +105,7 @@ func TestFileScanner(t *testing.T) { } func setupFilesForScannerTest(t *testing.T, tmpDir string) { - err := os.Mkdir(filepath.Join(tmpDir, directoryPath), 750) + err := os.Mkdir(filepath.Join(tmpDir, directoryPath), 0750) if err != nil { t.Fatalf("cannot create non harvestable directory: %v", err) } @@ -201,10 +201,11 @@ func TestFileWatchNewDeleteModified(t *testing.T) { t.Run(name, func(t *testing.T) { w := fileWatcher{ - log: logp.L(), - prev: test.prevFiles, - scanner: &mockScanner{test.nextFiles}, - events: make(chan loginp.FSEvent), + log: logp.L(), + prev: test.prevFiles, + scanner: &mockScanner{test.nextFiles}, + events: make(chan loginp.FSEvent), + sameFileFunc: testSameFile, } go w.watch(context.Background()) @@ -242,6 +243,10 @@ func (t testFileInfo) ModTime() time.Time { return t.time } func (t testFileInfo) IsDir() bool { return false } func (t testFileInfo) Sys() interface{} { return t.sys } +func testSameFile(fi1, fi2 os.FileInfo) bool { + return fi1.Name() == fi2.Name() +} + func mustDuration(durStr string) time.Duration { dur, err := time.ParseDuration(durStr) if err != nil { diff --git a/filebeat/input/filestream/fswatch_test_non_windows.go b/filebeat/input/filestream/fswatch_test_non_windows.go index ec8045f7c533..ddd3be068ddd 100644 --- a/filebeat/input/filestream/fswatch_test_non_windows.go +++ b/filebeat/input/filestream/fswatch_test_non_windows.go @@ -22,6 +22,7 @@ package filestream import ( "context" + "errors" "io/ioutil" "os" "path/filepath" @@ -29,6 +30,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" "github.com/elastic/beats/v7/libbeat/common/match" @@ -144,9 +146,10 @@ func TestFileWatcherRenamedFile(t *testing.T) { t.Fatal(err) } w := fileWatcher{ - log: logp.L(), - scanner: scanner, - events: make(chan loginp.FSEvent), + log: logp.L(), + scanner: scanner, + events: make(chan loginp.FSEvent), + sameFileFunc: testSameFile, } go w.watch(context.Background()) @@ -170,6 +173,64 @@ func TestFileWatcherRenamedFile(t *testing.T) { assert.Equal(t, renamedPath, evt.NewPath) } +// this test is only supported on non Windows for now. +func TestFileWatcherRenamedTruncated(t *testing.T) { + tmpDir := t.TempDir() + + fs, err := newFileScanner([]string{filepath.Join(tmpDir, "app.log*")}, fileScannerConfig{}) + if err != nil { + t.Fatal(err) + } + w := fileWatcher{ + log: logp.L(), + scanner: fs, + events: make(chan loginp.FSEvent), + sameFileFunc: os.SameFile, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go w.watch(ctx) + + appLogPath := filepath.Join(tmpDir, "app.log") + rotatedAppLogPath := filepath.Join(tmpDir, "app.log.1") + err = os.WriteFile(appLogPath, []byte("my longer log line"), 0o600) + if err != nil { + t.Fatal(err) + } + + evt := w.Event() + require.Equal(t, loginp.OpCreate, evt.Op, "new file should be detected") + require.Equal(t, "", evt.OldPath, "new file does not have an old path set") + require.Equal(t, appLogPath, evt.NewPath, "new file does not have an old path set") + + go w.watch(ctx) + + err = os.Rename(appLogPath, rotatedAppLogPath) + if err != nil { + t.Fatalf("failed to rotate active file: %v", err) + } + + if _, err := os.Stat(appLogPath); !errors.Is(err, os.ErrNotExist) { + t.Fatalf("app.log should not exist") + } + + err = os.WriteFile(appLogPath, []byte("shorter line"), 0o600) + if err != nil { + t.Fatal(err) + } + + evt = w.Event() + require.Equal(t, loginp.OpRename, evt.Op, "app.log has been renamed to app.log.1, got: %s old_path=%s new_path=%s", evt.Op.String(), evt.OldPath, evt.NewPath) + require.Equal(t, appLogPath, evt.OldPath, "old_path should be set to app.log because of rename") + require.Equal(t, rotatedAppLogPath, evt.NewPath, "new_path should be set to app.log.1 because of rename") + + evt = w.Event() + require.Equal(t, loginp.OpCreate, evt.Op, "new file app.log should be detected, got: %s for old_path=%s new_path=%s", evt.Op.String(), evt.OldPath, evt.NewPath) + require.Equal(t, "", evt.OldPath, "new file should not have an old path set") + require.Equal(t, appLogPath, evt.NewPath, "new file should be called app.log") +} + func mustAbsPath(filename string) string { abspath, err := filepath.Abs(filename) if err != nil {