Skip to content

Commit

Permalink
Add dryrun feature
Browse files Browse the repository at this point in the history
Signed-off-by: Justin Jung <jungjust@amazon.com>
  • Loading branch information
justinjung04 committed Jun 14, 2024
1 parent 6c15c92 commit 6ae585a
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 14 deletions.
22 changes: 21 additions & 1 deletion pkg/storegateway/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (

"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/store"
"github.com/weaveworks/common/httpgrpc"
Expand Down Expand Up @@ -65,18 +67,36 @@ func (t *tokenBucketLimiter) ReserveWithType(num uint64, dataType store.StoreDat
return nil
}

// if request bucket is running low, check shared buckets
// if we can't retrieve from request bucket, check shared buckets
retrieved = t.userTokenBucket.Retrieve(tokensToRetrieve)
if !retrieved {
// if dry run, force retrieve all tokens and return nil
if t.dryRun {
t.requestTokenBucket.ForceRetrieve(tokensToRetrieve)
t.userTokenBucket.ForceRetrieve(tokensToRetrieve)
t.podTokenBucket.ForceRetrieve(tokensToRetrieve)
level.Warn(util_log.Logger).Log("msg", "not enough tokens in user token bucket", "dataType", dataType, "dataSize", num, "tokens", tokensToRetrieve)
return nil
}
return fmt.Errorf("not enough tokens in user token bucket")
}

retrieved = t.podTokenBucket.Retrieve(tokensToRetrieve)
if !retrieved {
t.userTokenBucket.Refund(tokensToRetrieve)

// if dry run, force retrieve all tokens and return nil
if t.dryRun {
// user bucket is already retrieved
t.requestTokenBucket.ForceRetrieve(tokensToRetrieve)
t.podTokenBucket.ForceRetrieve(tokensToRetrieve)
level.Warn(util_log.Logger).Log("msg", "not enough tokens in pod token bucket", "dataType", dataType, "dataSize", num, "tokens", tokensToRetrieve)
return nil
}
return fmt.Errorf("not enough tokens in pod token bucket")
}

t.requestTokenBucket.ForceRetrieve(tokensToRetrieve)
return nil
}

Expand Down
58 changes: 47 additions & 11 deletions pkg/storegateway/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestNewTokenBucketLimiter(t *testing.T) {
podTokenBucket := util.NewTokenBucket(3, 3, nil)
userTokenBucket := util.NewTokenBucket(2, 2, nil)
requestTokenBucket := util.NewTokenBucket(1, 1, nil)
l := newTokenBucketLimiter(podTokenBucket, userTokenBucket, requestTokenBucket, true, func(tokens uint64, dataType store.StoreDataType) int64 {
l := newTokenBucketLimiter(podTokenBucket, userTokenBucket, requestTokenBucket, false, func(tokens uint64, dataType store.StoreDataType) int64 {
if dataType == store.SeriesFetched {
return int64(tokens) * 5
}
Expand All @@ -45,30 +45,66 @@ func TestNewTokenBucketLimiter(t *testing.T) {

// should force retrieve tokens from all buckets upon succeeding
assert.NoError(t, l.ReserveWithType(2, store.PostingsFetched))
assert.False(t, podTokenBucket.Retrieve(2))
assert.Equal(t, int64(1), podTokenBucket.RemainingTokens())
assert.Equal(t, int64(0), userTokenBucket.RemainingTokens())
assert.Equal(t, int64(-1), requestTokenBucket.RemainingTokens())

// should fail if user token bucket is running low
podTokenBucket.Refund(3)
podTokenBucket.Refund(2)
userTokenBucket.Refund(2)
requestTokenBucket.Refund(1)
requestTokenBucket.Refund(2)
assert.ErrorContains(t, l.ReserveWithType(3, store.PostingsFetched), "not enough tokens in user token bucket")
assert.Equal(t, int64(3), podTokenBucket.RemainingTokens())
assert.Equal(t, int64(2), userTokenBucket.RemainingTokens())
assert.Equal(t, int64(1), requestTokenBucket.RemainingTokens())

// should fail if pod token bucket is running low
podTokenBucket.Refund(3)
userTokenBucket.Refund(2)
requestTokenBucket.Refund(1)
podTokenBucket.ForceRetrieve(3)
podTokenBucket.ForceRetrieve(2)
assert.ErrorContains(t, l.ReserveWithType(2, store.PostingsFetched), "not enough tokens in pod token bucket")
assert.Equal(t, int64(1), podTokenBucket.RemainingTokens())
assert.Equal(t, int64(2), userTokenBucket.RemainingTokens())
assert.Equal(t, int64(1), requestTokenBucket.RemainingTokens())

// should retrieve different amount of tokens based on data type
podTokenBucket.Refund(3)
userTokenBucket.Refund(2)
requestTokenBucket.Refund(1)
podTokenBucket.Refund(2)
assert.ErrorContains(t, l.ReserveWithType(1, store.SeriesFetched), "not enough tokens in user token bucket")
assert.Equal(t, int64(3), podTokenBucket.RemainingTokens())
assert.Equal(t, int64(2), userTokenBucket.RemainingTokens())
assert.Equal(t, int64(1), requestTokenBucket.RemainingTokens())

// should always succeed if retrieve token bucket has enough tokens, although shared buckets are empty
podTokenBucket.ForceRetrieve(3)
userTokenBucket.ForceRetrieve(2)
assert.NoError(t, l.ReserveWithType(1, store.PostingsFetched))
assert.Equal(t, int64(-1), podTokenBucket.RemainingTokens())
assert.Equal(t, int64(-1), userTokenBucket.RemainingTokens())
assert.Equal(t, int64(0), requestTokenBucket.RemainingTokens())
}

func TestNewTokenBucketLimter_DryRun(t *testing.T) {
podTokenBucket := util.NewTokenBucket(3, 3, nil)
userTokenBucket := util.NewTokenBucket(2, 2, nil)
requestTokenBucket := util.NewTokenBucket(1, 1, nil)
l := newTokenBucketLimiter(podTokenBucket, userTokenBucket, requestTokenBucket, true, func(tokens uint64, dataType store.StoreDataType) int64 {
if dataType == store.SeriesFetched {
return int64(tokens) * 5
}
return int64(tokens)
})

// should force retrieve tokens from all buckets upon succeeding
assert.NoError(t, l.ReserveWithType(2, store.PostingsFetched))
assert.False(t, podTokenBucket.Retrieve(2))
assert.Equal(t, int64(1), podTokenBucket.RemainingTokens())
assert.Equal(t, int64(0), userTokenBucket.RemainingTokens())
assert.Equal(t, int64(-1), requestTokenBucket.RemainingTokens())

// should not fail even if tokens are not enough
podTokenBucket.Refund(2)
userTokenBucket.Refund(2)
requestTokenBucket.Refund(2)
assert.NoError(t, l.ReserveWithType(5, store.PostingsFetched))
assert.Equal(t, int64(-2), podTokenBucket.RemainingTokens())
assert.Equal(t, int64(-3), userTokenBucket.RemainingTokens())
assert.Equal(t, int64(-4), requestTokenBucket.RemainingTokens())
}
9 changes: 9 additions & 0 deletions pkg/util/token_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ func (t *TokenBucket) Refund(amount int64) {
}
}

func (t *TokenBucket) RemainingTokens() int64 {
t.mu.Lock()
defer t.mu.Unlock()

t.updateTokens()

return t.remainingTokens
}

func (t *TokenBucket) updateTokens() {
now := time.Now()
refilledTokens := int64(now.Sub(t.lastRefill).Seconds() * float64(t.refillRate))
Expand Down
8 changes: 6 additions & 2 deletions pkg/util/token_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,25 @@ func TestTokenBucket_Retrieve(t *testing.T) {
assert.False(t, bucket.Retrieve(10))
time.Sleep(time.Second)
assert.True(t, bucket.Retrieve(10))
assert.Equal(t, int64(0), bucket.RemainingTokens())
}

func TestTokenBucket_ForceRetrieve(t *testing.T) {
bucket := NewTokenBucket(10, 600, nil)

bucket.ForceRetrieve(20)
assert.Equal(t, int64(-10), bucket.RemainingTokens())
assert.False(t, bucket.Retrieve(10))
time.Sleep(time.Second)
assert.True(t, bucket.Retrieve(10))
assert.Equal(t, int64(0), bucket.RemainingTokens())
}

func TestTokenBucket_Refund(t *testing.T) {
bucket := NewTokenBucket(10, 600, nil)

bucket.ForceRetrieve(100)
bucket.Refund(100)
bucket.ForceRetrieve(10)
bucket.Refund(20)
assert.True(t, bucket.Retrieve(10))
assert.Equal(t, int64(0), bucket.RemainingTokens())
}

0 comments on commit 6ae585a

Please sign in to comment.