Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat: add a token_bucket_throttling_controller #941

Merged
merged 15 commits into from
Oct 27, 2021
99 changes: 99 additions & 0 deletions include/dsn/utils/token_bucket_throttling_controller.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once

#include <chrono>
#include <memory>

namespace folly {
template <typename Clock>
class BasicDynamicTokenBucket;

} // namespace folly

namespace dsn {
namespace utils {

using DynamicTokenBucket = folly::BasicDynamicTokenBucket<std::chrono::steady_clock>;

// token_bucket_throttling_controller ignores `delay` parameter
class token_bucket_throttling_controller
Smityz marked this conversation as resolved.
Show resolved Hide resolved
{
private:
friend class token_bucket_throttling_controller_test;

std::unique_ptr<DynamicTokenBucket> _token_bucket;

bool _enabled;
std::string _env_value;
int32_t _partition_count = 0;
Smityz marked this conversation as resolved.
Show resolved Hide resolved
double _rate;
double _burstsize;

public:
token_bucket_throttling_controller();

// return ture means you can get token
// return false means the bucket is already empty, but the token is borrowed from future.
// non-blocking
bool get_token(int32_t request_units);

// reset to no throttling.
void reset(bool &changed, std::string &old_env_value);

// return the current env value.
const std::string &env_value() const;

// Configures throttling strategy dynamically from app-envs.
//
// Support two style format:
// 1. style: "20000*delay*100,20000*reject*100"
// example: 20000*delay*100,20000*reject*100
// result: reject 20000 request_units, but never delay
// example: 20000*delay*100
// result: never reject or delay
// example: 20000*reject*100
// result: reject 20000 request_units
// 2. style: 20/"20K"/"20M"
// example: 20K
// result: reject 20000 request_units
//
// return true if parse succeed.
// return false if parse failed for the reason of invalid env_value.
// if return false, the original value will not be changed.
// 'parse_error' is set when return false.
// 'changed' is set when return true.
// 'old_env_value' is set when 'changed' is set to true.
bool parse_from_env(const std::string &env_value,
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
int32_t partition_count,
std::string &parse_error,
bool &changed,
std::string &old_env_value);

static bool string_to_value(std::string str, int64_t &value);

// wrapper of transform_env_string, check if the env string is validated.
static bool validate(const std::string &env, std::string &hint_message);

static bool transform_env_string(const std::string &env,
int64_t &reject_size_value,
bool &enabled,
std::string &hint_message);
};

} // namespace utils
} // namespace dsn
163 changes: 163 additions & 0 deletions src/utils/test/token_bucket_throttling_controller_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <dsn/utils/token_bucket_throttling_controller.h>
#include <dsn/utility/TokenBucket.h>

#include <gtest/gtest.h>

namespace dsn {
namespace utils {

#define INVALIDATE_SITUATION_CHECK(env) \
do { \
std::string old_value, parse_err; \
bool env_changed_result = false; \
ASSERT_FALSE(cntl.parse_from_env(env, 4, parse_err, env_changed_result, old_value)); \
ASSERT_EQ(env_changed_result, false); \
ASSERT_EQ(parse_err, "wrong format, you can set like 20000 or 20K"); \
ASSERT_EQ(cntl._enabled, true); \
ASSERT_EQ(old_value, old_env); \
ASSERT_EQ(cntl._env_value, old_env); \
} while (0)

#define VALIDATE_SITUATION_CHECK( \
env, partition_count, throttle_size, enabled, env_changed, old_env) \
do { \
bool env_changed_result = false; \
std::string old_value, parse_err; \
int32_t partitioned_throttle_size = throttle_size / partition_count; \
ASSERT_TRUE( \
cntl.parse_from_env(env, partition_count, parse_err, env_changed_result, old_value)); \
ASSERT_EQ(cntl._env_value, env); \
ASSERT_EQ(cntl._partition_count, partition_count); \
ASSERT_EQ(cntl._burstsize, partitioned_throttle_size); \
ASSERT_EQ(cntl._rate, partitioned_throttle_size); \
ASSERT_EQ(cntl._enabled, enabled); \
ASSERT_EQ(env_changed_result, env_changed); \
ASSERT_EQ(old_value, old_env); \
ASSERT_EQ(parse_err, ""); \
} while (0)

class token_bucket_throttling_controller_test : public ::testing::Test
{
public:
void test_parse_env_basic_token_bucket_throttling()
{
token_bucket_throttling_controller cntl;

// token_bucket_throttling_controller doesn't support delay only
VALIDATE_SITUATION_CHECK("20000*delay*100", 4, 0, false, true, "");
VALIDATE_SITUATION_CHECK("200K", 4, 200000, true, true, "20000*delay*100");
VALIDATE_SITUATION_CHECK("20000*delay*100,20000*reject*100", 4, 20000, true, true, "200K");
VALIDATE_SITUATION_CHECK("20K*delay*100,20K*reject*100",
4,
20000,
true,
true,
"20000*delay*100,20000*reject*100");
VALIDATE_SITUATION_CHECK(
"20000*reject*100", 4, 20000, true, true, "20K*delay*100,20K*reject*100");

// invalid argument]
std::string old_env = "20000*reject*100";
INVALIDATE_SITUATION_CHECK("*deldday*100");
INVALIDATE_SITUATION_CHECK("");
INVALIDATE_SITUATION_CHECK("*reject");
INVALIDATE_SITUATION_CHECK("*reject*");
INVALIDATE_SITUATION_CHECK("reject*");
INVALIDATE_SITUATION_CHECK("reject");
INVALIDATE_SITUATION_CHECK("200g");
INVALIDATE_SITUATION_CHECK("200G");
INVALIDATE_SITUATION_CHECK("M");
INVALIDATE_SITUATION_CHECK("K");
INVALIDATE_SITUATION_CHECK("-1K");
INVALIDATE_SITUATION_CHECK("1aK");
INVALIDATE_SITUATION_CHECK("pegNo1");
INVALIDATE_SITUATION_CHECK("-20");
INVALIDATE_SITUATION_CHECK("12KM");
INVALIDATE_SITUATION_CHECK("1K2M");
INVALIDATE_SITUATION_CHECK("2000K0*reject*100");
}

void throttle_test()
{
auto cntl = std::make_unique<token_bucket_throttling_controller>();
std::string parse_err;
bool env_changed = false;
std::string old_value;
const int partition_count = 4;

int throttle_limit = 200000;
cntl->parse_from_env(
std::to_string(throttle_limit), partition_count, parse_err, env_changed, old_value);

auto token_bucket = std::make_unique<DynamicTokenBucket>();
int fail_count = 0;
for (int i = 0; i < 100000; i++) {
token_bucket->consumeWithBorrowAndWait(
1, throttle_limit / partition_count * 0.8, throttle_limit / partition_count * 1.0);
if (!cntl->get_token(1)) {
fail_count++;
}
}
ASSERT_EQ(fail_count, 0);

sleep(1);

fail_count = 0;
for (int i = 0; i < 100000; i++) {
token_bucket->consumeWithBorrowAndWait(
1, throttle_limit / partition_count * 1.2, throttle_limit / partition_count * 1.5);
if (!cntl->get_token(1)) {
fail_count++;
}
}
ASSERT_GT(fail_count, 10000);

sleep(1);

fail_count = 0;
int fail_count1 = 0;
for (int i = 0; i < 200000; i++) {
if (i < 100000) {
token_bucket->consumeWithBorrowAndWait(1,
throttle_limit / partition_count * 1.2,
throttle_limit / partition_count * 1.5);
fail_count1 = fail_count;
} else {
token_bucket->consumeWithBorrowAndWait(1,
throttle_limit / partition_count * 0.2,
throttle_limit / partition_count * 0.3);
}
if (!cntl->get_token(1)) {
fail_count++;
}
}
ASSERT_GT(fail_count1, 10000);
ASSERT_LE(fail_count, fail_count1 * 1.2);
}
};

TEST_F(token_bucket_throttling_controller_test, test_parse_env_basic_token_bucket_throttling)
{
test_parse_env_basic_token_bucket_throttling();
}

TEST_F(token_bucket_throttling_controller_test, throttle_test) { throttle_test(); }
} // namespace utils
} // namespace dsn
Loading