Skip to content

Commit

Permalink
feat(inputs.influxdb_v2_listener): Add support for rate limiting (inf…
Browse files Browse the repository at this point in the history
  • Loading branch information
LarsStegman authored May 16, 2024
1 parent 5607934 commit dcb6177
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 10 deletions.
5 changes: 5 additions & 0 deletions plugins/inputs/influxdb_v2_listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## (Double check the port. Could be 9999 if using OSS Beta)
service_address = ":8086"

## Maximum undelivered metrics before rate limit kicks in.
## When the rate limit kicks in, HTTP status 429 will be returned.
## 0 disables rate limiting
# max_undelivered_metrics = 0

## Maximum duration before timing out read of the request
# read_timeout = "10s"
## Maximum duration before timing out write of the response
Expand Down
87 changes: 77 additions & 10 deletions plugins/inputs/influxdb_v2_listener/influxdb_v2_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"net"
"net/http"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/influxdata/telegraf"
Expand Down Expand Up @@ -52,19 +54,27 @@ type InfluxDBV2Listener struct {
port int
tlsint.ServerConfig

ReadTimeout config.Duration `toml:"read_timeout"`
WriteTimeout config.Duration `toml:"write_timeout"`
MaxBodySize config.Size `toml:"max_body_size"`
Token string `toml:"token"`
BucketTag string `toml:"bucket_tag"`
ParserType string `toml:"parser_type"`
MaxUndeliveredMetrics int `toml:"max_undelivered_metrics"`
ReadTimeout config.Duration `toml:"read_timeout"`
WriteTimeout config.Duration `toml:"write_timeout"`
MaxBodySize config.Size `toml:"max_body_size"`
Token string `toml:"token"`
BucketTag string `toml:"bucket_tag"`
ParserType string `toml:"parser_type"`

ctx context.Context
cancel context.CancelFunc
trackingMetricCount map[telegraf.TrackingID]int64
countLock sync.Mutex
totalUndeliveredMetrics atomic.Int64

timeFunc influx.TimeFunc

listener net.Listener
server http.Server

acc telegraf.Accumulator
acc telegraf.Accumulator
trackingAcc telegraf.TrackingAccumulator

bytesRecv selfstat.Stat
requestsServed selfstat.Stat
Expand Down Expand Up @@ -135,6 +145,26 @@ func (h *InfluxDBV2Listener) Init() error {
// Start starts the InfluxDB listener service.
func (h *InfluxDBV2Listener) Start(acc telegraf.Accumulator) error {
h.acc = acc
h.ctx, h.cancel = context.WithCancel(context.Background())
if h.MaxUndeliveredMetrics > 0 {
h.trackingAcc = h.acc.WithTracking(h.MaxUndeliveredMetrics)
h.trackingMetricCount = make(map[telegraf.TrackingID]int64, h.MaxUndeliveredMetrics)
go func() {
for {
select {
case <-h.ctx.Done():
return
case info := <-h.trackingAcc.Delivered():
if count, ok := h.trackingMetricCount[info.ID()]; ok {
h.countLock.Lock()
h.totalUndeliveredMetrics.Add(-count)
delete(h.trackingMetricCount, info.ID())
h.countLock.Unlock()
}
}
}
}()
}

tlsConf, err := h.ServerConfig.TLSConfig()
if err != nil {
Expand Down Expand Up @@ -180,6 +210,7 @@ func (h *InfluxDBV2Listener) Start(acc telegraf.Accumulator) error {

// Stop cleans up all resources
func (h *InfluxDBV2Listener) Stop() {
h.cancel()
err := h.server.Shutdown(context.Background())
if err != nil {
h.Log.Infof("Error shutting down HTTP server: %v", err.Error())
Expand Down Expand Up @@ -219,6 +250,7 @@ func (h *InfluxDBV2Listener) handleDefault() http.HandlerFunc {
func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc {
return func(res http.ResponseWriter, req *http.Request) {
defer h.writesServed.Incr(1)

// Check that the content length is not too large for us to handle.
if req.ContentLength > int64(h.MaxBodySize) {
if err := tooLarge(res, int64(h.MaxBodySize)); err != nil {
Expand Down Expand Up @@ -308,13 +340,48 @@ func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc {
if h.BucketTag != "" && bucket != "" {
m.AddTag(h.BucketTag, bucket)
}
}

h.acc.AddMetric(m)
if h.MaxUndeliveredMetrics > 0 {
h.writeWithTracking(res, metrics)
} else {
h.write(res, metrics)
}
}
}

func (h *InfluxDBV2Listener) writeWithTracking(res http.ResponseWriter, metrics []telegraf.Metric) {
if len(metrics) > h.MaxUndeliveredMetrics {
res.WriteHeader(http.StatusRequestEntityTooLarge)
h.Log.Debugf("status %d, always rejecting batch of %d metrics: larger than max_undelivered_metrics %d",
http.StatusRequestEntityTooLarge, len(metrics), h.MaxUndeliveredMetrics)
return
}

// http request success
res.WriteHeader(http.StatusNoContent)
pending := h.totalUndeliveredMetrics.Load()
remainingUndeliveredMetrics := int64(h.MaxUndeliveredMetrics) - pending
if int64(len(metrics)) > remainingUndeliveredMetrics {
res.WriteHeader(http.StatusTooManyRequests)
h.Log.Debugf("status %d, rejecting batch of %d metrics: larger than remaining undelivered metrics %d",
http.StatusTooManyRequests, len(metrics), remainingUndeliveredMetrics)
return
}

h.countLock.Lock()
trackingID := h.trackingAcc.AddTrackingMetricGroup(metrics)
h.trackingMetricCount[trackingID] = int64(len(metrics))
h.totalUndeliveredMetrics.Add(int64(len(metrics)))
h.countLock.Unlock()

res.WriteHeader(http.StatusNoContent)
}

func (h *InfluxDBV2Listener) write(res http.ResponseWriter, metrics []telegraf.Metric) {
for _, m := range metrics {
h.acc.AddMetric(m)
}

res.WriteHeader(http.StatusNoContent)
}

func tooLarge(res http.ResponseWriter, maxLength int64) error {
Expand Down
66 changes: 66 additions & 0 deletions plugins/inputs/influxdb_v2_listener/influxdb_v2_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ func newTestAuthListener() *InfluxDBV2Listener {
return listener
}

func newRateLimitedTestListener(maxUndeliveredMetrics int) *InfluxDBV2Listener {
listener := newTestListener()
listener.MaxUndeliveredMetrics = maxUndeliveredMetrics
return listener
}

func newTestSecureListener() *InfluxDBV2Listener {
listener := &InfluxDBV2Listener{
Log: testutil.Logger{},
Expand Down Expand Up @@ -599,4 +605,64 @@ func TestWriteWithPrecisionNoTimestamp(t *testing.T) {
require.Equal(t, time.Unix(42, 0), acc.Metrics[0].Time)
}

func TestRateLimitedConnectionDropsSecondRequest(t *testing.T) {
listener := newRateLimitedTestListener(1)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

msg := "xyzzy value=42\n"
postURL := createURL(listener, "http", "/api/v2/write", "bucket=mybucket&precision=s")
resp, err := http.Post(postURL, "", bytes.NewBuffer([]byte(msg))) // #nosec G107 -- url has to be dynamic due to dynamic port number
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode)

resp, err = http.Post(postURL, "", bytes.NewBuffer([]byte(msg))) // #nosec G107 -- url has to be dynamic due to dynamic port number
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 429, resp.StatusCode)
}

func TestRateLimitedConnectionAcceptsNewRequestOnDelivery(t *testing.T) {
listener := newRateLimitedTestListener(1)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

msg := "xyzzy value=42\n"
postURL := createURL(listener, "http", "/api/v2/write", "bucket=mybucket&precision=s")
resp, err := http.Post(postURL, "", bytes.NewBuffer([]byte(msg))) // #nosec G107 -- url has to be dynamic due to dynamic port number
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode)

ms := acc.GetTelegrafMetrics()
for _, m := range ms {
m.Accept()
}

resp, err = http.Post(postURL, "", bytes.NewBuffer([]byte(msg))) // #nosec G107 -- url has to be dynamic due to dynamic port number
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode)
}

func TestRateLimitedConnectionRejectsBatchesLargerThanMaxUndeliveredMetrics(t *testing.T) {
listener := newRateLimitedTestListener(1)
acc := &testutil.Accumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()

msg := "xyzzy value=42\nxyzzy value=43"
postURL := createURL(listener, "http", "/api/v2/write", "bucket=mybucket&precision=s")
resp, err := http.Post(postURL, "", bytes.NewBuffer([]byte(msg))) // #nosec G107 -- url has to be dynamic due to dynamic port number
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 413, resp.StatusCode)
}

// The term 'master_repl' used here is archaic language from redis
5 changes: 5 additions & 0 deletions plugins/inputs/influxdb_v2_listener/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
## (Double check the port. Could be 9999 if using OSS Beta)
service_address = ":8086"

## Maximum undelivered metrics before rate limit kicks in.
## When the rate limit kicks in, HTTP status 429 will be returned.
## 0 disables rate limiting
# max_undelivered_metrics = 0

## Maximum duration before timing out read of the request
# read_timeout = "10s"
## Maximum duration before timing out write of the response
Expand Down

0 comments on commit dcb6177

Please sign in to comment.