From 83ab7e98b64dceec4310383b3a8fcc8ecd9ee4e2 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Thu, 27 May 2021 12:52:46 +0200 Subject: [PATCH] Log input: remove states more eagerly (#25756) ## What does this PR do? The change introduced here more eagerly removes the state if clean_removed is set, reduing the time window of detection and state removal to `scan_frequency` in the log input. ## Why is it important? The log input removes states on clean_removed by setting the TTL to 0, and wait for another scan to finally remove the state for the from the state registry. The total time window for a state removal thusly was `2*scan_frequency`. The disk scan always is subject to race conditions with the actual on disk state. In case of inode reuse an increased time window might might lead to filebeat detecting a new file as a rename of an old file. Next Filebeat truncate detection would try to handle the case, but if logs are written and rotated very fast the new file might already be bigger then the old file. By removing the state more eagerly we reduce the risk of running into a race condition reopening a new file as old. --- CHANGELOG.next.asciidoc | 1 + filebeat/input/log/input.go | 23 ++++++++++++++++++----- filebeat/registrar/registrar.go | 7 +++---- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 53195008707..5e81dfc10f1 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -273,6 +273,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix integer overflow in S3 offsets when collecting very large files. {pull}22523[22523] - Fix CredentialsJSON unpacking for `gcp-pubsub` and `httpjson` inputs. {pull}23277[23277] - Fix issue with m365_defender, when parsing incidents that has no alerts attached: {pull}25421[25421] +- Improve inode reuse handling by removing state for removed files more eagerly from the internal state table in the logs inputs. {pull}25756[25756] - Mitigate deadlock is aws-s3 input when SQS visibility timeout is exceeded. {issue}25750[25750] - Fix httpjson cursor override with empty values by adding `ignore_empty_value` option. {pull}25802[25802] diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index b5a5cc7543f..919c66396cb 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -231,14 +231,13 @@ func (p *Input) Run() { // It is important that a first scan is run before cleanup to make sure all new states are read first if p.config.CleanInactive > 0 || p.config.CleanRemoved { - beforeCount := p.states.Count() - cleanedStates, pendingClean := p.states.Cleanup() - logger.Debugf("input states cleaned up. Before: %d, After: %d, Pending: %d", - beforeCount, beforeCount-cleanedStates, pendingClean) + p.cleanupStates() } - // Marking removed files to be cleaned up. Cleanup happens after next scan to make sure all states are updated first + // Marking removed files to be cleaned up. if p.config.CleanRemoved { + var removed uint + for _, state := range p.states.GetStates() { stateLogger := loggerWithState(logger, state) @@ -246,6 +245,7 @@ func (p *Input) Run() { stat, err := os.Stat(state.Source) if err != nil { if os.IsNotExist(err) { + removed++ p.removeState(stateLogger, state) stateLogger.Debugf("Remove state for file as file removed: %s", state.Source) } else { @@ -259,14 +259,27 @@ func (p *Input) Run() { state.Id, state.IdentifierName = p.fileStateIdentifier.GenerateID(state) } if !state.IsEqual(&newState) { + removed++ p.removeState(stateLogger, state) stateLogger.Debugf("Remove state of file as its identity has changed: %s", state.Source) } } } + + if removed > 0 { + logger.Debugf("%v entries marked as removed. Trigger state cleanup.", removed) + p.cleanupStates() + } } } +func (p *Input) cleanupStates() { + beforeCount := p.states.Count() + cleanedStates, pendingClean := p.states.Cleanup() + p.logger.Debugf("input states cleaned up. Before: %d, After: %d, Pending: %d", + beforeCount, beforeCount-cleanedStates, pendingClean) +} + func (p *Input) removeState(logger *logp.Logger, state file.State) { // Only clean up files where state is Finished if !state.Finished { diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index fa07048d205..27c292886ca 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -175,6 +175,7 @@ func (r *Registrar) Run() { // flush timeout configured. Only update internal state and track pending // updates to be written to registry. r.onEvents(states) + r.gcStates() if flushC == nil && len(states) > 0 { timer = time.NewTimer(r.flushTimeout) flushC = timer.C @@ -197,14 +198,12 @@ func (r *Registrar) commitStateUpdates() { statesCurrent.Set(int64(len(states))) registryWrites.Inc() - - r.log.Debugf("Registry file updated. %d active states.", len(states)) - registrySuccess.Inc() - if err := writeStates(r.store, states); err != nil { r.log.Errorf("Error writing registrar state to statestore: %v", err) registryFails.Inc() } + r.log.Debugf("Registry file updated. %d active states.", len(states)) + registrySuccess.Inc() if r.out != nil { r.out.Published(r.bufferedStateUpdates)