Skip to content

Commit

Permalink
different method can use different TimeoutConcurrencyConf
Browse files Browse the repository at this point in the history
  • Loading branch information
yanglimingcn committed Feb 13, 2023
1 parent 4eee48c commit b149783
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 6 deletions.
4 changes: 3 additions & 1 deletion docs/cn/timeout_concurrency_limiter.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
在服务正常运营过程中,流量的增减、请求体的大小变化,磁盘的顺序、随机读写,这些都会影响请求的延迟,用户一般情况下不希望请求延迟的波动造成错误,即使会有一些请求的排队造成请求延迟增加,因此,一般用户设置的请求超时时间都会是服务平均延迟的3至4倍。基于请求超时时间的限流是根据统计服务平均延迟和请求设置的超时时间相比较,来估算请求是否能够在设置的超时时间内完成处理,如果能够能完成则接受请求,如果不能完成则拒绝请求。由于统计服务平均延迟和当前请求的实际延迟会有一定的时间差,因此需要设置一个比较宽泛的最大并发度,保证服务不会因为突然的慢请求造成短时间内服务堆积过多的请求。

## 开启方法
目前只有method级别支持基于超时的限流。如果要为某个method开启基于超时的限流,只需要将它的最大并发设置为"timeout"即可,如果客户端没有开启FLAGS_baidu_std_protocol_deliver_timeout_ms,可以设置FLAGS_timeout_cl_default_timeout_ms来调整一个默认的请求超时时间,可以设置FLAGS_timeout_cl_max_concurrency来调整最大并发度。
目前只有method级别支持基于超时的限流。如果要为某个method开启基于超时的限流,只需要将它的最大并发设置为"timeout"即可,如果客户端没有开启FLAGS_baidu_std_protocol_deliver_timeout_ms,可以设置FLAGS_timeout_cl_default_timeout_ms来调整一个默认的请求超时时间,可以设置FLAGS_timeout_cl_max_concurrency来调整最大并发度。也可以通过设置brpc::TimeoutConcurrencyConf为每个method指定不同的配置。

```c++
// Set timeout concurrency limiter for all methods
brpc::ServerOptions options;
options.method_max_concurrency = "timeout";
options.method_max_concurrency = brpc::TimeoutConcurrencyConf{1, 100};

// Set timeout concurrency limiter for specific method
server.MaxConcurrencyOf("example.EchoService.Echo") = "timeout";
server.MaxConcurrencyOf("example.EchoService.Echo") = brpc::TimeoutConcurrencyConf{1, 100};
```
10 changes: 10 additions & 0 deletions src/brpc/adaptive_max_concurrency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ AdaptiveMaxConcurrency::AdaptiveMaxConcurrency(int max_concurrency)
}
}

AdaptiveMaxConcurrency::AdaptiveMaxConcurrency(
const TimeoutConcurrencyConf& value)
: _value("timeout"), _max_concurrency(-1), _timeout_conf(value) {}

inline bool CompareStringPieceWithoutCase(
const butil::StringPiece& s1, const char* s2) {
DCHECK(s2 != NULL);
Expand Down Expand Up @@ -80,6 +84,12 @@ void AdaptiveMaxConcurrency::operator=(int max_concurrency) {
}
}

void AdaptiveMaxConcurrency::operator=(const TimeoutConcurrencyConf& value) {
_value = "timeout";
_max_concurrency = -1;
_timeout_conf = value;
}

const std::string& AdaptiveMaxConcurrency::type() const {
if (_max_concurrency > 0) {
return CONSTANT();
Expand Down
11 changes: 11 additions & 0 deletions src/brpc/adaptive_max_concurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,18 @@

namespace brpc {

// timeout concurrency limiter config
struct TimeoutConcurrencyConf {
int64_t timeout_ms;
int max_concurrency;
};

class AdaptiveMaxConcurrency{
public:
explicit AdaptiveMaxConcurrency();
explicit AdaptiveMaxConcurrency(int max_concurrency);
explicit AdaptiveMaxConcurrency(const butil::StringPiece& value);
explicit AdaptiveMaxConcurrency(const TimeoutConcurrencyConf& value);

// Non-trivial destructor to prevent AdaptiveMaxConcurrency from being
// passed to variadic arguments without explicit type conversion.
Expand All @@ -41,11 +48,13 @@ class AdaptiveMaxConcurrency{

void operator=(int max_concurrency);
void operator=(const butil::StringPiece& value);
void operator=(const TimeoutConcurrencyConf& value);

// 0 for type="unlimited"
// >0 for type="constant"
// <0 for type="user-defined"
operator int() const { return _max_concurrency; }
operator TimeoutConcurrencyConf() const { return _timeout_conf; }

// "unlimited" for type="unlimited"
// "10" "20" "30" for type="constant"
Expand All @@ -62,6 +71,8 @@ class AdaptiveMaxConcurrency{
private:
std::string _value;
int _max_concurrency;
TimeoutConcurrencyConf
_timeout_conf; // TODO std::varient for different type
};

inline std::ostream& operator<<(std::ostream& os, const AdaptiveMaxConcurrency& amc) {
Expand Down
20 changes: 15 additions & 5 deletions src/brpc/policy/timeout_concurrency_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,34 @@ DEFINE_int32(timeout_cl_max_concurrency, 100,

TimeoutConcurrencyLimiter::TimeoutConcurrencyLimiter()
: _avg_latency_us(FLAGS_timeout_cl_initial_avg_latency_us),
_last_sampling_time_us(0) {}
_last_sampling_time_us(0),
_timeout_ms(FLAGS_timeout_cl_default_timeout_ms),
_max_concurrency(FLAGS_timeout_cl_max_concurrency) {}

TimeoutConcurrencyLimiter::TimeoutConcurrencyLimiter(
const TimeoutConcurrencyConf &conf)
: _avg_latency_us(FLAGS_timeout_cl_initial_avg_latency_us),
_last_sampling_time_us(0),
_timeout_ms(conf.timeout_ms),
_max_concurrency(conf.max_concurrency) {}

TimeoutConcurrencyLimiter *TimeoutConcurrencyLimiter::New(
const AdaptiveMaxConcurrency &) const {
return new (std::nothrow) TimeoutConcurrencyLimiter;
const AdaptiveMaxConcurrency &amc) const {
return new (std::nothrow)
TimeoutConcurrencyLimiter(static_cast<TimeoutConcurrencyConf>(amc));
}

bool TimeoutConcurrencyLimiter::OnRequested(int current_concurrency,
Controller *cntl) {
auto timeout_ms = FLAGS_timeout_cl_default_timeout_ms;
auto timeout_ms = _timeout_ms;
if (cntl != nullptr && cntl->timeout_ms() != UNSET_MAGIC_NUM) {
timeout_ms = cntl->timeout_ms();
}
// In extreme cases, the average latency may be greater than requested
// timeout, allow currency_concurrency is 1 ensures the average latency can
// be obtained renew.
return current_concurrency == 1 ||
(current_concurrency <= FLAGS_timeout_cl_max_concurrency &&
(current_concurrency <= _max_concurrency &&
_avg_latency_us < timeout_ms * 1000);
}

Expand Down
3 changes: 3 additions & 0 deletions src/brpc/policy/timeout_concurrency_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace policy {
class TimeoutConcurrencyLimiter : public ConcurrencyLimiter {
public:
TimeoutConcurrencyLimiter();
explicit TimeoutConcurrencyLimiter(const TimeoutConcurrencyConf& conf);

bool OnRequested(int current_concurrency, Controller* cntl) override;

Expand Down Expand Up @@ -66,6 +67,8 @@ class TimeoutConcurrencyLimiter : public ConcurrencyLimiter {
BAIDU_CACHELINE_ALIGNMENT butil::atomic<int64_t> _last_sampling_time_us;
butil::Mutex _sw_mutex;
SampleWindow _sw;
int64_t _timeout_ms;
int _max_concurrency;
};

} // namespace policy
Expand Down
24 changes: 24 additions & 0 deletions test/brpc_timeout_concurrency_limiter_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,27 @@ TEST(TimeoutConcurrencyLimiterTest, OnResponded) {
ASSERT_EQ(limiter._sw.succ_count, 2);
ASSERT_EQ(limiter._sw.failed_count, 0);
}

TEST(TimeoutConcurrencyLimiterTest, AdaptiveMaxConcurrencyTest) {
{
brpc::AdaptiveMaxConcurrency concurrency(
brpc::TimeoutConcurrencyConf{100, 100});
ASSERT_EQ(concurrency.type(), "timeout");
ASSERT_EQ(concurrency.value(), "timeout");
}
{
brpc::AdaptiveMaxConcurrency concurrency;
concurrency = "timeout";
ASSERT_EQ(concurrency.type(), "timeout");
ASSERT_EQ(concurrency.value(), "timeout");
}
{
brpc::AdaptiveMaxConcurrency concurrency;
concurrency = brpc::TimeoutConcurrencyConf{50, 100};
ASSERT_EQ(concurrency.type(), "timeout");
ASSERT_EQ(concurrency.value(), "timeout");
auto time_conf = static_cast<brpc::TimeoutConcurrencyConf>(concurrency);
ASSERT_EQ(time_conf.timeout_ms, 50);
ASSERT_EQ(time_conf.max_concurrency, 100);
}
}

0 comments on commit b149783

Please sign in to comment.