Skip to content

Commit

Permalink
Log input: remove states more eagerly (elastic#25756)
Browse files Browse the repository at this point in the history
## What does this PR do?

The change introduced here more eagerly removes the state if
clean_removed is set, reduing the time window of detection and state
removal to `scan_frequency` in the log input.

## Why is it important?

The log input removes states on clean_removed by setting the TTL to 0, and wait for another scan to finally remove the state for the from the state registry. The total time window for a state removal thusly was `2*scan_frequency`. The disk scan always is subject to race conditions with the actual on disk state. In case of inode reuse an increased time window might might lead to filebeat detecting a new file as a rename of an old file. Next Filebeat truncate detection would try to handle the case, but if logs are written and rotated very fast the new file might already be bigger then the old file.

By removing the state more eagerly we reduce the risk of running into a race condition reopening a new file as old.
  • Loading branch information
Steffen Siering authored May 27, 2021
1 parent 57d57f7 commit 83ab7e9
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix integer overflow in S3 offsets when collecting very large files. {pull}22523[22523]
- Fix CredentialsJSON unpacking for `gcp-pubsub` and `httpjson` inputs. {pull}23277[23277]
- Fix issue with m365_defender, when parsing incidents that has no alerts attached: {pull}25421[25421]
- Improve inode reuse handling by removing state for removed files more eagerly from the internal state table in the logs inputs. {pull}25756[25756]
- Mitigate deadlock is aws-s3 input when SQS visibility timeout is exceeded. {issue}25750[25750]
- Fix httpjson cursor override with empty values by adding `ignore_empty_value` option. {pull}25802[25802]

Expand Down
23 changes: 18 additions & 5 deletions filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,21 +231,21 @@ func (p *Input) Run() {

// It is important that a first scan is run before cleanup to make sure all new states are read first
if p.config.CleanInactive > 0 || p.config.CleanRemoved {
beforeCount := p.states.Count()
cleanedStates, pendingClean := p.states.Cleanup()
logger.Debugf("input states cleaned up. Before: %d, After: %d, Pending: %d",
beforeCount, beforeCount-cleanedStates, pendingClean)
p.cleanupStates()
}

// Marking removed files to be cleaned up. Cleanup happens after next scan to make sure all states are updated first
// Marking removed files to be cleaned up.
if p.config.CleanRemoved {
var removed uint

for _, state := range p.states.GetStates() {
stateLogger := loggerWithState(logger, state)

// os.Stat will return an error in case the file does not exist
stat, err := os.Stat(state.Source)
if err != nil {
if os.IsNotExist(err) {
removed++
p.removeState(stateLogger, state)
stateLogger.Debugf("Remove state for file as file removed: %s", state.Source)
} else {
Expand All @@ -259,14 +259,27 @@ func (p *Input) Run() {
state.Id, state.IdentifierName = p.fileStateIdentifier.GenerateID(state)
}
if !state.IsEqual(&newState) {
removed++
p.removeState(stateLogger, state)
stateLogger.Debugf("Remove state of file as its identity has changed: %s", state.Source)
}
}
}

if removed > 0 {
logger.Debugf("%v entries marked as removed. Trigger state cleanup.", removed)
p.cleanupStates()
}
}
}

func (p *Input) cleanupStates() {
beforeCount := p.states.Count()
cleanedStates, pendingClean := p.states.Cleanup()
p.logger.Debugf("input states cleaned up. Before: %d, After: %d, Pending: %d",
beforeCount, beforeCount-cleanedStates, pendingClean)
}

func (p *Input) removeState(logger *logp.Logger, state file.State) {
// Only clean up files where state is Finished
if !state.Finished {
Expand Down
7 changes: 3 additions & 4 deletions filebeat/registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (r *Registrar) Run() {
// flush timeout configured. Only update internal state and track pending
// updates to be written to registry.
r.onEvents(states)
r.gcStates()
if flushC == nil && len(states) > 0 {
timer = time.NewTimer(r.flushTimeout)
flushC = timer.C
Expand All @@ -197,14 +198,12 @@ func (r *Registrar) commitStateUpdates() {
statesCurrent.Set(int64(len(states)))

registryWrites.Inc()

r.log.Debugf("Registry file updated. %d active states.", len(states))
registrySuccess.Inc()

if err := writeStates(r.store, states); err != nil {
r.log.Errorf("Error writing registrar state to statestore: %v", err)
registryFails.Inc()
}
r.log.Debugf("Registry file updated. %d active states.", len(states))
registrySuccess.Inc()

if r.out != nil {
r.out.Published(r.bufferedStateUpdates)
Expand Down

0 comments on commit 83ab7e9

Please sign in to comment.