Skip to content

Commit

Permalink
feat(processors.batch): create batch processor
Browse files Browse the repository at this point in the history
  • Loading branch information
LarsStegman committed Sep 11, 2024
1 parent 7dbe28d commit 7bb64c2
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 0 deletions.
5 changes: 5 additions & 0 deletions plugins/processors/all/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build !custom || processors || processors.batch

package all

import _ "github.com/influxdata/telegraf/plugins/processors/batch" // register plugin
56 changes: 56 additions & 0 deletions plugins/processors/batch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Batch Processor Plugin

The batch processor batches metrics into
batches by adding a batch tag to the metrics.

## Global configuration options <!-- @/docs/includes/plugin_config.md -->

In addition to the plugin-specific configuration settings, plugins support
additional global and plugin configuration settings. These settings are used to
modify metrics, tags, and field or create aliases and configure ordering, etc.
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.

[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins

## Configuration

```toml @sample.conf
## Batch metrics into separate batches by adding a tag indicating the batch index.
## Can be used to route batches of metrics to different outputs
## to parallelize writing of metrics to an output
## Metrics are distributed across batches using the round-robin scheme.
[[processors.batch]]
## The name of the tag to use for adding the batch index
batch_tag = "my_batch"

## The number of batches to create
num_batches = 16
```

## Example

The example below uses these settings:

```toml
[[processors.batch]]
## The tag key to use for batching
batch_tag = "batch"

## The number of batches to create
num_batches = 3
```

```diff
- temperature cpu=25
- temperature cpu=50
- temperature cpu=75
- temperature cpu=25
- temperature cpu=50
- temperature cpu=75
+ temperature,batch=0 cpu=25
+ temperature,batch=1 cpu=50
+ temperature,batch=2 cpu=75
+ temperature,batch=0 cpu=25
+ temperature,batch=1 cpu=50
+ temperature,batch=2 cpu=75
```
41 changes: 41 additions & 0 deletions plugins/processors/batch/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package batch

import (
_ "embed"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
"strconv"
)

//go:embed sample.conf
var sampleConfig string

type Batch struct {
BatchTag string `toml:"batch_tag"`
NumBatches uint64 `toml:"num_batches"`

// the number of metrics that have been processed so far
count uint64
}

func (*Batch) SampleConfig() string {
return sampleConfig
}

func (b *Batch) Apply(in ...telegraf.Metric) []telegraf.Metric {
out := make([]telegraf.Metric, 0, len(in))
for _, m := range in {
batchId := b.count % b.NumBatches
b.count++
m.AddTag(b.BatchTag, strconv.FormatUint(batchId, 10))
out = append(out, m)
}

return out
}

func init() {
processors.Add("batch", func() telegraf.Processor {
return &Batch{}
})
}
69 changes: 69 additions & 0 deletions plugins/processors/batch/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package batch

import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"testing"
)

const batchTag = "?internal_batch_idx"

func MakeBatching(batches uint64) *Batch {
return &Batch{
BatchTag: batchTag,
NumBatches: batches,
}
}

func MakeXMetrics(count int) []telegraf.Metric {
ms := make([]telegraf.Metric, 0, count)
for range count {
ms = append(ms, testutil.MockMetrics()...)
}

return ms
}

func requireMetricInBatch(t *testing.T, m telegraf.Metric, batch string) {
batchTagValue, ok := m.GetTag(batchTag)
require.True(t, ok)
require.Equal(t, batch, batchTagValue)
}

func Test_SingleMetricPutInBatch0(t *testing.T) {
b := MakeBatching(1)
m := testutil.MockMetricsWithValue(1)
expectedM := testutil.MockMetricsWithValue(1)
expectedM[0].AddTag(batchTag, "0")

res := b.Apply(m...)
testutil.RequireMetricsEqual(t, expectedM, res)
}

func Test_MetricsSmallerThanBatchSizeAreInDifferentBatches(t *testing.T) {
b := MakeBatching(3)
ms := MakeXMetrics(2)
res := b.Apply(ms...)
requireMetricInBatch(t, res[0], "0")
requireMetricInBatch(t, res[1], "1")
}

func Test_MetricsEqualToBatchSizeInDifferentBatches(t *testing.T) {
b := MakeBatching(3)
ms := MakeXMetrics(3)
res := b.Apply(ms...)
requireMetricInBatch(t, res[0], "0")
requireMetricInBatch(t, res[1], "1")
requireMetricInBatch(t, res[2], "2")
}

func Test_MetricsMoreThanBatchSizeInSameBatch(t *testing.T) {
b := MakeBatching(2)
ms := MakeXMetrics(3)
res := b.Apply(ms...)

requireMetricInBatch(t, res[0], "0")
requireMetricInBatch(t, res[1], "1")
requireMetricInBatch(t, res[2], "0")
}
10 changes: 10 additions & 0 deletions plugins/processors/batch/sample.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
## Batch metrics into separate batches by adding a tag indicating the batch index.
## Can be used to route batches of metrics to different outputs
## to parallelize writing of metrics to an output
## Metrics are distributed across batches using the round-robin scheme.
[[processors.batch]]
## The name of the tag to use for adding the batch index
batch_tag = "my_batch"

## The number of batches to create
num_batches = 16

0 comments on commit 7bb64c2

Please sign in to comment.