Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refact: update rate limit report #11

Merged
merged 2 commits into from
Oct 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 32 additions & 10 deletions polaris/quota/quota_bucket_qps.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace polaris {

TokenBucket::TokenBucket()
: global_max_amount_(0), local_max_amount_(0), bucket_time_(0), bucket_stat_(0),
pending_bucket_time_(0), pending_bucket_stat_(0) {}
pending_bucket_time_(0), pending_bucket_stat_(0), last_use_up_time_(0) {}

TokenBucket::TokenBucket(const TokenBucket& other) {
global_max_amount_ = other.global_max_amount_.Load();
Expand All @@ -40,6 +40,7 @@ TokenBucket::TokenBucket(const TokenBucket& other) {
bucket_stat_ = other.bucket_stat_.Load();
pending_bucket_time_ = other.pending_bucket_time_;
pending_bucket_stat_ = other.pending_bucket_stat_;
last_use_up_time_ = other.last_use_up_time_;
}

void TokenBucket::Init(const RateLimitAmount& amount, uint64_t current_time,
Expand Down Expand Up @@ -95,7 +96,7 @@ void TokenBucket::ReturnToken(int64_t acquire_amount, bool use_remote_quota) {

uint64_t TokenBucket::RefreshToken(int64_t remote_left, int64_t ack_quota,
uint64_t current_bucket_time, bool remote_quota_expired,
uint64_t current_time) {
uint64_t time_in_bucket) {
int64_t last_token_remote_total = remote_quota_.remote_token_total_;
remote_quota_.remote_token_total_ = remote_left;
uint64_t next_report_time = Time::kMaxTime;
Expand Down Expand Up @@ -126,12 +127,23 @@ uint64_t TokenBucket::RefreshToken(int64_t remote_left, int64_t ack_quota,
if (remote_left > 0) {
int64_t remote_used = global_max_amount_ - new_remote_token_left;
if (remote_used > 0 && new_remote_token_left > 0) {
int64_t left_time = new_remote_token_left * current_time / remote_used;
if (left_time < 80) {
next_report_time = left_time / 2 + 1;
// 按当前周期已用配额计算剩余配额还需要多少时间用完
next_report_time = new_remote_token_left * time_in_bucket / remote_used / 2 + 1;
POLARIS_LOG(LOG_TRACE, "next report time by qps:%" PRIu64 "", next_report_time);
}
if (last_use_up_time_ < time_in_bucket) {
last_use_up_time_ = time_in_bucket; // 当前周期配额消耗比上周期慢
} else {
// 按上个周期的QPS计算,当前周期时间到配额用完的一半作为下次上报间隔
uint64_t use_up_report_time = (last_use_up_time_ - time_in_bucket) / 2 + 1;
if (use_up_report_time < next_report_time) {
next_report_time = use_up_report_time;
POLARIS_LOG(LOG_TRACE, "next report time last use up:%" PRIu64 "", next_report_time);
}
POLARIS_LOG(LOG_TRACE, "left time: %" PRId64 " report time:%" PRIu64 "", left_time,
next_report_time);
}
} else {
if (last_use_up_time_ > time_in_bucket) {
last_use_up_time_ = time_in_bucket; // 当前周期配额消耗比上周期快
}
}
}
Expand Down Expand Up @@ -265,12 +277,22 @@ uint64_t RemoteAwareQpsBucket::SetRemoteQuota(const RemoteQuotaResult& remote_qu
remote_quota_result.local_usage_->create_server_time_ / it->first == current_bucket_time) {
local_used = local_usage->quota_usage_[bucket_it->first].quota_allocated_;
}
uint64_t report_time = bucket.RefreshToken(remote_quota, local_used, current_bucket_time,
current_time >= last_remote_sync_time_ + it->first,
current_time % it->first);
uint64_t time_in_bucket = current_time % it->first;
uint64_t report_time =
bucket.RefreshToken(remote_quota, local_used, current_bucket_time,
current_time >= last_remote_sync_time_ + it->first, time_in_bucket);
if (report_time < next_report_time) {
next_report_time = report_time;
}

if (remote_quota < 0) {
// 配额已用完,如果下一次同步配额已经到了下一个周期,在配额用完之前至少同步一次
uint64_t next_time_befor_use_up = it->first - time_in_bucket + bucket.LastUseUpTime() / 2;
if (next_time_befor_use_up < next_report_time) {
next_report_time = next_time_befor_use_up;
POLARIS_LOG(LOG_TRACE, "next bucket report time:%" PRIu64 "", next_report_time);
}
}
}
last_remote_sync_time_.Store(current_time);
return next_report_time;
Expand Down
3 changes: 3 additions & 0 deletions polaris/quota/quota_bucket_qps.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class TokenBucket {
// 更新配额限额
void UpdateLimitAmount(const RateLimitAmount& limit_amount, int64_t local_max_amount);

uint64_t LastUseUpTime() const { return last_use_up_time_; }

private:
sync::Atomic<int64_t> global_max_amount_; // 规则中配置的周期分配配额总量
int64_t local_max_amount_; // 离线分配时配额数,local模式等于global_max_amount_
Expand All @@ -77,6 +79,7 @@ class TokenBucket {
uint64_t pending_bucket_time_; // 正在上报未应答的配额bucket时间
int64_t pending_bucket_stat_; // 正在上报未应答的配额bucket计数
RemoteQuotaInfo remote_quota_; // 远程配额信息
uint64_t last_use_up_time_; // 上个周期使用完配额的时间
};

// 记录本地配额使用信息和服务器同步的配额信息
Expand Down
4 changes: 2 additions & 2 deletions test/quota/quota_bucket_qps_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ TEST_F(TokenBucketTest, RefreshTokenWithLeft) {
uint64_t expect_bucket_time = current_time / 1000;
// 还剩10次,不需要加快上报
uint64_t report_time = token_bucket_.RefreshToken(10, 0, expect_bucket_time, false, 0);
ASSERT_EQ(report_time, Time::kMaxTime);
ASSERT_EQ(report_time, 1);
for (int i = 0; i < 20; ++i) {
int64_t left_quota;
bool result = token_bucket_.GetToken(acquire_amount_, expect_bucket_time, true, left_quota);
Expand All @@ -122,7 +122,7 @@ TEST_F(TokenBucketTest, RefreshTokenWithLeft) {
// 远端还剩6次,本地已上报2次,本地共使用3次,还剩5次
// 80ms消耗5次,剩余需要80ms,还无需加快上报
report_time = token_bucket_.RefreshToken(6, 2, expect_bucket_time, false, 80);
ASSERT_EQ(report_time, Time::kMaxTime);
ASSERT_EQ(report_time, 80 / 2 + 1);
}
if (i == 3) {
// 远端还剩4次,本地又上报1次,共使用4次,100ms消耗7次,需要42ms,需加快上报
Expand Down