diff --git a/plugins/common/ratelimiter/config.go b/plugins/common/ratelimiter/config.go index a2ca077c05f59..9ebbeb2704c16 100644 --- a/plugins/common/ratelimiter/config.go +++ b/plugins/common/ratelimiter/config.go @@ -1,6 +1,7 @@ package ratelimiter import ( + "errors" "time" "github.com/influxdata/telegraf/config" @@ -11,9 +12,12 @@ type RateLimitConfig struct { Period config.Duration `toml:"rate_limit_period"` } -func (cfg *RateLimitConfig) CreateRateLimiter() *RateLimiter { +func (cfg *RateLimitConfig) CreateRateLimiter() (*RateLimiter, error) { + if cfg.Limit > 0 && cfg.Period <= 0 { + return nil, errors.New("invalid period for rate-limit") + } return &RateLimiter{ limit: int64(cfg.Limit), period: time.Duration(cfg.Period), - } + }, nil } diff --git a/plugins/common/ratelimiter/limiters_test.go b/plugins/common/ratelimiter/limiters_test.go index e886b1cc80221..28b53159ce448 100644 --- a/plugins/common/ratelimiter/limiters_test.go +++ b/plugins/common/ratelimiter/limiters_test.go @@ -9,9 +9,16 @@ import ( "github.com/stretchr/testify/require" ) +func TestInvalidPeriod(t *testing.T) { + cfg := &RateLimitConfig{Limit: config.Size(1024)} + _, err := cfg.CreateRateLimiter() + require.ErrorContains(t, err, "invalid period for rate-limit") +} + func TestUnlimited(t *testing.T) { cfg := &RateLimitConfig{} - limiter := cfg.CreateRateLimiter() + limiter, err := cfg.CreateRateLimiter() + require.NoError(t, err) start := time.Now() end := start.Add(30 * time.Minute) @@ -24,7 +31,8 @@ func TestUnlimitedWithPeriod(t *testing.T) { cfg := &RateLimitConfig{ Period: config.Duration(5 * time.Minute), } - limiter := cfg.CreateRateLimiter() + limiter, err := cfg.CreateRateLimiter() + require.NoError(t, err) start := time.Now() end := start.Add(30 * time.Minute) @@ -67,7 +75,8 @@ func TestLimited(t *testing.T) { for _, tt := range tests { t.Run(tt.name+" at period", func(t *testing.T) { // Setup the limiter - limiter := tt.cfg.CreateRateLimiter() + limiter, err := tt.cfg.CreateRateLimiter() + require.NoError(t, err) // Compute the actual values start := time.Now().Truncate(tt.step) @@ -85,7 +94,8 @@ func TestLimited(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Setup the limiter - limiter := tt.cfg.CreateRateLimiter() + limiter, err := tt.cfg.CreateRateLimiter() + require.NoError(t, err) // Compute the actual values start := time.Now().Truncate(tt.step).Add(1 * time.Second) @@ -134,7 +144,8 @@ func TestUndo(t *testing.T) { for _, tt := range tests { t.Run(tt.name+" at period", func(t *testing.T) { // Setup the limiter - limiter := tt.cfg.CreateRateLimiter() + limiter, err := tt.cfg.CreateRateLimiter() + require.NoError(t, err) // Compute the actual values start := time.Now().Truncate(tt.step) @@ -156,7 +167,8 @@ func TestUndo(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Setup the limiter - limiter := tt.cfg.CreateRateLimiter() + limiter, err := tt.cfg.CreateRateLimiter() + require.NoError(t, err) // Compute the actual values start := time.Now().Truncate(tt.step).Add(1 * time.Second) diff --git a/plugins/outputs/influxdb_v2/README.md b/plugins/outputs/influxdb_v2/README.md index 239e953e6fcc9..b9a78f0b00a04 100644 --- a/plugins/outputs/influxdb_v2/README.md +++ b/plugins/outputs/influxdb_v2/README.md @@ -101,6 +101,12 @@ to use them. # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false + + ## Rate limits for sending data (disabled by default) + ## Available, uncompressed payload size e.g. "5Mb" + # rate_limit = "unlimited" + ## Fixed time-window for the available payload size e.g. "5m" + # rate_limit_period = "0s" ``` ## Metrics diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index 34e698dd75c94..8a622a5f4b522 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -22,7 +22,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/plugins/serializers/influx" + "github.com/influxdata/telegraf/plugins/common/ratelimiter" ) type APIError struct { @@ -59,8 +59,9 @@ type httpClient struct { pingTimeout config.Duration readIdleTimeout config.Duration tlsConfig *tls.Config - serializer *influx.Serializer encoder internal.ContentEncoder + serializer ratelimiter.Serializer + rateLimiter *ratelimiter.RateLimiter client *http.Client params url.Values retryTime time.Time @@ -160,52 +161,69 @@ func (c *httpClient) Write(ctx context.Context, metrics []telegraf.Metric) error } batches := make(map[string][]telegraf.Metric) + batchIndices := make(map[string][]int) if c.bucketTag == "" { - err := c.writeBatch(ctx, c.bucket, metrics) - if err != nil { - var apiErr *APIError - if errors.As(err, &apiErr) { - if apiErr.StatusCode == http.StatusRequestEntityTooLarge { - return c.splitAndWriteBatch(ctx, c.bucket, metrics) - } - } - - return err + batches[c.bucket] = metrics + batchIndices[c.bucket] = make([]int, len(metrics)) + for i := range metrics { + batchIndices[c.bucket][i] = i } } else { - for _, metric := range metrics { + for i, metric := range metrics { bucket, ok := metric.GetTag(c.bucketTag) if !ok { bucket = c.bucket - } - - if _, ok := batches[bucket]; !ok { - batches[bucket] = make([]telegraf.Metric, 0) - } - - if c.excludeBucketTag { - // Avoid modifying the metric in case we need to retry the request. + } else if c.excludeBucketTag { + // Avoid modifying the metric if we do remove the tag metric = metric.Copy() metric.Accept() metric.RemoveTag(c.bucketTag) } batches[bucket] = append(batches[bucket], metric) + batchIndices[c.bucket] = append(batchIndices[c.bucket], i) + } + } + + var wErr internal.PartialWriteError + for bucket, batch := range batches { + err := c.writeBatch(ctx, bucket, batch) + if err == nil { + wErr.MetricsAccept = append(wErr.MetricsAccept, batchIndices[bucket]...) + continue } - for bucket, batch := range batches { - err := c.writeBatch(ctx, bucket, batch) - if err != nil { - var apiErr *APIError - if errors.As(err, &apiErr) { - if apiErr.StatusCode == http.StatusRequestEntityTooLarge { - return c.splitAndWriteBatch(ctx, c.bucket, metrics) - } - } - - return err + // Check if the request was too large and split it + var apiErr *APIError + if errors.As(err, &apiErr) { + if apiErr.StatusCode == http.StatusRequestEntityTooLarge { + return c.splitAndWriteBatch(ctx, c.bucket, metrics) } + wErr.Err = err + wErr.MetricsReject = append(wErr.MetricsReject, batchIndices[bucket]...) + return &wErr } + + // Check if we got a write error and if so, translate the returned + // metric indices to return the original indices in case of bucketing + var writeErr *internal.PartialWriteError + if errors.As(err, &writeErr) { + wErr.Err = writeErr.Err + for _, idx := range writeErr.MetricsAccept { + wErr.MetricsAccept = append(wErr.MetricsAccept, batchIndices[bucket][idx]) + } + for _, idx := range writeErr.MetricsReject { + wErr.MetricsReject = append(wErr.MetricsReject, batchIndices[bucket][idx]) + } + if !errors.Is(writeErr.Err, internal.ErrSizeLimitReached) { + continue + } + return &wErr + } + + // Return the error without special treatment + wErr.Err = err + return &wErr } return nil } @@ -222,11 +240,16 @@ func (c *httpClient) splitAndWriteBatch(ctx context.Context, bucket string, metr } func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []telegraf.Metric) error { - // Serialize the metrics - body, err := c.serializer.SerializeBatch(metrics) - if err != nil { - return err + // Get the current limit for the outbound data + ratets := time.Now() + limit := c.rateLimiter.Remaining(ratets) + + // Serialize the metrics with the remaining limit, exit early if nothing was serialized + body, werr := c.serializer.SerializeBatch(metrics, limit) + if werr != nil && !errors.Is(werr, internal.ErrSizeLimitReached) || len(body) == 0 { + return werr } + used := int64(len(body)) // Encode the content if requested if c.encoder != nil { @@ -249,6 +272,7 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te c.addHeaders(req) // Execute the request + c.rateLimiter.Accept(ratets, used) resp, err := c.client.Do(req.WithContext(ctx)) if err != nil { internal.OnClientError(c.client, err) @@ -269,7 +293,7 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te http.StatusMultiStatus, http.StatusAlreadyReported: c.retryCount = 0 - return nil + return werr } // We got an error and now try to decode further @@ -294,11 +318,18 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te http.StatusBadRequest, // request was received but server refused to process it due to a semantic problem with the request. // for example, submitting metrics outside the retention period. - // Clients should *not* repeat the request and the metrics should be dropped. http.StatusUnprocessableEntity, http.StatusNotAcceptable: - c.log.Errorf("Failed to write metric to %s (will be dropped: %s): %s\n", bucket, resp.Status, desc) - return nil + + // Clients should *not* repeat the request and the metrics should be dropped. + rejected := make([]int, 0, len(metrics)) + for i := range len(metrics) { + rejected = append(rejected, i) + } + return &internal.PartialWriteError{ + Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s): %s", bucket, resp.Status, desc), + MetricsReject: rejected, + } case http.StatusUnauthorized, http.StatusForbidden: return fmt.Errorf("failed to write metric to %s (%s): %s", bucket, resp.Status, desc) case http.StatusTooManyRequests, @@ -316,8 +347,14 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te // if it's any other 4xx code, the client should not retry as it's the client's mistake. // retrying will not make the request magically work. if len(resp.Status) > 0 && resp.Status[0] == '4' { - c.log.Errorf("Failed to write metric to %s (will be dropped: %s): %s\n", bucket, resp.Status, desc) - return nil + rejected := make([]int, 0, len(metrics)) + for i := range len(metrics) { + rejected = append(rejected, i) + } + return &internal.PartialWriteError{ + Err: fmt.Errorf("failed to write metric to %s (will be dropped: %s): %s", bucket, resp.Status, desc), + MetricsReject: rejected, + } } // This is only until platform spec is fully implemented. As of the diff --git a/plugins/outputs/influxdb_v2/influxdb_v2.go b/plugins/outputs/influxdb_v2/influxdb_v2.go index 15a66632788e2..89b0d8d2f875a 100644 --- a/plugins/outputs/influxdb_v2/influxdb_v2.go +++ b/plugins/outputs/influxdb_v2/influxdb_v2.go @@ -17,6 +17,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/common/ratelimiter" commontls "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers/influx" @@ -44,10 +45,11 @@ type InfluxDB struct { ReadIdleTimeout config.Duration `toml:"read_idle_timeout"` Log telegraf.Logger `toml:"-"` commontls.ClientConfig + ratelimiter.RateLimitConfig clients []*httpClient encoder internal.ContentEncoder - serializer *influx.Serializer + serializer ratelimiter.Serializer tlsCfg *tls.Config } @@ -65,7 +67,7 @@ func (i *InfluxDB) Init() error { i.URLs = append(i.URLs, "http://localhost:8086") } - // Check options + // Init encoding if configured switch i.ContentEncoding { case "", "gzip": i.ContentEncoding = "gzip" @@ -80,13 +82,14 @@ func (i *InfluxDB) Init() error { } // Setup the limited serializer - i.serializer = &influx.Serializer{ + serializer := &influx.Serializer{ UintSupport: i.UintSupport, OmitTimestamp: i.OmitTimestamp, } - if err := i.serializer.Init(); err != nil { + if err := serializer.Init(); err != nil { return fmt.Errorf("setting up serializer failed: %w", err) } + i.serializer = ratelimiter.NewIndividualSerializer(serializer) // Setup the client config tlsCfg, err := i.ClientConfig.TLSConfig() @@ -142,6 +145,10 @@ func (i *InfluxDB) Connect() error { switch parts.Scheme { case "http", "https", "unix": + limiter, err := i.RateLimitConfig.CreateRateLimiter() + if err != nil { + return err + } c := &httpClient{ url: parts, localAddr: localAddr, @@ -158,8 +165,9 @@ func (i *InfluxDB) Connect() error { tlsConfig: i.tlsCfg, pingTimeout: i.PingTimeout, readIdleTimeout: i.ReadIdleTimeout, - serializer: i.serializer, encoder: i.encoder, + rateLimiter: limiter, + serializer: i.serializer, log: i.Log, } @@ -191,6 +199,10 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { for _, n := range rand.Perm(len(i.clients)) { client := i.clients[n] if err := client.Write(ctx, metrics); err != nil { + var werr *internal.PartialWriteError + if errors.As(err, &werr) || errors.Is(err, internal.ErrSizeLimitReached) { + return err + } i.Log.Errorf("When writing to [%s]: %v", client.url, err) continue } diff --git a/plugins/outputs/influxdb_v2/influxdb_v2_test.go b/plugins/outputs/influxdb_v2/influxdb_v2_test.go index 36c3c3b08e0d9..f93617a38744e 100644 --- a/plugins/outputs/influxdb_v2/influxdb_v2_test.go +++ b/plugins/outputs/influxdb_v2/influxdb_v2_test.go @@ -7,6 +7,7 @@ import ( "net/http/httptest" "reflect" "strings" + "sync/atomic" "testing" "time" @@ -14,7 +15,9 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/common/ratelimiter" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" influxdb "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2" @@ -373,3 +376,113 @@ func TestTooLargeWriteRetry(t *testing.T) { } require.Error(t, plugin.Write(hugeMetrics)) } + +func TestRateLimit(t *testing.T) { + // Setup a test server + var received atomic.Uint64 + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/v2/write": + if err := r.ParseForm(); err != nil { + w.WriteHeader(http.StatusUnprocessableEntity) + return + } + + body, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusUnprocessableEntity) + return + } + received.Add(uint64(len(body))) + + w.WriteHeader(http.StatusNoContent) + + return + default: + w.WriteHeader(http.StatusNotFound) + return + } + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Bucket: "telegraf", + ContentEncoding: "identity", + RateLimitConfig: ratelimiter.RateLimitConfig{ + Limit: 50, + Period: config.Duration(time.Second), + }, + Log: &testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + defer plugin.Close() + + // Together the metric batch size is too big, split up, we get success + metrics := []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 1), + ), + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 99.0, + }, + time.Unix(0, 2), + ), + metric.New( + "operating_hours", + map[string]string{ + "machine": "A", + }, + map[string]interface{}{ + "value": 123.456, + }, + time.Unix(0, 3), + ), + metric.New( + "status", + map[string]string{ + "machine": "B", + }, + map[string]interface{}{ + "temp": 48.235, + "remaining": 999.999, + }, + time.Unix(0, 4), + ), + } + + // Write the metrics the first time. Only the first two metrics should be + // received by the server due to the rate limit. + require.ErrorIs(t, plugin.Write(metrics), internal.ErrSizeLimitReached) + require.LessOrEqual(t, received.Load(), uint64(30)) + + // A direct follow-up write attempt with the remaining metrics should fail + // due to the rate limit being reached + require.ErrorIs(t, plugin.Write(metrics[2:]), internal.ErrSizeLimitReached) + require.LessOrEqual(t, received.Load(), uint64(30)) + + // Wait for at least the period (plus some safety margin) to write the third metric + time.Sleep(time.Duration(plugin.RateLimitConfig.Period) + 100*time.Millisecond) + require.ErrorIs(t, plugin.Write(metrics[2:]), internal.ErrSizeLimitReached) + require.Greater(t, received.Load(), uint64(30)) + require.LessOrEqual(t, received.Load(), uint64(72)) + + // Wait again for the period for at least the period (plus some safety margin) + // to write the last metric. This should finally succeed as all metrics + // are written. + time.Sleep(time.Duration(plugin.RateLimitConfig.Period) + 100*time.Millisecond) + require.NoError(t, plugin.Write(metrics[3:])) + require.Equal(t, uint64(121), received.Load()) +} diff --git a/plugins/outputs/influxdb_v2/sample.conf b/plugins/outputs/influxdb_v2/sample.conf index 5fc41a6613686..e5de679fcd7fb 100644 --- a/plugins/outputs/influxdb_v2/sample.conf +++ b/plugins/outputs/influxdb_v2/sample.conf @@ -71,3 +71,9 @@ # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false + + ## Rate limits for sending data (disabled by default) + ## Available, uncompressed payload size e.g. "5Mb" + # rate_limit = "unlimited" + ## Fixed time-window for the available payload size e.g. "5m" + # rate_limit_period = "0s"