diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 53195008707..5e81dfc10f1 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -273,6 +273,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix integer overflow in S3 offsets when collecting very large files. {pull}22523[22523] - Fix CredentialsJSON unpacking for `gcp-pubsub` and `httpjson` inputs. {pull}23277[23277] - Fix issue with m365_defender, when parsing incidents that has no alerts attached: {pull}25421[25421] +- Improve inode reuse handling by removing state for removed files more eagerly from the internal state table in the logs inputs. {pull}25756[25756] - Mitigate deadlock is aws-s3 input when SQS visibility timeout is exceeded. {issue}25750[25750] - Fix httpjson cursor override with empty values by adding `ignore_empty_value` option. {pull}25802[25802] diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index b5a5cc7543f..919c66396cb 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -231,14 +231,13 @@ func (p *Input) Run() { // It is important that a first scan is run before cleanup to make sure all new states are read first if p.config.CleanInactive > 0 || p.config.CleanRemoved { - beforeCount := p.states.Count() - cleanedStates, pendingClean := p.states.Cleanup() - logger.Debugf("input states cleaned up. Before: %d, After: %d, Pending: %d", - beforeCount, beforeCount-cleanedStates, pendingClean) + p.cleanupStates() } - // Marking removed files to be cleaned up. Cleanup happens after next scan to make sure all states are updated first + // Marking removed files to be cleaned up. if p.config.CleanRemoved { + var removed uint + for _, state := range p.states.GetStates() { stateLogger := loggerWithState(logger, state) @@ -246,6 +245,7 @@ func (p *Input) Run() { stat, err := os.Stat(state.Source) if err != nil { if os.IsNotExist(err) { + removed++ p.removeState(stateLogger, state) stateLogger.Debugf("Remove state for file as file removed: %s", state.Source) } else { @@ -259,14 +259,27 @@ func (p *Input) Run() { state.Id, state.IdentifierName = p.fileStateIdentifier.GenerateID(state) } if !state.IsEqual(&newState) { + removed++ p.removeState(stateLogger, state) stateLogger.Debugf("Remove state of file as its identity has changed: %s", state.Source) } } } + + if removed > 0 { + logger.Debugf("%v entries marked as removed. Trigger state cleanup.", removed) + p.cleanupStates() + } } } +func (p *Input) cleanupStates() { + beforeCount := p.states.Count() + cleanedStates, pendingClean := p.states.Cleanup() + p.logger.Debugf("input states cleaned up. Before: %d, After: %d, Pending: %d", + beforeCount, beforeCount-cleanedStates, pendingClean) +} + func (p *Input) removeState(logger *logp.Logger, state file.State) { // Only clean up files where state is Finished if !state.Finished { diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index fa07048d205..27c292886ca 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -175,6 +175,7 @@ func (r *Registrar) Run() { // flush timeout configured. Only update internal state and track pending // updates to be written to registry. r.onEvents(states) + r.gcStates() if flushC == nil && len(states) > 0 { timer = time.NewTimer(r.flushTimeout) flushC = timer.C @@ -197,14 +198,12 @@ func (r *Registrar) commitStateUpdates() { statesCurrent.Set(int64(len(states))) registryWrites.Inc() - - r.log.Debugf("Registry file updated. %d active states.", len(states)) - registrySuccess.Inc() - if err := writeStates(r.store, states); err != nil { r.log.Errorf("Error writing registrar state to statestore: %v", err) registryFails.Inc() } + r.log.Debugf("Registry file updated. %d active states.", len(states)) + registrySuccess.Inc() if r.out != nil { r.out.Published(r.bufferedStateUpdates)