From 871d8b60ae7a2d02e9e6eca05ffb6ac0e05d6e3d Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Mon, 20 May 2019 11:26:59 +0200 Subject: [PATCH] Fix goroutine leak on non-explicit finalization of log inputs (#12164) If log inputs were finished because their context, or one of their ouleters have been finished, then it wasn't stopping its harvesters, leaking resources. (cherry picked from commit 6914806fe935093b56c55c78be29d5584e3ed0c6) --- CHANGELOG.next.asciidoc | 1 + filebeat/input/log/input.go | 33 +++++++++++++++++++++++++-------- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c8e5c91981c..e70d87263a9 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -48,6 +48,7 @@ https://github.com/elastic/beats/compare/v6.7.2...6.8[Check the HEAD diff] - Fix initialization of the TCP input logger. {pull}11605[11605] - Fix goroutine leak caused on initialization failures of log input. {pull}12125[12125] - Fix memory leak in Filebeat pipeline acker. {pull}12063[12063] +- Fix goroutine leak on non-explicit finalization of log input. {pull}12164[12164] *Heartbeat* diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 6d5cf35ee69..c03427e4a4d 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -24,6 +24,7 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" "github.com/elastic/beats/filebeat/channel" @@ -68,6 +69,7 @@ type Input struct { done chan struct{} numHarvesters atomic.Uint32 meta map[string]string + stopOnce sync.Once } // NewInput instantiates a new Log @@ -146,6 +148,8 @@ func NewInput( logp.Info("Configured paths: %v", p.config.Paths) cleanupNeeded = false + go p.stopWhenDone() + return p, nil } @@ -727,14 +731,27 @@ func (p *Input) Wait() { // Stop stops all harvesters and then stops the input func (p *Input) Stop() { - // Stop all harvesters - // In case the beatDone channel is closed, this will not wait for completion - // Otherwise Stop will wait until output is complete - p.harvesters.Stop() + p.stopOnce.Do(func() { + // Stop all harvesters + // In case the beatDone channel is closed, this will not wait for completion + // Otherwise Stop will wait until output is complete + p.harvesters.Stop() + + // close state updater + p.stateOutlet.Close() + + // stop all communication between harvesters and publisher pipeline + p.outlet.Close() + }) +} - // close state updater - p.stateOutlet.Close() +// stopWhenDone takes care of stopping the input if some of the contexts are done +func (p *Input) stopWhenDone() { + select { + case <-p.done: + case <-p.stateOutlet.Done(): + case <-p.outlet.Done(): + } - // stop all communication between harvesters and publisher pipeline - p.outlet.Close() + p.Wait() }