Skip to content

Commit

Permalink
Use lifo order in metric buffer (#5287)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored Jan 15, 2019
1 parent 193aba8 commit da80276
Show file tree
Hide file tree
Showing 4 changed files with 365 additions and 77 deletions.
145 changes: 88 additions & 57 deletions internal/models/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ type Buffer struct {
cap int // the capacity of the buffer

batchFirst int // index of the first metric in the batch
batchLast int // one after the index of the last metric in the batch
batchSize int // number of metrics current in the batch
batchSize int // number of metrics currently in the batch

MetricsAdded selfstat.Stat
MetricsWritten selfstat.Stat
Expand Down Expand Up @@ -82,46 +81,24 @@ func (b *Buffer) metricDropped(metric telegraf.Metric) {
metric.Reject()
}

func (b *Buffer) inBatch() bool {
if b.batchSize == 0 {
return false
}

if b.batchFirst < b.batchLast {
return b.last >= b.batchFirst && b.last < b.batchLast
} else {
return b.last >= b.batchFirst || b.last < b.batchLast
}
}

func (b *Buffer) add(m telegraf.Metric) {
// Check if Buffer is full
if b.size == b.cap {
if b.batchSize == 0 {
// No batch taken by the output, we can drop the metric now.
b.metricDropped(b.buf[b.last])
} else if b.inBatch() {
// There is an outstanding batch and this will overwrite a metric
// in it, delay the dropping only in case the batch gets rejected.
b.metricDropped(b.buf[b.last])

if b.last == b.batchFirst && b.batchSize > 0 {
b.batchSize--
b.batchFirst++
b.batchFirst %= b.cap
} else {
// There is an outstanding batch, but this overwrites a metric
// outside of it.
b.metricDropped(b.buf[b.last])
b.batchFirst = b.next(b.batchFirst)
}
}

b.metricAdded()

b.buf[b.last] = m
b.last++
b.last %= b.cap
b.last = b.next(b.last)

if b.size == b.cap {
b.first++
b.first %= b.cap
b.first = b.next(b.first)
}

b.size = min(b.size+1, b.cap)
Expand All @@ -138,10 +115,8 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) {
}

// Batch returns a slice containing up to batchSize of the most recently added
// metrics.
//
// The metrics contained in the batch are not removed from the buffer, instead
// the last batch is recorded and removed only if Accept is called.
// metrics. Metrics are ordered from newest to oldest in the batch. The
// batch must not be modified by the client.
func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
b.Lock()
defer b.Unlock()
Expand All @@ -152,21 +127,23 @@ func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
return out
}

b.batchFirst = b.first
b.batchLast = b.first + outLen
b.batchLast %= b.cap
b.batchFirst = b.cap + b.last - outLen
b.batchFirst %= b.cap
b.batchSize = outLen

until := min(b.cap, b.first+outLen)

n := copy(out, b.buf[b.first:until])
if n < outLen {
copy(out[n:], b.buf[:outLen-n])
batchIndex := b.batchFirst
for i := range out {
out[len(out)-1-i] = b.buf[batchIndex]
b.buf[batchIndex] = nil
batchIndex = b.next(batchIndex)
}

b.last = b.batchFirst
b.size -= outLen
return out
}

// Accept removes the metrics contained in the last batch.
// Accept marks the batch, acquired from Batch(), as successfully written.
func (b *Buffer) Accept(batch []telegraf.Metric) {
b.Lock()
defer b.Unlock()
Expand All @@ -175,35 +152,89 @@ func (b *Buffer) Accept(batch []telegraf.Metric) {
b.metricWritten(m)
}

b.size -= b.batchSize
for i := 0; i < b.batchSize; i++ {
b.buf[b.first] = nil
b.first++
b.first %= b.cap
}

b.resetBatch()
}

// Reject clears the current batch record so that calls to Accept will have no
// effect.
// Reject returns the batch, acquired from Batch(), to the buffer and marks it
// as unsent.
func (b *Buffer) Reject(batch []telegraf.Metric) {
b.Lock()
defer b.Unlock()

if len(batch) > b.batchSize {
// Part or all of the batch was dropped before reject was called.
for _, m := range batch[b.batchSize:] {
b.metricDropped(m)
older := b.dist(b.first, b.batchFirst)
free := b.cap - b.size
restore := min(len(batch), free+older)

// Rotate newer metrics forward the number of metrics that we can restore.
rb := b.batchFirst
rp := b.last
re := b.nextby(rp, restore)
b.last = re
for rb != rp {
rp = b.prev(rp)
re = b.prev(re)

if b.buf[re] != nil {
b.metricDropped(b.buf[re])
}

b.buf[re] = b.buf[rp]
b.buf[rp] = nil
}

// Copy metrics from the batch back into the buffer; recall that the
// batch is in reverse order compared to b.buf
for i := range batch {
if i < restore {
re = b.prev(re)
b.buf[re] = batch[i]
b.size++
} else {
b.metricDropped(batch[i])
}
}

b.resetBatch()
}

// dist returns the distance between two indexes. Because this data structure
// uses a half open range the arguments must both either left side or right
// side pairs.
func (b *Buffer) dist(begin, end int) int {
if begin <= end {
return end - begin
} else {
return b.cap - begin - 1 + end
}
}

// next returns the next index with wrapping.
func (b *Buffer) next(index int) int {
index++
if index == b.cap {
return 0
}
return index
}

// next returns the index that is count newer with wrapping.
func (b *Buffer) nextby(index, count int) int {
index += count
index %= b.cap
return index
}

// next returns the prev index with wrapping.
func (b *Buffer) prev(index int) int {
index--
if index < 0 {
return b.cap - 1
}
return index
}

func (b *Buffer) resetBatch() {
b.batchFirst = 0
b.batchLast = 0
b.batchSize = 0
}

Expand Down
Loading

0 comments on commit da80276

Please sign in to comment.