diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4dec4b62bbe..ef876ec72f3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -93,6 +93,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Reduce memory usage if long lines are truncated to fit `max_bytes` limit. The line buffer is copied into a smaller buffer now. This allows the runtime to release unused memory earlier. {pull}11524[11524] - Fix memory leak in Filebeat pipeline acker. {pull}12063[12063] - Fix goroutine leak caused on initialization failures of log input. {pull}12125[12125] +- Fix goroutine leak on non-explicit finalization of log input. {pull}12164[12164] *Heartbeat* diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index ea19c20033b..6c19762ede9 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -24,6 +24,7 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" "github.com/elastic/beats/filebeat/channel" @@ -68,6 +69,7 @@ type Input struct { done chan struct{} numHarvesters atomic.Uint32 meta map[string]string + stopOnce sync.Once } // NewInput instantiates a new Log @@ -146,6 +148,8 @@ func NewInput( logp.Info("Configured paths: %v", p.config.Paths) cleanupNeeded = false + go p.stopWhenDone() + return p, nil } @@ -727,14 +731,27 @@ func (p *Input) Wait() { // Stop stops all harvesters and then stops the input func (p *Input) Stop() { - // Stop all harvesters - // In case the beatDone channel is closed, this will not wait for completion - // Otherwise Stop will wait until output is complete - p.harvesters.Stop() + p.stopOnce.Do(func() { + // Stop all harvesters + // In case the beatDone channel is closed, this will not wait for completion + // Otherwise Stop will wait until output is complete + p.harvesters.Stop() + + // close state updater + p.stateOutlet.Close() + + // stop all communication between harvesters and publisher pipeline + p.outlet.Close() + }) +} - // close state updater - p.stateOutlet.Close() +// stopWhenDone takes care of stopping the input if some of the contexts are done +func (p *Input) stopWhenDone() { + select { + case <-p.done: + case <-p.stateOutlet.Done(): + case <-p.outlet.Done(): + } - // stop all communication between harvesters and publisher pipeline - p.outlet.Close() + p.Wait() }