Skip to content

Commit

Permalink
Define a queue metrics reporter interface (#31289)
Browse files Browse the repository at this point in the history
* first pass at a queue metrics reporter

* change interface

* change monitored types

* change timestamp field name

* add bare queue implementations

* make linter happy

* fix linter ignore

* remove QueueLag for now
  • Loading branch information
fearful-symmetry authored and chrisberkhout committed Jun 1, 2023
1 parent 2990607 commit 96de144
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 1 deletion.
4 changes: 4 additions & 0 deletions libbeat/publisher/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type testConsumer struct {
close func() error
}

func (q *testQueue) Metrics() (queue.Metrics, error) {
return queue.Metrics{}, nil
}

func (q *testQueue) Close() error {
if q.close != nil {
return q.close()
Expand Down
7 changes: 6 additions & 1 deletion libbeat/publisher/queue/diskqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func NewQueue(logger *logp.Logger, settings Settings) (*diskQueue, error) {
// and could also prevent us from creating new ones, so we treat this as a
// fatal error on startup rather than quietly providing degraded
// performance.
return nil, fmt.Errorf("couldn't write to state file: %v", err)
return nil, fmt.Errorf("couldn't write to state file: %w", err)
}

// Index any existing data segments to be placed in segments.reading.
Expand Down Expand Up @@ -193,6 +193,7 @@ func NewQueue(logger *logp.Logger, settings Settings) (*diskQueue, error) {
// events that are still present on disk but were already sent and
// acknowledged on a previous run (we probably want to track these as well
// in the future.)
//nolint:godox // Ignore This
// TODO: pass in a context that queues can use to report these events.
activeFrameCount := 0
for _, segment := range initialSegments {
Expand Down Expand Up @@ -277,3 +278,7 @@ func (dq *diskQueue) Producer(cfg queue.ProducerConfig) queue.Producer {
func (dq *diskQueue) Consumer() queue.Consumer {
return &diskQueueConsumer{queue: dq, done: make(chan struct{})}
}

func (dq *diskQueue) Metrics() (queue.Metrics, error) {
return queue.Metrics{}, queue.ErrMetricsNotImplemented
}
4 changes: 4 additions & 0 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ func (b *broker) Consumer() queue.Consumer {
return newConsumer(b)
}

func (b *broker) Metrics() (queue.Metrics, error) {
return queue.Metrics{}, queue.ErrMetricsNotImplemented
}

var ackChanPool = sync.Pool{
New: func() interface{} {
return &ackChan{
Expand Down
22 changes: 22 additions & 0 deletions libbeat/publisher/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package queue

import (
"errors"
"io"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/opt"
"github.com/elastic/beats/v7/libbeat/publisher"
)

Expand All @@ -34,6 +36,24 @@ type ACKListener interface {
OnACK(eventCount int) // number of consecutively published events acked by producers
}

//Metrics is a set of basic-user friendly metrics that report the current state of the queue. These metrics are meant to be relatively generic and high-level, and when reported directly, can be comprehensible to a user.
type Metrics struct {
//EventCount is the total events currently in the queue
EventCount opt.Uint
//ByteCount is the total byte size of the queue
ByteCount opt.Uint
//ByteLimit is the user-configured byte limit of the queue
ByteLimit opt.Uint
//EventLimit is the user-configured event limit of the queue
EventLimit opt.Uint

//OldestActiveTimestamp is the timestamp of the oldest item in the queue.
OldestActiveTimestamp common.Time
}

// ErrMetricsNotImplemented is a hopefully temporary type to mark queue metrics as not yet implemented
var ErrMetricsNotImplemented = errors.New("Queue metrics not implemented")

// Queue is responsible for accepting, forwarding and ACKing events.
// A queue will receive and buffer single events from its producers.
// Consumers will receive events in batches from the queues buffers.
Expand All @@ -50,6 +70,8 @@ type Queue interface {

Producer(cfg ProducerConfig) Producer
Consumer() Consumer

Metrics() (Metrics, error)
}

// BufferConfig returns the pipelines buffering settings,
Expand Down

0 comments on commit 96de144

Please sign in to comment.