Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(outputs): Add rate-limiting infrastructure #16258

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion internal/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package internal

import "errors"

var ErrNotConnected = errors.New("not connected")
var (
ErrNotConnected = errors.New("not connected")
ErrSerialization = errors.New("serialization of metric(s) failed")
ErrSizeLimitReached = errors.New("size limit reached")
)

// StartupError indicates an error that occurred during startup of a plugin
// e.g. due to connectivity issues or resources being not yet available.
Expand Down
227 changes: 207 additions & 20 deletions models/running_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestRunningOutputWriteFail(t *testing.T) {
Filter: Filter{},
}

m := &mockOutput{failWrite: true}
m := &mockOutput{batchAcceptSize: -1}
ro := NewRunningOutput(m, conf, 4, 12)

// Fill buffer to limit twice
Expand All @@ -264,7 +264,7 @@ func TestRunningOutputWriteFail(t *testing.T) {
// no successful flush yet
require.Empty(t, m.Metrics())

m.failWrite = false
m.batchAcceptSize = 0
err = ro.Write()
require.NoError(t, err)

Expand All @@ -277,7 +277,7 @@ func TestRunningOutputWriteFailOrder(t *testing.T) {
Filter: Filter{},
}

m := &mockOutput{failWrite: true}
m := &mockOutput{batchAcceptSize: -1}
ro := NewRunningOutput(m, conf, 100, 1000)

// add 5 metrics
Expand All @@ -293,7 +293,8 @@ func TestRunningOutputWriteFailOrder(t *testing.T) {
// no successful flush yet
require.Empty(t, m.Metrics())

m.failWrite = false
m.batchAcceptSize = 0

// add 5 more metrics
for _, metric := range next5 {
ro.AddMetric(metric)
Expand All @@ -314,7 +315,7 @@ func TestRunningOutputWriteFailOrder2(t *testing.T) {
Filter: Filter{},
}

m := &mockOutput{failWrite: true}
m := &mockOutput{batchAcceptSize: -1}
ro := NewRunningOutput(m, conf, 5, 100)

// add 5 metrics
Expand Down Expand Up @@ -357,7 +358,7 @@ func TestRunningOutputWriteFailOrder2(t *testing.T) {
// no successful flush yet
require.Empty(t, m.Metrics())

m.failWrite = false
m.batchAcceptSize = 0
err = ro.Write()
require.NoError(t, err)

Expand All @@ -377,7 +378,7 @@ func TestRunningOutputWriteFailOrder3(t *testing.T) {
Filter: Filter{},
}

m := &mockOutput{failWrite: true}
m := &mockOutput{batchAcceptSize: -1}
ro := NewRunningOutput(m, conf, 5, 1000)

// add 5 metrics
Expand All @@ -399,7 +400,8 @@ func TestRunningOutputWriteFailOrder3(t *testing.T) {
require.Error(t, err)

// unset fail and write metrics
m.failWrite = false
m.batchAcceptSize = 0

err = ro.Write()
require.NoError(t, err)

Expand Down Expand Up @@ -620,7 +622,7 @@ func TestRunningOutputNonRetryableStartupBehaviorDefault(t *testing.T) {
}
}

func TestRunningOutputUntypedtartupBehaviorIgnore(t *testing.T) {
func TestRunningOutputUntypedStartupBehaviorIgnore(t *testing.T) {
serr := errors.New("untyped err")

for _, behavior := range []string{"", "error", "retry", "ignore"} {
Expand Down Expand Up @@ -692,12 +694,181 @@ func TestRunningOutputPartiallyStarted(t *testing.T) {
require.Equal(t, 3, mo.writes)
}

func TestRunningOutputWritePartialSuccess(t *testing.T) {
plugin := &mockOutput{
batchAcceptSize: 4,
}
model := NewRunningOutput(plugin, &OutputConfig{}, 5, 10)
require.NoError(t, model.Init())
require.NoError(t, model.Connect())
defer model.Close()

// Fill buffer completely
for _, metric := range first5 {
model.AddMetric(metric)
}
for _, metric := range next5 {
model.AddMetric(metric)
}

// We no not expect any successful flush yet
require.Empty(t, plugin.Metrics())
require.Equal(t, 10, model.buffer.Len())

// Write to the output. This should only partially succeed with the first
// few metrics removed from buffer
require.ErrorIs(t, model.Write(), internal.ErrSizeLimitReached)
require.Len(t, plugin.metrics, 4)
require.Equal(t, 6, model.buffer.Len())

// The next write should remove the next metrics from the buffer
require.ErrorIs(t, model.Write(), internal.ErrSizeLimitReached)
require.Len(t, plugin.metrics, 8)
require.Equal(t, 2, model.buffer.Len())

// The last write should succeed straight away and all metrics should have
// been received by the output
require.NoError(t, model.Write())
testutil.RequireMetricsEqual(t, append(first5, next5...), plugin.metrics)
require.Zero(t, model.buffer.Len())
}

func TestRunningOutputWritePartialSuccessAndLoss(t *testing.T) {
lost := 0
plugin := &mockOutput{
batchAcceptSize: 4,
metricFatalIndex: &lost,
}
model := NewRunningOutput(plugin, &OutputConfig{}, 5, 10)
require.NoError(t, model.Init())
require.NoError(t, model.Connect())
defer model.Close()

// Fill buffer completely
for _, metric := range first5 {
model.AddMetric(metric)
}
for _, metric := range next5 {
model.AddMetric(metric)
}
expected := []telegraf.Metric{
/* fatal, */ first5[1], first5[2], first5[3],
/* fatal, */ next5[0], next5[1], next5[2],
next5[3], next5[4],
}

// We no not expect any successful flush yet
require.Empty(t, plugin.Metrics())
require.Equal(t, 10, model.buffer.Len())

// Write to the output. This should only partially succeed with the first
// few metrics removed from buffer
require.ErrorIs(t, model.Write(), internal.ErrSizeLimitReached)
require.Len(t, plugin.metrics, 3)
require.Equal(t, 6, model.buffer.Len())

// The next write should remove the next metrics from the buffer
require.ErrorIs(t, model.Write(), internal.ErrSizeLimitReached)
require.Len(t, plugin.metrics, 6)
require.Equal(t, 2, model.buffer.Len())

// The last write should succeed straight away and all metrics should have
// been received by the output
require.NoError(t, model.Write())
testutil.RequireMetricsEqual(t, expected, plugin.metrics)
require.Zero(t, model.buffer.Len())
}

func TestRunningOutputWriteBatchPartialSuccess(t *testing.T) {
plugin := &mockOutput{
batchAcceptSize: 4,
}
model := NewRunningOutput(plugin, &OutputConfig{}, 5, 10)
require.NoError(t, model.Init())
require.NoError(t, model.Connect())
defer model.Close()

// Fill buffer completely
for _, metric := range first5 {
model.AddMetric(metric)
}
for _, metric := range next5 {
model.AddMetric(metric)
}

// We no not expect any successful flush yet
require.Empty(t, plugin.Metrics())
require.Equal(t, 10, model.buffer.Len())

// Write to the output. This should only partially succeed with the first
// few metrics removed from buffer
require.ErrorIs(t, model.WriteBatch(), internal.ErrSizeLimitReached)
require.Len(t, plugin.metrics, 4)
require.Equal(t, 6, model.buffer.Len())

// The next write should remove the next metrics from the buffer
require.ErrorIs(t, model.WriteBatch(), internal.ErrSizeLimitReached)
require.Len(t, plugin.metrics, 8)
require.Equal(t, 2, model.buffer.Len())

// The last write should succeed straight away and all metrics should have
// been received by the output
require.NoError(t, model.WriteBatch())
testutil.RequireMetricsEqual(t, append(first5, next5...), plugin.metrics)
require.Zero(t, model.buffer.Len())
}

func TestRunningOutputWriteBatchPartialSuccessAndLoss(t *testing.T) {
lost := 0
plugin := &mockOutput{
batchAcceptSize: 4,
metricFatalIndex: &lost,
}
model := NewRunningOutput(plugin, &OutputConfig{}, 5, 10)
require.NoError(t, model.Init())
require.NoError(t, model.Connect())
defer model.Close()

// Fill buffer completely
for _, metric := range first5 {
model.AddMetric(metric)
}
for _, metric := range next5 {
model.AddMetric(metric)
}
expected := []telegraf.Metric{
/* fatal, */ first5[1], first5[2], first5[3],
/* fatal, */ next5[0], next5[1], next5[2],
next5[3], next5[4],
}

// We no not expect any successful flush yet
require.Empty(t, plugin.Metrics())
require.Equal(t, 10, model.buffer.Len())

// Write to the output. This should only partially succeed with the first
// few metrics removed from buffer
require.ErrorIs(t, model.WriteBatch(), internal.ErrSizeLimitReached)
require.Len(t, plugin.metrics, 3)
require.Equal(t, 6, model.buffer.Len())

// The next write should remove the next metrics from the buffer
require.ErrorIs(t, model.WriteBatch(), internal.ErrSizeLimitReached)
require.Len(t, plugin.metrics, 6)
require.Equal(t, 2, model.buffer.Len())

// The last write should succeed straight away and all metrics should have
// been received by the output
require.NoError(t, model.WriteBatch())
testutil.RequireMetricsEqual(t, expected, plugin.metrics)
require.Zero(t, model.buffer.Len())
}

// Benchmark adding metrics.
func BenchmarkRunningOutputAddWrite(b *testing.B) {
conf := &OutputConfig{
Filter: Filter{},
}

m := &perfOutput{}
ro := NewRunningOutput(m, conf, 1000, 10000)

Expand All @@ -712,7 +883,6 @@ func BenchmarkRunningOutputAddWriteEvery100(b *testing.B) {
conf := &OutputConfig{
Filter: Filter{},
}

m := &perfOutput{}
ro := NewRunningOutput(m, conf, 1000, 10000)

Expand All @@ -729,10 +899,8 @@ func BenchmarkRunningOutputAddFailWrites(b *testing.B) {
conf := &OutputConfig{
Filter: Filter{},
}

m := &perfOutput{failWrite: true}
ro := NewRunningOutput(m, conf, 1000, 10000)

for n := 0; n < b.N; n++ {
ro.AddMetric(testutil.TestMetric(101, "metric1"))
}
Expand All @@ -743,9 +911,11 @@ type mockOutput struct {

metrics []telegraf.Metric

// if true, mock write failure
failWrite bool
// Failing output simulation
batchAcceptSize int
metricFatalIndex *int

// Startup error simulation
startupError error
startupErrorCount int
writes int
Expand All @@ -761,11 +931,11 @@ func (m *mockOutput) Connect() error {
return m.startupError
}

func (m *mockOutput) Close() error {
func (*mockOutput) Close() error {
return nil
}

func (m *mockOutput) SampleConfig() string {
func (*mockOutput) SampleConfig() string {
return ""
}

Expand All @@ -774,12 +944,29 @@ func (m *mockOutput) Write(metrics []telegraf.Metric) error {

m.Lock()
defer m.Unlock()
if m.failWrite {

// Simulate a failed write
if m.batchAcceptSize < 0 {
return errors.New("failed write")
}

m.metrics = append(m.metrics, metrics...)
return nil
// Simulate a successful write
if m.batchAcceptSize == 0 || len(metrics) <= m.batchAcceptSize {
m.metrics = append(m.metrics, metrics...)
return nil
}

// Simulate a partially successful write
werr := &internal.PartialWriteError{Err: internal.ErrSizeLimitReached}
for i, x := range metrics {
if m.metricFatalIndex != nil && i == *m.metricFatalIndex {
werr.MetricsReject = append(werr.MetricsReject, i)
} else if i < m.batchAcceptSize {
m.metrics = append(m.metrics, x)
werr.MetricsAccept = append(werr.MetricsAccept, i)
}
}
return werr
}

func (m *mockOutput) Metrics() []telegraf.Metric {
Expand Down
19 changes: 19 additions & 0 deletions plugins/common/ratelimiter/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package ratelimiter

import (
"time"

"github.com/influxdata/telegraf/config"
)

type RateLimitConfig struct {
Limit config.Size `toml:"rate_limit"`
Period config.Duration `toml:"rate_limit_period"`
}

func (cfg *RateLimitConfig) CreateRateLimiter() *RateLimiter {
return &RateLimiter{
limit: int64(cfg.Limit),
period: time.Duration(cfg.Period),
}
}
Loading
Loading