diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index ee61eb2f111..11070139c87 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -18,7 +18,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff] *Packetbeat* *Topbeat* -- Rename proc.cpu.user_p with proc.cpu.total_p as includes CPU time spent in kernel space {pull}631[631] +- Rename proc.cpu.user_p with proc.cpu.total_p as includes CPU time spent in kernel space {pull}631[631] *Filebeat* @@ -43,6 +43,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff] *Filebeat* - Add exclude_files configuration option {pull}563[563] - Stop filebeat if filebeat is started without any prospectors defined or empty prospectors {pull}644[644] {pull}647[647] +- Improve shutdown of crawler and prospector to wait for clean completion {pull}720[720] - Set spool_size default value to 2048 {pull}628[628] *Winlogbeat* diff --git a/filebeat/beat/filebeat.go b/filebeat/beat/filebeat.go index 4051daa2fa7..8f008b7f08e 100644 --- a/filebeat/beat/filebeat.go +++ b/filebeat/beat/filebeat.go @@ -19,8 +19,10 @@ type Filebeat struct { FbConfig *cfg.Config // Channel from harvesters to spooler publisherChan chan []*FileEvent - Spooler *Spooler + spooler *Spooler registrar *Registrar + cralwer *Crawler + done chan struct{} } func New() *Filebeat { @@ -44,6 +46,8 @@ func (fb *Filebeat) Config(b *beat.Beat) error { } func (fb *Filebeat) Setup(b *beat.Beat) error { + fb.done = make(chan struct{}) + return nil } @@ -61,7 +65,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { return err } - crawl := &Crawler{ + fb.cralwer = &Crawler{ Registrar: fb.registrar, } @@ -69,8 +73,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error { fb.registrar.LoadState() // Init and Start spooler: Harvesters dump events into the spooler. - fb.Spooler = NewSpooler(fb) - err = fb.Spooler.Config() + fb.spooler = NewSpooler(fb) + err = fb.spooler.Config() if err != nil { logp.Err("Could not init spooler: %v", err) @@ -78,9 +82,12 @@ func (fb *Filebeat) Run(b *beat.Beat) error { } // Start up spooler - go fb.Spooler.Run() + go fb.spooler.Run() + + // registrar records last acknowledged positions in all files. + go fb.registrar.Run() - err = crawl.Start(fb.FbConfig.Filebeat.Prospectors, fb.Spooler.Channel) + err = fb.cralwer.Start(fb.FbConfig.Filebeat.Prospectors, fb.spooler.Channel) if err != nil { return err } @@ -88,8 +95,10 @@ func (fb *Filebeat) Run(b *beat.Beat) error { // Publishes event to output go Publish(b, fb) - // registrar records last acknowledged positions in all files. - fb.registrar.Run() + // Blocks progressing + select { + case <-fb.done: + } return nil } @@ -101,17 +110,18 @@ func (fb *Filebeat) Cleanup(b *beat.Beat) error { // Stop is called on exit for cleanup func (fb *Filebeat) Stop() { - // Stop harvesters - // Stop prospectors + logp.Info("Stopping filebeat") + // Stop crawler -> stop prospectors -> stop harvesters + fb.cralwer.Stop() // Stopping spooler will flush items - fb.Spooler.Stop() + fb.spooler.Stop() // Stopping registrar will write last state fb.registrar.Stop() - // Close channels - //close(fb.publisherChan) + // Stop Filebeat + close(fb.done) } func Publish(beat *beat.Beat, fb *Filebeat) { diff --git a/filebeat/beat/spooler.go b/filebeat/beat/spooler.go index 0c3ca1a5c20..860cd47d50e 100644 --- a/filebeat/beat/spooler.go +++ b/filebeat/beat/spooler.go @@ -10,7 +10,7 @@ import ( type Spooler struct { Filebeat *Filebeat - running bool + exit chan struct{} nextFlushTime time.Time spool []*input.FileEvent Channel chan *input.FileEvent @@ -19,7 +19,7 @@ type Spooler struct { func NewSpooler(filebeat *Filebeat) *Spooler { spooler := &Spooler{ Filebeat: filebeat, - running: false, + exit: make(chan struct{}), } config := &spooler.Filebeat.FbConfig.Filebeat @@ -66,9 +66,6 @@ func (s *Spooler) Run() { config := &s.Filebeat.FbConfig.Filebeat - // Enable running - s.running = true - // Sets up ticket channel ticker := time.NewTicker(config.IdleTimeoutDuration / 2) @@ -78,11 +75,10 @@ func (s *Spooler) Run() { // Loops until running is set to false for { - if !s.running { - break - } - select { + + case <-s.exit: + break case event := <-s.Channel: s.spool = append(s.spool, event) @@ -99,18 +95,19 @@ func (s *Spooler) Run() { } } } +} +// Stop stops the spooler. Flushes events before stopping +func (s *Spooler) Stop() { logp.Info("Stopping spooler") + close(s.exit) // Flush again before exiting spooler and closes channel + logp.Info("Spooler last flush spooler") s.flush() close(s.Channel) } -// Stop stops the spooler. Flushes events before stopping -func (s *Spooler) Stop() { -} - // flush flushes all event and sends them to the publisher func (s *Spooler) flush() { // Checks if any new objects diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index 2d548716736..f47600c06c4 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -2,6 +2,7 @@ package crawler import ( "fmt" + "sync" "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/input" @@ -22,20 +23,17 @@ import ( type Crawler struct { // Registrar object to persist the state - Registrar *Registrar - running bool + Registrar *Registrar + prospectors []*Prospector + wg sync.WaitGroup } -func (crawler *Crawler) Start(prospectorConfigs []config.ProspectorConfig, eventChan chan *input.FileEvent) error { - - crawler.running = true +func (c *Crawler) Start(prospectorConfigs []config.ProspectorConfig, eventChan chan *input.FileEvent) error { if len(prospectorConfigs) == 0 { return fmt.Errorf("No prospectors defined. You must have at least one prospector defined in the config file.") } - var prospectors []*Prospector - logp.Info("Loading Prospectors: %v", len(prospectorConfigs)) // Prospect the globs/paths given on the command line and launch harvesters @@ -43,26 +41,34 @@ func (crawler *Crawler) Start(prospectorConfigs []config.ProspectorConfig, event logp.Debug("prospector", "File Configs: %v", prospectorConfig.Paths) - prospector, err := NewProspector(prospectorConfig, crawler.Registrar, eventChan) - prospectors = append(prospectors, prospector) + prospector, err := NewProspector(prospectorConfig, c.Registrar, eventChan) + c.prospectors = append(c.prospectors, prospector) if err != nil { return fmt.Errorf("Error in initing prospector: %s", err) } } - logp.Info("Loading Prospectors completed") - logp.Info("Running Prospectors") - for _, prospector := range prospectors { - go prospector.Run() + logp.Info("Loading Prospectors completed. Number of prospectors: %v", len(c.prospectors)) + + c.wg = sync.WaitGroup{} + for _, prospector := range c.prospectors { + c.wg.Add(1) + go prospector.Run(&c.wg) } - logp.Info("All prospectors are running") - logp.Info("All prospectors initialised with %d states to persist", len(crawler.Registrar.State)) + logp.Info("All prospectors are initialised and running with %d states to persist", len(c.Registrar.State)) return nil } -func (crawler *Crawler) Stop() { - // TODO: Properly stop prospectors and harvesters +func (c *Crawler) Stop() { + logp.Info("Stopping Crawler") + + logp.Info("Stopping %v prospectors", len(c.prospectors)) + for _, prospector := range c.prospectors { + prospector.Stop() + } + c.wg.Wait() + logp.Info("Crawler stopped") } diff --git a/filebeat/crawler/prospector.go b/filebeat/crawler/prospector.go index 7d4188da593..1819a8e53a8 100644 --- a/filebeat/crawler/prospector.go +++ b/filebeat/crawler/prospector.go @@ -2,6 +2,7 @@ package crawler import ( "fmt" + "sync" "time" cfg "github.com/elastic/beats/filebeat/config" @@ -15,7 +16,7 @@ type Prospector struct { prospectorer Prospectorer channel chan *input.FileEvent registrar *Registrar - running bool + done chan struct{} } type Prospectorer interface { @@ -28,6 +29,7 @@ func NewProspector(prospectorConfig cfg.ProspectorConfig, registrar *Registrar, ProspectorConfig: prospectorConfig, registrar: registrar, channel: channel, + done: make(chan struct{}), } err := prospector.Init() @@ -61,6 +63,7 @@ func (p *Prospector) Init() error { case cfg.LogInputType: prospectorer, err = NewProspectorLog(p.ProspectorConfig, p.channel, p.registrar) prospectorer.Init() + default: return fmt.Errorf("Invalid prospector type: %v", p.ProspectorConfig.Harvester.InputType) } @@ -71,21 +74,32 @@ func (p *Prospector) Init() error { } // Starts scanning through all the file paths and fetch the related files. Start a harvester for each file -func (p *Prospector) Run() { +func (p *Prospector) Run(wg *sync.WaitGroup) { + + // TODO: Defer the wg.Done() call to block shutdown + // Currently there are 2 cases where shutting down the prospector could be blocked: + // 1. reading from file + // 2. forwarding event to spooler + // As this is not implemented yet, no blocking on prospector shutdown is done. + wg.Done() - p.running = true logp.Info("Starting prospector of type: %v", p.ProspectorConfig.Harvester.InputType) for { - p.prospectorer.Run(p.channel) - if !p.running { - break + select { + case <-p.done: + logp.Info("Prospector stopped") + return + default: + logp.Info("Run prospector") + p.prospectorer.Run(p.channel) } } } func (p *Prospector) Stop() { - // TODO: Stopping is currently not implemented + logp.Info("Stopping Prospector") + close(p.done) } // Setup Prospector Config diff --git a/filebeat/harvester/util.go b/filebeat/harvester/util.go index d57a9873fd2..1ea9e6b6f74 100644 --- a/filebeat/harvester/util.go +++ b/filebeat/harvester/util.go @@ -17,7 +17,6 @@ func readLine(reader processor.LineProcessor) (time.Time, string, int, error) { // Full line read to be returned if l.Bytes != 0 && err == nil { - logp.Debug("harvester", "full line read") return l.Ts, string(l.Content), l.Bytes, err } diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index d66c015194d..6b75d494593 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -112,15 +112,12 @@ func Run(name string, version string, bt Beater) { }() // Waits until beats channel is closed - for { - select { - case <-b.exit: - b.Stop() - logp.Info("Exit beat completed") - return - } + select { + case <-b.exit: + b.Stop() + logp.Info("Exit beat completed") + return } - } func (b *Beat) Start() error {