From 2875faf126d6b32e86ecc42a8f532bd3217ed030 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Wed, 7 Apr 2021 11:43:48 +0200 Subject: [PATCH] Add tests and support for truncated files in filestream input (#24424) ## What does this PR do? Add support for truncated files and adds migrates related tests from `test_harvester.py`. It also adds more tests to cover the following cases: * truncation is detected only by the `Prospector` * truncation is detected first by the `Harvester` then by the `Prospector` * truncation is detected first by the `Prospector` then by the `Harvester` * file gets truncated when the output is not able to accept events ## Why is it important? The support for stopping reading from truncated files was already implemented. However, `filestream` input could not start reading it from the beginning. A new file system event is added called `OpTruncate`. When the size of a file has shrinked compared to the last time the scanner has encountered it, an `OpTruncate` event is emitted. When the prospector gets this event, the `HarvesterGroup` is restarting the `Harvester` of the file. Restarting basically means that the new `Harvester` cancels the previous reader and starts --- filebeat/input/filestream/environment_test.go | 114 +++++++-- filebeat/input/filestream/fswatch.go | 50 ++-- filebeat/input/filestream/identifier.go | 9 +- .../filestream/identifier_inode_deviceid.go | 1 + filebeat/input/filestream/input.go | 49 ++-- .../filestream/input_integration_test.go | 219 ++++++++++++++++++ .../internal/input-logfile/fswatch.go | 1 + .../internal/input-logfile/harvester.go | 49 +++- .../internal/input-logfile/harvester_test.go | 27 +++ .../internal/input-logfile/prospector.go | 3 + .../internal/input-logfile/publish.go | 6 +- .../internal/input-logfile/store.go | 34 ++- .../internal/input-logfile/store_test.go | 93 ++++++++ filebeat/input/filestream/prospector.go | 7 +- filebeat/input/filestream/prospector_test.go | 23 ++ 15 files changed, 624 insertions(+), 61 deletions(-) diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index ad15c5fefc7..f337bc06411 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -34,6 +34,7 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/acker" "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/statestore" @@ -56,7 +57,7 @@ type registryEntry struct { Cursor struct { Offset int `json:"offset"` } `json:"cursor"` - Meta interface{} `json:"meta"` + Meta interface{} `json:"meta,omitempty"` } func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment { @@ -223,6 +224,23 @@ func requireMetadataEquals(one, other fileMeta) bool { return one == other } +// waitUntilOffsetInRegistry waits for the expected offset is set for a file. +func (e *inputTestingEnvironment) waitUntilOffsetInRegistry(filename string, expectedOffset int) { + filepath := e.abspath(filename) + fi, err := os.Stat(filepath) + if err != nil { + e.t.Fatalf("cannot stat file when cheking for offset: %+v", err) + } + + id := getIDFromPath(filepath, fi) + entry, err := e.getRegistryState(id) + for err != nil || entry.Cursor.Offset != expectedOffset { + entry, err = e.getRegistryState(id) + } + + require.Equal(e.t, expectedOffset, entry.Cursor.Offset) +} + func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename string) { filepath := e.abspath(filename) fi, err := os.Stat(filepath) @@ -231,14 +249,12 @@ func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename string) { } inputStore, _ := e.stateStore.Access() - - identifier, _ := newINodeDeviceIdentifier(nil) - src := identifier.GetSource(loginp.FSEvent{Info: fi, Op: loginp.OpCreate, NewPath: filepath}) + id := getIDFromPath(filepath, fi) var entry registryEntry - err = inputStore.Get(src.Name(), &entry) + err = inputStore.Get(id, &entry) if err == nil { - e.t.Fatalf("key is not expected to be present '%s'", src.Name()) + e.t.Fatalf("key is not expected to be present '%s'", id) } } @@ -273,10 +289,7 @@ func getIDFromPath(filepath string, fi os.FileInfo) string { // waitUntilEventCount waits until total count events arrive to the client. func (e *inputTestingEnvironment) waitUntilEventCount(count int) { for { - sum := 0 - for _, c := range e.pipeline.clients { - sum += len(c.GetEvents()) - } + sum := len(e.pipeline.GetAllEvents()) if sum == count { return } @@ -355,10 +368,12 @@ func (s *testInputStore) CleanupInterval() time.Duration { } type mockClient struct { - publishes []beat.Event + publishing []beat.Event + published []beat.Event ackHandler beat.ACKer closed bool mtx sync.Mutex + canceler context.CancelFunc } // GetEvents returns the published events @@ -366,7 +381,7 @@ func (c *mockClient) GetEvents() []beat.Event { c.mtx.Lock() defer c.mtx.Unlock() - return c.publishes + return c.published } // Publish mocks the Client Publish method @@ -379,11 +394,21 @@ func (c *mockClient) PublishAll(events []beat.Event) { c.mtx.Lock() defer c.mtx.Unlock() + c.publishing = append(c.publishing, events...) for _, event := range events { - c.publishes = append(c.publishes, event) c.ackHandler.AddEvent(event, true) } c.ackHandler.ACKEvents(len(events)) + + for _, event := range events { + c.published = append(c.published, event) + } +} + +func (c *mockClient) waitUntilPublishingHasStarted() { + for len(c.publishing) == 0 { + time.Sleep(10 * time.Millisecond) + } } // Close mocks the Client Close method @@ -401,12 +426,16 @@ func (c *mockClient) Close() error { // mockPipelineConnector mocks the PipelineConnector interface type mockPipelineConnector struct { - clients []*mockClient - mtx sync.Mutex + blocking bool + clients []*mockClient + mtx sync.Mutex } // GetAllEvents returns all events associated with a pipeline func (pc *mockPipelineConnector) GetAllEvents() []beat.Event { + pc.mtx.Lock() + defer pc.mtx.Unlock() + var evList []beat.Event for _, clientEvents := range pc.clients { evList = append(evList, clientEvents.GetEvents()...) @@ -425,11 +454,64 @@ func (pc *mockPipelineConnector) ConnectWith(config beat.ClientConfig) (beat.Cli pc.mtx.Lock() defer pc.mtx.Unlock() + ctx, cancel := context.WithCancel(context.Background()) c := &mockClient{ - ackHandler: config.ACKHandler, + canceler: cancel, + ackHandler: newMockACKHandler(ctx, pc.blocking, config), } pc.clients = append(pc.clients, c) return c, nil + +} + +func (pc *mockPipelineConnector) cancelAllClients() { + pc.mtx.Lock() + defer pc.mtx.Unlock() + + for _, client := range pc.clients { + client.canceler() + } +} + +func (pc *mockPipelineConnector) cancelClient(i int) { + pc.mtx.Lock() + defer pc.mtx.Unlock() + + if len(pc.clients) < i+1 { + return + } + + pc.clients[i].canceler() +} + +func newMockACKHandler(starter context.Context, blocking bool, config beat.ClientConfig) beat.ACKer { + if !blocking { + return config.ACKHandler + } + + return acker.Combine(blockingACKer(starter), config.ACKHandler) + +} + +func blockingACKer(starter context.Context) beat.ACKer { + return acker.EventPrivateReporter(func(acked int, private []interface{}) { + for starter.Err() == nil { + } + }) +} + +func (pc *mockPipelineConnector) clientsCount() int { + pc.mtx.Lock() + defer pc.mtx.Unlock() + + return len(pc.clients) +} + +func (pc *mockPipelineConnector) invertBlocking() { + pc.mtx.Lock() + defer pc.mtx.Unlock() + + pc.blocking = !pc.blocking } diff --git a/filebeat/input/filestream/fswatch.go b/filebeat/input/filestream/fswatch.go index 0b1e5df46f4..19614063db8 100644 --- a/filebeat/input/filestream/fswatch.go +++ b/filebeat/input/filestream/fswatch.go @@ -59,6 +59,9 @@ type fileScanner struct { type fileWatcherConfig struct { // Interval is the time between two scans. Interval time.Duration `config:"check_interval"` + // ResendOnModTime if a file has been changed according to modtime but the size is the same + // it is still considered truncation. + ResendOnModTime bool `config:"resend_on_touch"` // Scanner is the configuration of the scanner. Scanner fileScannerConfig `config:",inline"` } @@ -66,11 +69,12 @@ type fileWatcherConfig struct { // fileWatcher gets the list of files from a FSWatcher and creates events by // comparing the files between its last two runs. type fileWatcher struct { - interval time.Duration - prev map[string]os.FileInfo - scanner loginp.FSScanner - log *logp.Logger - events chan loginp.FSEvent + interval time.Duration + resendOnModTime bool + prev map[string]os.FileInfo + scanner loginp.FSScanner + log *logp.Logger + events chan loginp.FSEvent } func newFileWatcher(paths []string, ns *common.ConfigNamespace) (loginp.FSWatcher, error) { @@ -98,18 +102,20 @@ func newScannerWatcher(paths []string, c *common.Config) (loginp.FSWatcher, erro return nil, err } return &fileWatcher{ - log: logp.NewLogger(watcherDebugKey), - interval: config.Interval, - prev: make(map[string]os.FileInfo, 0), - scanner: scanner, - events: make(chan loginp.FSEvent), + log: logp.NewLogger(watcherDebugKey), + interval: config.Interval, + resendOnModTime: config.ResendOnModTime, + prev: make(map[string]os.FileInfo, 0), + scanner: scanner, + events: make(chan loginp.FSEvent), }, nil } func defaultFileWatcherConfig() fileWatcherConfig { return fileWatcherConfig{ - Interval: 10 * time.Second, - Scanner: defaultFileScannerConfig(), + Interval: 10 * time.Second, + ResendOnModTime: false, + Scanner: defaultFileScannerConfig(), } } @@ -142,10 +148,18 @@ func (w *fileWatcher) watch(ctx unison.Canceler) { } if prevInfo.ModTime() != info.ModTime() { - select { - case <-ctx.Done(): - return - case w.events <- writeEvent(path, info): + if prevInfo.Size() > info.Size() || w.resendOnModTime && prevInfo.Size() == info.Size() { + select { + case <-ctx.Done(): + return + case w.events <- truncateEvent(path, info): + } + } else { + select { + case <-ctx.Done(): + return + case w.events <- writeEvent(path, info): + } } } @@ -198,6 +212,10 @@ func writeEvent(path string, fi os.FileInfo) loginp.FSEvent { return loginp.FSEvent{Op: loginp.OpWrite, OldPath: path, NewPath: path, Info: fi} } +func truncateEvent(path string, fi os.FileInfo) loginp.FSEvent { + return loginp.FSEvent{Op: loginp.OpTruncate, OldPath: path, NewPath: path, Info: fi} +} + func renamedEvent(oldPath, path string, fi os.FileInfo) loginp.FSEvent { return loginp.FSEvent{Op: loginp.OpRename, OldPath: oldPath, NewPath: path, Info: fi} } diff --git a/filebeat/input/filestream/identifier.go b/filebeat/input/filestream/identifier.go index 331b9cedc9f..bde88aa03fe 100644 --- a/filebeat/input/filestream/identifier.go +++ b/filebeat/input/filestream/identifier.go @@ -60,9 +60,10 @@ type fileIdentifier interface { // fileSource implements the Source interface // It is required to identify and manage file sources. type fileSource struct { - info os.FileInfo - newPath string - oldPath string + info os.FileInfo + newPath string + oldPath string + truncated bool name string identifierGenerator string @@ -103,6 +104,7 @@ func (i *inodeDeviceIdentifier) GetSource(e loginp.FSEvent) fileSource { info: e.Info, newPath: e.NewPath, oldPath: e.OldPath, + truncated: e.Op == loginp.OpTruncate, name: i.name + identitySep + file.GetOSState(e.Info).String(), identifierGenerator: i.name, } @@ -140,6 +142,7 @@ func (p *pathIdentifier) GetSource(e loginp.FSEvent) fileSource { info: e.Info, newPath: e.NewPath, oldPath: e.OldPath, + truncated: e.Op == loginp.OpTruncate, name: p.name + identitySep + path, identifierGenerator: p.name, } diff --git a/filebeat/input/filestream/identifier_inode_deviceid.go b/filebeat/input/filestream/identifier_inode_deviceid.go index 459ae90348b..fb87708dd18 100644 --- a/filebeat/input/filestream/identifier_inode_deviceid.go +++ b/filebeat/input/filestream/identifier_inode_deviceid.go @@ -98,6 +98,7 @@ func (i *inodeMarkerIdentifier) GetSource(e loginp.FSEvent) fileSource { info: e.Info, newPath: e.NewPath, oldPath: e.OldPath, + truncated: e.Op == loginp.OpTruncate, name: i.name + identitySep + osstate.InodeString() + "-" + i.markerContents(), identifierGenerator: i.name, } diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index b63f28ff7e6..1a093602fb8 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -29,6 +29,7 @@ import ( input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/cleanup" "github.com/elastic/beats/v7/libbeat/common/match" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/logp" @@ -162,7 +163,7 @@ func (inp *filestream) Run( func initState(log *logp.Logger, c loginp.Cursor, s fileSource) state { var state state - if c.IsNew() { + if c.IsNew() || s.truncated { return state } @@ -175,7 +176,7 @@ func initState(log *logp.Logger, c loginp.Cursor, s fileSource) state { } func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path string, offset int64) (reader.Reader, error) { - f, err := inp.openFile(path, offset) + f, err := inp.openFile(log, path, offset) if err != nil { return nil, err } @@ -224,20 +225,41 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri // or the file cannot be opened because for example of failing read permissions, an error // is returned and the harvester is closed. The file will be picked up again the next time // the file system is scanned -func (inp *filestream) openFile(path string, offset int64) (*os.File, error) { - err := inp.checkFileBeforeOpening(path) +func (inp *filestream) openFile(log *logp.Logger, path string, offset int64) (*os.File, error) { + fi, err := os.Stat(path) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to stat source file %s: %s", path, err) + } + + // it must be checked if the file is not a named pipe before we try to open it + // if it is a named pipe os.OpenFile fails, so there is no need to try opening it. + if fi.Mode()&os.ModeNamedPipe != 0 { + return nil, fmt.Errorf("failed to open file %s, named pipes are not supported", fi.Name()) } + ok := false f, err := os.OpenFile(path, os.O_RDONLY, os.FileMode(0)) if err != nil { return nil, fmt.Errorf("failed opening %s: %s", path, err) } + defer cleanup.IfNot(&ok, cleanup.IgnoreError(f.Close)) + fi, err = f.Stat() + if err != nil { + return nil, fmt.Errorf("failed to stat source file %s: %s", path, err) + } + + err = checkFileBeforeOpening(fi) + if err != nil { + return nil, err + } + + if fi.Size() < offset { + log.Infof("File was truncated. Reading file from offset 0. Path=%s", path) + offset = 0 + } err = inp.initFileOffset(f, offset) if err != nil { - f.Close() return nil, err } @@ -249,24 +271,16 @@ func (inp *filestream) openFile(path string, offset int64) (*os.File, error) { } return nil, fmt.Errorf("initialising encoding for '%v' failed: %v", f, err) } + ok = true return f, nil } -func (inp *filestream) checkFileBeforeOpening(path string) error { - fi, err := os.Stat(path) - if err != nil { - return fmt.Errorf("failed to stat source file %s: %v", path, err) - } - +func checkFileBeforeOpening(fi os.FileInfo) error { if !fi.Mode().IsRegular() { return fmt.Errorf("tried to open non regular file: %q %s", fi.Mode(), fi.Name()) } - if fi.Mode()&os.ModeNamedPipe != 0 { - return fmt.Errorf("failed to open file %s, named pipes are not supported", path) - } - return nil } @@ -294,8 +308,7 @@ func (inp *filestream) readFromSource( if err != nil { switch err { case ErrFileTruncate: - log.Info("File was truncated. Begin reading file from offset 0.") - s.Offset = 0 + log.Infof("File was truncated. Begin reading file from offset 0. Path=%s", path) case ErrClosed: log.Info("Reader was closed. Closing.") default: diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go index 67436446cde..c768abfa32b 100644 --- a/filebeat/input/filestream/input_integration_test.go +++ b/filebeat/input/filestream/input_integration_test.go @@ -25,6 +25,7 @@ import ( "os" "runtime" "testing" + "time" "github.com/stretchr/testify/require" "golang.org/x/text/encoding" @@ -517,3 +518,221 @@ func TestFilestreamCloseAfterIntervalRotatedAndNewRemoved(t *testing.T) { cancelInput() env.waitUntilInputStops() } + +// test_truncated_file_open from test_harvester.py +func TestFilestreamTruncatedFileOpen(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.resend_on_touch": "true", + }) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + testlines := []byte("first line\nsecond line\nthird line\n") + env.mustWriteLinesToFile(testlogName, testlines) + + env.waitUntilEventCount(3) + env.requireOffsetInRegistry(testlogName, len(testlines)) + + env.mustTruncateFile(testlogName, 0) + time.Sleep(5 * time.Millisecond) + + truncatedTestLines := []byte("truncated first line\n") + env.mustWriteLinesToFile(testlogName, truncatedTestLines) + env.waitUntilEventCount(4) + + cancelInput() + env.waitUntilInputStops() + env.requireOffsetInRegistry(testlogName, len(truncatedTestLines)) +} + +// test_truncated_file_closed from test_harvester.py +func TestFilestreamTruncatedFileClosed(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.resend_on_touch": "true", + "close.reader.on_eof": "true", + }) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + testlines := []byte("first line\nsecond line\nthird line\n") + env.mustWriteLinesToFile(testlogName, testlines) + + env.waitUntilEventCount(3) + env.requireOffsetInRegistry(testlogName, len(testlines)) + + env.waitUntilHarvesterIsDone() + + env.mustTruncateFile(testlogName, 0) + time.Sleep(5 * time.Millisecond) + + truncatedTestLines := []byte("truncated first line\n") + env.mustWriteLinesToFile(testlogName, truncatedTestLines) + env.waitUntilEventCount(4) + + cancelInput() + env.waitUntilInputStops() + env.requireOffsetInRegistry(testlogName, len(truncatedTestLines)) +} + +// test_truncate from test_harvester.py +func TestFilestreamTruncateWithSymlink(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + symlinkName := "test.log.symlink" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{ + env.abspath(testlogName), + env.abspath(symlinkName), + }, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.resend_on_touch": "true", + "prospector.scanner.symlinks": "true", + }) + + lines := []byte("first line\nsecond line\nthird line\n") + env.mustWriteLinesToFile(testlogName, lines) + + env.mustSymlink(testlogName, symlinkName) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(3) + + env.requireOffsetInRegistry(testlogName, len(lines)) + + // remove symlink + env.mustRemoveFile(symlinkName) + env.mustTruncateFile(testlogName, 0) + env.waitUntilOffsetInRegistry(testlogName, 0) + + moreLines := []byte("forth line\nfifth line\n") + env.mustWriteLinesToFile(testlogName, moreLines) + + env.waitUntilEventCount(5) + env.requireOffsetInRegistry(testlogName, len(moreLines)) + + cancelInput() + env.waitUntilInputStops() + + env.requireRegistryEntryCount(1) +} + +func TestFilestreamTruncateBigScannerInterval(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "5s", + "prospector.scanner.resend_on_touch": "true", + }) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + testlines := []byte("first line\nsecond line\nthird line\n") + env.mustWriteLinesToFile(testlogName, testlines) + + env.waitUntilEventCount(3) + env.requireOffsetInRegistry(testlogName, len(testlines)) + + env.mustTruncateFile(testlogName, 0) + + truncatedTestLines := []byte("truncated first line\n") + env.mustWriteLinesToFile(testlogName, truncatedTestLines) + + env.waitUntilEventCount(3) + + cancelInput() + env.waitUntilInputStops() +} + +func TestFilestreamTruncateCheckOffset(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.resend_on_touch": "true", + }) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + testlines := []byte("first line\nsecond line\nthird line\n") + env.mustWriteLinesToFile(testlogName, testlines) + + env.waitUntilEventCount(3) + env.requireOffsetInRegistry(testlogName, len(testlines)) + + env.mustTruncateFile(testlogName, 0) + + env.waitUntilOffsetInRegistry(testlogName, 0) + + cancelInput() + env.waitUntilInputStops() +} + +func TestFilestreamTruncateBlockedOutput(t *testing.T) { + env := newInputTestingEnvironment(t) + env.pipeline = &mockPipelineConnector{blocking: true} + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.resend_on_touch": "true", + }) + + testlines := []byte("first line\nsecond line\n") + env.mustWriteLinesToFile(testlogName, testlines) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + for env.pipeline.clientsCount() != 1 { + time.Sleep(10 * time.Millisecond) + } + env.pipeline.clients[0].waitUntilPublishingHasStarted() + env.pipeline.clients[0].canceler() + + env.waitUntilEventCount(2) + env.requireOffsetInRegistry(testlogName, len(testlines)) + + // extra lines are appended after first line is processed + // so it can interfere with the truncation of the file + env.mustAppendLinesToFile(testlogName, []byte("third line\n")) + + env.mustTruncateFile(testlogName, 0) + + env.waitUntilOffsetInRegistry(testlogName, 0) + + // all newly started client has to be cancelled so events can be processed + env.pipeline.cancelAllClients() + // if a new client shows up, it should not block + env.pipeline.invertBlocking() + + truncatedTestLines := []byte("truncated line\n") + env.mustWriteLinesToFile(testlogName, truncatedTestLines) + + env.waitUntilEventCount(3) + env.waitUntilOffsetInRegistry(testlogName, len(truncatedTestLines)) + + cancelInput() + env.waitUntilInputStops() +} diff --git a/filebeat/input/filestream/internal/input-logfile/fswatch.go b/filebeat/input/filestream/internal/input-logfile/fswatch.go index eb080bad292..56235e6c4bc 100644 --- a/filebeat/input/filestream/internal/input-logfile/fswatch.go +++ b/filebeat/input/filestream/internal/input-logfile/fswatch.go @@ -29,6 +29,7 @@ const ( OpWrite OpDelete OpRename + OpTruncate ) // Operation describes what happened to a file. diff --git a/filebeat/input/filestream/internal/input-logfile/harvester.go b/filebeat/input/filestream/internal/input-logfile/harvester.go index 72d0c27e4c8..00b14bc498a 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester.go @@ -117,6 +117,8 @@ func (r *readerGroup) hasID(id string) bool { type HarvesterGroup interface { // Start starts a Harvester and adds it to the readers list. Start(input.Context, Source) + // Restart starts a Harvester if it might be already running. + Restart(input.Context, Source) // Stop cancels the reader of a given Source. Stop(Source) // StopGroup cancels all running Harvesters. @@ -139,25 +141,48 @@ func (hg *defaultHarvesterGroup) Start(ctx input.Context, s Source) { ctx.Logger = ctx.Logger.With("source", sourceName) ctx.Logger.Debug("Starting harvester for file") - hg.tg.Go(func(canceler unison.Canceler) error { + hg.tg.Go(startHarvester(ctx, hg, s, false)) +} + +// Restart starts the Harvester for a Source if a Harvester is already running it waits for it +// to shut down for a specified timeout. It does not block. +func (hg *defaultHarvesterGroup) Restart(ctx input.Context, s Source) { + sourceName := hg.identifier.ID(s) + + ctx.Logger = ctx.Logger.With("source", sourceName) + ctx.Logger.Debug("Restarting harvester for file") + + hg.tg.Go(startHarvester(ctx, hg, s, true)) +} + +func startHarvester(ctx input.Context, hg *defaultHarvesterGroup, s Source, restart bool) func(canceler unison.Canceler) error { + srcID := hg.identifier.ID(s) + + return func(canceler unison.Canceler) error { defer func() { if v := recover(); v != nil { err := fmt.Errorf("harvester panic with: %+v\n%s", v, debug.Stack()) ctx.Logger.Errorf("Harvester crashed with: %+v", err) + hg.readers.remove(srcID) } }() + + if restart { + // stop previous harvester + hg.readers.remove(srcID) + } defer ctx.Logger.Debug("Stopped harvester for file") - harvesterCtx, cancelHarvester, err := hg.readers.newContext(sourceName, canceler) + harvesterCtx, cancelHarvester, err := hg.readers.newContext(srcID, canceler) if err != nil { return fmt.Errorf("error while adding new reader to the bookkeeper %v", err) } ctx.Cancelation = harvesterCtx defer cancelHarvester() - defer hg.readers.remove(sourceName) - resource, err := lock(ctx, hg.store, sourceName) + resource, err := lock(ctx, hg.store, srcID) if err != nil { + hg.readers.remove(srcID) return fmt.Errorf("error while locking resource: %v", err) } defer releaseResource(resource) @@ -167,6 +192,7 @@ func (hg *defaultHarvesterGroup) Start(ctx input.Context, s Source) { ACKHandler: newInputACKHandler(ctx.Logger), }) if err != nil { + hg.readers.remove(srcID) return fmt.Errorf("error while connecting to output with pipeline: %v", err) } defer client.Close() @@ -177,10 +203,18 @@ func (hg *defaultHarvesterGroup) Start(ctx input.Context, s Source) { err = hg.harvester.Run(ctx, s, cursor, publisher) if err != nil && err != context.Canceled { + hg.readers.remove(srcID) return fmt.Errorf("error while running harvester: %v", err) } + // If the context was not cancelled it means that the Harvester is stopping because of + // some internal decision, not due to outside interaction. + // If it is stopping itself, it must clean up the bookkeeper. + if ctx.Cancelation.Err() != context.Canceled { + hg.readers.remove(srcID) + } + return nil - }) + } } // Stop stops the running Harvester for a given Source. @@ -205,6 +239,11 @@ func lock(ctx input.Context, store *store, key string) (*resource, error) { resource.Release() return nil, err } + + resource.stateMutex.Lock() + resource.lockedVersion = resource.version + resource.stateMutex.Unlock() + return resource, nil } diff --git a/filebeat/input/filestream/internal/input-logfile/harvester_test.go b/filebeat/input/filestream/internal/input-logfile/harvester_test.go index 9425c30be4b..079291ad4a4 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester_test.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester_test.go @@ -271,6 +271,33 @@ func TestDefaultHarvesterGroup(t *testing.T) { require.Equal(t, 0, mockHarvester.getRunCount()) }) + + t.Run("assert harvester can be restarted", func(t *testing.T) { + var wg sync.WaitGroup + mockHarvester := &mockHarvester{onRun: blockUntilCancelOnRun, wg: &wg} + hg := testDefaultHarvesterGroup(t, mockHarvester) + inputCtx := input.Context{Logger: logp.L(), Cancelation: context.Background()} + + gorountineChecker := resources.NewGoroutinesChecker() + defer gorountineChecker.WaitUntilOriginalCount() + + wg.Add(2) + hg.Start(inputCtx, source) + hasRun := mockHarvester.getRunCount() + for hasRun == 0 { + hasRun = mockHarvester.getRunCount() + } + hg.Restart(inputCtx, source) + + for hasRun != 2 { + hasRun = mockHarvester.getRunCount() + } + require.NoError(t, hg.StopGroup()) + + wg.Wait() + + require.Equal(t, 2, mockHarvester.getRunCount()) + }) } func testDefaultHarvesterGroup(t *testing.T, mockHarvester Harvester) *defaultHarvesterGroup { diff --git a/filebeat/input/filestream/internal/input-logfile/prospector.go b/filebeat/input/filestream/internal/input-logfile/prospector.go index fbd1dd5906c..7414176b0ec 100644 --- a/filebeat/input/filestream/internal/input-logfile/prospector.go +++ b/filebeat/input/filestream/internal/input-logfile/prospector.go @@ -43,6 +43,9 @@ type StateMetadataUpdater interface { UpdateMetadata(s Source, v interface{}) error // Remove marks a state for deletion of a given Source. Remove(s Source) error + // ResetCursor resets the cursor in the registry and drops previous state + // updates that are not yet ACKed. + ResetCursor(s Source, cur interface{}) error } // ProspectorCleaner cleans the state store before it starts running. diff --git a/filebeat/input/filestream/internal/input-logfile/publish.go b/filebeat/input/filestream/internal/input-logfile/publish.go index 547a82c479f..ddc389321b1 100644 --- a/filebeat/input/filestream/internal/input-logfile/publish.go +++ b/filebeat/input/filestream/internal/input-logfile/publish.go @@ -124,11 +124,15 @@ func (op *updateOp) done(n uint) { // Execute updates the persistent store with the scheduled changes and releases the resource. func (op *updateOp) Execute(n uint) { resource := op.resource - defer op.done(n) resource.stateMutex.Lock() defer resource.stateMutex.Unlock() + if resource.lockedVersion != op.resource.version { + return + } + + defer op.done(n) resource.activeCursorOperations -= n if resource.activeCursorOperations == 0 { resource.cursor = resource.pendingCursor diff --git a/filebeat/input/filestream/internal/input-logfile/store.go b/filebeat/input/filestream/internal/input-logfile/store.go index 6cd0028c61c..fe149f59d77 100644 --- a/filebeat/input/filestream/internal/input-logfile/store.go +++ b/filebeat/input/filestream/internal/input-logfile/store.go @@ -75,6 +75,10 @@ type resource struct { // as long as pending is > 0 the resource is in used and must not be garbage collected. pending atomic.Uint64 + // current identity version when updated stateMutex must be locked. + // Pending updates will be discarded if it is increased. + version, lockedVersion uint + // lock guarantees only one input create updates for this entry lock unison.Mutex @@ -84,7 +88,7 @@ type resource struct { // stateMutex is used to lock the resource when it is update/read from // multiple go-routines like the ACK handler or the input publishing an // event. - // stateMutex is used to access the fields 'stored', 'state' and 'internalInSync' + // stateMutex is used to access the fields 'stored', 'state', 'internalInSync' and 'version'. stateMutex sync.Mutex // stored indicates that the state is available in the registry file. It is false for new entries. @@ -181,6 +185,11 @@ func (s *sourceStore) Remove(src Source) error { return s.store.remove(key) } +func (s *sourceStore) ResetCursor(src Source, cur interface{}) error { + key := s.identifier.ID(src) + return s.store.resetCursor(key, cur) +} + // CleanIf sets the TTL of a resource if the predicate return true. func (s *sourceStore) CleanIf(pred func(v Value) bool) { s.store.ephemeralStore.mu.Lock() @@ -294,6 +303,29 @@ func (s *store) writeState(r *resource) { } } +// resetCursor sets the cursor to the value in cur in the persistent store and +// drops all pending cursor operations. +func (s *store) resetCursor(key string, cur interface{}) error { + r := s.ephemeralStore.Find(key, false) + if r == nil { + return fmt.Errorf("resource '%s' not found", key) + } + defer r.Release() + + r.stateMutex.Lock() + defer r.stateMutex.Unlock() + + r.version++ + r.UpdatesReleaseN(r.activeCursorOperations) + r.activeCursorOperations = 0 + r.pendingCursor = nil + typeconv.Convert(&r.cursor, cur) + + s.writeState(r) + + return nil +} + // Removes marks an entry for removal by setting its TTL to zero. func (s *store) remove(key string) error { resource := s.ephemeralStore.Find(key, false) diff --git a/filebeat/input/filestream/internal/input-logfile/store_test.go b/filebeat/input/filestream/internal/input-logfile/store_test.go index d55162bd6c4..0f5a2372a2d 100644 --- a/filebeat/input/filestream/internal/input-logfile/store_test.go +++ b/filebeat/input/filestream/internal/input-logfile/store_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/statestore/storetest" @@ -227,6 +228,98 @@ func TestStore_UpdateTTL(t *testing.T) { }) } +func TestStore_ResetCursor(t *testing.T) { + type cur struct { + Offset int + } + t.Run("reset cursor empty and lock it", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + "test::key": { + TTL: 60 * time.Second, + }, + })) + defer store.Release() + + res := store.Get("test::key") + require.Equal(t, uint(0), res.version) + require.Equal(t, uint(0), res.lockedVersion) + require.Equal(t, nil, res.cursor) + require.Equal(t, nil, res.pendingCursor) + + store.resetCursor("test::key", cur{Offset: 10}) + + res = store.Get("test::key") + require.Equal(t, uint(1), res.version) + require.Equal(t, uint(0), res.lockedVersion) + require.Equal(t, map[string]interface{}{"offset": int64(10)}, res.cursor) + + res, err := lock(input.Context{}, store, "test::key") + require.NoError(t, err) + require.Equal(t, uint(1), res.version) + require.Equal(t, uint(1), res.lockedVersion) + }) + + t.Run("reset cursor with no pending updates", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + "test::key": { + TTL: 60 * time.Second, + Cursor: cur{Offset: 6}, + }, + })) + defer store.Release() + + res := store.Get("test::key") + require.Equal(t, uint(0), res.version) + require.Equal(t, uint(0), res.lockedVersion) + require.Equal(t, map[string]interface{}{"offset": int64(6)}, res.cursor) + require.Equal(t, nil, res.pendingCursor) + + store.resetCursor("test::key", cur{Offset: 0}) + + res = store.Get("test::key") + require.Equal(t, uint(1), res.version) + require.Equal(t, uint(0), res.lockedVersion) + require.Equal(t, map[string]interface{}{"offset": int64(0)}, res.cursor) + + res, err := lock(input.Context{}, store, "test::key") + require.NoError(t, err) + require.Equal(t, uint(1), res.version) + require.Equal(t, uint(1), res.lockedVersion) + }) + + t.Run("reset cursor with pending updates", func(t *testing.T) { + store := testOpenStore(t, "test", createSampleStore(t, map[string]state{ + "test::key": { + TTL: 60 * time.Second, + Cursor: cur{Offset: 6}, + }, + })) + defer store.Release() + + res := store.Get("test::key") + + // lock before creating a new update operation + res, err := lock(input.Context{}, store, "test::key") + require.NoError(t, err) + op, err := createUpdateOp(store, res, cur{Offset: 42}) + require.NoError(t, err) + + store.resetCursor("test::key", cur{Offset: 0}) + + // try to update cursor after it has been reset + op.Execute(1) + releaseResource(res) + + res = store.Get("test::key") + require.Equal(t, uint(1), res.version) + require.Equal(t, uint(0), res.lockedVersion) + require.Equal(t, uint(0), res.activeCursorOperations) + require.Equal(t, map[string]interface{}{"offset": int64(0)}, res.cursor) + require.Equal(t, nil, res.pendingCursor) + + }) +} + type testMeta struct { IdentifierName string } diff --git a/filebeat/input/filestream/prospector.go b/filebeat/input/filestream/prospector.go index f08e4346c74..322004421bf 100644 --- a/filebeat/input/filestream/prospector.go +++ b/filebeat/input/filestream/prospector.go @@ -130,9 +130,14 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h break } } - hg.Start(ctx, src) + case loginp.OpTruncate: + log.Debugf("File %s has been truncated", fe.NewPath) + + s.ResetCursor(src, state{Offset: 0}) + hg.Restart(ctx, src) + case loginp.OpDelete: log.Debugf("File %s has been removed", fe.OldPath) diff --git a/filebeat/input/filestream/prospector_test.go b/filebeat/input/filestream/prospector_test.go index d22f7c3538a..ffcdbcf31f9 100644 --- a/filebeat/input/filestream/prospector_test.go +++ b/filebeat/input/filestream/prospector_test.go @@ -188,6 +188,17 @@ func TestProspectorNewAndUpdatedFiles(t *testing.T) { harvesterGroupStop{}, }, }, + "one updated then truncated file": { + events: []loginp.FSEvent{ + loginp.FSEvent{Op: loginp.OpWrite, NewPath: "/path/to/file"}, + loginp.FSEvent{Op: loginp.OpTruncate, NewPath: "/path/to/file"}, + }, + expectedEvents: []harvesterEvent{ + harvesterStart("path::/path/to/file"), + harvesterRestart("path::/path/to/file"), + harvesterGroupStop{}, + }, + }, "old files with ignore older configured": { events: []loginp.FSEvent{ loginp.FSEvent{ @@ -381,6 +392,10 @@ type harvesterStart string func (h harvesterStart) String() string { return string(h) } +type harvesterRestart string + +func (h harvesterRestart) String() string { return string(h) } + type harvesterStop string func (h harvesterStop) String() string { return string(h) } @@ -401,6 +416,10 @@ func (t *testHarvesterGroup) Start(_ input.Context, s loginp.Source) { t.events = append(t.events, harvesterStart(s.Name())) } +func (t *testHarvesterGroup) Restart(_ input.Context, s loginp.Source) { + t.events = append(t.events, harvesterRestart(s.Name())) +} + func (t *testHarvesterGroup) Stop(s loginp.Source) { t.events = append(t.events, harvesterStop(s.Name())) } @@ -454,6 +473,10 @@ func (mu *mockMetadataUpdater) FindCursorMeta(s loginp.Source, v interface{}) er return nil } +func (mu *mockMetadataUpdater) ResetCursor(s loginp.Source, cur interface{}) error { + return nil +} + func (mu *mockMetadataUpdater) UpdateMetadata(s loginp.Source, v interface{}) error { mu.table[s.Name()] = v return nil