From 8c14932457308a88b2f00260663834c76960e225 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Thu, 2 Jul 2020 17:21:20 +0200 Subject: [PATCH] Prepare input/file for changes in the registrar (#19516) Planned changes in the registrar will introduce a key-value store. This changes prepares the input/file package for upcoming updates. (cherry picked from commit 36e2978945a95f2adc75d321412d31926a4f4ce3) --- filebeat/input/file/state.go | 20 ++++++++++---------- filebeat/input/file/states.go | 12 +++++++++++- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/filebeat/input/file/state.go b/filebeat/input/file/state.go index dde3c6c5421..ef255243b4c 100644 --- a/filebeat/input/file/state.go +++ b/filebeat/input/file/state.go @@ -30,16 +30,16 @@ import ( // State is used to communicate the reading state of a file type State struct { - Id string `json:"-"` // local unique id to make comparison more efficient - Finished bool `json:"-"` // harvester state - Fileinfo os.FileInfo `json:"-"` // the file info - Source string `json:"source"` - Offset int64 `json:"offset"` - Timestamp time.Time `json:"timestamp"` - TTL time.Duration `json:"ttl"` - Type string `json:"type"` - Meta map[string]string `json:"meta"` - FileStateOS file.StateOS + Id string `json:"-" struct:"-"` // local unique id to make comparison more efficient + Finished bool `json:"-" struct:"-"` // harvester state + Fileinfo os.FileInfo `json:"-" struct:"-"` // the file info + Source string `json:"source" struct:"source"` + Offset int64 `json:"offset" struct:"offset"` + Timestamp time.Time `json:"timestamp" struct:"timestamp"` + TTL time.Duration `json:"ttl" struct:"ttl"` + Type string `json:"type" struct:"type"` + Meta map[string]string `json:"meta" struct:"meta,omitempty"` + FileStateOS file.StateOS `json:"FileStateOS" struct:"FileStateOS"` } // NewState creates a new file state diff --git a/filebeat/input/file/states.go b/filebeat/input/file/states.go index fc50dd904c0..34704b41dba 100644 --- a/filebeat/input/file/states.go +++ b/filebeat/input/file/states.go @@ -94,6 +94,12 @@ func (s *States) findPrevious(id string) int { // The number of states that were cleaned up and number of states that can be // cleaned up in the future is returned. func (s *States) Cleanup() (int, int) { + return s.CleanupWith(nil) +} + +// CleanupWith cleans up the state array. It calls `fn` with the state ID, for +// each entry to be removed. +func (s *States) CleanupWith(fn func(string)) (int, int) { s.Lock() defer s.Unlock() @@ -114,7 +120,11 @@ func (s *States) Cleanup() (int, int) { continue } - delete(s.idx, state.ID()) + id := state.ID() + delete(s.idx, id) + if fn != nil { + fn(id) + } logp.Debug("state", "State removed for %v because of older: %v", state.Source, state.TTL) L--