Skip to content

Commit

Permalink
transfer: optimize rate limit's algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
Chilledheart committed Jul 30, 2024
1 parent b84a8d8 commit 812d992
Showing 1 changed file with 104 additions and 27 deletions.
131 changes: 104 additions & 27 deletions src/net/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,46 @@

namespace net {

/* check rate limits within this many recent milliseconds, at minimum. */
#define MIN_RATE_LIMIT_PERIOD 3000

// modified from curl's lib/multi.c
inline int64_t pgrsLimitWaitTime(int64_t cursize, int64_t startsize, int64_t limit, absl::Time start, absl::Time now) {
int64_t size = cursize - startsize;
int64_t minimum;
int64_t actual;

if (!limit || !size)
return 0;

/*
* 'minimum' is the number of milliseconds 'size' should take to download to
* stay below 'limit'.
*/
if (size < INT64_MAX / 1000)
minimum = (int64_t)(int64_t(1000) * size / limit);
else {
minimum = (int64_t)(size / limit);
if (minimum < INT64_MAX / 1000)
minimum *= 1000;
else
minimum = INT64_MAX;
}

/*
* 'actual' is the time in milliseconds it took to actually download the
* last 'size' bytes.
*/
actual = absl::ToInt64Milliseconds(now - start);
if (actual < minimum) {
/* if it downloaded the data faster than the limit, make it wait the
difference */
return (minimum - actual);
}

return 0;
}

/// the class to describe the traffic between given node (endpoint)
class stream : public RefCountedThreadSafe<stream> {
public:
Expand Down Expand Up @@ -171,17 +211,15 @@ class stream : public RefCountedThreadSafe<stream> {
scoped_refptr<stream> self(this);

if (limit_rate_) {
auto delta = absl::ToInt64Seconds(absl::Now() - read_start_);
int64_t clicks = delta + 1;
int64_t estimated_transferred;
if (UNLIKELY(INT64_MAX / (int64_t)limit_rate_ <= clicks)) {
estimated_transferred = INT64_MAX;
} else {
estimated_transferred = limit_rate_ * clicks;
}
int64_t limit = estimated_transferred - rbytes_transferred_;
if (limit <= 0) {
read_delay_timer_.expires_after(std::chrono::milliseconds(-limit * 1000 / limit_rate_ + 1));
auto recv_timeout_ms =
pgrsLimitWaitTime(rbytes_transferred_, dl_limit_size_, limit_rate_, dl_limit_start_, absl::Now());
if (recv_timeout_ms) {
if (!ul_limit_state_ && !dl_limit_state_) {
// entering ratelimit state
ratelimit(absl::Now());
}
dl_limit_state_ = true;
read_delay_timer_.expires_after(std::chrono::milliseconds(recv_timeout_ms));
read_delay_timer_.async_wait([this, self](asio::error_code ec) {
if (UNLIKELY(ec == asio::error::operation_aborted)) {
return;
Expand All @@ -196,6 +234,12 @@ class stream : public RefCountedThreadSafe<stream> {
}
}

if (dl_limit_state_ && !ul_limit_state_) {
// leaving ratelimit state
ratelimit(absl::Now());
}
dl_limit_state_ = false;

if (yield) {
read_yield_timer_.expires_after(std::chrono::microseconds(kReadYieldIntervalUs));
read_yield_timer_.async_wait([this, self](asio::error_code ec) {
Expand Down Expand Up @@ -256,18 +300,16 @@ class stream : public RefCountedThreadSafe<stream> {
}

if (limit_rate_) {
auto delta = absl::ToInt64Seconds(absl::Now() - write_start_);
int64_t clicks = delta + 1;
int64_t estimated_transferred;
if (UNLIKELY(INT64_MAX / (int64_t)limit_rate_ <= clicks)) {
estimated_transferred = INT64_MAX;
} else {
estimated_transferred = limit_rate_ * clicks;
}
int64_t limit = estimated_transferred - wbytes_transferred_;
if (limit <= 0) {
auto send_timeout_ms =
pgrsLimitWaitTime(wbytes_transferred_, ul_limit_size_, limit_rate_, ul_limit_start_, absl::Now());
if (send_timeout_ms) {
if (!ul_limit_state_ && !dl_limit_state_) {
// entering ratelimit state
ratelimit(absl::Now());
}
ul_limit_state_ = true;
scoped_refptr<stream> self(this);
write_delay_timer_.expires_after(std::chrono::milliseconds(-limit * 1000 / limit_rate_ + 1));
write_delay_timer_.expires_after(std::chrono::milliseconds(send_timeout_ms));
wait_write_callback_ = std::move(callback);
DCHECK(!wait_write_callback_);
write_delay_timer_.async_wait([this, self](asio::error_code ec) {
Expand All @@ -284,6 +326,11 @@ class stream : public RefCountedThreadSafe<stream> {
write_inprogress_ = true;
wait_write_callback_ = std::move(callback);
scoped_refptr<stream> self(this);
if (ul_limit_state_ && !dl_limit_state_) {
// leaving ratelimit state
ratelimit(absl::Now());
}
ul_limit_state_ = false;
s_wait_write([this, self](asio::error_code ec) {
// Cancelled, safe to ignore
if (UNLIKELY(ec == asio::error::bad_descriptor || ec == asio::error::operation_aborted)) {
Expand Down Expand Up @@ -409,7 +456,12 @@ class stream : public RefCountedThreadSafe<stream> {
SetTCPCongestion(socket_.native_handle(), ec);
SetTCPKeepAlive(socket_.native_handle(), ec);
SetSocketTcpNoDelay(&socket_, ec);
write_start_ = read_start_ = absl::Now();

auto start = absl::Now();
ul_limit_size_ = dl_limit_size_ = 0;
ul_limit_start_ = dl_limit_start_ = start;
ul_limit_state_ = dl_limit_state_ = false;
ratelimit(start);
on_async_connect_callback(asio::error_code());
}

Expand Down Expand Up @@ -475,6 +527,26 @@ class stream : public RefCountedThreadSafe<stream> {
size_t rbytes_transferred() const { return rbytes_transferred_; }
size_t wbytes_transferred() const { return wbytes_transferred_; }

private:
/*
* Update the timestamp and sizestamp to use for rate limit calculations.
*/
void ratelimit(absl::Time now) {
/* do not set a new stamp unless the time since last update is long enough */
if (limit_rate_) {
if (absl::ToInt64Milliseconds(now - dl_limit_start_) >= MIN_RATE_LIMIT_PERIOD) {
dl_limit_start_ = now;
dl_limit_size_ = rbytes_transferred_;
}
}
if (limit_rate_) {
if (absl::ToInt64Milliseconds(now - ul_limit_start_) >= MIN_RATE_LIMIT_PERIOD) {
ul_limit_start_ = now;
ul_limit_size_ = wbytes_transferred_;
}
}
}

private:
/// used to resolve local and remote endpoint
net::Resolver resolver_;
Expand Down Expand Up @@ -502,20 +574,25 @@ class stream : public RefCountedThreadSafe<stream> {
handle_t wait_write_callback_;

// statistics
size_t rbytes_transferred_ = 0;
size_t wbytes_transferred_ = 0;
int64_t rbytes_transferred_ = 0;
int64_t wbytes_transferred_ = 0;

// post yield
asio::steady_timer read_yield_timer_;
static constexpr const uint64_t kReadYieldIntervalUs = 10;

// rate limiter
const uint64_t limit_rate_;
absl::Time read_start_;

asio::steady_timer read_delay_timer_;
absl::Time dl_limit_start_;
int64_t dl_limit_size_;
bool dl_limit_state_;

absl::Time write_start_;
asio::steady_timer write_delay_timer_;
absl::Time ul_limit_start_;
int64_t ul_limit_size_;
bool ul_limit_state_;
};

} // namespace net
Expand Down

0 comments on commit 812d992

Please sign in to comment.