Skip to content

Commit

Permalink
Move tail_files to prospector level (#2932)
Browse files Browse the repository at this point in the history
tail_files is now only applied on the first run and after that ignored. Also the state for all files falling under tail_files and not having a state, a state will directly be written.

* Implement tail_files by setting ignore_older to 1ns for the first run
* Fix typo in stats variable names

Closes #2613 and #2788
  • Loading branch information
ruflin authored and Steffen Siering committed Nov 7, 2016
1 parent 7385fd5 commit 71fd436
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ https://github.com/elastic/beats/compare/v5.0.0...master[Check the HEAD diff]
- If a file is falling under ignore_older during startup, offset is now set to end of file instead of 0.
With the previous logic the whole file was sent in case a line was added and it was inconsitent with
files which were harvested previously. {pull}2907[2907]
- tail_files is now only applied on the first scan and not for all new files. {pull}2932[2932]

*Winlogbeat*

Expand Down
2 changes: 0 additions & 2 deletions filebeat/harvester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ var (
BufferSize: 16 * humanize.KiByte,
DocumentType: "log",
InputType: cfg.DefaultInputType,
TailFiles: false,
Backoff: 1 * time.Second,
BackoffFactor: 2,
MaxBackoff: 10 * time.Second,
Expand All @@ -38,7 +37,6 @@ type harvesterConfig struct {
DocumentType string `config:"document_type"`
Encoding string `config:"encoding"`
InputType string `config:"input_type"`
TailFiles bool `config:"tail_files"`
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
BackoffFactor int `config:"backoff_factor" validate:"min=1"`
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
Expand Down
6 changes: 0 additions & 6 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,6 @@ func (h *Harvester) initFileOffset(file *os.File) (int64, error) {
return file.Seek(h.state.Offset, os.SEEK_SET)
}

// tail file if file is new and tail_files config is set
if h.config.TailFiles {
logp.Debug("harvester", "Setting offset for tailing file: %s.", h.state.Source)
return file.Seek(0, os.SEEK_END)
}

// get offset from file in case of encoding factory was required to read some data.
logp.Debug("harvester", "Setting offset for file based on seek: %s", h.state.Source)
return file.Seek(0, os.SEEK_CUR)
Expand Down
2 changes: 2 additions & 0 deletions filebeat/prospector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var (
CleanRemoved: true,
HarvesterLimit: 0,
Symlinks: false,
TailFiles: false,
}
)

Expand All @@ -30,6 +31,7 @@ type prospectorConfig struct {
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"`
Symlinks bool `config:"symlinks"`
TailFiles bool `config:"tail_files"`
}

func (config *prospectorConfig) Validate() error {
Expand Down
6 changes: 3 additions & 3 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Prospector struct {
}

type Prospectorer interface {
Init(states []file.State) error
Init(states file.States) error
Run()
}

Expand All @@ -61,7 +61,7 @@ func NewProspector(cfg *common.Config, states file.States, outlet Outlet) (*Pros
return nil, err
}

err := prospector.Init(states.GetStates())
err := prospector.Init(states)
if err != nil {
return nil, err
}
Expand All @@ -72,7 +72,7 @@ func NewProspector(cfg *common.Config, states file.States, outlet Outlet) (*Pros
}

// Init sets up default config for prospector
func (p *Prospector) Init(states []file.State) error {
func (p *Prospector) Init(states file.States) error {

var prospectorer Prospectorer
var err error
Expand Down
24 changes: 19 additions & 5 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
)

var (
filesRenamed = expvar.NewInt("filebeat.prospector.log.files.renamed")
filesTrucated = expvar.NewInt("filebeat.prospector.log.files.truncated")
filesRenamed = expvar.NewInt("filebeat.prospector.log.files.renamed")
filesTruncated = expvar.NewInt("filebeat.prospector.log.files.truncated")
)

type ProspectorLog struct {
Expand All @@ -37,13 +37,14 @@ func NewProspectorLog(p *Prospector) (*ProspectorLog, error) {
// Init sets up the prospector
// It goes through all states coming from the registry. Only the states which match the glob patterns of
// the prospector will be loaded and updated. All other states will not be touched.
func (p *ProspectorLog) Init(states []file.State) error {
func (p *ProspectorLog) Init(states file.States) error {
logp.Debug("prospector", "exclude_files: %s", p.config.ExcludeFiles)

for _, state := range states {
for _, state := range states.GetStates() {
// Check if state source belongs to this prospector. If yes, update the state.
if p.matchesFile(state.Source) {
state.TTL = -1

// Update prospector states and send new states to registry
err := p.Prospector.updateState(input.NewEvent(state))
if err != nil {
Expand All @@ -60,6 +61,19 @@ func (p *ProspectorLog) Init(states []file.State) error {
func (p *ProspectorLog) Run() {
logp.Debug("prospector", "Start next scan")

// TailFiles is like ignore_older = 1ns and only on startup
if p.config.TailFiles {
ignoreOlder := p.config.IgnoreOlder

// Overwrite ignore_older for the first scan
p.config.IgnoreOlder = 1
defer func() {
// Reset ignore_older after first run
p.config.IgnoreOlder = ignoreOlder
// Disable tail_files after the first run
p.config.TailFiles = false
}()
}
p.scan()

// It is important that a first scan is run before cleanup to make sure all new states are read first
Expand Down Expand Up @@ -246,7 +260,7 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S
logp.Err("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err)
}

filesTrucated.Add(1)
filesTruncated.Add(1)
return
}

Expand Down
4 changes: 3 additions & 1 deletion filebeat/prospector/prospector_log_other_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ func TestInit(t *testing.T) {
Paths: test.paths,
},
}
err := p.Init(test.states)
states := file.NewStates()
states.SetStates(test.states)
err := p.Init(*states)
assert.NoError(t, err)
assert.Equal(t, test.count, p.Prospector.states.Count())
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/prospector/prospector_stdin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewProspectorStdin(p *Prospector) (*ProspectorStdin, error) {
return prospectorer, nil
}

func (p *ProspectorStdin) Init(states []file.State) error {
func (p *ProspectorStdin) Init(states file.States) error {
p.started = false
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion filebeat/prospector/prospector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ func TestProspectorInitInputTypeLogError(t *testing.T) {
config: prospectorConfig{},
}

err := prospector.Init([]file.State{})
states := file.NewStates()
states.SetStates([]file.State{})
err := prospector.Init(*states)
// Error should be returned because no path is set
assert.Error(t, err)
}
Expand Down

0 comments on commit 71fd436

Please sign in to comment.