diff --git a/filebeat/input/file/state.go b/filebeat/input/file/state.go index dde3c6c5421..ef255243b4c 100644 --- a/filebeat/input/file/state.go +++ b/filebeat/input/file/state.go @@ -30,16 +30,16 @@ import ( // State is used to communicate the reading state of a file type State struct { - Id string `json:"-"` // local unique id to make comparison more efficient - Finished bool `json:"-"` // harvester state - Fileinfo os.FileInfo `json:"-"` // the file info - Source string `json:"source"` - Offset int64 `json:"offset"` - Timestamp time.Time `json:"timestamp"` - TTL time.Duration `json:"ttl"` - Type string `json:"type"` - Meta map[string]string `json:"meta"` - FileStateOS file.StateOS + Id string `json:"-" struct:"-"` // local unique id to make comparison more efficient + Finished bool `json:"-" struct:"-"` // harvester state + Fileinfo os.FileInfo `json:"-" struct:"-"` // the file info + Source string `json:"source" struct:"source"` + Offset int64 `json:"offset" struct:"offset"` + Timestamp time.Time `json:"timestamp" struct:"timestamp"` + TTL time.Duration `json:"ttl" struct:"ttl"` + Type string `json:"type" struct:"type"` + Meta map[string]string `json:"meta" struct:"meta,omitempty"` + FileStateOS file.StateOS `json:"FileStateOS" struct:"FileStateOS"` } // NewState creates a new file state diff --git a/filebeat/input/file/states.go b/filebeat/input/file/states.go index fc50dd904c0..34704b41dba 100644 --- a/filebeat/input/file/states.go +++ b/filebeat/input/file/states.go @@ -94,6 +94,12 @@ func (s *States) findPrevious(id string) int { // The number of states that were cleaned up and number of states that can be // cleaned up in the future is returned. func (s *States) Cleanup() (int, int) { + return s.CleanupWith(nil) +} + +// CleanupWith cleans up the state array. It calls `fn` with the state ID, for +// each entry to be removed. +func (s *States) CleanupWith(fn func(string)) (int, int) { s.Lock() defer s.Unlock() @@ -114,7 +120,11 @@ func (s *States) Cleanup() (int, int) { continue } - delete(s.idx, state.ID()) + id := state.ID() + delete(s.idx, id) + if fn != nil { + fn(id) + } logp.Debug("state", "State removed for %v because of older: %v", state.Source, state.TTL) L--