diff --git a/pkg/mcs/resourcemanager/server/token_buckets.go b/pkg/mcs/resourcemanager/server/token_buckets.go index f0b733a14ca..9c4fd86eaee 100644 --- a/pkg/mcs/resourcemanager/server/token_buckets.go +++ b/pkg/mcs/resourcemanager/server/token_buckets.go @@ -409,6 +409,11 @@ func (ts *TokenSlot) assignSlotTokens(requiredToken float64, targetPeriodMs uint // | // grant_rate 0 ------------------------------------------------------------------------------------ // loan *** k*period_token (k+k-1)*period_token *** (k+k+1...+1)*period_token + + // loadCoefficient is relative to the capacity of load RUs. + // It's like a buffer to slow down the client consumption. the buffer capacity is `(1 + 2 ... +loanCoefficient) * fillRate * targetPeriodTimeSec`. + // Details see test case `TestGroupTokenBucketRequestLoop`. + p := make([]float64, loanCoefficient) p[0] = float64(loanCoefficient) * float64(fillRate) * targetPeriodTimeSec for i := 1; i < loanCoefficient; i++ { diff --git a/pkg/mcs/resourcemanager/server/token_buckets_test.go b/pkg/mcs/resourcemanager/server/token_buckets_test.go index b04a535fc74..4824bebdf03 100644 --- a/pkg/mcs/resourcemanager/server/token_buckets_test.go +++ b/pkg/mcs/resourcemanager/server/token_buckets_test.go @@ -15,6 +15,7 @@ package server import ( + "fmt" "math" "testing" "time" @@ -123,3 +124,65 @@ func TestGroupTokenBucketRequest(t *testing.T) { re.LessOrEqual(math.Abs(tb.Tokens-20000), 1e-7) re.Equal(int64(time.Second)*10/int64(time.Millisecond), trickle) } + +func TestGroupTokenBucketRequestLoop(t *testing.T) { + re := require.New(t) + tbSetting := &rmpb.TokenBucket{ + Tokens: 50000, + Settings: &rmpb.TokenLimitSettings{ + FillRate: 2000, + BurstLimit: 200000, + }, + } + + gtb := NewGroupTokenBucket(tbSetting) + clientUniqueID := uint64(0) + initialTime := time.Now() + + // Initialize the token bucket + gtb.init(initialTime, clientUniqueID) + gtb.Tokens = 50000 + + const timeIncrement = 5 * time.Second + const defaultTrickleDuration = int64(time.Second) * 5 / int64(time.Millisecond) + + // Define the test cases in a table + testCases := []struct { + requestTokens float64 + assignedTokens float64 + currentGlobalBucketTokens float64 + expectedTrickle int64 + }{ + /* requestTokens, assignedTokens, currentGlobalBucketTokens, trickleTime */ + {50000, 50000, 0, 0}, + {50000, 30000, -20000, defaultTrickleDuration}, + {30000, 15000, -25000, defaultTrickleDuration}, + {15000, 12500, -27500, defaultTrickleDuration}, + {12500, 11250, -28750, defaultTrickleDuration}, + {11250, 10625, -29375, defaultTrickleDuration}, + // PR_PER_SEC is close to 2000, RU_PER_SEC = assignedTokens / TrickleTime. + {10625, 10312.5, -29687.5, defaultTrickleDuration}, + {10312.5, 10156.25, -29843.75, defaultTrickleDuration}, + {10156.25, 10078.125, -29921.875, defaultTrickleDuration}, + {10078.125, 10039.0625, -29960.9375, defaultTrickleDuration}, + {10039.0625, 10019.53125, -29980.46875, defaultTrickleDuration}, + {10019.53125, 10009.765625, -29990.234375, defaultTrickleDuration}, + {10009.765625, 10004.8828125, -29995.1171875, defaultTrickleDuration}, + {10004.8828125, 10002.44140625, -29997.55859375, defaultTrickleDuration}, + {10002.44140625, 10001.220703125, -29998.779296875, defaultTrickleDuration}, + {10001.220703125, 10000.6103515625, -29999.3896484375, defaultTrickleDuration}, + {10000.6103515625, 10000.30517578125, -29999.69482421875, defaultTrickleDuration}, + {10000.30517578125, 10000.152587890625, -29999.847412109375, defaultTrickleDuration}, + {10000.152587890625, 10000.0762939453125, -29999.9237060546875, defaultTrickleDuration}, + {10000.0762939453125, 10000.038146972656, -29999.961853027343, defaultTrickleDuration}, + } + + currentTime := initialTime + for i, tc := range testCases { + tb, trickle := gtb.request(currentTime, tc.requestTokens, uint64(time.Second)*5/uint64(time.Millisecond), clientUniqueID) + re.Equal(tc.currentGlobalBucketTokens, gtb.GetTokenBucket().Tokens, fmt.Sprintf("Test case %d failed: expected bucket tokens %f, got %f", i, tc.currentGlobalBucketTokens, gtb.GetTokenBucket().Tokens)) + re.LessOrEqual(math.Abs(tb.Tokens-tc.assignedTokens), 1e-7, fmt.Sprintf("Test case %d failed: expected tokens %f, got %f", i, tc.assignedTokens, tb.Tokens)) + re.Equal(tc.expectedTrickle, trickle, fmt.Sprintf("Test case %d failed: expected trickle %d, got %d", i, tc.expectedTrickle, trickle)) + currentTime = currentTime.Add(timeIncrement) + } +}