diff --git a/src/net/stream.hpp b/src/net/stream.hpp index 622e25327..f66077fe7 100644 --- a/src/net/stream.hpp +++ b/src/net/stream.hpp @@ -29,6 +29,46 @@ namespace net { +/* check rate limits within this many recent milliseconds, at minimum. */ +#define MIN_RATE_LIMIT_PERIOD 3000 + +// modified from curl's lib/multi.c +inline int64_t pgrsLimitWaitTime(int64_t cursize, int64_t startsize, int64_t limit, absl::Time start, absl::Time now) { + int64_t size = cursize - startsize; + int64_t minimum; + int64_t actual; + + if (!limit || !size) + return 0; + + /* + * 'minimum' is the number of milliseconds 'size' should take to download to + * stay below 'limit'. + */ + if (size < INT64_MAX / 1000) + minimum = (int64_t)(int64_t(1000) * size / limit); + else { + minimum = (int64_t)(size / limit); + if (minimum < INT64_MAX / 1000) + minimum *= 1000; + else + minimum = INT64_MAX; + } + + /* + * 'actual' is the time in milliseconds it took to actually download the + * last 'size' bytes. + */ + actual = absl::ToInt64Milliseconds(now - start); + if (actual < minimum) { + /* if it downloaded the data faster than the limit, make it wait the + difference */ + return (minimum - actual); + } + + return 0; +} + /// the class to describe the traffic between given node (endpoint) class stream : public RefCountedThreadSafe { public: @@ -171,17 +211,15 @@ class stream : public RefCountedThreadSafe { scoped_refptr self(this); if (limit_rate_) { - auto delta = absl::ToInt64Seconds(absl::Now() - read_start_); - int64_t clicks = delta + 1; - int64_t estimated_transferred; - if (UNLIKELY(INT64_MAX / (int64_t)limit_rate_ <= clicks)) { - estimated_transferred = INT64_MAX; - } else { - estimated_transferred = limit_rate_ * clicks; - } - int64_t limit = estimated_transferred - rbytes_transferred_; - if (limit <= 0) { - read_delay_timer_.expires_after(std::chrono::milliseconds(-limit * 1000 / limit_rate_ + 1)); + auto recv_timeout_ms = + pgrsLimitWaitTime(rbytes_transferred_, dl_limit_size_, limit_rate_, dl_limit_start_, absl::Now()); + if (recv_timeout_ms) { + if (!ul_limit_state_ && !dl_limit_state_) { + // entering ratelimit state + ratelimit(absl::Now()); + } + dl_limit_state_ = true; + read_delay_timer_.expires_after(std::chrono::milliseconds(recv_timeout_ms)); read_delay_timer_.async_wait([this, self](asio::error_code ec) { if (UNLIKELY(ec == asio::error::operation_aborted)) { return; @@ -196,6 +234,12 @@ class stream : public RefCountedThreadSafe { } } + if (dl_limit_state_ && !ul_limit_state_) { + // leaving ratelimit state + ratelimit(absl::Now()); + } + dl_limit_state_ = false; + if (yield) { read_yield_timer_.expires_after(std::chrono::microseconds(kReadYieldIntervalUs)); read_yield_timer_.async_wait([this, self](asio::error_code ec) { @@ -256,18 +300,16 @@ class stream : public RefCountedThreadSafe { } if (limit_rate_) { - auto delta = absl::ToInt64Seconds(absl::Now() - write_start_); - int64_t clicks = delta + 1; - int64_t estimated_transferred; - if (UNLIKELY(INT64_MAX / (int64_t)limit_rate_ <= clicks)) { - estimated_transferred = INT64_MAX; - } else { - estimated_transferred = limit_rate_ * clicks; - } - int64_t limit = estimated_transferred - wbytes_transferred_; - if (limit <= 0) { + auto send_timeout_ms = + pgrsLimitWaitTime(wbytes_transferred_, ul_limit_size_, limit_rate_, ul_limit_start_, absl::Now()); + if (send_timeout_ms) { + if (!ul_limit_state_ && !dl_limit_state_) { + // entering ratelimit state + ratelimit(absl::Now()); + } + ul_limit_state_ = true; scoped_refptr self(this); - write_delay_timer_.expires_after(std::chrono::milliseconds(-limit * 1000 / limit_rate_ + 1)); + write_delay_timer_.expires_after(std::chrono::milliseconds(send_timeout_ms)); wait_write_callback_ = std::move(callback); DCHECK(!wait_write_callback_); write_delay_timer_.async_wait([this, self](asio::error_code ec) { @@ -284,6 +326,11 @@ class stream : public RefCountedThreadSafe { write_inprogress_ = true; wait_write_callback_ = std::move(callback); scoped_refptr self(this); + if (ul_limit_state_ && !dl_limit_state_) { + // leaving ratelimit state + ratelimit(absl::Now()); + } + ul_limit_state_ = false; s_wait_write([this, self](asio::error_code ec) { // Cancelled, safe to ignore if (UNLIKELY(ec == asio::error::bad_descriptor || ec == asio::error::operation_aborted)) { @@ -409,7 +456,12 @@ class stream : public RefCountedThreadSafe { SetTCPCongestion(socket_.native_handle(), ec); SetTCPKeepAlive(socket_.native_handle(), ec); SetSocketTcpNoDelay(&socket_, ec); - write_start_ = read_start_ = absl::Now(); + + auto start = absl::Now(); + ul_limit_size_ = dl_limit_size_ = 0; + ul_limit_start_ = dl_limit_start_ = start; + ul_limit_state_ = dl_limit_state_ = false; + ratelimit(start); on_async_connect_callback(asio::error_code()); } @@ -475,6 +527,26 @@ class stream : public RefCountedThreadSafe { size_t rbytes_transferred() const { return rbytes_transferred_; } size_t wbytes_transferred() const { return wbytes_transferred_; } + private: + /* + * Update the timestamp and sizestamp to use for rate limit calculations. + */ + void ratelimit(absl::Time now) { + /* do not set a new stamp unless the time since last update is long enough */ + if (limit_rate_) { + if (absl::ToInt64Milliseconds(now - dl_limit_start_) >= MIN_RATE_LIMIT_PERIOD) { + dl_limit_start_ = now; + dl_limit_size_ = rbytes_transferred_; + } + } + if (limit_rate_) { + if (absl::ToInt64Milliseconds(now - ul_limit_start_) >= MIN_RATE_LIMIT_PERIOD) { + ul_limit_start_ = now; + ul_limit_size_ = wbytes_transferred_; + } + } + } + private: /// used to resolve local and remote endpoint net::Resolver resolver_; @@ -502,8 +574,8 @@ class stream : public RefCountedThreadSafe { handle_t wait_write_callback_; // statistics - size_t rbytes_transferred_ = 0; - size_t wbytes_transferred_ = 0; + int64_t rbytes_transferred_ = 0; + int64_t wbytes_transferred_ = 0; // post yield asio::steady_timer read_yield_timer_; @@ -511,11 +583,16 @@ class stream : public RefCountedThreadSafe { // rate limiter const uint64_t limit_rate_; - absl::Time read_start_; + asio::steady_timer read_delay_timer_; + absl::Time dl_limit_start_; + int64_t dl_limit_size_; + bool dl_limit_state_; - absl::Time write_start_; asio::steady_timer write_delay_timer_; + absl::Time ul_limit_start_; + int64_t ul_limit_size_; + bool ul_limit_state_; }; } // namespace net