diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 5afa21a4f583..cac717ef478d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -59,6 +59,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix Azure activitylogs identity field type and several missing fields. {pull}31170[31170] - checkpoint: Fix ingest error when a message contains trailing spaces {pull}31197[31197] - m365_defender: Fix processing when alerts.entities is an empty list. {issue}31223[31223] {pull}31227[31227] +- Prevent filestream from rereading whole files if they are rotated using rename. {pull}31268[31268] *Heartbeat* diff --git a/filebeat/input/filestream/fswatch.go b/filebeat/input/filestream/fswatch.go index 26ad77cb3306..50aa25980280 100644 --- a/filebeat/input/filestream/fswatch.go +++ b/filebeat/input/filestream/fswatch.go @@ -77,6 +77,7 @@ type fileWatcher struct { scanner loginp.FSScanner log *logp.Logger events chan loginp.FSEvent + sameFileFunc func(os.FileInfo, os.FileInfo) bool } func newFileWatcher(paths []string, ns *common.ConfigNamespace) (loginp.FSWatcher, error) { @@ -110,6 +111,7 @@ func newScannerWatcher(paths []string, c *common.Config) (loginp.FSWatcher, erro prev: make(map[string]os.FileInfo, 0), scanner: scanner, events: make(chan loginp.FSEvent), + sameFileFunc: os.SameFile, }, nil } @@ -127,7 +129,7 @@ func (w *fileWatcher) Run(ctx unison.Canceler) { // run initial scan before starting regular w.watch(ctx) - timed.Periodic(ctx, w.interval, func() error { + _ = timed.Periodic(ctx, w.interval, func() error { w.watch(ctx) return nil @@ -143,12 +145,16 @@ func (w *fileWatcher) watch(ctx unison.Canceler) { for path, info := range paths { + // if the scanner found a new path or an existing path + // with a different file, it is a new file prevInfo, ok := w.prev[path] - if !ok { - newFiles[path] = paths[path] + if !ok || !w.sameFileFunc(prevInfo, info) { + newFiles[path] = info continue } + // if the two infos belong to the same file and it has been modified + // if the size is smaller than before, it is truncated, if bigger, it is a write event if prevInfo.ModTime() != info.ModTime() { if prevInfo.Size() > info.Size() || w.resendOnModTime && prevInfo.Size() == info.Size() { select { @@ -173,7 +179,7 @@ func (w *fileWatcher) watch(ctx unison.Canceler) { // either because they have been deleted or renamed for removedPath, removedInfo := range w.prev { for newPath, newInfo := range newFiles { - if os.SameFile(removedInfo, newInfo) { + if w.sameFileFunc(removedInfo, newInfo) { select { case <-ctx.Done(): return @@ -293,13 +299,13 @@ func (s *fileScanner) resolveRecursiveGlobs(c fileScannerConfig) error { // normalizeGlobPatterns calls `filepath.Abs` on all the globs from config func (s *fileScanner) normalizeGlobPatterns() error { - var paths []string - for _, path := range s.paths { + paths := make([]string, len(s.paths)) + for i, path := range s.paths { pathAbs, err := filepath.Abs(path) if err != nil { - return fmt.Errorf("failed to get the absolute path for %s: %v", path, err) + return fmt.Errorf("failed to get the absolute path for %s: %w", path, err) } - paths = append(paths, pathAbs) + paths[i] = pathAbs } s.paths = paths return nil diff --git a/filebeat/input/filestream/fswatch_test.go b/filebeat/input/filestream/fswatch_test.go index 54fe3804f02d..56e3d8314508 100644 --- a/filebeat/input/filestream/fswatch_test.go +++ b/filebeat/input/filestream/fswatch_test.go @@ -105,7 +105,7 @@ func TestFileScanner(t *testing.T) { } func setupFilesForScannerTest(t *testing.T, tmpDir string) { - err := os.Mkdir(filepath.Join(tmpDir, directoryPath), 750) + err := os.Mkdir(filepath.Join(tmpDir, directoryPath), 0750) if err != nil { t.Fatalf("cannot create non harvestable directory: %v", err) } @@ -201,10 +201,11 @@ func TestFileWatchNewDeleteModified(t *testing.T) { t.Run(name, func(t *testing.T) { w := fileWatcher{ - log: logp.L(), - prev: test.prevFiles, - scanner: &mockScanner{test.nextFiles}, - events: make(chan loginp.FSEvent), + log: logp.L(), + prev: test.prevFiles, + scanner: &mockScanner{test.nextFiles}, + events: make(chan loginp.FSEvent), + sameFileFunc: testSameFile, } go w.watch(context.Background()) @@ -242,6 +243,10 @@ func (t testFileInfo) ModTime() time.Time { return t.time } func (t testFileInfo) IsDir() bool { return false } func (t testFileInfo) Sys() interface{} { return t.sys } +func testSameFile(fi1, fi2 os.FileInfo) bool { + return fi1.Name() == fi2.Name() +} + func mustDuration(durStr string) time.Duration { dur, err := time.ParseDuration(durStr) if err != nil { diff --git a/filebeat/input/filestream/fswatch_test_non_windows.go b/filebeat/input/filestream/fswatch_test_non_windows.go index ec8045f7c533..ddd3be068ddd 100644 --- a/filebeat/input/filestream/fswatch_test_non_windows.go +++ b/filebeat/input/filestream/fswatch_test_non_windows.go @@ -22,6 +22,7 @@ package filestream import ( "context" + "errors" "io/ioutil" "os" "path/filepath" @@ -29,6 +30,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" "github.com/elastic/beats/v7/libbeat/common/match" @@ -144,9 +146,10 @@ func TestFileWatcherRenamedFile(t *testing.T) { t.Fatal(err) } w := fileWatcher{ - log: logp.L(), - scanner: scanner, - events: make(chan loginp.FSEvent), + log: logp.L(), + scanner: scanner, + events: make(chan loginp.FSEvent), + sameFileFunc: testSameFile, } go w.watch(context.Background()) @@ -170,6 +173,64 @@ func TestFileWatcherRenamedFile(t *testing.T) { assert.Equal(t, renamedPath, evt.NewPath) } +// this test is only supported on non Windows for now. +func TestFileWatcherRenamedTruncated(t *testing.T) { + tmpDir := t.TempDir() + + fs, err := newFileScanner([]string{filepath.Join(tmpDir, "app.log*")}, fileScannerConfig{}) + if err != nil { + t.Fatal(err) + } + w := fileWatcher{ + log: logp.L(), + scanner: fs, + events: make(chan loginp.FSEvent), + sameFileFunc: os.SameFile, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go w.watch(ctx) + + appLogPath := filepath.Join(tmpDir, "app.log") + rotatedAppLogPath := filepath.Join(tmpDir, "app.log.1") + err = os.WriteFile(appLogPath, []byte("my longer log line"), 0o600) + if err != nil { + t.Fatal(err) + } + + evt := w.Event() + require.Equal(t, loginp.OpCreate, evt.Op, "new file should be detected") + require.Equal(t, "", evt.OldPath, "new file does not have an old path set") + require.Equal(t, appLogPath, evt.NewPath, "new file does not have an old path set") + + go w.watch(ctx) + + err = os.Rename(appLogPath, rotatedAppLogPath) + if err != nil { + t.Fatalf("failed to rotate active file: %v", err) + } + + if _, err := os.Stat(appLogPath); !errors.Is(err, os.ErrNotExist) { + t.Fatalf("app.log should not exist") + } + + err = os.WriteFile(appLogPath, []byte("shorter line"), 0o600) + if err != nil { + t.Fatal(err) + } + + evt = w.Event() + require.Equal(t, loginp.OpRename, evt.Op, "app.log has been renamed to app.log.1, got: %s old_path=%s new_path=%s", evt.Op.String(), evt.OldPath, evt.NewPath) + require.Equal(t, appLogPath, evt.OldPath, "old_path should be set to app.log because of rename") + require.Equal(t, rotatedAppLogPath, evt.NewPath, "new_path should be set to app.log.1 because of rename") + + evt = w.Event() + require.Equal(t, loginp.OpCreate, evt.Op, "new file app.log should be detected, got: %s for old_path=%s new_path=%s", evt.Op.String(), evt.OldPath, evt.NewPath) + require.Equal(t, "", evt.OldPath, "new file should not have an old path set") + require.Equal(t, appLogPath, evt.NewPath, "new file should be called app.log") +} + func mustAbsPath(filename string) string { abspath, err := filepath.Abs(filename) if err != nil {