Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add entry IDs to the memory queue #32541

Merged
merged 29 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4392736
rename queue.Batch.ACK -> queue.Batch.Done
faec Jun 9, 2022
56ca444
Merge branch 'main' of github.com:elastic/beats
faec Jun 16, 2022
b4b8c7f
working on memory queue updates
faec Jun 22, 2022
9c1e575
Merge branch 'main' of github.com:elastic/beats
faec Jun 22, 2022
a3ef8e6
Merge branch 'main' into queue-batch
faec Jun 28, 2022
af24d15
adding queue entry ids to the api
faec Jul 14, 2022
92383f1
trying to untangle the memqueue producer seq numbers
faec Jul 14, 2022
0737f73
Merge branch 'main' of github.com:elastic/beats
faec Jul 25, 2022
64357db
Merge branch 'main' into queue-batch
faec Jul 27, 2022
f69d7ec
mid-refactor
faec Jul 27, 2022
939b702
it builds again
faec Jul 27, 2022
c31f51b
fix a variety of bugs
faec Jul 28, 2022
1cc14cf
more fixes
faec Jul 28, 2022
6c70ebc
queue tests pass
faec Jul 28, 2022
37bb4b1
pipeline tests pass but maybe shouldn't
faec Jul 28, 2022
d41fa54
pipeline tests pass for real
faec Jul 28, 2022
f1ee576
lint
faec Jul 29, 2022
f2d2146
return valid event ids from forgetfulProducer
faec Jul 29, 2022
4e729c6
remove commented code
faec Aug 2, 2022
740fb36
review comments
faec Aug 2, 2022
e907952
add comments
faec Aug 2, 2022
7b66aeb
report acked entry IDs from the queue metrics call
faec Aug 2, 2022
edd9ec4
make memqueue.Broker private again
faec Aug 2, 2022
f96e6b3
unit test in progress
faec Aug 3, 2022
7784252
add (non-passing) unit tests
faec Aug 3, 2022
139fbe2
fix testBackward
faec Aug 3, 2022
da9eff4
working on making the tests deterministic
faec Aug 3, 2022
c0e89d0
make producer callbacks more deterministic
faec Aug 3, 2022
387923d
fix remaining tests
faec Aug 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ func (c *client) publish(e beat.Event) {

var published bool
if c.canDrop {
published = c.producer.TryPublish(pubEvent)
_, published = c.producer.TryPublish(pubEvent)
} else {
published = c.producer.Publish(pubEvent)
_, published = c.producer.Publish(pubEvent)
}

if published {
Expand Down
3 changes: 2 additions & 1 deletion libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ func TestClientWaitClose(t *testing.T) {
defer client.Close()

// Send an event which gets acknowledged immediately.
client.Publish(beat.Event{})
output := newMockClient(func(batch publisher.Batch) error {
batch.ACK()
return nil
Expand All @@ -196,6 +195,8 @@ func TestClientWaitClose(t *testing.T) {
pipeline.output.Set(outputs.Group{Clients: []outputs.Client{output}})
defer pipeline.output.Set(outputs.Group{})

client.Publish(beat.Event{})

closed := make(chan struct{})
go func() {
defer close(closed)
Expand Down
5 changes: 3 additions & 2 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,10 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
producerCfg := queue.ProducerConfig{}

if reportEvents || cfg.Events != nil {
producerCfg.OnDrop = func(event beat.Event) {
producerCfg.OnDrop = func(event interface{}) {
publisherEvent, _ := event.(publisher.Event)
if cfg.Events != nil {
cfg.Events.DroppedOnPublish(event)
cfg.Events.DroppedOnPublish(publisherEvent.Content)
}
if reportEvents {
p.waitCloseGroup.Add(-1)
Expand Down
16 changes: 8 additions & 8 deletions libbeat/publisher/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type testQueue struct {
}

type testProducer struct {
publish func(try bool, event interface{}) bool
publish func(try bool, event interface{}) (queue.EntryID, bool)
cancel func() int
}

Expand Down Expand Up @@ -68,18 +68,18 @@ func (q *testQueue) Get(sz int) (queue.Batch, error) {
return nil, nil
}

func (p *testProducer) Publish(event interface{}) bool {
func (p *testProducer) Publish(event interface{}) (queue.EntryID, bool) {
if p.publish != nil {
return p.publish(false, event)
}
return false
return 0, false
}

func (p *testProducer) TryPublish(event interface{}) bool {
func (p *testProducer) TryPublish(event interface{}) (queue.EntryID, bool) {
if p.publish != nil {
return p.publish(true, event)
}
return false
return 0, false
}

func (p *testProducer) Cancel() int {
Expand Down Expand Up @@ -114,7 +114,7 @@ func makeTestQueue() queue.Queue {
var producer *testProducer
p := blockingProducer(cfg)
producer = &testProducer{
publish: func(try bool, event interface{}) bool {
publish: func(try bool, event interface{}) (queue.EntryID, bool) {
if try {
return p.TryPublish(event)
}
Expand Down Expand Up @@ -146,10 +146,10 @@ func blockingProducer(_ queue.ProducerConfig) queue.Producer {
waiting := atomic.MakeInt(0)

return &testProducer{
publish: func(_ bool, _ interface{}) bool {
publish: func(_ bool, _ interface{}) (queue.EntryID, bool) {
waiting.Inc()
<-sig
return false
return 0, false
},

cancel: func() int {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/pipeline/ttl_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch {
count := original.Count()
events := make([]publisher.Event, 0, count)
for i := 0; i < count; i++ {
event, ok := original.Event(i).(publisher.Event)
event, ok := original.Entry(i).(publisher.Event)
if ok {
// In Beats this conversion will always succeed because only
// publisher.Event objects are inserted into the queue, but
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/diskqueue/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func publishEvents(p queue.Producer, num int, protobuf bool) {
} else {
e = makePublisherEvent()
}
ok := p.Publish(e)
_, ok := p.Publish(e)
if !ok {
panic("didn't publish")
}
Expand Down
6 changes: 5 additions & 1 deletion libbeat/publisher/queue/diskqueue/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,14 @@ func (batch *diskQueueBatch) Count() int {
return len(batch.frames)
}

func (batch *diskQueueBatch) Event(i int) interface{} {
func (batch *diskQueueBatch) Entry(i int) interface{} {
return batch.frames[i].event
}

func (batch *diskQueueBatch) ID(i int) queue.EntryID {
return 0
rdner marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I correct in assuming a "real" event ids for the disk queue would be in a follow on PR? If so can we open an issue and make sure it is marked as "related" in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

func (batch *diskQueueBatch) Done() {
batch.queue.acks.addFrames(batch.frames)
}
8 changes: 4 additions & 4 deletions libbeat/publisher/queue/diskqueue/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ type producerWriteRequest struct {
// diskQueueProducer implementation of the queue.Producer interface
//

func (producer *diskQueueProducer) Publish(event interface{}) bool {
return producer.publish(event, true)
func (producer *diskQueueProducer) Publish(event interface{}) (queue.EntryID, bool) {
return 0, producer.publish(event, true)
}

func (producer *diskQueueProducer) TryPublish(event interface{}) bool {
return producer.publish(event, false)
func (producer *diskQueueProducer) TryPublish(event interface{}) (queue.EntryID, bool) {
return 0, producer.publish(event, false)
}

func (producer *diskQueueProducer) publish(
Expand Down
13 changes: 4 additions & 9 deletions libbeat/publisher/queue/memqueue/batchbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@

package memqueue

type queueEntry struct {
event interface{}
client clientState
}

type batchBuffer struct {
next *batchBuffer
flushed bool
Expand All @@ -34,20 +29,20 @@ func newBatchBuffer(sz int) *batchBuffer {
return b
}

func (b *batchBuffer) add(event interface{}, st clientState) {
b.entries = append(b.entries, queueEntry{event, st})
func (b *batchBuffer) add(entry queueEntry) {
b.entries = append(b.entries, entry)
}

func (b *batchBuffer) length() int {
return len(b.entries)
}

func (b *batchBuffer) cancel(st *produceState) int {
func (b *batchBuffer) cancel(producer *ackProducer) int {
entries := b.entries[:0]

removedCount := 0
for _, entry := range b.entries {
if entry.client.state == st {
if entry.producer == producer {
removedCount++
continue
}
Expand Down
17 changes: 15 additions & 2 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ type Settings struct {
InputQueueSize int
}

type queueEntry struct {
event interface{}
id queue.EntryID

producer *ackProducer
producerID producerID // The order of this entry within its producer
}

type batch struct {
queue *broker
entries []queueEntry
Expand Down Expand Up @@ -153,7 +161,7 @@ func create(
func NewQueue(
logger *logp.Logger,
settings Settings,
) queue.Queue {
) *broker {
var (
sz = settings.Events
minEvents = settings.FlushMinEvents
Expand Down Expand Up @@ -271,6 +279,7 @@ func (b *broker) Metrics() (queue.Metrics, error) {
EventCount: opt.UintWith(uint64(resp.currentQueueSize)),
EventLimit: opt.UintWith(uint64(b.bufSize)),
UnackedConsumedEvents: opt.UintWith(uint64(resp.occupiedRead)),
OldestEntryID: resp.oldestEntryID,
}, nil
}

Expand Down Expand Up @@ -380,10 +389,14 @@ func (b *batch) Count() int {
return len(b.entries)
}

func (b *batch) Event(i int) interface{} {
func (b *batch) Entry(i int) interface{} {
return b.entries[i].event
}

func (b *batch) ID(i int) queue.EntryID {
return b.entries[i].id
}

rdner marked this conversation as resolved.
Show resolved Hide resolved
func (b *batch) Done() {
b.doneChan <- batchDoneMsg{}
}
Loading