diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index b2ff293b69c..1f004c42d31 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -34,6 +34,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - `queue.Eventer` has been renamed to `queue.ACKListener` {pull}16691[16691] - Require logger as first parameter for `outputs.elasticsearch.client#BulkReadItemStatus`. {pull}16761[16761] - Extract Elasticsearch client logic from `outputs/elasticsearch` package into new `esclientleg` package. {pull}16150[16150] +- Rename `queue.BufferConfig.Events` to `queue.BufferConfig.MaxEvents`. {pull}17622[17622] ==== Bugfixes diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 7f462b91152..2e6d5b7b77e 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -183,11 +183,7 @@ func New( return nil, err } - if count := p.queue.BufferConfig().Events; count > 0 { - p.eventSema = newSema(count) - } - - maxEvents := p.queue.BufferConfig().Events + maxEvents := p.queue.BufferConfig().MaxEvents if maxEvents <= 0 { // Maximum number of events until acker starts blocking. // Only active if pipeline can drop events. diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index 7aecfe09bba..fdc2d8e27c9 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -191,7 +191,7 @@ func (b *broker) Close() error { func (b *broker) BufferConfig() queue.BufferConfig { return queue.BufferConfig{ - Events: b.bufSize, + MaxEvents: b.bufSize, } } diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index a99c81e3f9b..2e6cd8e395b 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -58,7 +58,9 @@ type Queue interface { // but still dropping events, the pipeline can use the buffer information, // to define an upper bound of events being active in the pipeline. type BufferConfig struct { - Events int // can be <= 0, if queue can not determine limit + // MaxEvents is the maximum number of events the queue can hold at capacity. + // A value <= 0 means there is no fixed limit. + MaxEvents int } // ProducerConfig as used by the Pipeline to configure some custom callbacks diff --git a/libbeat/publisher/queue/spool/spool.go b/libbeat/publisher/queue/spool/spool.go index 1a920e622b7..c796170fdc7 100644 --- a/libbeat/publisher/queue/spool/spool.go +++ b/libbeat/publisher/queue/spool/spool.go @@ -175,7 +175,7 @@ func (s *diskSpool) Close() error { // BufferConfig returns the queue initial buffer settings. func (s *diskSpool) BufferConfig() queue.BufferConfig { - return queue.BufferConfig{Events: -1} + return queue.BufferConfig{MaxEvents: -1} } // Producer creates a new queue producer for publishing events.