Skip to content

Commit

Permalink
[libbeat] rename queue.Eventer -> queue.ACKListener (elastic#16691)
Browse files Browse the repository at this point in the history
  • Loading branch information
faec authored Feb 28, 2020
1 parent fa289ce commit 31b9426
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Deprecate test flags, `generate` and `update_expected`, in favor of `data`. {pull}15292[15292]
- Python 3 is required now to run python tests and tools. {pull}14798[14798]
- The type `memqueue.Broker` is no longer exported; instead of `memqueue.NewBroker`, call `memqueue.NewQueue` (which provides the same public interface). {pull}16667[16667]
- `queue.Eventer` has been renamed to `queue.ACKListener` {pull}16691[16691]

==== Bugfixes

Expand Down
6 changes: 3 additions & 3 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config)
clients = append(clients, client)
}

queueFactory := func(e queue.Eventer) (queue.Queue, error) {
queueFactory := func(ackListener queue.ACKListener) (queue.Queue, error) {
return memqueue.NewQueue(log,
memqueue.Settings{
Eventer: e,
Events: 20,
ACKListener: ackListener,
Events: 20,
}), nil
}

Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestClient(t *testing.T) {
makePipeline := func(settings Settings, qu queue.Queue) *Pipeline {
p, err := New(beat.Info{},
Monitors{},
func(_ queue.Eventer) (queue.Queue, error) {
func(_ queue.ACKListener) (queue.Queue, error) {
return qu, nil
},
outputs.Group{},
Expand Down
6 changes: 3 additions & 3 deletions libbeat/publisher/pipeline/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func loadOutput(
func createQueueBuilder(
config common.ConfigNamespace,
monitors Monitors,
) (func(queue.Eventer) (queue.Queue, error), error) {
) (func(queue.ACKListener) (queue.Queue, error), error) {
queueType := defaultQueueType
if b := config.Name(); b != "" {
queueType = b
Expand All @@ -187,7 +187,7 @@ func createQueueBuilder(
monitoring.NewString(queueReg, "name").Set(queueType)
}

return func(eventer queue.Eventer) (queue.Queue, error) {
return queueFactory(eventer, monitors.Logger, queueConfig)
return func(ackListener queue.ACKListener) (queue.Queue, error) {
return queueFactory(ackListener, monitors.Logger, queueConfig)
}, nil
}
2 changes: 1 addition & 1 deletion libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ type waitCloser struct {
events sync.WaitGroup
}

type queueFactory func(queue.Eventer) (queue.Queue, error)
type queueFactory func(queue.ACKListener) (queue.Queue, error)

// New create a new Pipeline instance from a queue instance and a set of outputs.
// The new pipeline will take ownership of queue and outputs. On Close, the
Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/queue/memqueue/ackloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func (l *ackLoop) handleBatchSig() int {
}

if count > 0 {
if e := l.broker.eventer; e != nil {
e.OnACK(count)
if listener := l.broker.ackListener; listener != nil {
listener.OnACK(count)
}

// report acks to waiting clients
Expand Down
12 changes: 7 additions & 5 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ type broker struct {
acks chan int
scheduledACKs chan chanList

eventer queue.Eventer
ackListener queue.ACKListener

// wait group for worker shutdown
wg sync.WaitGroup
waitOnClose bool
}

type Settings struct {
Eventer queue.Eventer
ACKListener queue.ACKListener
Events int
FlushMinEvents int
FlushTimeout time.Duration
Expand All @@ -87,7 +87,9 @@ func init() {
queue.RegisterType("mem", create)
}

func create(eventer queue.Eventer, logger *logp.Logger, cfg *common.Config) (queue.Queue, error) {
func create(
ackListener queue.ACKListener, logger *logp.Logger, cfg *common.Config,
) (queue.Queue, error) {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, err
Expand All @@ -98,7 +100,7 @@ func create(eventer queue.Eventer, logger *logp.Logger, cfg *common.Config) (que
}

return NewQueue(logger, Settings{
Eventer: eventer,
ACKListener: ackListener,
Events: config.Events,
FlushMinEvents: config.FlushMinEvents,
FlushTimeout: config.FlushTimeout,
Expand Down Expand Up @@ -152,7 +154,7 @@ func NewQueue(

waitOnClose: settings.WaitOnClose,

eventer: settings.Eventer,
ackListener: settings.ACKListener,
}

var eventLoop interface {
Expand Down
8 changes: 4 additions & 4 deletions libbeat/publisher/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import (
)

// Factory for creating a queue used by a pipeline instance.
type Factory func(Eventer, *logp.Logger, *common.Config) (Queue, error)
type Factory func(ACKListener, *logp.Logger, *common.Config) (Queue, error)

// Eventer listens to special events to be send by queue implementations.
type Eventer interface {
OnACK(int) // number of consecutively published messages, acked by producers
// ACKListener listens to special events to be send by queue implementations.
type ACKListener interface {
OnACK(eventCount int) // number of consecutively published events acked by producers
}

// Queue is responsible for accepting, forwarding and ACKing events.
Expand Down
16 changes: 8 additions & 8 deletions libbeat/publisher/queue/spool/inbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
)

type inBroker struct {
ctx *spoolCtx
eventer queue.Eventer
ctx *spoolCtx
ackListener queue.ACKListener

// active state handler
state func(*inBroker) bool
Expand Down Expand Up @@ -73,7 +73,7 @@ const (

func newInBroker(
ctx *spoolCtx,
eventer queue.Eventer,
ackListener queue.ACKListener,
qu *pq.Queue,
codec codecID,
flushTimeout time.Duration,
Expand All @@ -90,9 +90,9 @@ func newInBroker(
}

b := &inBroker{
ctx: ctx,
eventer: eventer,
state: (*inBroker).stateEmpty,
ctx: ctx,
ackListener: ackListener,
state: (*inBroker).stateEmpty,

// API
events: make(chan pushRequest, inEventChannelSize),
Expand Down Expand Up @@ -134,8 +134,8 @@ func (b *inBroker) onFlush(n uint) {
return
}

if b.eventer != nil {
b.eventer.OnACK(int(n))
if b.ackListener != nil {
b.ackListener.OnACK(int(n))
}
b.ctx.logger.Debug("inbroker: flushed events:", n)
b.bufferedEvents -= n
Expand Down
6 changes: 4 additions & 2 deletions libbeat/publisher/queue/spool/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func init() {
queue.RegisterType("spool", create)
}

func create(eventer queue.Eventer, logp *logp.Logger, cfg *common.Config) (queue.Queue, error) {
func create(
ackListener queue.ACKListener, logp *logp.Logger, cfg *common.Config,
) (queue.Queue, error) {
cfgwarn.Beta("Spooling to disk is beta")

config := defaultConfig()
Expand All @@ -63,7 +65,7 @@ func create(eventer queue.Eventer, logp *logp.Logger, cfg *common.Config) (queue
}

return NewSpool(log, path, Settings{
Eventer: eventer,
ACKListener: ackListener,
Mode: config.File.Permissions,
WriteBuffer: uint(config.Write.BufferSize),
WriteFlushTimeout: config.Write.FlushTimeout,
Expand Down
5 changes: 3 additions & 2 deletions libbeat/publisher/queue/spool/spool.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type Settings struct {
// buffer be flushed and reset to its original size.
WriteBuffer uint

Eventer queue.Eventer
ACKListener queue.ACKListener

WriteFlushTimeout time.Duration
WriteFlushEvents uint
Expand Down Expand Up @@ -134,7 +134,8 @@ func NewSpool(logger logger, path string, settings Settings) (*Spool, error) {
if inFlushTimeout < minInFlushTimeout {
inFlushTimeout = minInFlushTimeout
}
inBroker, err := newInBroker(inCtx, settings.Eventer, queue, settings.Codec,
inBroker, err := newInBroker(
inCtx, settings.ACKListener, queue, settings.Codec,
inFlushTimeout, settings.WriteFlushEvents)
if err != nil {
return nil, err
Expand Down

0 comments on commit 31b9426

Please sign in to comment.