Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the buffer_size internal metric after writes #5314

Merged
merged 2 commits into from
Jan 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions internal/models/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Buffer struct {
MetricsAdded selfstat.Stat
MetricsWritten selfstat.Stat
MetricsDropped selfstat.Stat
BufferSize selfstat.Stat
BufferLimit selfstat.Stat
}

// NewBuffer returns a new empty Buffer with the given capacity.
Expand All @@ -53,7 +55,19 @@ func NewBuffer(name string, capacity int) *Buffer {
"metrics_dropped",
map[string]string{"output": name},
),
BufferSize: selfstat.Register(
"write",
"buffer_size",
map[string]string{"output": name},
),
BufferLimit: selfstat.Register(
"write",
"buffer_limit",
map[string]string{"output": name},
),
}
b.BufferSize.Set(int64(0))
b.BufferLimit.Set(int64(capacity))
return b
}

Expand All @@ -62,7 +76,11 @@ func (b *Buffer) Len() int {
b.Lock()
defer b.Unlock()

return b.size
return b.length()
}

func (b *Buffer) length() int {
return min(b.size+b.batchSize, b.cap)
}

func (b *Buffer) metricAdded() {
Expand Down Expand Up @@ -112,6 +130,8 @@ func (b *Buffer) Add(metrics ...telegraf.Metric) {
for i := range metrics {
b.add(metrics[i])
}

b.BufferSize.Set(int64(b.length()))
}

// Batch returns a slice containing up to batchSize of the most recently added
Expand Down Expand Up @@ -153,6 +173,7 @@ func (b *Buffer) Accept(batch []telegraf.Metric) {
}

b.resetBatch()
b.BufferSize.Set(int64(b.length()))
}

// Reject returns the batch, acquired from Batch(), to the buffer and marks it
Expand All @@ -176,6 +197,7 @@ func (b *Buffer) Reject(batch []telegraf.Metric) {

if b.buf[re] != nil {
b.metricDropped(b.buf[re])
b.first = b.next(b.first)
}

b.buf[re] = b.buf[rp]
Expand All @@ -188,13 +210,14 @@ func (b *Buffer) Reject(batch []telegraf.Metric) {
if i < restore {
re = b.prev(re)
b.buf[re] = batch[i]
b.size++
b.size = min(b.size+1, b.cap)
} else {
b.metricDropped(batch[i])
}
}

b.resetBatch()
b.BufferSize.Set(int64(b.length()))
}

// dist returns the distance between two indexes. Because this data structure
Expand All @@ -204,7 +227,7 @@ func (b *Buffer) dist(begin, end int) int {
if begin <= end {
return end - begin
} else {
return b.cap - begin - 1 + end
return b.cap - begin + end
}
}

Expand Down
82 changes: 80 additions & 2 deletions internal/models/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func TestBuffer_RejectPartialRoom(t *testing.T) {
}, batch)
}

func TestBuffer_RejectWrapped(t *testing.T) {
func TestBuffer_RejectNewMetricsWrapped(t *testing.T) {
b := setup(NewBuffer("test", 5))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
Expand Down Expand Up @@ -402,6 +402,84 @@ func TestBuffer_RejectWrapped(t *testing.T) {
}, batch)
}

func TestBuffer_RejectWrapped(t *testing.T) {
b := setup(NewBuffer("test", 5))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
b.Add(MetricTime(4))
b.Add(MetricTime(5))

b.Add(MetricTime(6))
b.Add(MetricTime(7))
b.Add(MetricTime(8))
batch := b.Batch(3)

b.Add(MetricTime(9))
b.Add(MetricTime(10))
b.Add(MetricTime(11))
b.Add(MetricTime(12))

b.Reject(batch)

batch = b.Batch(5)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(12),
MetricTime(11),
MetricTime(10),
MetricTime(9),
MetricTime(8),
}, batch)
}

func TestBuffer_RejectAdjustFirst(t *testing.T) {
b := setup(NewBuffer("test", 10))
b.Add(MetricTime(1))
b.Add(MetricTime(2))
b.Add(MetricTime(3))
batch := b.Batch(3)
b.Add(MetricTime(4))
b.Add(MetricTime(5))
b.Add(MetricTime(6))
b.Reject(batch)

b.Add(MetricTime(7))
b.Add(MetricTime(8))
b.Add(MetricTime(9))
batch = b.Batch(3)
b.Add(MetricTime(10))
b.Add(MetricTime(11))
b.Add(MetricTime(12))
b.Reject(batch)

b.Add(MetricTime(13))
b.Add(MetricTime(14))
b.Add(MetricTime(15))
batch = b.Batch(3)
b.Add(MetricTime(16))
b.Add(MetricTime(17))
b.Add(MetricTime(18))
b.Reject(batch)

b.Add(MetricTime(19))

batch = b.Batch(10)
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
MetricTime(19),
MetricTime(18),
MetricTime(17),
MetricTime(16),
MetricTime(15),
MetricTime(14),
MetricTime(13),
MetricTime(12),
MetricTime(11),
MetricTime(10),
}, batch)
}

func TestBuffer_AddDropsOverwrittenMetrics(t *testing.T) {
m := Metric()
b := setup(NewBuffer("test", 5))
Expand Down Expand Up @@ -509,7 +587,7 @@ func TestBuffer_BatchNotRemoved(t *testing.T) {
b := setup(NewBuffer("test", 5))
b.Add(m, m, m, m, m)
b.Batch(2)
require.Equal(t, 3, b.Len())
require.Equal(t, 5, b.Len())
}

func TestBuffer_BatchRejectAcceptNoop(t *testing.T) {
Expand Down
54 changes: 14 additions & 40 deletions internal/models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package models
import (
"log"
"sync"
"sync/atomic"
"time"

"github.com/influxdata/telegraf"
Expand All @@ -29,23 +30,23 @@ type OutputConfig struct {

// RunningOutput contains the output configuration
type RunningOutput struct {
// Must be 64-bit aligned
newMetricsCount int64

Name string
Output telegraf.Output
Config *OutputConfig
MetricBufferLimit int
MetricBatchSize int

MetricsFiltered selfstat.Stat
BufferSize selfstat.Stat
BufferLimit selfstat.Stat
WriteTime selfstat.Stat

batch []telegraf.Metric
buffer *Buffer
BatchReady chan time.Time

aggMutex sync.Mutex
batchMutex sync.Mutex
buffer *Buffer

aggMutex sync.Mutex
}

func NewRunningOutput(
Expand All @@ -69,7 +70,6 @@ func NewRunningOutput(
}
ro := &RunningOutput{
Name: name,
batch: make([]telegraf.Metric, 0, batchSize),
buffer: NewBuffer(name, bufferLimit),
BatchReady: make(chan time.Time, 1),
Output: output,
Expand All @@ -81,24 +81,13 @@ func NewRunningOutput(
"metrics_filtered",
map[string]string{"output": name},
),
BufferSize: selfstat.Register(
"write",
"buffer_size",
map[string]string{"output": name},
),
BufferLimit: selfstat.Register(
"write",
"buffer_limit",
map[string]string{"output": name},
),
WriteTime: selfstat.RegisterTiming(
"write",
"write_time_ns",
map[string]string{"output": name},
),
}

ro.BufferLimit.Set(int64(ro.MetricBufferLimit))
return ro
}

Expand Down Expand Up @@ -129,28 +118,16 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
return
}

ro.batchMutex.Lock()

ro.batch = append(ro.batch, metric)
if len(ro.batch) == ro.MetricBatchSize {
ro.addBatchToBuffer()

nBuffer := ro.buffer.Len()
ro.BufferSize.Set(int64(nBuffer))
ro.buffer.Add(metric)

count := atomic.AddInt64(&ro.newMetricsCount, 1)
if count == int64(ro.MetricBatchSize) {
atomic.StoreInt64(&ro.newMetricsCount, 0)
select {
case ro.BatchReady <- time.Now():
default:
}
}

ro.batchMutex.Unlock()
}

// AddBatchToBuffer moves the metrics from the batch into the metric buffer.
func (ro *RunningOutput) addBatchToBuffer() {
ro.buffer.Add(ro.batch...)
ro.batch = ro.batch[:0]
}

// Write writes all metrics to the output, stopping when all have been sent on
Expand All @@ -163,15 +140,12 @@ func (ro *RunningOutput) Write() error {
output.Reset()
ro.aggMutex.Unlock()
}
// add and write can be called concurrently
ro.batchMutex.Lock()
ro.addBatchToBuffer()
ro.batchMutex.Unlock()

nBuffer := ro.buffer.Len()
atomic.StoreInt64(&ro.newMetricsCount, 0)

// Only process the metrics in the buffer now. Metrics added while we are
// writing will be sent on the next call.
nBuffer := ro.buffer.Len()
nBatches := nBuffer/ro.MetricBatchSize + 1
for i := 0; i < nBatches; i++ {
batch := ro.buffer.Batch(ro.MetricBatchSize)
Expand All @@ -189,7 +163,7 @@ func (ro *RunningOutput) Write() error {
return nil
}

// WriteBatch writes only the batch metrics to the output.
// WriteBatch writes a single batch of metrics to the output.
func (ro *RunningOutput) WriteBatch() error {
batch := ro.buffer.Batch(ro.MetricBatchSize)
if len(batch) == 0 {
Expand Down