From 1f5198e0c3a21fa88082bd3dd8cc068f1367f058 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Thu, 24 Jun 2021 15:56:45 +0200 Subject: [PATCH] Add support for copytruncate method when rotating input logs with an external tool in `filestream` input (#23457) ## What does this PR do? The PR makes the `filestream` log rotation aware to make sure Filebeat can cooperate better with external log rotation tools. The first supported strategy is `copytruncate`. When `logrotate` rotates e.g. `boot.log` with `copytruncate` the following things happen: 1. all archived files are renamed e.g. `boot.log.2` is renamed `boot.log.3` until `boot.log.1` no longer exists 2. `boot.log` is copied to `boot.log.1` 3. `boot.log` is truncated You can see my tests on my machine: Before rotation: ``` root@sleipnir:/home/n# ls -lisaht /var/log/boot.log* 130476 30K -rw------- 1 root root 28K Jan 29 08:59 /var/log/boot.log 130577 36K -rw------- 1 root root 34K Jan 29 08:59 /var/log/boot.log.1 130657 60K -rw------- 1 root root 57K Jan 7 09:51 /var/log/boot.log.2 ``` After rotation: ``` root@sleipnir:/home/n# ls -lisaht /var/log/boot.log* 130476 0 -rw------- 1 root root 0 May 25 12:41 /var/log/boot.log 130430 30K -rw------- 1 root root 28K May 25 12:41 /var/log/boot.log.1 130577 36K -rw------- 1 root root 34K Jan 29 08:59 /var/log/boot.log.2 130657 60K -rw------- 1 root root 57K Jan 7 09:51 /var/log/boot.log.3 ``` On rotation, the active file is continued and archived files are kept open until EOF is reached. ### Configuration ```yaml rotation.external.strategy.copytruncate: suffix_regex: \.\d$ count: 10 ``` Note: when Filebeat will be able to rotate input logs, its configuration will be under `rotation.internal.*`. ## Why is it important? Previously, Filebeat was not able to cooperate with external log rotation tools that used `copytruncate` method. --- CHANGELOG.next.asciidoc | 2 + .../config/filebeat.inputs.reference.yml.tmpl | 10 + .../input-filestream-file-options.asciidoc | 53 +++ filebeat/filebeat.reference.yml | 10 + filebeat/input/filestream/config.go | 12 + .../filestream/copytruncate_prospector.go | 361 ++++++++++++++++++ .../copytruncate_prospector_test.go | 276 +++++++++++++ filebeat/input/filestream/fswatch_test.go | 47 +-- filebeat/input/filestream/identifier.go | 3 + .../filestream/identifier_inode_deviceid.go | 1 + filebeat/input/filestream/input.go | 41 +- .../internal/input-logfile/fswatch.go | 21 + .../internal/input-logfile/harvester.go | 32 ++ .../internal/input-logfile/publish.go | 2 +- .../internal/input-logfile/store.go | 52 ++- filebeat/input/filestream/logger.go | 41 ++ filebeat/input/filestream/prospector.go | 131 ++++--- .../input/filestream/prospector_creator.go | 106 +++++ filebeat/input/filestream/prospector_test.go | 33 +- x-pack/filebeat/filebeat.reference.yml | 10 + 20 files changed, 1128 insertions(+), 116 deletions(-) create mode 100644 filebeat/input/filestream/copytruncate_prospector.go create mode 100644 filebeat/input/filestream/copytruncate_prospector_test.go create mode 100644 filebeat/input/filestream/logger.go create mode 100644 filebeat/input/filestream/prospector_creator.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1f158e37531..4653100fedd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -828,6 +828,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Update grok patterns for HA Proxy module {issue}25827[25827] {pull}25835[25835] - Added dataset `anomalithreatstream` to the `threatintel` module to ingest indicators from Anomali ThreatStream {pull}26350[26350] +- Add support for `copytruncate` method when rotating input logs with an external tool in `filestream` input. {pull}23457[23457] + *Heartbeat* - Add mime type detection for http responses. {pull}22976[22976] diff --git a/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl b/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl index 19176bfc39e..211292b9432 100644 --- a/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl +++ b/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl @@ -292,6 +292,16 @@ filebeat.inputs: # original for harvesting but will report the symlink name as source. #prospector.scanner.symlinks: false + ### Log rotation + + # When an external tool rotates the input files with copytruncate strategy + # use this section to help the input find the rotated files. + #rotation.external.strategy.copytruncate: + # Regex that matches the rotated files. + # suffix_regex: \.\d$ + # If the rotated filename suffix is a datetime, set it here. + # dateformat: -20060102 + ### State options # Files for the modification data is older then clean_inactive the state from the registry is removed diff --git a/filebeat/docs/inputs/input-filestream-file-options.asciidoc b/filebeat/docs/inputs/input-filestream-file-options.asciidoc index 3beb1f7fa98..4de04cc9d28 100644 --- a/filebeat/docs/inputs/input-filestream-file-options.asciidoc +++ b/filebeat/docs/inputs/input-filestream-file-options.asciidoc @@ -482,3 +482,56 @@ Set the location of the marker file the following way: ---- file_identity.inode_marker.path: /logs/.filebeat-marker ---- + +=== Log rotation + +As log files are constantly written, they must be rotated and purged to prevent +the logger application from filling up the disk. Rotation is done by an external +application, thus, {beatname_uc} needs information how to cooperate with it. + +When reading from rotating files make sure the paths configuration includes +both the active file and all rotated files. + +By default, {beatname_uc} is able to track files correctly in the following strategies: +* create: new active file with a unique name is created on rotation +* rename: rotated files are renamed + +However, in case of copytruncate strategy, you should provide additional configuration +to {beatname_uc}. + +[float] +==== rotation.external.strategy.copytruncate + +experimental[] + +If the log rotating application copies the contents of the active file and then +truncates the original file, use these options to help {beatname_uc} to read files +correctly. + +Set the option `suffix_regex` so {beatname_uc} can tell active and rotated files apart. There are +two supported suffix types in the input: numberic and date. + +==== Numeric suffix + +If your rotated files have an incrementing index appended to the end of the filename, e.g. +active file `apache.log` and the rotated files are named `apache.log.1`, `apache.log.2`, etc, +use the following configuration. + +[source,yaml] +--- +rotation.external.strategy.copytruncate: + suffix_regex: \.\d$ +--- + +==== Date suffix + +If the rotation date is appended to the end of the filename, e.g. active file `apache.log` and the +rotated files are named `apache.log-20210526`, `apache.log-20210527`, etc. use the following configuration: + +[source,yaml] +--- +rotation.external.strategy.copytruncate: + suffix_regex: \-\d{6}$ + dateformat: -20060102 +--- + diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 51d42a93f2a..de1917041bd 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -699,6 +699,16 @@ filebeat.inputs: # original for harvesting but will report the symlink name as source. #prospector.scanner.symlinks: false + ### Log rotation + + # When an external tool rotates the input files with copytruncate strategy + # use this section to help the input find the rotated files. + #rotation.external.strategy.copytruncate: + # Regex that matches the rotated files. + # suffix_regex: \.\d$ + # If the rotated filename suffix is a datetime, set it here. + # dateformat: -20060102 + ### State options # Files for the modification data is older then clean_inactive the state from the registry is removed diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index cf41c97d080..007da10a045 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -41,6 +41,7 @@ type config struct { HarvesterLimit uint32 `config:"harvester_limit" validate:"min=0"` IgnoreOlder time.Duration `config:"ignore_older"` IgnoreInactive ignoreInactiveType `config:"ignore_inactive"` + Rotation *common.ConfigNamespace `config:"rotation"` } type closerConfig struct { @@ -78,6 +79,17 @@ type backoffConfig struct { Max time.Duration `config:"max" validate:"nonzero"` } +type rotationConfig struct { + Strategy *common.ConfigNamespace `config:"strategy" validate:"required"` +} + +type commonRotationConfig struct { + SuffixRegex string `config:"suffix_regex" validate:"required"` + DateFormat string `config:"dateformat"` +} + +type copyTruncateConfig commonRotationConfig + func defaultConfig() config { return config{ Reader: defaultReaderConfig(), diff --git a/filebeat/input/filestream/copytruncate_prospector.go b/filebeat/input/filestream/copytruncate_prospector.go new file mode 100644 index 00000000000..ba64778998c --- /dev/null +++ b/filebeat/input/filestream/copytruncate_prospector.go @@ -0,0 +1,361 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + "os" + "regexp" + "sort" + "strconv" + "time" + + "github.com/urso/sderr" + + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" + input "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/go-concert/unison" +) + +const ( + copyTruncateProspectorDebugKey = "copy_truncate_file_prospector" + copiedFileIdx = 0 +) + +var ( + numericSuffixRegexp = regexp.MustCompile("\\d*$") +) + +// sorter is required for ordering rotated log files +// The slice is ordered so the newest rotated file comes first. +type sorter interface { + sort([]rotatedFileInfo) +} + +// rotatedFileInfo stores the file information of a rotated file. +type rotatedFileInfo struct { + path string + src loginp.Source + + ts time.Time + idx int +} + +func (f rotatedFileInfo) String() string { + return f.path +} + +// rotatedFilestream includes the information of the original file +// and its identifier, and the rotated file. +type rotatedFilestream struct { + originalSrc loginp.Source + rotated []rotatedFileInfo +} + +func newRotatedFilestreams(cfg *copyTruncateConfig) *rotatedFilestreams { + var sorter sorter + sorter = newNumericSorter() + if cfg.DateFormat != "" { + sorter = &dateSorter{cfg.DateFormat} + } + return &rotatedFilestreams{ + table: make(map[string]*rotatedFilestream, 0), + sorter: sorter, + } +} + +// numericSorter sorts rotated log files that have a numeric suffix. +// Example: apache.log.1, apache.log.2 +type numericSorter struct { + suffix *regexp.Regexp +} + +func newNumericSorter() sorter { + return &numericSorter{ + suffix: numericSuffixRegexp, + } +} + +func (s *numericSorter) sort(files []rotatedFileInfo) { + sort.Slice( + files, + func(i, j int) bool { + return s.GetIdx(&files[i]) < s.GetIdx(&files[j]) + }, + ) +} + +func (s *numericSorter) GetIdx(fi *rotatedFileInfo) int { + if fi.idx > 0 { + return fi.idx + } + + idxStr := s.suffix.FindString(fi.path) + if idxStr == "" { + return -1 + } + idx, err := strconv.Atoi(idxStr) + if err != nil { + return -1 + } + fi.idx = idx + + return idx +} + +// dateSorter sorts rotated log files that have a date suffix +// based on the configured format. +// Example: apache.log-21210526, apache.log-20210527 +type dateSorter struct { + format string +} + +func (s *dateSorter) sort(files []rotatedFileInfo) { + sort.Slice( + files, + func(i, j int) bool { + return s.GetTs(&files[j]).Before(s.GetTs(&files[i])) + }, + ) +} + +func (s *dateSorter) GetTs(fi *rotatedFileInfo) time.Time { + if !fi.ts.IsZero() { + return fi.ts + } + fileTs := fi.path[len(fi.path)-len(s.format):] + + ts, err := time.Parse(s.format, fileTs) + if err != nil { + return time.Time{} + } + fi.ts = ts + return ts +} + +// rotatedFilestreams is a map of original files and their rotated instances. +type rotatedFilestreams struct { + table map[string]*rotatedFilestream + sorter sorter +} + +// addOriginalFile adds a new original file and its identifying information +// to the bookkeeper. +func (r rotatedFilestreams) addOriginalFile(path string, src loginp.Source) { + if _, ok := r.table[path]; ok { + return + } + r.table[path] = &rotatedFilestream{originalSrc: src, rotated: make([]rotatedFileInfo, 0)} +} + +// isOriginalAdded checks if an original file has been found. +func (r rotatedFilestreams) isOriginalAdded(path string) bool { + _, ok := r.table[path] + return ok +} + +// originalSrc returns the original Source information of a given +// original file path. +func (r rotatedFilestreams) originalSrc(path string) loginp.Source { + return r.table[path].originalSrc +} + +// addRotatedFile adds a new rotated file to the list and returns its index. +// if a file is already added, the source is updated and the index is returned. +func (r rotatedFilestreams) addRotatedFile(original, rotated string, src loginp.Source) int { + for idx, fi := range r.table[original].rotated { + if fi.path == rotated { + r.table[original].rotated[idx].src = src + return idx + } + } + + r.table[original].rotated = append(r.table[original].rotated, rotatedFileInfo{rotated, src, time.Time{}, 0}) + r.sorter.sort(r.table[original].rotated) + + for idx, fi := range r.table[original].rotated { + if fi.path == rotated { + return idx + } + } + + return -1 +} + +// addRotatedFile adds a new rotated file to the list and returns its index. +// if a file is already added, the source is updated and the index is returned. +func (r rotatedFilestreams) removeRotatedFile(original, rotated string) { + for idx, fi := range r.table[original].rotated { + if fi.path == rotated { + r.table[original].rotated = append(r.table[original].rotated[:idx], r.table[original].rotated[idx+1:]...) + return + } + } +} + +type copyTruncateFileProspector struct { + fileProspector + rotatedSuffix *regexp.Regexp + rotatedFiles *rotatedFilestreams +} + +// Run starts the fileProspector which accepts FS events from a file watcher. +func (p *copyTruncateFileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, hg loginp.HarvesterGroup) { + log := ctx.Logger.With("prospector", copyTruncateProspectorDebugKey) + log.Debug("Starting prospector") + defer log.Debug("Prospector has stopped") + + defer p.stopHarvesterGroup(log, hg) + + var tg unison.MultiErrGroup + + tg.Go(func() error { + p.filewatcher.Run(ctx.Cancelation) + return nil + }) + + tg.Go(func() error { + ignoreInactiveSince := getIgnoreSince(p.ignoreInactiveSince, ctx.Agent) + + for ctx.Cancelation.Err() == nil { + fe := p.filewatcher.Event() + + if fe.Op == loginp.OpDone { + return nil + } + + src := p.identifier.GetSource(fe) + log = loggerWithEvent(log, fe, src) + + switch fe.Op { + case loginp.OpCreate, loginp.OpWrite: + if fe.Op == loginp.OpCreate { + log.Debugf("A new file %s has been found", fe.NewPath) + + } else if fe.Op == loginp.OpWrite { + log.Debugf("File %s has been updated", fe.NewPath) + } + + if p.fileProspector.isFileIgnored(log, fe, ignoreInactiveSince) { + continue + } + + if fe.Op == loginp.OpCreate { + err := s.UpdateMetadata(src, fileMeta{Source: fe.NewPath, IdentifierName: p.identifier.Name()}) + if err != nil { + log.Errorf("Failed to set cursor meta data of entry %s: %v", src.Name(), err) + } + } + + // check if the event belongs to a rotated file + if p.isRotated(fe) { + log.Debugf("File %s is rotated", fe.NewPath) + + p.onRotatedFile(log, ctx, fe, src, hg) + + } else { + log.Debugf("File %s is original", fe.NewPath) + // if file is original, add it to the bookeeper + p.rotatedFiles.addOriginalFile(fe.NewPath, src) + + hg.Start(ctx, src) + } + + case loginp.OpTruncate: + log.Debugf("File %s has been truncated", fe.NewPath) + + s.ResetCursor(src, state{Offset: 0}) + hg.Restart(ctx, src) + + case loginp.OpDelete: + log.Debugf("File %s has been removed", fe.OldPath) + + p.fileProspector.onRemove(log, fe, src, s, hg) + + case loginp.OpRename: + log.Debugf("File %s has been renamed to %s", fe.OldPath, fe.NewPath) + + // check if the event belongs to a rotated file + if p.isRotated(fe) { + log.Debugf("File %s is rotated", fe.NewPath) + + p.onRotatedFile(log, ctx, fe, src, hg) + } + + p.fileProspector.onRename(log, ctx, fe, src, s, hg) + + default: + log.Error("Unkown return value %v", fe.Op) + } + } + return nil + }) + + errs := tg.Wait() + if len(errs) > 0 { + log.Error("%s", sderr.WrapAll(errs, "running prospector failed")) + } +} + +func (p *copyTruncateFileProspector) isRotated(event loginp.FSEvent) bool { + if p.rotatedSuffix.MatchString(event.NewPath) { + return true + } + return false +} + +func (p *copyTruncateFileProspector) onRotatedFile( + log *logp.Logger, + ctx input.Context, + fe loginp.FSEvent, + src loginp.Source, + hg loginp.HarvesterGroup, +) { + // Continue reading the rotated file from where we have left off with the original. + // The original will be picked up again when updated and read from the beginning. + originalPath := p.rotatedSuffix.ReplaceAllLiteralString(fe.NewPath, "") + // if we haven't encountered the original file which was rotated, get its information + if !p.rotatedFiles.isOriginalAdded(originalPath) { + fi, err := os.Stat(originalPath) + if err != nil { + log.Errorf("Cannot continue file, error while getting the information of the original file: %+v", err) + log.Debugf("Starting possibly rotated file from the beginning: %s", fe.NewPath) + hg.Start(ctx, src) + return + } + originalSrc := p.identifier.GetSource(loginp.FSEvent{NewPath: originalPath, Info: fi}) + p.rotatedFiles.addOriginalFile(originalPath, originalSrc) + p.rotatedFiles.addRotatedFile(originalPath, fe.NewPath, src) + hg.Start(ctx, src) + return + } + + idx := p.rotatedFiles.addRotatedFile(originalPath, fe.NewPath, src) + if idx == copiedFileIdx { + // if a file is the most fresh rotated file, continue reading from + // where we have left off with the active file. + previousSrc := p.rotatedFiles.table[originalPath].originalSrc + hg.Continue(ctx, previousSrc, src) + } else { + // if a file is rotated but not the most fresh rotated file, + // read it from where have left off. + if fe.Op != loginp.OpRename { + hg.Start(ctx, src) + } + } +} diff --git a/filebeat/input/filestream/copytruncate_prospector_test.go b/filebeat/input/filestream/copytruncate_prospector_test.go new file mode 100644 index 00000000000..52d6b1e3391 --- /dev/null +++ b/filebeat/input/filestream/copytruncate_prospector_test.go @@ -0,0 +1,276 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + "context" + "fmt" + "regexp" + "testing" + + "github.com/stretchr/testify/require" + + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" + input "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/logp" +) + +func TestCopyTruncateProspector_Create(t *testing.T) { + testCases := map[string]struct { + events []loginp.FSEvent + expectedEvents []harvesterEvent + expectedRotatedFiles map[string][]string + }{ + "one new file, then rotated": { + events: []loginp.FSEvent{ + loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/file"}, + loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/file.1"}, + }, + expectedEvents: []harvesterEvent{ + harvesterStart("path::/path/to/file"), + harvesterContinue("path::/path/to/file -> path::/path/to/file.1"), + harvesterGroupStop{}, + }, + expectedRotatedFiles: map[string][]string{ + "/path/to/file": []string{ + "/path/to/file.1", + }, + }, + }, + "one new file, then rotated twice in order": { + events: []loginp.FSEvent{ + loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/file"}, + loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/file.1"}, + loginp.FSEvent{Op: loginp.OpTruncate, NewPath: "/path/to/file"}, + loginp.FSEvent{Op: loginp.OpRename, NewPath: "/path/to/file.2", OldPath: "/path/to/file.1"}, + loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/file.1"}, + loginp.FSEvent{Op: loginp.OpTruncate, NewPath: "/path/to/file"}, + }, + expectedEvents: []harvesterEvent{ + harvesterStart("path::/path/to/file"), + harvesterContinue("path::/path/to/file -> path::/path/to/file.1"), + harvesterRestart("path::/path/to/file"), + harvesterStop("path::/path/to/file.1"), + harvesterStart("path::/path/to/file.2"), + harvesterContinue("path::/path/to/file -> path::/path/to/file.1"), + harvesterRestart("path::/path/to/file"), + harvesterGroupStop{}, + }, + expectedRotatedFiles: map[string][]string{ + "/path/to/file": []string{ + "/path/to/file.1", + "/path/to/file.2", + }, + }, + }, + "one new file, then rotated twice with renaming": { + events: []loginp.FSEvent{ + loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/file.2"}, + loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/file"}, + loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/file.1"}, + loginp.FSEvent{Op: loginp.OpRename, NewPath: "/path/to/file.3", OldPath: "/path/to/file.2"}, + loginp.FSEvent{Op: loginp.OpRename, NewPath: "/path/to/file.2", OldPath: "/path/to/file.1"}, + loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/file.1"}, + loginp.FSEvent{Op: loginp.OpTruncate, NewPath: "/path/to/file"}, + }, + expectedEvents: []harvesterEvent{ + harvesterStart("path::/path/to/file.2"), + harvesterStart("path::/path/to/file"), + harvesterContinue("path::/path/to/file -> path::/path/to/file.1"), + harvesterStop("path::/path/to/file.2"), + harvesterStart("path::/path/to/file.3"), + harvesterStop("path::/path/to/file.1"), + harvesterStart("path::/path/to/file.2"), + harvesterContinue("path::/path/to/file -> path::/path/to/file.1"), + harvesterRestart("path::/path/to/file"), + harvesterGroupStop{}, + }, + expectedRotatedFiles: map[string][]string{ + "/path/to/file": []string{ + "/path/to/file.1", + "/path/to/file.2", + "/path/to/file.3", + }, + }, + }, + "first rotated file, when rotated file not exist": { + events: []loginp.FSEvent{ + loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/file.1"}, + }, + expectedEvents: []harvesterEvent{ + harvesterStart("path::/path/to/file.1"), + harvesterGroupStop{}, + }, + expectedRotatedFiles: map[string][]string{}, + }, + } + + for name, test := range testCases { + test := test + + t.Run(name, func(t *testing.T) { + p := copyTruncateFileProspector{ + fileProspector{ + filewatcher: &mockFileWatcher{events: test.events}, + identifier: mustPathIdentifier(false), + }, + regexp.MustCompile("\\.\\d$"), + &rotatedFilestreams{make(map[string]*rotatedFilestream), newNumericSorter()}, + } + ctx := input.Context{Logger: logp.L(), Cancelation: context.Background()} + hg := newTestHarvesterGroup() + + p.Run(ctx, newMockMetadataUpdater(), hg) + + require.Equal(t, len(test.expectedEvents), len(hg.events)) + for i := 0; i < len(test.expectedEvents); i++ { + require.Equal(t, test.expectedEvents[i], hg.events[i]) + } + + for originalFile, rotatedFiles := range test.expectedRotatedFiles { + rFile, ok := p.rotatedFiles.table[originalFile] + if !ok { + fmt.Printf("cannot find %s in original files\n", originalFile) + t.FailNow() + } + require.Equal(t, len(rotatedFiles), len(rFile.rotated)) + for i, rotatedFile := range rotatedFiles { + if rFile.rotated[i].path != rotatedFile { + fmt.Printf("%s is not a rotated file, instead %s is\n", rFile.rotated[i].path, rotatedFile) + t.FailNow() + } + } + } + }) + } +} + +func TestNumericSorter(t *testing.T) { + testCases := map[string]struct { + fileinfos []rotatedFileInfo + expectedOrder []string + }{ + "one fileinfo": { + fileinfos: []rotatedFileInfo{ + rotatedFileInfo{path: "/path/to/apache.log.1"}, + }, + expectedOrder: []string{ + "/path/to/apache.log.1", + }, + }, + "ordered fileinfos": { + fileinfos: []rotatedFileInfo{ + rotatedFileInfo{path: "/path/to/apache.log.1"}, + rotatedFileInfo{path: "/path/to/apache.log.2"}, + rotatedFileInfo{path: "/path/to/apache.log.3"}, + }, + expectedOrder: []string{ + "/path/to/apache.log.1", + "/path/to/apache.log.2", + "/path/to/apache.log.3", + }, + }, + "unordered fileinfos": { + fileinfos: []rotatedFileInfo{ + rotatedFileInfo{path: "/path/to/apache.log.3"}, + rotatedFileInfo{path: "/path/to/apache.log.1"}, + rotatedFileInfo{path: "/path/to/apache.log.2"}, + }, + expectedOrder: []string{ + "/path/to/apache.log.1", + "/path/to/apache.log.2", + "/path/to/apache.log.3", + }, + }, + "unordered fileinfos with numbers in filename": { + fileinfos: []rotatedFileInfo{ + rotatedFileInfo{path: "/path/to/apache42.log.3"}, + rotatedFileInfo{path: "/path/to/apache43.log.1"}, + rotatedFileInfo{path: "/path/to/apache44.log.2"}, + }, + expectedOrder: []string{ + "/path/to/apache43.log.1", + "/path/to/apache44.log.2", + "/path/to/apache42.log.3", + }, + }, + } + sorter := newNumericSorter() + + for name, test := range testCases { + test := test + t.Run(name, func(t *testing.T) { + sorter.sort(test.fileinfos) + for i, fi := range test.fileinfos { + require.Equal(t, test.expectedOrder[i], fi.path) + } + + }) + } +} +func TestDateSorter(t *testing.T) { + testCases := map[string]struct { + fileinfos []rotatedFileInfo + expectedOrder []string + }{ + "one fileinfo": { + fileinfos: []rotatedFileInfo{ + rotatedFileInfo{path: "/path/to/apache.log-20140506"}, + }, + expectedOrder: []string{ + "/path/to/apache.log-20140506", + }, + }, + "ordered fileinfos": { + fileinfos: []rotatedFileInfo{ + rotatedFileInfo{path: "/path/to/apache.log-20140506"}, + rotatedFileInfo{path: "/path/to/apache.log-20140507"}, + rotatedFileInfo{path: "/path/to/apache.log-20140508"}, + }, + expectedOrder: []string{ + "/path/to/apache.log-20140508", + "/path/to/apache.log-20140507", + "/path/to/apache.log-20140506", + }, + }, + "unordered fileinfos": { + fileinfos: []rotatedFileInfo{ + rotatedFileInfo{path: "/path/to/apache.log-20140507"}, + rotatedFileInfo{path: "/path/to/apache.log-20140508"}, + rotatedFileInfo{path: "/path/to/apache.log-20140506"}, + }, + expectedOrder: []string{ + "/path/to/apache.log-20140508", + "/path/to/apache.log-20140507", + "/path/to/apache.log-20140506", + }, + }, + } + sorter := dateSorter{"-20060102"} + + for name, test := range testCases { + test := test + t.Run(name, func(t *testing.T) { + sorter.sort(test.fileinfos) + for i, fi := range test.fileinfos { + require.Equal(t, test.expectedOrder[i], fi.path) + } + + }) + } +} diff --git a/filebeat/input/filestream/fswatch_test.go b/filebeat/input/filestream/fswatch_test.go index c2c01a53da2..54fe3804f02 100644 --- a/filebeat/input/filestream/fswatch_test.go +++ b/filebeat/input/filestream/fswatch_test.go @@ -129,69 +129,69 @@ func TestFileWatchNewDeleteModified(t *testing.T) { "one new file": { prevFiles: map[string]os.FileInfo{}, nextFiles: map[string]os.FileInfo{ - "new_path": testFileInfo{"new_path", 5, oldTs}, + "new_path": testFileInfo{"new_path", 5, oldTs, nil}, }, expectedEvents: []loginp.FSEvent{ - loginp.FSEvent{Op: loginp.OpCreate, OldPath: "", NewPath: "new_path", Info: testFileInfo{"new_path", 5, oldTs}}, + loginp.FSEvent{Op: loginp.OpCreate, OldPath: "", NewPath: "new_path", Info: testFileInfo{"new_path", 5, oldTs, nil}}, }, }, "one deleted file": { prevFiles: map[string]os.FileInfo{ - "old_path": testFileInfo{"old_path", 5, oldTs}, + "old_path": testFileInfo{"old_path", 5, oldTs, nil}, }, nextFiles: map[string]os.FileInfo{}, expectedEvents: []loginp.FSEvent{ - loginp.FSEvent{Op: loginp.OpDelete, OldPath: "old_path", NewPath: "", Info: testFileInfo{"old_path", 5, oldTs}}, + loginp.FSEvent{Op: loginp.OpDelete, OldPath: "old_path", NewPath: "", Info: testFileInfo{"old_path", 5, oldTs, nil}}, }, }, "one modified file": { prevFiles: map[string]os.FileInfo{ - "path": testFileInfo{"path", 5, oldTs}, + "path": testFileInfo{"path", 5, oldTs, nil}, }, nextFiles: map[string]os.FileInfo{ - "path": testFileInfo{"path", 10, newTs}, + "path": testFileInfo{"path", 10, newTs, nil}, }, expectedEvents: []loginp.FSEvent{ - loginp.FSEvent{Op: loginp.OpWrite, OldPath: "path", NewPath: "path", Info: testFileInfo{"path", 10, newTs}}, + loginp.FSEvent{Op: loginp.OpWrite, OldPath: "path", NewPath: "path", Info: testFileInfo{"path", 10, newTs, nil}}, }, }, "two modified files": { prevFiles: map[string]os.FileInfo{ - "path1": testFileInfo{"path1", 5, oldTs}, - "path2": testFileInfo{"path2", 5, oldTs}, + "path1": testFileInfo{"path1", 5, oldTs, nil}, + "path2": testFileInfo{"path2", 5, oldTs, nil}, }, nextFiles: map[string]os.FileInfo{ - "path1": testFileInfo{"path1", 10, newTs}, - "path2": testFileInfo{"path2", 10, newTs}, + "path1": testFileInfo{"path1", 10, newTs, nil}, + "path2": testFileInfo{"path2", 10, newTs, nil}, }, expectedEvents: []loginp.FSEvent{ - loginp.FSEvent{Op: loginp.OpWrite, OldPath: "path1", NewPath: "path1", Info: testFileInfo{"path1", 10, newTs}}, - loginp.FSEvent{Op: loginp.OpWrite, OldPath: "path2", NewPath: "path2", Info: testFileInfo{"path2", 10, newTs}}, + loginp.FSEvent{Op: loginp.OpWrite, OldPath: "path1", NewPath: "path1", Info: testFileInfo{"path1", 10, newTs, nil}}, + loginp.FSEvent{Op: loginp.OpWrite, OldPath: "path2", NewPath: "path2", Info: testFileInfo{"path2", 10, newTs, nil}}, }, }, "one modified file, one new file": { prevFiles: map[string]os.FileInfo{ - "path1": testFileInfo{"path1", 5, oldTs}, + "path1": testFileInfo{"path1", 5, oldTs, nil}, }, nextFiles: map[string]os.FileInfo{ - "path1": testFileInfo{"path1", 10, newTs}, - "path2": testFileInfo{"path2", 10, newTs}, + "path1": testFileInfo{"path1", 10, newTs, nil}, + "path2": testFileInfo{"path2", 10, newTs, nil}, }, expectedEvents: []loginp.FSEvent{ - loginp.FSEvent{Op: loginp.OpWrite, OldPath: "path1", NewPath: "path1", Info: testFileInfo{"path1", 10, newTs}}, - loginp.FSEvent{Op: loginp.OpCreate, OldPath: "", NewPath: "path2", Info: testFileInfo{"path2", 10, newTs}}, + loginp.FSEvent{Op: loginp.OpWrite, OldPath: "path1", NewPath: "path1", Info: testFileInfo{"path1", 10, newTs, nil}}, + loginp.FSEvent{Op: loginp.OpCreate, OldPath: "", NewPath: "path2", Info: testFileInfo{"path2", 10, newTs, nil}}, }, }, "one new file, one deleted file": { prevFiles: map[string]os.FileInfo{ - "path_deleted": testFileInfo{"path_deleted", 5, oldTs}, + "path_deleted": testFileInfo{"path_deleted", 5, oldTs, nil}, }, nextFiles: map[string]os.FileInfo{ - "path_new": testFileInfo{"path_new", 10, newTs}, + "path_new": testFileInfo{"path_new", 10, newTs, nil}, }, expectedEvents: []loginp.FSEvent{ - loginp.FSEvent{Op: loginp.OpDelete, OldPath: "path_deleted", NewPath: "", Info: testFileInfo{"path_deleted", 5, oldTs}}, - loginp.FSEvent{Op: loginp.OpCreate, OldPath: "", NewPath: "path_new", Info: testFileInfo{"path_new", 10, newTs}}, + loginp.FSEvent{Op: loginp.OpDelete, OldPath: "path_deleted", NewPath: "", Info: testFileInfo{"path_deleted", 5, oldTs, nil}}, + loginp.FSEvent{Op: loginp.OpCreate, OldPath: "", NewPath: "path_new", Info: testFileInfo{"path_new", 10, newTs, nil}}, }, }, } @@ -232,6 +232,7 @@ type testFileInfo struct { path string size int64 time time.Time + sys interface{} } func (t testFileInfo) Name() string { return t.path } @@ -239,7 +240,7 @@ func (t testFileInfo) Size() int64 { return t.size } func (t testFileInfo) Mode() os.FileMode { return 0 } func (t testFileInfo) ModTime() time.Time { return t.time } func (t testFileInfo) IsDir() bool { return false } -func (t testFileInfo) Sys() interface{} { return nil } +func (t testFileInfo) Sys() interface{} { return t.sys } func mustDuration(durStr string) time.Duration { dur, err := time.ParseDuration(durStr) diff --git a/filebeat/input/filestream/identifier.go b/filebeat/input/filestream/identifier.go index bde88aa03fe..7b28a1d3cba 100644 --- a/filebeat/input/filestream/identifier.go +++ b/filebeat/input/filestream/identifier.go @@ -64,6 +64,7 @@ type fileSource struct { newPath string oldPath string truncated bool + archived bool name string identifierGenerator string @@ -105,6 +106,7 @@ func (i *inodeDeviceIdentifier) GetSource(e loginp.FSEvent) fileSource { newPath: e.NewPath, oldPath: e.OldPath, truncated: e.Op == loginp.OpTruncate, + archived: e.Op == loginp.OpArchived, name: i.name + identitySep + file.GetOSState(e.Info).String(), identifierGenerator: i.name, } @@ -143,6 +145,7 @@ func (p *pathIdentifier) GetSource(e loginp.FSEvent) fileSource { newPath: e.NewPath, oldPath: e.OldPath, truncated: e.Op == loginp.OpTruncate, + archived: e.Op == loginp.OpArchived, name: p.name + identitySep + path, identifierGenerator: p.name, } diff --git a/filebeat/input/filestream/identifier_inode_deviceid.go b/filebeat/input/filestream/identifier_inode_deviceid.go index fb87708dd18..291bc0ad357 100644 --- a/filebeat/input/filestream/identifier_inode_deviceid.go +++ b/filebeat/input/filestream/identifier_inode_deviceid.go @@ -99,6 +99,7 @@ func (i *inodeMarkerIdentifier) GetSource(e loginp.FSEvent) fileSource { newPath: e.NewPath, oldPath: e.OldPath, truncated: e.Op == loginp.OpTruncate, + archived: e.Op == loginp.OpArchived, name: i.name + identitySep + osstate.InodeString() + "-" + i.markerContents(), identifierGenerator: i.name, } diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 8294baa85d2..e143280e5b9 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -83,14 +83,9 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error) return nil, nil, err } - filewatcher, err := newFileWatcher(config.Paths, config.FileWatcher) + prospector, err := newProspector(config) if err != nil { - return nil, nil, fmt.Errorf("error while creating filewatcher %v", err) - } - - identifier, err := newFileIdentifier(config.FileIdentity) - if err != nil { - return nil, nil, fmt.Errorf("error while creating file identifier: %v", err) + return nil, nil, fmt.Errorf("cannot create prospector: %w", err) } encodingFactory, ok := encoding.FindEncoding(config.Reader.Encoding) @@ -98,14 +93,6 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error) return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Reader.Encoding) } - prospector := &fileProspector{ - filewatcher: filewatcher, - identifier: identifier, - ignoreOlder: config.IgnoreOlder, - cleanRemoved: config.CleanRemoved, - stateChangeCloser: config.Close.OnStateChange, - } - filestream := &filestream{ readerConfig: config.Reader, encodingFactory: encodingFactory, @@ -123,7 +110,7 @@ func (inp *filestream) Test(src loginp.Source, ctx input.TestContext) error { return fmt.Errorf("not file source") } - reader, err := inp.open(ctx.Logger, ctx.Cancelation, fs.newPath, 0) + reader, err := inp.open(ctx.Logger, ctx.Cancelation, fs, 0) if err != nil { return err } @@ -144,7 +131,7 @@ func (inp *filestream) Run( log := ctx.Logger.With("path", fs.newPath).With("state-id", src.Name()) state := initState(log, cursor, fs) - r, err := inp.open(log, ctx.Cancelation, fs.newPath, state.Offset) + r, err := inp.open(log, ctx.Cancelation, fs, state.Offset) if err != nil { log.Errorf("File could not be opened for reading: %v", err) return err @@ -176,18 +163,30 @@ func initState(log *logp.Logger, c loginp.Cursor, s fileSource) state { return state } -func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path string, offset int64) (reader.Reader, error) { - f, err := inp.openFile(log, path, offset) +func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSource, offset int64) (reader.Reader, error) { + f, err := inp.openFile(log, fs.newPath, offset) if err != nil { return nil, err } log.Debug("newLogFileReader with config.MaxBytes:", inp.readerConfig.MaxBytes) + // if the file is archived, it means that it is not going to be updated in the future + // thus, when EOF is reached, it can be closed + closerCfg := inp.closerConfig + if fs.archived && !inp.closerConfig.Reader.OnEOF { + closerCfg = closerConfig{ + Reader: readerCloserConfig{ + OnEOF: true, + AfterInterval: inp.closerConfig.Reader.AfterInterval, + }, + OnStateChange: inp.closerConfig.OnStateChange, + } + } // TODO: NewLineReader uses additional buffering to deal with encoding and testing // for new lines in input stream. Simple 8-bit based encodings, or plain // don't require 'complicated' logic. - logReader, err := newFileReader(log, canceler, f, inp.readerConfig, inp.closerConfig) + logReader, err := newFileReader(log, canceler, f, inp.readerConfig, closerCfg) if err != nil { return nil, err } @@ -218,7 +217,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri r = readfile.NewStripNewline(r, inp.readerConfig.LineTerminator) - r = readfile.NewFilemeta(r, path) + r = readfile.NewFilemeta(r, fs.newPath) r, err = newParsers(r, parserConfig{maxBytes: inp.readerConfig.MaxBytes, lineTerminator: inp.readerConfig.LineTerminator}, inp.readerConfig.Parsers) if err != nil { diff --git a/filebeat/input/filestream/internal/input-logfile/fswatch.go b/filebeat/input/filestream/internal/input-logfile/fswatch.go index 56235e6c4bc..9982f370e4f 100644 --- a/filebeat/input/filestream/internal/input-logfile/fswatch.go +++ b/filebeat/input/filestream/internal/input-logfile/fswatch.go @@ -30,11 +30,32 @@ const ( OpDelete OpRename OpTruncate + OpArchived +) + +var ( + operationNames = map[Operation]string{ + OpDone: "done", + OpCreate: "create", + OpWrite: "write", + OpDelete: "delete", + OpRename: "rename", + OpTruncate: "truncate", + OpArchived: "archive", + } ) // Operation describes what happened to a file. type Operation uint8 +func (o *Operation) String() string { + name, ok := operationNames[*o] + if !ok { + return "" + } + return name +} + // FSEvent returns inforamation about file system changes. type FSEvent struct { // NewPath is the new path of the file. diff --git a/filebeat/input/filestream/internal/input-logfile/harvester.go b/filebeat/input/filestream/internal/input-logfile/harvester.go index 00b14bc498a..926485ab181 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester.go @@ -119,6 +119,8 @@ type HarvesterGroup interface { Start(input.Context, Source) // Restart starts a Harvester if it might be already running. Restart(input.Context, Source) + // Continue starts a new Harvester with the state information of the previous. + Continue(ctx input.Context, previous, next Source) // Stop cancels the reader of a given Source. Stop(Source) // StopGroup cancels all running Harvesters. @@ -217,6 +219,36 @@ func startHarvester(ctx input.Context, hg *defaultHarvesterGroup, s Source, rest } } +// Continue start a new Harvester with the state information from a different Source. +func (hg *defaultHarvesterGroup) Continue(ctx input.Context, previous, next Source) { + ctx.Logger.Debugf("Continue harvester for file prev=%s, next=%s", previous.Name(), next.Name()) + prevID := hg.identifier.ID(previous) + nextID := hg.identifier.ID(next) + + hg.tg.Go(func(canceler unison.Canceler) error { + previousResource, err := lock(ctx, hg.store, prevID) + if err != nil { + return fmt.Errorf("error while locking previous resource: %v", err) + } + // mark previous state out of date + // so when reading starts again the offset is set to zero + hg.store.remove(prevID) + + nextResource, err := lock(ctx, hg.store, nextID) + if err != nil { + return fmt.Errorf("error while locking next resource: %v", err) + } + hg.store.UpdateTTL(nextResource, hg.cleanTimeout) + + previousResource.copyInto(nextResource) + releaseResource(previousResource) + releaseResource(nextResource) + + hg.Start(ctx, next) + return nil + }) +} + // Stop stops the running Harvester for a given Source. func (hg *defaultHarvesterGroup) Stop(s Source) { hg.tg.Go(func(_ unison.Canceler) error { diff --git a/filebeat/input/filestream/internal/input-logfile/publish.go b/filebeat/input/filestream/internal/input-logfile/publish.go index ddc389321b1..fa495061919 100644 --- a/filebeat/input/filestream/internal/input-logfile/publish.go +++ b/filebeat/input/filestream/internal/input-logfile/publish.go @@ -128,7 +128,7 @@ func (op *updateOp) Execute(n uint) { resource.stateMutex.Lock() defer resource.stateMutex.Unlock() - if resource.lockedVersion != op.resource.version { + if resource.lockedVersion != op.resource.version || resource.isDeleted() { return } diff --git a/filebeat/input/filestream/internal/input-logfile/store.go b/filebeat/input/filestream/internal/input-logfile/store.go index fe149f59d77..398c7db00ff 100644 --- a/filebeat/input/filestream/internal/input-logfile/store.go +++ b/filebeat/input/filestream/internal/input-logfile/store.go @@ -93,6 +93,9 @@ type resource struct { // stored indicates that the state is available in the registry file. It is false for new entries. stored bool + // invalid indicates if the resource has been marked for deletion, if yes, it cannot be overwritten + // in the persistent state. + invalid bool // internalInSync is true if all 'Internal' metadata like TTL or update timestamp are in sync. // Normally resources are added when being created. But if operations failed we will retry inserting @@ -291,8 +294,12 @@ func (s *store) updateMetadata(key string, meta interface{}) error { } // writeState writes the state to the persistent store. -// WARNING! it does not lock the store +// WARNING! it does not lock the store or the resource. func (s *store) writeState(r *resource) { + if r.invalid { + return + } + err := s.persistentStore.Set(r.key, r.inSyncStateSnapshot()) if err != nil { s.log.Errorf("Failed to update resource fields for '%v'", r.key) @@ -301,6 +308,7 @@ func (s *store) writeState(r *resource) { r.stored = true r.internalInSync = true } + } // resetCursor sets the cursor to the value in cur in the persistent store and @@ -332,7 +340,6 @@ func (s *store) remove(key string) error { if resource == nil { return fmt.Errorf("resource '%s' not found", key) } - s.UpdateTTL(resource, 0) return nil } @@ -341,6 +348,10 @@ func (s *store) remove(key string) error { // The TTL value is part of the internal state, and will be written immediately to the persistent store. // On update the resource its `cursor` state is used, to keep the cursor state in sync with the current known // on disk store state. +// +// If the TTL of the resource is set to 0, once it is persisted, it is going to be removed from the +// store in the next cleaner run. The resource also gets invalidated to make sure new updates are not +// saved to the registry. func (s *store) UpdateTTL(resource *resource, ttl time.Duration) { resource.stateMutex.Lock() defer resource.stateMutex.Unlock() @@ -354,6 +365,15 @@ func (s *store) UpdateTTL(resource *resource, ttl time.Duration) { } s.writeState(resource) + + if resource.isDeleted() { + // version must be incremented to make sure existing resource + // instances do not overwrite the removal of the entry + resource.version++ + // invalidate it after it has been persisted to make sure it cannot + //be overwritten in the persistent store + resource.invalid = true + } } // Find returns the resource for a given key. If the key is unknown and create is set to false nil will be returned. @@ -362,7 +382,7 @@ func (s *states) Find(key string, create bool) *resource { s.mu.Lock() defer s.mu.Unlock() - if resource := s.table[key]; resource != nil { + if resource := s.table[key]; resource != nil && !resource.isDeleted() { resource.Retain() return resource } @@ -389,6 +409,10 @@ func (r *resource) IsNew() bool { return r.pendingCursor == nil && r.cursor == nil } +func (r *resource) isDeleted() bool { + return !r.internalState.Updated.IsZero() && r.internalState.TTL == 0 +} + // Retain is used to indicate that 'resource' gets an additional 'owner'. // Owners of an resource can be active inputs or pending update operations // not yet written to disk. @@ -430,6 +454,27 @@ func (r *resource) inSyncStateSnapshot() state { } } +func (r *resource) copyInto(dst *resource) { + r.stateMutex.Lock() + defer r.stateMutex.Unlock() + + internalState := r.internalState + + // This is required to prevent the cleaner from removing the + // entry from the registry immediately. + // It still might be removed if the output is blocked for a long + // time. If removed the whole file is resent to the output when found/updated. + internalState.Updated = time.Now() + dst.stored = r.stored + dst.internalInSync = true + dst.internalState = internalState + dst.activeCursorOperations = r.activeCursorOperations + dst.cursor = r.cursor + dst.pendingCursor = nil + dst.cursorMeta = r.cursorMeta + dst.lock = unison.MakeMutex() +} + func (r *resource) copyWithNewKey(key string) *resource { internalState := r.internalState @@ -447,6 +492,7 @@ func (r *resource) copyWithNewKey(key string) *resource { cursor: r.cursor, pendingCursor: nil, cursorMeta: r.cursorMeta, + lock: unison.MakeMutex(), } } diff --git a/filebeat/input/filestream/logger.go b/filebeat/input/filestream/logger.go new file mode 100644 index 00000000000..ebb1a2bc71a --- /dev/null +++ b/filebeat/input/filestream/logger.go @@ -0,0 +1,41 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" + "github.com/elastic/beats/v7/libbeat/common/file" + "github.com/elastic/beats/v7/libbeat/logp" +) + +func loggerWithEvent(logger *logp.Logger, event loginp.FSEvent, src loginp.Source) *logp.Logger { + log := logger.With( + "operation", event.Op, + "source_name", src.Name(), + ) + if event.Info != nil && event.Info.Sys() != nil { + log = log.With("os_id", file.GetOSState(event.Info)) + } + if event.NewPath != "" { + log = log.With("new_path", event.NewPath) + } + if event.OldPath != "" { + log = log.With("old_path", event.OldPath) + } + return log +} diff --git a/filebeat/input/filestream/prospector.go b/filebeat/input/filestream/prospector.go index 97bf14efa7c..a820758f624 100644 --- a/filebeat/input/filestream/prospector.go +++ b/filebeat/input/filestream/prospector.go @@ -129,6 +129,8 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h } src := p.identifier.GetSource(fe) + log = loggerWithEvent(log, fe, src) + switch fe.Op { case loginp.OpCreate, loginp.OpWrite: if fe.Op == loginp.OpCreate { @@ -143,15 +145,7 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h log.Debugf("File %s has been updated", fe.NewPath) } - if p.ignoreOlder > 0 { - now := time.Now() - if now.Sub(fe.Info.ModTime()) > p.ignoreOlder { - log.Debugf("Ignore file because ignore_older reached. File %s", fe.NewPath) - break - } - } - if !ignoreInactiveSince.IsZero() && fe.Info.ModTime().Sub(ignoreInactiveSince) <= 0 { - log.Debugf("Ignore file because ignore_since.* reached time %v. File %s", p.ignoreInactiveSince, fe.NewPath) + if p.isFileIgnored(log, fe, ignoreInactiveSince) { break } @@ -166,58 +160,12 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h case loginp.OpDelete: log.Debugf("File %s has been removed", fe.OldPath) - if p.stateChangeCloser.Removed { - log.Debugf("Stopping harvester as file %s has been removed and close.on_state_change.removed is enabled.", src.Name()) - hg.Stop(src) - } - - if p.cleanRemoved { - log.Debugf("Remove state for file as file removed: %s", fe.OldPath) - - err := s.Remove(src) - if err != nil { - log.Errorf("Error while removing state from statestore: %v", err) - } - } + p.onRemove(log, fe, src, s, hg) case loginp.OpRename: log.Debugf("File %s has been renamed to %s", fe.OldPath, fe.NewPath) - // if file_identity is based on path, the current reader has to be cancelled - // and a new one has to start. - if !p.identifier.Supports(trackRename) { - prevSrc := p.identifier.GetSource(loginp.FSEvent{NewPath: fe.OldPath}) - hg.Stop(prevSrc) - - log.Debugf("Remove state for file as file renamed and path file_identity is configured: %s", fe.OldPath) - err := s.Remove(prevSrc) - if err != nil { - log.Errorf("Error while removing old state of renamed file (%s): %v", fe.OldPath, err) - } - - hg.Start(ctx, src) - } else { - // update file metadata as the path has changed - var meta fileMeta - err := s.FindCursorMeta(src, meta) - if err != nil { - log.Errorf("Error while getting cursor meta data of entry %s: %v", src.Name(), err) - - meta.IdentifierName = p.identifier.Name() - } - err = s.UpdateMetadata(src, fileMeta{Source: src.newPath, IdentifierName: meta.IdentifierName}) - if err != nil { - log.Errorf("Failed to update cursor meta data of entry %s: %v", src.Name(), err) - } - - if p.stateChangeCloser.Renamed { - log.Debugf("Stopping harvester as file %s has been renamed and close.on_state_change.renamed is enabled.", src.Name()) - - fe.Op = loginp.OpDelete - srcToClose := p.identifier.GetSource(fe) - hg.Stop(srcToClose) - } - } + p.onRename(log, ctx, fe, src, s, hg) default: log.Error("Unkown return value %v", fe.Op) @@ -232,6 +180,75 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h } } +func (p *fileProspector) isFileIgnored(log *logp.Logger, fe loginp.FSEvent, ignoreInactiveSince time.Time) bool { + if p.ignoreOlder > 0 { + now := time.Now() + if now.Sub(fe.Info.ModTime()) > p.ignoreOlder { + log.Debugf("Ignore file because ignore_older reached. File %s", fe.NewPath) + return true + } + } + if !ignoreInactiveSince.IsZero() && fe.Info.ModTime().Sub(ignoreInactiveSince) <= 0 { + log.Debugf("Ignore file because ignore_since.* reached time %v. File %s", p.ignoreInactiveSince, fe.NewPath) + return true + } + return false +} + +func (p *fileProspector) onRemove(log *logp.Logger, fe loginp.FSEvent, src loginp.Source, s loginp.StateMetadataUpdater, hg loginp.HarvesterGroup) { + if p.stateChangeCloser.Removed { + log.Debugf("Stopping harvester as file %s has been removed and close.on_state_change.removed is enabled.", src.Name()) + hg.Stop(src) + } + + if p.cleanRemoved { + log.Debugf("Remove state for file as file removed: %s", fe.OldPath) + + err := s.Remove(src) + if err != nil { + log.Errorf("Error while removing state from statestore: %v", err) + } + } +} + +func (p *fileProspector) onRename(log *logp.Logger, ctx input.Context, fe loginp.FSEvent, src loginp.Source, s loginp.StateMetadataUpdater, hg loginp.HarvesterGroup) { + // if file_identity is based on path, the current reader has to be cancelled + // and a new one has to start. + if !p.identifier.Supports(trackRename) { + prevSrc := p.identifier.GetSource(loginp.FSEvent{NewPath: fe.OldPath}) + hg.Stop(prevSrc) + + log.Debugf("Remove state for file as file renamed and path file_identity is configured: %s", fe.OldPath) + err := s.Remove(prevSrc) + if err != nil { + log.Errorf("Error while removing old state of renamed file (%s): %v", fe.OldPath, err) + } + + hg.Start(ctx, src) + } else { + // update file metadata as the path has changed + var meta fileMeta + err := s.FindCursorMeta(src, meta) + if err != nil { + log.Errorf("Error while getting cursor meta data of entry %s: %v", src.Name(), err) + + meta.IdentifierName = p.identifier.Name() + } + err = s.UpdateMetadata(src, fileMeta{Source: fe.NewPath, IdentifierName: meta.IdentifierName}) + if err != nil { + log.Errorf("Failed to update cursor meta data of entry %s: %v", src.Name(), err) + } + + if p.stateChangeCloser.Renamed { + log.Debugf("Stopping harvester as file %s has been renamed and close.on_state_change.renamed is enabled.", src.Name()) + + fe.Op = loginp.OpDelete + srcToClose := p.identifier.GetSource(fe) + hg.Stop(srcToClose) + } + } +} + func (p *fileProspector) stopHarvesterGroup(log *logp.Logger, hg loginp.HarvesterGroup) { err := hg.StopGroup() if err != nil { diff --git a/filebeat/input/filestream/prospector_creator.go b/filebeat/input/filestream/prospector_creator.go new file mode 100644 index 00000000000..59f86d1426a --- /dev/null +++ b/filebeat/input/filestream/prospector_creator.go @@ -0,0 +1,106 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + "fmt" + "regexp" + "sync" + + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" + "github.com/elastic/beats/v7/libbeat/common/cfgwarn" +) + +const ( + externalMode = "external" + internalMode = "internal" + + copytruncateStrategy = "copytruncate" +) + +var ( + experimentalWarning sync.Once +) + +func newProspector(config config) (loginp.Prospector, error) { + filewatcher, err := newFileWatcher(config.Paths, config.FileWatcher) + if err != nil { + return nil, fmt.Errorf("error while creating filewatcher %v", err) + } + + identifier, err := newFileIdentifier(config.FileIdentity) + if err != nil { + return nil, fmt.Errorf("error while creating file identifier: %v", err) + } + + fileprospector := fileProspector{ + filewatcher: filewatcher, + identifier: identifier, + ignoreOlder: config.IgnoreOlder, + cleanRemoved: config.CleanRemoved, + stateChangeCloser: config.Close.OnStateChange, + } + if config.Rotation == nil { + return &fileprospector, nil + } + + rotationMethod := config.Rotation.Name() + switch rotationMethod { + case "": + return &fileprospector, nil + + case internalMode: + return nil, fmt.Errorf("not implemented: internal log rotation") + + case externalMode: + externalConfig := config.Rotation.Config() + cfg := rotationConfig{} + err := externalConfig.Unpack(&cfg) + if err != nil { + return nil, fmt.Errorf("failed to unpack configuration of external rotation: %+v", err) + } + strategy := cfg.Strategy.Name() + switch strategy { + case copytruncateStrategy: + experimentalWarning.Do(func() { + cfgwarn.Experimental("rotation.external.copytruncate is used.") + }) + + cpCfg := ©TruncateConfig{} + err = cfg.Strategy.Config().Unpack(&cpCfg) + if err != nil { + return nil, fmt.Errorf("failed to unpack configuration of external copytruncate rotation: %+v", err) + } + suffix, err := regexp.Compile(cpCfg.SuffixRegex) + if err != nil { + return nil, fmt.Errorf("invalid suffix regex for copytruncate rotation") + } + fileprospector.stateChangeCloser.Renamed = false + return ©TruncateFileProspector{ + fileprospector, + suffix, + newRotatedFilestreams(cpCfg), + }, nil + default: + } + return nil, fmt.Errorf("no such external rotation strategy: %s", strategy) + + default: + } + return nil, fmt.Errorf("no such rotation method: %s", rotationMethod) +} diff --git a/filebeat/input/filestream/prospector_test.go b/filebeat/input/filestream/prospector_test.go index ffcdbcf31f9..83dc2055df0 100644 --- a/filebeat/input/filestream/prospector_test.go +++ b/filebeat/input/filestream/prospector_test.go @@ -170,8 +170,8 @@ func TestProspectorNewAndUpdatedFiles(t *testing.T) { }{ "two new files": { events: []loginp.FSEvent{ - loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/file"}, - loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/other/file"}, + loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/file", Info: testFileInfo{}}, + loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/other/file", Info: testFileInfo{}}, }, expectedEvents: []harvesterEvent{ harvesterStart("path::/path/to/file"), @@ -181,7 +181,7 @@ func TestProspectorNewAndUpdatedFiles(t *testing.T) { }, "one updated file": { events: []loginp.FSEvent{ - loginp.FSEvent{Op: loginp.OpWrite, NewPath: "/path/to/file"}, + loginp.FSEvent{Op: loginp.OpWrite, NewPath: "/path/to/file", Info: testFileInfo{}}, }, expectedEvents: []harvesterEvent{ harvesterStart("path::/path/to/file"), @@ -190,8 +190,8 @@ func TestProspectorNewAndUpdatedFiles(t *testing.T) { }, "one updated then truncated file": { events: []loginp.FSEvent{ - loginp.FSEvent{Op: loginp.OpWrite, NewPath: "/path/to/file"}, - loginp.FSEvent{Op: loginp.OpTruncate, NewPath: "/path/to/file"}, + loginp.FSEvent{Op: loginp.OpWrite, NewPath: "/path/to/file", Info: testFileInfo{}}, + loginp.FSEvent{Op: loginp.OpTruncate, NewPath: "/path/to/file", Info: testFileInfo{}}, }, expectedEvents: []harvesterEvent{ harvesterStart("path::/path/to/file"), @@ -204,12 +204,12 @@ func TestProspectorNewAndUpdatedFiles(t *testing.T) { loginp.FSEvent{ Op: loginp.OpCreate, NewPath: "/path/to/file", - Info: testFileInfo{"/path/to/file", 5, minuteAgo}, + Info: testFileInfo{"/path/to/file", 5, minuteAgo, nil}, }, loginp.FSEvent{ Op: loginp.OpWrite, NewPath: "/path/to/other/file", - Info: testFileInfo{"/path/to/other/file", 5, minuteAgo}, + Info: testFileInfo{"/path/to/other/file", 5, minuteAgo, nil}, }, }, ignoreOlder: 10 * time.Second, @@ -222,12 +222,12 @@ func TestProspectorNewAndUpdatedFiles(t *testing.T) { loginp.FSEvent{ Op: loginp.OpCreate, NewPath: "/path/to/file", - Info: testFileInfo{"/path/to/file", 5, minuteAgo}, + Info: testFileInfo{"/path/to/file", 5, minuteAgo, nil}, }, loginp.FSEvent{ Op: loginp.OpWrite, NewPath: "/path/to/other/file", - Info: testFileInfo{"/path/to/other/file", 5, minuteAgo}, + Info: testFileInfo{"/path/to/other/file", 5, minuteAgo, nil}, }, }, ignoreOlder: 5 * time.Minute, @@ -265,13 +265,13 @@ func TestProspectorDeletedFile(t *testing.T) { }{ "one deleted file without clean removed": { events: []loginp.FSEvent{ - loginp.FSEvent{Op: loginp.OpDelete, OldPath: "/path/to/file"}, + loginp.FSEvent{Op: loginp.OpDelete, OldPath: "/path/to/file", Info: testFileInfo{}}, }, cleanRemoved: false, }, "one deleted file with clean removed": { events: []loginp.FSEvent{ - loginp.FSEvent{Op: loginp.OpDelete, OldPath: "/path/to/file"}, + loginp.FSEvent{Op: loginp.OpDelete, OldPath: "/path/to/file", Info: testFileInfo{}}, }, cleanRemoved: true, }, @@ -318,6 +318,7 @@ func TestProspectorRenamedFile(t *testing.T) { Op: loginp.OpRename, OldPath: "/old/path/to/file", NewPath: "/new/path/to/file", + Info: testFileInfo{}, }, }, expectedEvents: []harvesterEvent{ @@ -332,6 +333,7 @@ func TestProspectorRenamedFile(t *testing.T) { Op: loginp.OpRename, OldPath: "/old/path/to/file", NewPath: "/new/path/to/file", + Info: testFileInfo{}, }, }, trackRename: true, @@ -345,6 +347,7 @@ func TestProspectorRenamedFile(t *testing.T) { Op: loginp.OpRename, OldPath: "/old/path/to/file", NewPath: "/new/path/to/file", + Info: testFileInfo{}, }, }, trackRename: true, @@ -396,6 +399,10 @@ type harvesterRestart string func (h harvesterRestart) String() string { return string(h) } +type harvesterContinue string + +func (h harvesterContinue) String() string { return string(h) } + type harvesterStop string func (h harvesterStop) String() string { return string(h) } @@ -420,6 +427,10 @@ func (t *testHarvesterGroup) Restart(_ input.Context, s loginp.Source) { t.events = append(t.events, harvesterRestart(s.Name())) } +func (t *testHarvesterGroup) Continue(_ input.Context, p, s loginp.Source) { + t.events = append(t.events, harvesterContinue(p.Name()+" -> "+s.Name())) +} + func (t *testHarvesterGroup) Stop(s loginp.Source) { t.events = append(t.events, harvesterStop(s.Name())) } diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index e90809b1503..f55994e9c83 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -2750,6 +2750,16 @@ filebeat.inputs: # original for harvesting but will report the symlink name as source. #prospector.scanner.symlinks: false + ### Log rotation + + # When an external tool rotates the input files with copytruncate strategy + # use this section to help the input find the rotated files. + #rotation.external.strategy.copytruncate: + # Regex that matches the rotated files. + # suffix_regex: \.\d$ + # If the rotated filename suffix is a datetime, set it here. + # dateformat: -20060102 + ### State options # Files for the modification data is older then clean_inactive the state from the registry is removed