Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.17](backport #31268) Detect new files under known paths in filestream input #31275

Merged
merged 1 commit into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix Azure activitylogs identity field type and several missing fields. {pull}31170[31170]
- checkpoint: Fix ingest error when a message contains trailing spaces {pull}31197[31197]
- m365_defender: Fix processing when alerts.entities is an empty list. {issue}31223[31223] {pull}31227[31227]
- Prevent filestream from rereading whole files if they are rotated using rename. {pull}31268[31268]

*Heartbeat*

Expand Down
22 changes: 14 additions & 8 deletions filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type fileWatcher struct {
scanner loginp.FSScanner
log *logp.Logger
events chan loginp.FSEvent
sameFileFunc func(os.FileInfo, os.FileInfo) bool
}

func newFileWatcher(paths []string, ns *common.ConfigNamespace) (loginp.FSWatcher, error) {
Expand Down Expand Up @@ -110,6 +111,7 @@ func newScannerWatcher(paths []string, c *common.Config) (loginp.FSWatcher, erro
prev: make(map[string]os.FileInfo, 0),
scanner: scanner,
events: make(chan loginp.FSEvent),
sameFileFunc: os.SameFile,
}, nil
}

Expand All @@ -127,7 +129,7 @@ func (w *fileWatcher) Run(ctx unison.Canceler) {
// run initial scan before starting regular
w.watch(ctx)

timed.Periodic(ctx, w.interval, func() error {
_ = timed.Periodic(ctx, w.interval, func() error {
w.watch(ctx)

return nil
Expand All @@ -143,12 +145,16 @@ func (w *fileWatcher) watch(ctx unison.Canceler) {

for path, info := range paths {

// if the scanner found a new path or an existing path
// with a different file, it is a new file
prevInfo, ok := w.prev[path]
if !ok {
newFiles[path] = paths[path]
if !ok || !w.sameFileFunc(prevInfo, info) {
newFiles[path] = info
continue
}

// if the two infos belong to the same file and it has been modified
// if the size is smaller than before, it is truncated, if bigger, it is a write event
if prevInfo.ModTime() != info.ModTime() {
if prevInfo.Size() > info.Size() || w.resendOnModTime && prevInfo.Size() == info.Size() {
select {
Expand All @@ -173,7 +179,7 @@ func (w *fileWatcher) watch(ctx unison.Canceler) {
// either because they have been deleted or renamed
for removedPath, removedInfo := range w.prev {
for newPath, newInfo := range newFiles {
if os.SameFile(removedInfo, newInfo) {
if w.sameFileFunc(removedInfo, newInfo) {
select {
case <-ctx.Done():
return
Expand Down Expand Up @@ -293,13 +299,13 @@ func (s *fileScanner) resolveRecursiveGlobs(c fileScannerConfig) error {

// normalizeGlobPatterns calls `filepath.Abs` on all the globs from config
func (s *fileScanner) normalizeGlobPatterns() error {
var paths []string
for _, path := range s.paths {
paths := make([]string, len(s.paths))
for i, path := range s.paths {
pathAbs, err := filepath.Abs(path)
if err != nil {
return fmt.Errorf("failed to get the absolute path for %s: %v", path, err)
return fmt.Errorf("failed to get the absolute path for %s: %w", path, err)
}
paths = append(paths, pathAbs)
paths[i] = pathAbs
}
s.paths = paths
return nil
Expand Down
15 changes: 10 additions & 5 deletions filebeat/input/filestream/fswatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestFileScanner(t *testing.T) {
}

func setupFilesForScannerTest(t *testing.T, tmpDir string) {
err := os.Mkdir(filepath.Join(tmpDir, directoryPath), 750)
err := os.Mkdir(filepath.Join(tmpDir, directoryPath), 0750)
if err != nil {
t.Fatalf("cannot create non harvestable directory: %v", err)
}
Expand Down Expand Up @@ -201,10 +201,11 @@ func TestFileWatchNewDeleteModified(t *testing.T) {

t.Run(name, func(t *testing.T) {
w := fileWatcher{
log: logp.L(),
prev: test.prevFiles,
scanner: &mockScanner{test.nextFiles},
events: make(chan loginp.FSEvent),
log: logp.L(),
prev: test.prevFiles,
scanner: &mockScanner{test.nextFiles},
events: make(chan loginp.FSEvent),
sameFileFunc: testSameFile,
}

go w.watch(context.Background())
Expand Down Expand Up @@ -242,6 +243,10 @@ func (t testFileInfo) ModTime() time.Time { return t.time }
func (t testFileInfo) IsDir() bool { return false }
func (t testFileInfo) Sys() interface{} { return t.sys }

func testSameFile(fi1, fi2 os.FileInfo) bool {
return fi1.Name() == fi2.Name()
}

func mustDuration(durStr string) time.Duration {
dur, err := time.ParseDuration(durStr)
if err != nil {
Expand Down
67 changes: 64 additions & 3 deletions filebeat/input/filestream/fswatch_test_non_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ package filestream

import (
"context"
"errors"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
"github.com/elastic/beats/v7/libbeat/common/match"
Expand Down Expand Up @@ -144,9 +146,10 @@ func TestFileWatcherRenamedFile(t *testing.T) {
t.Fatal(err)
}
w := fileWatcher{
log: logp.L(),
scanner: scanner,
events: make(chan loginp.FSEvent),
log: logp.L(),
scanner: scanner,
events: make(chan loginp.FSEvent),
sameFileFunc: testSameFile,
}

go w.watch(context.Background())
Expand All @@ -170,6 +173,64 @@ func TestFileWatcherRenamedFile(t *testing.T) {
assert.Equal(t, renamedPath, evt.NewPath)
}

// this test is only supported on non Windows for now.
func TestFileWatcherRenamedTruncated(t *testing.T) {
tmpDir := t.TempDir()

fs, err := newFileScanner([]string{filepath.Join(tmpDir, "app.log*")}, fileScannerConfig{})
if err != nil {
t.Fatal(err)
}
w := fileWatcher{
log: logp.L(),
scanner: fs,
events: make(chan loginp.FSEvent),
sameFileFunc: os.SameFile,
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go w.watch(ctx)

appLogPath := filepath.Join(tmpDir, "app.log")
rotatedAppLogPath := filepath.Join(tmpDir, "app.log.1")
err = os.WriteFile(appLogPath, []byte("my longer log line"), 0o600)
if err != nil {
t.Fatal(err)
}

evt := w.Event()
require.Equal(t, loginp.OpCreate, evt.Op, "new file should be detected")
require.Equal(t, "", evt.OldPath, "new file does not have an old path set")
require.Equal(t, appLogPath, evt.NewPath, "new file does not have an old path set")

go w.watch(ctx)

err = os.Rename(appLogPath, rotatedAppLogPath)
if err != nil {
t.Fatalf("failed to rotate active file: %v", err)
}

if _, err := os.Stat(appLogPath); !errors.Is(err, os.ErrNotExist) {
t.Fatalf("app.log should not exist")
}

err = os.WriteFile(appLogPath, []byte("shorter line"), 0o600)
if err != nil {
t.Fatal(err)
}

evt = w.Event()
require.Equal(t, loginp.OpRename, evt.Op, "app.log has been renamed to app.log.1, got: %s old_path=%s new_path=%s", evt.Op.String(), evt.OldPath, evt.NewPath)
require.Equal(t, appLogPath, evt.OldPath, "old_path should be set to app.log because of rename")
require.Equal(t, rotatedAppLogPath, evt.NewPath, "new_path should be set to app.log.1 because of rename")

evt = w.Event()
require.Equal(t, loginp.OpCreate, evt.Op, "new file app.log should be detected, got: %s for old_path=%s new_path=%s", evt.Op.String(), evt.OldPath, evt.NewPath)
require.Equal(t, "", evt.OldPath, "new file should not have an old path set")
require.Equal(t, appLogPath, evt.NewPath, "new file should be called app.log")
}

func mustAbsPath(filename string) string {
abspath, err := filepath.Abs(filename)
if err != nil {
Expand Down