Skip to content

Commit

Permalink
Fix goroutine leak on non-explicit finalization of log inputs (elasti…
Browse files Browse the repository at this point in the history
…c#12164)

If log inputs were finished because their context, or one of their
ouleters have been finished, then it wasn't stopping its harvesters,
leaking resources.

(cherry picked from commit 6914806)
  • Loading branch information
jsoriano committed May 20, 2019
1 parent cfbfeb6 commit 85dd9f4
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ https://github.com/elastic/beats/compare/v7.0.0...7.1[Check the HEAD diff]

- Fix goroutine leak caused on initialization failures of log input. {pull}12125[12125]
- Fix memory leak in Filebeat pipeline acker. {pull}12063[12063]
- Fix goroutine leak on non-explicit finalization of log input. {pull}12164[12164]

*Heartbeat*

Expand Down
33 changes: 25 additions & 8 deletions filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"path/filepath"
"sort"
"strings"
"sync"
"time"

"github.com/elastic/beats/filebeat/channel"
Expand Down Expand Up @@ -68,6 +69,7 @@ type Input struct {
done chan struct{}
numHarvesters atomic.Uint32
meta map[string]string
stopOnce sync.Once
}

// NewInput instantiates a new Log
Expand Down Expand Up @@ -146,6 +148,8 @@ func NewInput(
logp.Info("Configured paths: %v", p.config.Paths)

cleanupNeeded = false
go p.stopWhenDone()

return p, nil
}

Expand Down Expand Up @@ -727,14 +731,27 @@ func (p *Input) Wait() {

// Stop stops all harvesters and then stops the input
func (p *Input) Stop() {
// Stop all harvesters
// In case the beatDone channel is closed, this will not wait for completion
// Otherwise Stop will wait until output is complete
p.harvesters.Stop()
p.stopOnce.Do(func() {
// Stop all harvesters
// In case the beatDone channel is closed, this will not wait for completion
// Otherwise Stop will wait until output is complete
p.harvesters.Stop()

// close state updater
p.stateOutlet.Close()

// stop all communication between harvesters and publisher pipeline
p.outlet.Close()
})
}

// close state updater
p.stateOutlet.Close()
// stopWhenDone takes care of stopping the input if some of the contexts are done
func (p *Input) stopWhenDone() {
select {
case <-p.done:
case <-p.stateOutlet.Done():
case <-p.outlet.Done():
}

// stop all communication between harvesters and publisher pipeline
p.outlet.Close()
p.Wait()
}

0 comments on commit 85dd9f4

Please sign in to comment.