Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(outputs.influxdb_v2): Add rate limit implementation #15742

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions plugins/common/ratelimiter/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ratelimiter

import (
"errors"
"time"

"github.com/influxdata/telegraf/config"
Expand All @@ -11,9 +12,12 @@ type RateLimitConfig struct {
Period config.Duration `toml:"rate_limit_period"`
}

func (cfg *RateLimitConfig) CreateRateLimiter() *RateLimiter {
func (cfg *RateLimitConfig) CreateRateLimiter() (*RateLimiter, error) {
if cfg.Limit > 0 && cfg.Period <= 0 {
return nil, errors.New("invalid period for rate-limit")
}
return &RateLimiter{
limit: int64(cfg.Limit),
period: time.Duration(cfg.Period),
}
}, nil
}
24 changes: 18 additions & 6 deletions plugins/common/ratelimiter/limiters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,16 @@ import (
"github.com/stretchr/testify/require"
)

func TestInvalidPeriod(t *testing.T) {
cfg := &RateLimitConfig{Limit: config.Size(1024)}
_, err := cfg.CreateRateLimiter()
require.ErrorContains(t, err, "invalid period for rate-limit")
}

func TestUnlimited(t *testing.T) {
cfg := &RateLimitConfig{}
limiter := cfg.CreateRateLimiter()
limiter, err := cfg.CreateRateLimiter()
require.NoError(t, err)

start := time.Now()
end := start.Add(30 * time.Minute)
Expand All @@ -24,7 +31,8 @@ func TestUnlimitedWithPeriod(t *testing.T) {
cfg := &RateLimitConfig{
Period: config.Duration(5 * time.Minute),
}
limiter := cfg.CreateRateLimiter()
limiter, err := cfg.CreateRateLimiter()
require.NoError(t, err)

start := time.Now()
end := start.Add(30 * time.Minute)
Expand Down Expand Up @@ -67,7 +75,8 @@ func TestLimited(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name+" at period", func(t *testing.T) {
// Setup the limiter
limiter := tt.cfg.CreateRateLimiter()
limiter, err := tt.cfg.CreateRateLimiter()
require.NoError(t, err)

// Compute the actual values
start := time.Now().Truncate(tt.step)
Expand All @@ -85,7 +94,8 @@ func TestLimited(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Setup the limiter
limiter := tt.cfg.CreateRateLimiter()
limiter, err := tt.cfg.CreateRateLimiter()
require.NoError(t, err)

// Compute the actual values
start := time.Now().Truncate(tt.step).Add(1 * time.Second)
Expand Down Expand Up @@ -134,7 +144,8 @@ func TestUndo(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name+" at period", func(t *testing.T) {
// Setup the limiter
limiter := tt.cfg.CreateRateLimiter()
limiter, err := tt.cfg.CreateRateLimiter()
require.NoError(t, err)

// Compute the actual values
start := time.Now().Truncate(tt.step)
Expand All @@ -156,7 +167,8 @@ func TestUndo(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Setup the limiter
limiter := tt.cfg.CreateRateLimiter()
limiter, err := tt.cfg.CreateRateLimiter()
require.NoError(t, err)

// Compute the actual values
start := time.Now().Truncate(tt.step).Add(1 * time.Second)
Expand Down
6 changes: 6 additions & 0 deletions plugins/outputs/influxdb_v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ to use them.
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Rate limits for sending data (disabled by default)
## Available, uncompressed payload size e.g. "5Mb"
# rate_limit = "unlimited"
## Fixed time-window for the available payload size e.g. "5m"
# rate_limit_period = "0s"
```

## Metrics
Expand Down
121 changes: 79 additions & 42 deletions plugins/outputs/influxdb_v2/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/plugins/common/ratelimiter"
)

type APIError struct {
Expand Down Expand Up @@ -59,8 +59,9 @@ type httpClient struct {
pingTimeout config.Duration
readIdleTimeout config.Duration
tlsConfig *tls.Config
serializer *influx.Serializer
encoder internal.ContentEncoder
serializer ratelimiter.Serializer
rateLimiter *ratelimiter.RateLimiter
client *http.Client
params url.Values
retryTime time.Time
Expand Down Expand Up @@ -160,52 +161,69 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
}

batches := make(map[string][]telegraf.Metric)
batchIndices := make(map[string][]int)
if c.bucketTag == "" {
err := c.writeBatch(ctx, c.bucket, metrics)
if err != nil {
var apiErr *APIError
if errors.As(err, &apiErr) {
if apiErr.StatusCode == http.StatusRequestEntityTooLarge {
return c.splitAndWriteBatch(ctx, c.bucket, metrics)
}
}

return err
batches[c.bucket] = metrics
batchIndices[c.bucket] = make([]int, len(metrics))
for i := range metrics {
batchIndices[c.bucket][i] = i
}
} else {
for _, metric := range metrics {
for i, metric := range metrics {
bucket, ok := metric.GetTag(c.bucketTag)
if !ok {
bucket = c.bucket
}

if _, ok := batches[bucket]; !ok {
batches[bucket] = make([]telegraf.Metric, 0)
}

if c.excludeBucketTag {
// Avoid modifying the metric in case we need to retry the request.
} else if c.excludeBucketTag {
// Avoid modifying the metric if we do remove the tag
metric = metric.Copy()
metric.Accept()
metric.RemoveTag(c.bucketTag)
}

batches[bucket] = append(batches[bucket], metric)
batchIndices[c.bucket] = append(batchIndices[c.bucket], i)
}
}

var wErr internal.PartialWriteError
for bucket, batch := range batches {
err := c.writeBatch(ctx, bucket, batch)
if err == nil {
wErr.MetricsAccept = append(wErr.MetricsAccept, batchIndices[bucket]...)
continue
}

for bucket, batch := range batches {
err := c.writeBatch(ctx, bucket, batch)
if err != nil {
var apiErr *APIError
if errors.As(err, &apiErr) {
if apiErr.StatusCode == http.StatusRequestEntityTooLarge {
return c.splitAndWriteBatch(ctx, c.bucket, metrics)
}
}

return err
// Check if the request was too large and split it
var apiErr *APIError
if errors.As(err, &apiErr) {
if apiErr.StatusCode == http.StatusRequestEntityTooLarge {
return c.splitAndWriteBatch(ctx, c.bucket, metrics)
}
wErr.Err = err
wErr.MetricsReject = append(wErr.MetricsReject, batchIndices[bucket]...)
return &wErr
}

// Check if we got a write error and if so, translate the returned
// metric indices to return the original indices in case of bucketing
var writeErr *internal.PartialWriteError
if errors.As(err, &writeErr) {
wErr.Err = writeErr.Err
for _, idx := range writeErr.MetricsAccept {
wErr.MetricsAccept = append(wErr.MetricsAccept, batchIndices[bucket][idx])
}
for _, idx := range writeErr.MetricsReject {
wErr.MetricsReject = append(wErr.MetricsReject, batchIndices[bucket][idx])
}
if !errors.Is(writeErr.Err, internal.ErrSizeLimitReached) {
continue
}
return &wErr
}

// Return the error without special treatment
wErr.Err = err
return &wErr
}
return nil
}
Expand All @@ -222,11 +240,16 @@ func (c *httpClient) splitAndWriteBatch(ctx context.Context, bucket string, metr
}

func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error {
// Serialize the metrics
body, err := c.serializer.SerializeBatch(metrics)
if err != nil {
return err
// Get the current limit for the outbound data
ratets := time.Now()
limit := c.rateLimiter.Remaining(ratets)

// Serialize the metrics with the remaining limit, exit early if nothing was serialized
body, werr := c.serializer.SerializeBatch(metrics, limit)
if werr != nil && !errors.Is(werr, internal.ErrSizeLimitReached) || len(body) == 0 {
return werr
}
used := int64(len(body))

// Encode the content if requested
if c.encoder != nil {
Expand All @@ -249,6 +272,7 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
c.addHeaders(req)

// Execute the request
c.rateLimiter.Accept(ratets, used)
resp, err := c.client.Do(req.WithContext(ctx))
if err != nil {
internal.OnClientError(c.client, err)
Expand All @@ -269,7 +293,7 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
http.StatusMultiStatus,
http.StatusAlreadyReported:
c.retryCount = 0
return nil
return werr
}

// We got an error and now try to decode further
Expand All @@ -294,11 +318,18 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
http.StatusBadRequest,
// request was received but server refused to process it due to a semantic problem with the request.
// for example, submitting metrics outside the retention period.
// Clients should *not* repeat the request and the metrics should be dropped.
http.StatusUnprocessableEntity,
http.StatusNotAcceptable:
c.log.Errorf("Failed to write metric to %s (will be dropped: %s): %s\n", bucket, resp.Status, desc)
return nil

// Clients should *not* repeat the request and the metrics should be dropped.
rejected := make([]int, 0, len(metrics))
for i := range len(metrics) {
rejected = append(rejected, i)
}
return &internal.PartialWriteError{
Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s): %s", bucket, resp.Status, desc),
MetricsReject: rejected,
}
case http.StatusUnauthorized, http.StatusForbidden:
return fmt.Errorf("failed to write metric to %s (%s): %s", bucket, resp.Status, desc)
case http.StatusTooManyRequests,
Expand All @@ -316,8 +347,14 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
// if it's any other 4xx code, the client should not retry as it's the client's mistake.
// retrying will not make the request magically work.
if len(resp.Status) > 0 && resp.Status[0] == '4' {
c.log.Errorf("Failed to write metric to %s (will be dropped: %s): %s\n", bucket, resp.Status, desc)
return nil
rejected := make([]int, 0, len(metrics))
for i := range len(metrics) {
rejected = append(rejected, i)
}
return &internal.PartialWriteError{
Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s): %s", bucket, resp.Status, desc),
MetricsReject: rejected,
}
}

// This is only until platform spec is fully implemented. As of the
Expand Down
22 changes: 17 additions & 5 deletions plugins/outputs/influxdb_v2/influxdb_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/ratelimiter"
commontls "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers/influx"
Expand Down Expand Up @@ -44,10 +45,11 @@ type InfluxDB struct {
ReadIdleTimeout config.Duration `toml:"read_idle_timeout"`
Log telegraf.Logger `toml:"-"`
commontls.ClientConfig
ratelimiter.RateLimitConfig

clients []*httpClient
encoder internal.ContentEncoder
serializer *influx.Serializer
serializer ratelimiter.Serializer
tlsCfg *tls.Config
}

Expand All @@ -65,7 +67,7 @@ func (i *InfluxDB) Init() error {
i.URLs = append(i.URLs, "http://localhost:8086")
}

// Check options
// Init encoding if configured
switch i.ContentEncoding {
case "", "gzip":
i.ContentEncoding = "gzip"
Expand All @@ -80,13 +82,14 @@ func (i *InfluxDB) Init() error {
}

// Setup the limited serializer
i.serializer = &influx.Serializer{
serializer := &influx.Serializer{
UintSupport: i.UintSupport,
OmitTimestamp: i.OmitTimestamp,
}
if err := i.serializer.Init(); err != nil {
if err := serializer.Init(); err != nil {
return fmt.Errorf("setting up serializer failed: %w", err)
}
i.serializer = ratelimiter.NewIndividualSerializer(serializer)

// Setup the client config
tlsCfg, err := i.ClientConfig.TLSConfig()
Expand Down Expand Up @@ -142,6 +145,10 @@ func (i *InfluxDB) Connect() error {

switch parts.Scheme {
case "http", "https", "unix":
limiter, err := i.RateLimitConfig.CreateRateLimiter()
if err != nil {
return err
}
c := &httpClient{
url: parts,
localAddr: localAddr,
Expand All @@ -158,8 +165,9 @@ func (i *InfluxDB) Connect() error {
tlsConfig: i.tlsCfg,
pingTimeout: i.PingTimeout,
readIdleTimeout: i.ReadIdleTimeout,
serializer: i.serializer,
encoder: i.encoder,
rateLimiter: limiter,
serializer: i.serializer,
log: i.Log,
}

Expand Down Expand Up @@ -191,6 +199,10 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
for _, n := range rand.Perm(len(i.clients)) {
client := i.clients[n]
if err := client.Write(ctx, metrics); err != nil {
var werr *internal.PartialWriteError
if errors.As(err, &werr) || errors.Is(err, internal.ErrSizeLimitReached) {
return err
}
i.Log.Errorf("When writing to [%s]: %v", client.url, err)
continue
}
Expand Down
Loading
Loading