diff --git a/include/dsn/utils/token_bucket_throttling_controller.h b/include/dsn/utils/token_bucket_throttling_controller.h new file mode 100644 index 0000000000..363a4d72b3 --- /dev/null +++ b/include/dsn/utils/token_bucket_throttling_controller.h @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include +#include + +namespace folly { +template +class BasicDynamicTokenBucket; + +} // namespace folly + +namespace dsn { +namespace utils { + +using DynamicTokenBucket = folly::BasicDynamicTokenBucket; + +// token_bucket_throttling_controller ignores `delay` parameter +class token_bucket_throttling_controller +{ +private: + friend class token_bucket_throttling_controller_test; + + std::unique_ptr _token_bucket; + + bool _enabled; + std::string _env_value; + int32_t _partition_count = 0; + double _rate; + double _burstsize; + +public: + token_bucket_throttling_controller(); + + // return ture means you can get token + // return false means the bucket is already empty, but the token is borrowed from future. + // non-blocking + bool get_token(int32_t request_units); + + // reset to no throttling. + void reset(bool &changed, std::string &old_env_value); + + // return the current env value. + const std::string &env_value() const; + + // Configures throttling strategy dynamically from app-envs. + // + // Support two style format: + // 1. style: "20000*delay*100,20000*reject*100" + // example: 20000*delay*100,20000*reject*100 + // result: reject 20000 request_units, but never delay + // example: 20000*delay*100 + // result: never reject or delay + // example: 20000*reject*100 + // result: reject 20000 request_units + // 2. style: 20/"20K"/"20M" + // example: 20K + // result: reject 20000 request_units + // + // return true if parse succeed. + // return false if parse failed for the reason of invalid env_value. + // if return false, the original value will not be changed. + // 'parse_error' is set when return false. + // 'changed' is set when return true. + // 'old_env_value' is set when 'changed' is set to true. + bool parse_from_env(const std::string &env_value, + int32_t partition_count, + std::string &parse_error, + bool &changed, + std::string &old_env_value); + + static bool string_to_value(std::string str, int64_t &value); + + // wrapper of transform_env_string, check if the env string is validated. + static bool validate(const std::string &env, std::string &hint_message); + + static bool transform_env_string(const std::string &env, + int64_t &reject_size_value, + bool &enabled, + std::string &hint_message); +}; + +} // namespace utils +} // namespace dsn diff --git a/src/utils/test/token_bucket_throttling_controller_test.cpp b/src/utils/test/token_bucket_throttling_controller_test.cpp new file mode 100644 index 0000000000..1b20715225 --- /dev/null +++ b/src/utils/test/token_bucket_throttling_controller_test.cpp @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include + +namespace dsn { +namespace utils { + +#define INVALIDATE_SITUATION_CHECK(env) \ + do { \ + std::string old_value, parse_err; \ + bool env_changed_result = false; \ + ASSERT_FALSE(cntl.parse_from_env(env, 4, parse_err, env_changed_result, old_value)); \ + ASSERT_EQ(env_changed_result, false); \ + ASSERT_EQ(parse_err, "wrong format, you can set like 20000 or 20K"); \ + ASSERT_EQ(cntl._enabled, true); \ + ASSERT_EQ(old_value, old_env); \ + ASSERT_EQ(cntl._env_value, old_env); \ + } while (0) + +#define VALIDATE_SITUATION_CHECK( \ + env, partition_count, throttle_size, enabled, env_changed, old_env) \ + do { \ + bool env_changed_result = false; \ + std::string old_value, parse_err; \ + int32_t partitioned_throttle_size = throttle_size / partition_count; \ + ASSERT_TRUE( \ + cntl.parse_from_env(env, partition_count, parse_err, env_changed_result, old_value)); \ + ASSERT_EQ(cntl._env_value, env); \ + ASSERT_EQ(cntl._partition_count, partition_count); \ + ASSERT_EQ(cntl._burstsize, partitioned_throttle_size); \ + ASSERT_EQ(cntl._rate, partitioned_throttle_size); \ + ASSERT_EQ(cntl._enabled, enabled); \ + ASSERT_EQ(env_changed_result, env_changed); \ + ASSERT_EQ(old_value, old_env); \ + ASSERT_EQ(parse_err, ""); \ + } while (0) + +class token_bucket_throttling_controller_test : public ::testing::Test +{ +public: + void test_parse_env_basic_token_bucket_throttling() + { + token_bucket_throttling_controller cntl; + + // token_bucket_throttling_controller doesn't support delay only + VALIDATE_SITUATION_CHECK("20000*delay*100", 4, 0, false, true, ""); + VALIDATE_SITUATION_CHECK("200K", 4, 200000, true, true, "20000*delay*100"); + VALIDATE_SITUATION_CHECK("20000*delay*100,20000*reject*100", 4, 20000, true, true, "200K"); + VALIDATE_SITUATION_CHECK("20K*delay*100,20K*reject*100", + 4, + 20000, + true, + true, + "20000*delay*100,20000*reject*100"); + VALIDATE_SITUATION_CHECK( + "20000*reject*100", 4, 20000, true, true, "20K*delay*100,20K*reject*100"); + + // invalid argument] + std::string old_env = "20000*reject*100"; + INVALIDATE_SITUATION_CHECK("*deldday*100"); + INVALIDATE_SITUATION_CHECK(""); + INVALIDATE_SITUATION_CHECK("*reject"); + INVALIDATE_SITUATION_CHECK("*reject*"); + INVALIDATE_SITUATION_CHECK("reject*"); + INVALIDATE_SITUATION_CHECK("reject"); + INVALIDATE_SITUATION_CHECK("200g"); + INVALIDATE_SITUATION_CHECK("200G"); + INVALIDATE_SITUATION_CHECK("M"); + INVALIDATE_SITUATION_CHECK("K"); + INVALIDATE_SITUATION_CHECK("-1K"); + INVALIDATE_SITUATION_CHECK("1aK"); + INVALIDATE_SITUATION_CHECK("pegNo1"); + INVALIDATE_SITUATION_CHECK("-20"); + INVALIDATE_SITUATION_CHECK("12KM"); + INVALIDATE_SITUATION_CHECK("1K2M"); + INVALIDATE_SITUATION_CHECK("2000K0*reject*100"); + } + + void throttle_test() + { + auto cntl = std::make_unique(); + std::string parse_err; + bool env_changed = false; + std::string old_value; + const int partition_count = 4; + + int throttle_limit = 200000; + cntl->parse_from_env( + std::to_string(throttle_limit), partition_count, parse_err, env_changed, old_value); + + auto token_bucket = std::make_unique(); + int fail_count = 0; + for (int i = 0; i < 100000; i++) { + token_bucket->consumeWithBorrowAndWait( + 1, throttle_limit / partition_count * 0.8, throttle_limit / partition_count * 1.0); + if (!cntl->get_token(1)) { + fail_count++; + } + } + ASSERT_EQ(fail_count, 0); + + sleep(1); + + fail_count = 0; + for (int i = 0; i < 100000; i++) { + token_bucket->consumeWithBorrowAndWait( + 1, throttle_limit / partition_count * 1.2, throttle_limit / partition_count * 1.5); + if (!cntl->get_token(1)) { + fail_count++; + } + } + ASSERT_GT(fail_count, 10000); + + sleep(1); + + fail_count = 0; + int fail_count1 = 0; + for (int i = 0; i < 200000; i++) { + if (i < 100000) { + token_bucket->consumeWithBorrowAndWait(1, + throttle_limit / partition_count * 1.2, + throttle_limit / partition_count * 1.5); + fail_count1 = fail_count; + } else { + token_bucket->consumeWithBorrowAndWait(1, + throttle_limit / partition_count * 0.2, + throttle_limit / partition_count * 0.3); + } + if (!cntl->get_token(1)) { + fail_count++; + } + } + ASSERT_GT(fail_count1, 10000); + ASSERT_LE(fail_count, fail_count1 * 1.2); + } +}; + +TEST_F(token_bucket_throttling_controller_test, test_parse_env_basic_token_bucket_throttling) +{ + test_parse_env_basic_token_bucket_throttling(); +} + +TEST_F(token_bucket_throttling_controller_test, throttle_test) { throttle_test(); } +} // namespace utils +} // namespace dsn diff --git a/src/utils/token_bucket_throttling_controller.cpp b/src/utils/token_bucket_throttling_controller.cpp new file mode 100644 index 0000000000..4d8714ff1b --- /dev/null +++ b/src/utils/token_bucket_throttling_controller.cpp @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +namespace dsn { +namespace utils { + +token_bucket_throttling_controller::token_bucket_throttling_controller() + : _enabled(false), _partition_count(0), _rate(0), _burstsize(0) +{ + _token_bucket = std::make_unique(); +} + +bool token_bucket_throttling_controller::get_token(int32_t request_units = 1) +{ + if (!_enabled) { + return true; + } + auto res = + _token_bucket->consumeWithBorrowNonBlocking((double)request_units, _rate, _burstsize); + + return (res.get_value_or(0) == 0); +} + +void token_bucket_throttling_controller::reset(bool &changed, std::string &old_env_value) +{ + if (_enabled) { + changed = true; + old_env_value = _env_value; + _enabled = false; + _env_value.clear(); + _partition_count = 0; + _rate = 0; + _burstsize = 0; + } else { + changed = false; + } +} + +// return the current env value. +const std::string &token_bucket_throttling_controller::env_value() const { return _env_value; } + +bool token_bucket_throttling_controller::parse_from_env(const std::string &env_value, + int32_t partition_count, + std::string &parse_error, + bool &changed, + std::string &old_env_value) +{ + old_env_value = _env_value; + changed = false; + + if (_enabled && dsn_likely(env_value == _env_value) && + dsn_likely(partition_count == _partition_count)) { + return true; + } + + int64_t reject_size_value; + bool enabled; + if (!transform_env_string(env_value, reject_size_value, enabled, parse_error)) { + return false; + } + + changed = true; + + _enabled = enabled; + _env_value = env_value; + _partition_count = partition_count; + _rate = reject_size_value / std::max(partition_count, 1); + _burstsize = _rate; + return true; +} + +bool token_bucket_throttling_controller::string_to_value(std::string str, int64_t &value) +{ + int64_t unit_multiplier = 1; + if (*str.rbegin() == 'M') { + unit_multiplier = 1000 * 1000; + } else if (*str.rbegin() == 'K') { + unit_multiplier = 1000; + } + if (unit_multiplier != 1) { + str.pop_back(); + } + if (!buf2int64(str, value) || value < 0) { + return false; + } + value *= unit_multiplier; + return true; +} + +bool token_bucket_throttling_controller::validate(const std::string &env, std::string &hint_message) +{ + int64_t temp; + bool temp_bool; + bool validated = transform_env_string(env, temp, temp_bool, hint_message); + return validated; +}; + +bool token_bucket_throttling_controller::transform_env_string(const std::string &env, + int64_t &reject_size_value, + bool &enabled, + std::string &hint_message) +{ + enabled = true; + + if (buf2int64(env, reject_size_value) && reject_size_value >= 0) { + return true; + } + + // format like "200K" + if (string_to_value(env, reject_size_value)) { + return true; + } + + // format like "20000*delay*100" + if (env.find("delay") != -1 && env.find("reject") == -1) { + reject_size_value = 0; + enabled = false; + + dinfo("token_bucket_throttling_controller doesn't support delay method, so throttling " + "controller is disabled now"); + return true; + } + + // format like "20000*delay*100,20000*reject*100" + auto comma_index = env.find(","); + auto star_index = env.find("*reject", comma_index + 1); + if (star_index < 0) { + hint_message = "wrong format, you can set like 20000 or 20K"; + return false; + } + auto reject_size = env.substr(comma_index + 1, star_index - comma_index - 1); + + if (string_to_value(reject_size, reject_size_value)) { + return true; + } + + hint_message = "wrong format, you can set like 20000 or 20K"; + return false; +} + +} // namespace utils +} // namespace dsn