Skip to content

Commit

Permalink
Fix memory leak in Filebeat pipeline acker (elastic#12063)
Browse files Browse the repository at this point in the history
* Fix memory leak in Filebeat pipeline acker

Before this change acker goroutine was kept forever as processed events
count was not correctly updated.

Filebeat sends an empty event to update file states, this event is not
published, but treated as dropped, without updating counters.

This change makes sures that `a.events` count gets updated for dropped
events also, so the acker gets closed after all ACKs happen.

(cherry picked from commit 9653105)
  • Loading branch information
exekias authored and Carlos Pérez-Aradros Herce committed May 9, 2019
1 parent d0179c5 commit 9136aa8
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
10 changes: 10 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ https://github.com/elastic/beats/compare/v7.0.0...7.1[Check the HEAD diff]

*Filebeat*

- Add support for Cisco syslog format used by their switch. {pull}10760[10760]
- Cover empty request data, url and version in Apache2 module{pull}10730[10730]
- Fix registry entries not being cleaned due to race conditions. {pull}10747[10747]
- Improve detection of file deletion on Windows. {pull}10747[10747]
- Fix goroutine leak happening when harvesters are dynamically stopped. {pull}11263[11263]
- Fix `add_docker_metadata` source matching, using `log.file.path` field now. {pull}11577[11577]
- Add missing Kubernetes metadata fields to Filebeat CoreDNS module, and fix a documentation error. {pull}11591[11591]
- Reduce memory usage if long lines are truncated to fit `max_bytes` limit. The line buffer is copied into a smaller buffer now. This allows the runtime to release unused memory earlier. {pull}11524[11524]
- Fix memory leak in Filebeat pipeline acker. {pull}12063[12063]

*Heartbeat*

*Journalbeat*
Expand Down
9 changes: 8 additions & 1 deletion libbeat/publisher/pipeline/acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,26 @@ func (a *gapCountACK) ackLoop() {
case <-a.done:
closing = true
a.done = nil
if a.events.Load() == 0 {
// stop worker, if all events accounted for have been ACKed.
// If new events are added after this acker won't handle them, which may
// result in duplicates
return
}

case <-a.pipeline.ackDone:
return

case n := <-acks:
empty := a.handleACK(n)
if empty && closing && a.events.Load() == 0 {
// stop worker, iff all events accounted for have been ACKed
// stop worker, if and only if all events accounted for have been ACKed
return
}

case <-drop:
// TODO: accumulate multiple drop events + flush count with timer
a.events.Sub(1)
a.fn(1, 0)
}
}
Expand Down

0 comments on commit 9136aa8

Please sign in to comment.