Skip to content

Commit

Permalink
Significant performance refactor of running_output buffers
Browse files Browse the repository at this point in the history
closes #914
closes #967
  • Loading branch information
sparrc committed Apr 26, 2016
1 parent 44c945b commit 15e053c
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 194 deletions.
16 changes: 4 additions & 12 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,13 @@ var header = `# Telegraf Configuration
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
## Telegraf will send metrics to output in batch of at
## Telegraf will send metrics to outputs in batches of at
## most metric_batch_size metrics.
metric_batch_size = 1000
## Telegraf will cache metric_buffer_limit metrics for each output, and will
## flush this buffer on a successful write. This should be a multiple of
## metric_batch_size and could not be less than 2 times metric_batch_size
## metric_batch_size and can not be less than 2 times metric_batch_size.
metric_buffer_limit = 10000
## Flush the buffer whenever full, regardless of flush_interval.
flush_buffer_when_full = true
## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
Expand Down Expand Up @@ -535,14 +533,8 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
return err
}

ro := internal_models.NewRunningOutput(name, output, outputConfig)
if c.Agent.MetricBatchSize > 0 {
ro.MetricBatchSize = c.Agent.MetricBatchSize
}
if c.Agent.MetricBufferLimit > 0 {
ro.MetricBufferLimit = c.Agent.MetricBufferLimit
}
ro.FlushBufferWhenFull = c.Agent.FlushBufferWhenFull
ro := internal_models.NewRunningOutput(name, output, outputConfig,
c.Agent.MetricBatchSize, c.Agent.MetricBufferLimit)
c.Outputs = append(c.Outputs, ro)
return nil
}
Expand Down
64 changes: 64 additions & 0 deletions internal/models/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package internal_models

import (
"github.com/influxdata/telegraf"
)

type Buffer struct {
buf chan telegraf.Metric
// total dropped metrics
drops int
// total metrics added
total int
}

func NewBuffer(size int) *Buffer {
return &Buffer{
buf: make(chan telegraf.Metric, size),
}
}

func (b *Buffer) IsEmpty() bool {
return len(b.buf) == 0
}

func (b *Buffer) Len() int {
return len(b.buf)
}

func (b *Buffer) Drops() int {
return b.drops
}

func (b *Buffer) Total() int {
return b.total
}

func (b *Buffer) Add(metrics ...telegraf.Metric) {
for i, _ := range metrics {
b.total++
select {
case b.buf <- metrics[i]:
default:
b.drops++
<-b.buf
b.buf <- metrics[i]
}
}
}

func (b *Buffer) Batch(batchSize int) []telegraf.Metric {
n := min(len(b.buf), batchSize)
out := make([]telegraf.Metric, n)
for i := 0; i < n; i++ {
out[i] = <-b.buf
}
return out
}

func min(a, b int) int {
if b < a {
return b
}
return a
}
156 changes: 68 additions & 88 deletions internal/models/running_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,53 @@ package internal_models

import (
"log"
"sync"
"time"

"github.com/influxdata/telegraf"
)

const (

// Default size of metrics batch size.
DEFAULT_METRIC_BATCH_SIZE = 1000

// Default number of metrics kept. It should be a multiple of batch size.
DEFAULT_METRIC_BUFFER_LIMIT = 10000
)

// tmpmetrics point to batch of metrics ready to be wrote to output.
// readI point to the oldest batch of metrics (the first to sent to output). It
// may point to nil value if tmpmetrics is empty.
// writeI point to the next slot to buffer a batch of metrics is output fail to
// write.
// RunningOutput contains the output configuration
type RunningOutput struct {
Name string
Output telegraf.Output
Config *OutputConfig
Quiet bool
MetricBufferLimit int
MetricBatchSize int
FlushBufferWhenFull bool

metrics []telegraf.Metric
tmpmetrics []([]telegraf.Metric)
writeI int
readI int

sync.Mutex
Name string
Output telegraf.Output
Config *OutputConfig
Quiet bool
MetricBufferLimit int
MetricBatchSize int

metrics *Buffer
failMetrics *Buffer
}

func NewRunningOutput(
name string,
output telegraf.Output,
conf *OutputConfig,
batchSize int,
bufferLimit int,
) *RunningOutput {
if bufferLimit == 0 {
bufferLimit = DEFAULT_METRIC_BUFFER_LIMIT
}
if batchSize == 0 {
batchSize = DEFAULT_METRIC_BATCH_SIZE
}
ro := &RunningOutput{
Name: name,
metrics: make([]telegraf.Metric, 0),
metrics: NewBuffer(batchSize),
failMetrics: NewBuffer(bufferLimit),
Output: output,
Config: conf,
MetricBufferLimit: DEFAULT_METRIC_BUFFER_LIMIT,
MetricBatchSize: DEFAULT_METRIC_BATCH_SIZE,
MetricBufferLimit: bufferLimit,
MetricBatchSize: batchSize,
}
return ro
}
Expand All @@ -63,19 +61,6 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
return
}
}
ro.Lock()
defer ro.Unlock()

if ro.tmpmetrics == nil {
size := ro.MetricBufferLimit / ro.MetricBatchSize
// ro.metrics already contains one batch
size = size - 1

if size < 1 {
size = 1
}
ro.tmpmetrics = make([]([]telegraf.Metric), size)
}

// Filter any tagexclude/taginclude parameters before adding metric
if len(ro.Config.Filter.TagExclude) != 0 || len(ro.Config.Filter.TagInclude) != 0 {
Expand All @@ -90,69 +75,64 @@ func (ro *RunningOutput) AddMetric(metric telegraf.Metric) {
metric, _ = telegraf.NewMetric(name, tags, fields, t)
}

if len(ro.metrics) < ro.MetricBatchSize {
ro.metrics = append(ro.metrics, metric)
} else {
flushSuccess := true
if ro.FlushBufferWhenFull {
err := ro.write(ro.metrics)
if err != nil {
log.Printf("ERROR writing full metric buffer to output %s, %s",
ro.Name, err)
flushSuccess = false
}
} else {
flushSuccess = false
}
if !flushSuccess {
if ro.tmpmetrics[ro.writeI] != nil && ro.writeI == ro.readI {
log.Printf("WARNING: overwriting cached metrics, you may want to " +
"increase the metric_buffer_limit setting in your [agent] " +
"config if you do not wish to overwrite metrics.\n")
ro.readI = (ro.readI + 1) % cap(ro.tmpmetrics)
}
ro.tmpmetrics[ro.writeI] = ro.metrics
ro.writeI = (ro.writeI + 1) % cap(ro.tmpmetrics)
ro.metrics.Add(metric)
if ro.metrics.Len() == ro.MetricBatchSize {
batch := ro.metrics.Batch(ro.MetricBatchSize)
err := ro.write(batch)
if err != nil {
ro.failMetrics.Add(batch...)
}
ro.metrics = make([]telegraf.Metric, 0)
ro.metrics = append(ro.metrics, metric)
}
}

// Write writes all cached points to this output.
func (ro *RunningOutput) Write() error {
ro.Lock()
defer ro.Unlock()

if ro.tmpmetrics == nil {
size := ro.MetricBufferLimit / ro.MetricBatchSize
// ro.metrics already contains one batch
size = size - 1

if size < 1 {
size = 1
}
ro.tmpmetrics = make([]([]telegraf.Metric), size)
if !ro.Quiet {
log.Printf("Output [%s] buffer fullness: %d / %d metrics. "+
"Total gathered metrics: %d. Total dropped metrics: %d.",
ro.Name,
ro.failMetrics.Len()+ro.metrics.Len(),
ro.MetricBufferLimit,
ro.metrics.Total(),
ro.metrics.Drops()+ro.failMetrics.Drops())
}

// Write any cached metric buffers before, as those metrics are the
// oldest
for ro.tmpmetrics[ro.readI] != nil {
if err := ro.write(ro.tmpmetrics[ro.readI]); err != nil {
return err
} else {
ro.tmpmetrics[ro.readI] = nil
ro.readI = (ro.readI + 1) % cap(ro.tmpmetrics)
var err error
if !ro.failMetrics.IsEmpty() {
bufLen := ro.failMetrics.Len()
// how many batches of failed writes we need to write.
nBatches := bufLen/ro.MetricBatchSize + 1
batchSize := ro.MetricBatchSize

for i := 0; i < nBatches; i++ {
// If it's the last batch, only grab the metrics that have not had
// a write attempt already (this is primarily to preserve order).
if i == nBatches-1 {
batchSize = bufLen % ro.MetricBatchSize
}
batch := ro.failMetrics.Batch(batchSize)
// If we've already failed previous writes, don't bother trying to
// write to this output again. We are not exiting the loop just so
// that we can rotate the metrics to preserve order.
if err == nil {
err = ro.write(batch)
}
if err != nil {
ro.failMetrics.Add(batch...)
}
}
}

err := ro.write(ro.metrics)
batch := ro.metrics.Batch(ro.MetricBatchSize)
// see comment above about not trying to write to an already failed output.
// if ro.failMetrics is empty then err will always be nil.
if err == nil {
err = ro.write(batch)
}
if err != nil {
ro.failMetrics.Add(batch...)
return err
} else {
ro.metrics = make([]telegraf.Metric, 0)
}

return nil
}

Expand All @@ -165,8 +145,8 @@ func (ro *RunningOutput) write(metrics []telegraf.Metric) error {
elapsed := time.Since(start)
if err == nil {
if !ro.Quiet {
log.Printf("Wrote %d metrics to output %s in %s\n",
len(metrics), ro.Name, elapsed)
log.Printf("Output [%s] wrote batch of %d metrics in %s\n",
ro.Name, len(metrics), elapsed)
}
}
return err
Expand Down
Loading

0 comments on commit 15e053c

Please sign in to comment.