From 689d4ef6723dfe30dccf3cd24a8abdf25b8c044f Mon Sep 17 00:00:00 2001 From: ruflin Date: Tue, 19 Jan 2016 17:47:02 +0100 Subject: [PATCH] Refactor beat exit * Introduce Signal function which is called if using CTRL-C or similar * Run now returns an error and doesn't exist itself anymore * Fix spooler and crawler shutdown issue * Update mockbeat to check Run return error. Thanks to @cyrilleverrier for his contribution here. --- CHANGELOG.asciidoc | 1 + filebeat/beat/spooler.go | 16 ++++++++------- filebeat/crawler/crawler.go | 3 +-- libbeat/beat/beat.go | 38 ++++++++++++++++++++--------------- libbeat/libbeat.go | 7 ++++++- winlogbeat/beat/winlogbeat.go | 6 ++++-- winlogbeat/main.go | 7 ++++++- 7 files changed, 49 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 363a87a41c1..13194fc9085 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -14,6 +14,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff] *Affecting all Beats* - Some publisher options refactoring in libbeat {pull}684[684] +- Run function to start a beat no returns an error instead of directly exiting. {pull}771[771] *Packetbeat* diff --git a/filebeat/beat/spooler.go b/filebeat/beat/spooler.go index 860cd47d50e..61c7fc3df89 100644 --- a/filebeat/beat/spooler.go +++ b/filebeat/beat/spooler.go @@ -79,13 +79,15 @@ func (s *Spooler) Run() { case <-s.exit: break - case event := <-s.Channel: - s.spool = append(s.spool, event) - - // Spooler is full -> flush - if len(s.spool) == cap(s.spool) { - logp.Debug("spooler", "Flushing spooler because spooler full. Events flushed: %v", len(s.spool)) - s.flush() + case event, ok := <-s.Channel: + if ok { + s.spool = append(s.spool, event) + + // Spooler is full -> flush + if len(s.spool) == cap(s.spool) { + logp.Debug("spooler", "Flushing spooler because spooler full. Events flushed: %v", len(s.spool)) + s.flush() + } } case <-ticker.C: // Flush periodically diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index f47600c06c4..f32fb368eca 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -42,11 +42,10 @@ func (c *Crawler) Start(prospectorConfigs []config.ProspectorConfig, eventChan c logp.Debug("prospector", "File Configs: %v", prospectorConfig.Paths) prospector, err := NewProspector(prospectorConfig, c.Registrar, eventChan) - c.prospectors = append(c.prospectors, prospector) - if err != nil { return fmt.Errorf("Error in initing prospector: %s", err) } + c.prospectors = append(c.prospectors, prospector) } logp.Info("Loading Prospectors completed. Number of prospectors: %v", len(c.prospectors)) diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index ec0a94d1c94..7dde956ebb4 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -25,8 +25,8 @@ package beat import ( "flag" "fmt" - "os" "runtime" + "sync" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/logp" @@ -63,7 +63,9 @@ type Beat struct { Events publisher.Client UUID uuid.UUID - exit chan struct{} + exit chan struct{} + error error + callback sync.Once } // Basic configuration of every beat @@ -98,7 +100,7 @@ func NewBeat(name string, version string, bt Beater) *Beat { } // Initiates and runs a new beat object -func Run(name string, version string, bt Beater) { +func Run(name string, version string, bt Beater) error { b := NewBeat(name, version, bt) @@ -110,7 +112,7 @@ func Run(name string, version string, bt Beater) { // TODO: detect if logging was already fully setup or not fmt.Printf("Start error: %v\n", err) logp.Critical("Start error: %v", err) - os.Exit(1) + b.error = err } // If start finishes, exit has to be called. This requires start to be blocking @@ -123,7 +125,7 @@ func Run(name string, version string, bt Beater) { case <-b.exit: b.Stop() logp.Info("Exit beat completed") - return + return b.error } } @@ -248,27 +250,31 @@ func (b *Beat) Run() error { logp.Critical("Running the beat returned an error: %v", err) } + return err +} + +// Stop calls the beater Stop action. +// It can happen that this function is called more then once. +func (b *Beat) Stop() { + logp.Info("Stopping Beat") + b.BT.Stop() + service.Cleanup() logp.Info("Cleaning up %s before shutting down.", b.Name) // Call beater cleanup function - err = b.BT.Cleanup(b) + err := b.BT.Cleanup(b) if err != nil { logp.Err("Cleanup returned an error: %v", err) } - return err -} - -// Stop calls the beater Stop action. -// It can happen that this function is called more then once. -func (beat *Beat) Stop() { - logp.Info("Stopping Beat") - beat.BT.Stop() } // Exiting beat -> shutdown func (b *Beat) Exit() { - logp.Info("Start exiting beat") - close(b.exit) + + b.callback.Do(func() { + logp.Info("Start exiting beat") + close(b.exit) + }) } diff --git a/libbeat/libbeat.go b/libbeat/libbeat.go index 18c04eef8eb..02e6b29635f 100644 --- a/libbeat/libbeat.go +++ b/libbeat/libbeat.go @@ -1,10 +1,15 @@ package main import ( + "os" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/mock" ) func main() { - beat.Run(mock.Name, mock.Version, &mock.Mockbeat{}) + err := beat.Run(mock.Name, mock.Version, &mock.Mockbeat{}) + if err != nil { + os.Exit(1) + } } diff --git a/winlogbeat/beat/winlogbeat.go b/winlogbeat/beat/winlogbeat.go index f8de9c93829..8d4f66495cf 100644 --- a/winlogbeat/beat/winlogbeat.go +++ b/winlogbeat/beat/winlogbeat.go @@ -182,8 +182,10 @@ func (eb *Winlogbeat) Cleanup(b *beat.Beat) error { } func (eb *Winlogbeat) Stop() { - logp.Info("Initiating shutdown, please wait.") - close(eb.done) + logp.Info("Stopping Winlogbeat") + if eb.done != nil { + close(eb.done) + } } func (eb *Winlogbeat) processEventLog( diff --git a/winlogbeat/main.go b/winlogbeat/main.go index d8a54e6a23a..a7197621660 100644 --- a/winlogbeat/main.go +++ b/winlogbeat/main.go @@ -1,6 +1,8 @@ package main import ( + "os" + "github.com/elastic/beats/libbeat/beat" winlogbeat "github.com/elastic/beats/winlogbeat/beat" ) @@ -9,5 +11,8 @@ import ( var Name = "winlogbeat" func main() { - beat.Run(Name, "", winlogbeat.New()) + err := beat.Run(Name, "", winlogbeat.New()) + if err != nil { + os.Exit(1) + } }