Skip to content

Commit

Permalink
x-pack/filebeat/input/httpjson: Apply rate limiting to all responses (#…
Browse files Browse the repository at this point in the history
…38161)

- Drain the body before rate limiting.
- Apply rate limiting to all responses, waiting immediately. Retry if
  the response was not successful and there was a rate limit wait (even
  if immediately expired), otherwise return.
- Improve names of variables `epoch` and `activeLimit`.

Co-authored-by: Dan Kortschak <dan.kortschak@elastic.co>
  • Loading branch information
chrisberkhout and efd6 authored Mar 8, 2024
1 parent 07e231b commit ca07b8e
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 80 deletions.
73 changes: 39 additions & 34 deletions x-pack/filebeat/input/httpjson/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,35 +42,37 @@ func (r *rateLimiter) execute(ctx context.Context, f func() (*http.Response, err
for {
resp, err := f()
if err != nil {
return nil, fmt.Errorf("failed to read http.response.body: %w", err)
return nil, err
}

if r == nil || resp.StatusCode == http.StatusOK {
if r == nil {
return resp, nil
}

if resp.StatusCode != http.StatusTooManyRequests {
return nil, fmt.Errorf("http request was unsuccessful with a status code %d", resp.StatusCode)
applied, err := r.applyRateLimit(ctx, resp)
if err != nil {
return nil, fmt.Errorf("error applying rate limit: %w", err)
}

if err := r.applyRateLimit(ctx, resp); err != nil {
return nil, err
if resp.StatusCode == http.StatusOK || !applied {
return resp, nil
}
}
}

// applyRateLimit applies appropriate rate limit if specified in the HTTP Header of the response
func (r *rateLimiter) applyRateLimit(ctx context.Context, resp *http.Response) error {
epoch, err := r.getRateLimit(resp)
// applyRateLimit applies appropriate rate limit if specified in the HTTP Header of the response.
// It returns a bool indicating whether a limit was reached.
func (r *rateLimiter) applyRateLimit(ctx context.Context, resp *http.Response) (bool, error) {
limitReached, resumeAt, err := r.getRateLimit(resp)
if err != nil {
return err
return limitReached, err
}

t := time.Unix(epoch, 0)
t := time.Unix(resumeAt, 0)
w := time.Until(t)
if epoch == 0 || w <= 0 {
if resumeAt == 0 || w <= 0 {
r.log.Debugf("Rate Limit: No need to apply rate limit.")
return nil
return limitReached, nil
}
r.log.Debugf("Rate Limit: Wait until %v for the rate limit to reset.", t)
timer := time.NewTimer(w)
Expand All @@ -80,24 +82,25 @@ func (r *rateLimiter) applyRateLimit(ctx context.Context, resp *http.Response) e
<-timer.C
}
r.log.Info("Context done.")
return nil
return limitReached, nil
case <-timer.C:
r.log.Debug("Rate Limit: time is up.")
return nil
return limitReached, nil
}
}

// getRateLimit gets the rate limit value if specified in the response,
// and returns an int64 value in seconds since unix epoch for rate limit reset time.
// and returns a bool indicating whether a limit was reached, and
// an int64 value in seconds since unix epoch for rate limit reset time.
// When there is a remaining rate limit quota, or when the rate limit reset time has expired, it
// returns 0 for the epoch value.
func (r *rateLimiter) getRateLimit(resp *http.Response) (int64, error) {
func (r *rateLimiter) getRateLimit(resp *http.Response) (bool, int64, error) {
if r == nil {
return 0, nil
return false, 0, nil
}

if r.remaining == nil {
return 0, nil
return false, 0, nil
}

tr := transformable{}
Expand All @@ -106,54 +109,56 @@ func (r *rateLimiter) getRateLimit(resp *http.Response) (int64, error) {

remaining, _ := r.remaining.Execute(ctx, tr, "rate-limit_remaining", nil, r.log)
if remaining == "" {
return 0, errors.New("remaining value is empty")
r.log.Infow("get rate limit", "error", errors.New("remaining value is empty"))
return false, 0, nil
}
m, err := strconv.ParseInt(remaining, 10, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse rate-limit remaining value: %w", err)
return false, 0, fmt.Errorf("failed to parse rate-limit remaining value: %w", err)
}

// by default, httpjson will continue requests until Limit is 0
// can optionally stop requests "early"
var activeLimit int64 = 0
var minRemaining int64 = 0
if r.earlyLimit != nil {
earlyLimit := *r.earlyLimit
if earlyLimit > 0 && earlyLimit < 1 {
limit, _ := r.limit.Execute(ctx, tr, "early_limit", nil, r.log)
if limit != "" {
l, err := strconv.ParseInt(limit, 10, 64)
if err == nil {
activeLimit = l - int64(earlyLimit*float64(l))
minRemaining = l - int64(earlyLimit*float64(l))
}
}
} else if earlyLimit >= 1 {
activeLimit = int64(earlyLimit)
minRemaining = int64(earlyLimit)
}
}

r.log.Debugf("Rate Limit: Using active Early Limit: %f", activeLimit)
if m > activeLimit {
return 0, nil
r.log.Debugf("Rate Limit: Using active Early Limit: %d", minRemaining)
if m > minRemaining {
return false, 0, nil
}

if r.reset == nil {
r.log.Warn("reset rate limit is not set")
return 0, nil
return false, 0, nil
}

reset, _ := r.reset.Execute(ctx, tr, "rate-limit_reset", nil, r.log)
if reset == "" {
return 0, errors.New("reset value is empty")
r.log.Infow("get rate limit", "error", errors.New("reset value is empty"))
return false, 0, nil
}

epoch, err := strconv.ParseInt(reset, 10, 64)
resumeAt, err := strconv.ParseInt(reset, 10, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse rate-limit reset value: %w", err)
return false, 0, fmt.Errorf("failed to parse rate-limit reset value: %w", err)
}

if timeNow().Unix() > epoch {
return 0, nil
if timeNow().Unix() > resumeAt {
return true, 0, nil
}

return epoch, nil
return true, resumeAt, nil
}
78 changes: 43 additions & 35 deletions x-pack/filebeat/input/httpjson/rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

// Test getRateLimit function with a remaining quota, expect to receive 0, nil.
func TestGetRateLimitReturns0IfRemainingQuota(t *testing.T) {
func TestGetRateLimitReturnsFalse0IfRemainingQuota(t *testing.T) {
header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "120")
header.Add("X-Rate-Limit-Remaining", "118")
Expand All @@ -34,12 +34,13 @@ func TestGetRateLimitReturns0IfRemainingQuota(t *testing.T) {
log: logp.NewLogger(""),
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
applied, resumeAt, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, 0, epoch)
assert.False(t, applied)
assert.EqualValues(t, 0, resumeAt)
}

func TestGetRateLimitReturns0IfEpochInPast(t *testing.T) {
func TestGetRateLimitReturnsTrue0IfResumeAtInPast(t *testing.T) {
header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "10")
header.Add("X-Rate-Limit-Remaining", "0")
Expand All @@ -57,20 +58,21 @@ func TestGetRateLimitReturns0IfEpochInPast(t *testing.T) {
log: logp.NewLogger(""),
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
applied, resumeAt, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, 0, epoch)
assert.True(t, applied)
assert.EqualValues(t, 0, resumeAt)
}

func TestGetRateLimitReturnsResetValue(t *testing.T) {
epoch := int64(1604582732 + 100)
reset := int64(1604582732 + 100)
timeNow = func() time.Time { return time.Unix(1604582732, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "10")
header.Add("X-Rate-Limit-Remaining", "0")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(epoch, 10))
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(reset, 10))
tplLimit := &valueTpl{}
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
Expand All @@ -84,22 +86,23 @@ func TestGetRateLimitReturnsResetValue(t *testing.T) {
log: logp.NewLogger(""),
}
resp := &http.Response{Header: header}
epoch2, err := rateLimit.getRateLimit(resp)
applied, resumeAt, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, 1604582832, epoch2)
assert.True(t, applied)
assert.EqualValues(t, reset, resumeAt)
}

// Test getRateLimit function with a remaining quota, using default early limit
// expect to receive 0, nil.
// expect to receive false, 0, nil.
func TestGetRateLimitReturns0IfEarlyLimit0(t *testing.T) {
resetEpoch := int64(1634579974 + 100)
resetAt := int64(1634579974 + 100)
timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "120")
header.Add("X-Rate-Limit-Remaining", "1")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10))
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetAt, 10))
tplLimit := &valueTpl{}
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
Expand All @@ -115,22 +118,23 @@ func TestGetRateLimitReturns0IfEarlyLimit0(t *testing.T) {
earlyLimit: earlyLimit,
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
applied, resumeAt, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, 0, epoch)
assert.False(t, applied)
assert.EqualValues(t, 0, resumeAt)
}

// Test getRateLimit function with a remaining limit, but early limit
// expect to receive Reset Time
// expect to receive true, Reset Time
func TestGetRateLimitReturnsResetValueIfEarlyLimit1(t *testing.T) {
resetEpoch := int64(1634579974 + 100)
resetAt := int64(1634579974 + 100)
timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "120")
header.Add("X-Rate-Limit-Remaining", "1")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10))
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetAt, 10))
tplLimit := &valueTpl{}
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
Expand All @@ -146,22 +150,23 @@ func TestGetRateLimitReturnsResetValueIfEarlyLimit1(t *testing.T) {
earlyLimit: earlyLimit,
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
applied, resumeAt, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, resetEpoch, epoch)
assert.True(t, applied)
assert.EqualValues(t, resetAt, resumeAt)
}

// Test getRateLimit function with a remaining quota, using 90% early limit
// expect to receive 0, nil.
// expect to receive false, 0, nil.
func TestGetRateLimitReturns0IfEarlyLimitPercent(t *testing.T) {
resetEpoch := int64(1634579974 + 100)
resetAt := int64(1634579974 + 100)
timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "120")
header.Add("X-Rate-Limit-Remaining", "13")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10))
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetAt, 10))
tplLimit := &valueTpl{}
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
Expand All @@ -177,22 +182,23 @@ func TestGetRateLimitReturns0IfEarlyLimitPercent(t *testing.T) {
earlyLimit: earlyLimit,
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
applied, resumeAt, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, 0, epoch)
assert.False(t, applied)
assert.EqualValues(t, 0, resumeAt)
}

// Test getRateLimit function with a remaining limit, but early limit of 90%
// expect to receive Reset Time
// expect to receive true, Reset Time
func TestGetRateLimitReturnsResetValueIfEarlyLimitPercent(t *testing.T) {
resetEpoch := int64(1634579974 + 100)
resetAt := int64(1634579974 + 100)
timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "120")
header.Add("X-Rate-Limit-Remaining", "12")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10))
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetAt, 10))
tplLimit := &valueTpl{}
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
Expand All @@ -208,21 +214,22 @@ func TestGetRateLimitReturnsResetValueIfEarlyLimitPercent(t *testing.T) {
earlyLimit: earlyLimit,
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
applied, resumeAt, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, resetEpoch, epoch)
assert.True(t, applied)
assert.EqualValues(t, resetAt, resumeAt)
}

// Test getRateLimit function when "Limit" header is missing, when using a Percentage early-limit
// expect to receive 0, nil. (default rate-limiting)
// expect to receive false, 0, nil. (default rate-limiting)
func TestGetRateLimitWhenMissingLimit(t *testing.T) {
resetEpoch := int64(1634579974 + 100)
reset := int64(1634579974 + 100)
timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Remaining", "1")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10))
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(reset, 10))
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
earlyLimit := func(i float64) *float64 { return &i }(0.9)
Expand All @@ -236,7 +243,8 @@ func TestGetRateLimitWhenMissingLimit(t *testing.T) {
earlyLimit: earlyLimit,
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
applied, resumeAt, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, 0, epoch)
assert.False(t, applied)
assert.EqualValues(t, 0, resumeAt)
}
Loading

0 comments on commit ca07b8e

Please sign in to comment.