Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: split read/write hotspot of hotspot_partition_calculator #592

Merged
merged 18 commits into from
Sep 17, 2020
Merged
108 changes: 63 additions & 45 deletions src/server/hotspot_partition_calculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>
#include <math.h>
#include <dsn/dist/fmt_logging.h>
#include <rrdb/rrdb_types.h>
#include <dsn/utility/flags.h>

namespace pegasus {
Expand All @@ -34,67 +35,84 @@ DSN_DEFINE_int64("pegasus.collector",
"eliminate outdated historical "
"data");

void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> &partitions)
void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> &partition_stats)
{
while (_partition_stat_histories.size() > FLAGS_max_hotspot_store_size - 1) {
Smityz marked this conversation as resolved.
Show resolved Hide resolved
_partition_stat_histories.pop();
_partition_stat_histories.pop_front();
}
std::vector<hotspot_partition_data> temp(partitions.size());
// TODO refactor the data structure
for (int i = 0; i < partitions.size(); i++) {
temp[i] = std::move(hotspot_partition_data(partitions[i]));
std::vector<hotspot_partition_data> temp;
for (const auto &partition_stat : partition_stats) {
temp.emplace_back(hotspot_partition_data(partition_stat));
}
_partition_stat_histories.emplace(temp);
_partition_stat_histories.emplace_back(temp);
}

void hotspot_partition_calculator::init_perf_counter(int partition_count)
{
std::string counter_name;
std::string counter_desc;
std::string read_counter_name, write_counter_name;
std::string read_counter_desc, write_counter_desc;
for (int i = 0; i < partition_count; i++) {
string partition_desc = _app_name + '.' + std::to_string(i);
counter_name = fmt::format("app.stat.hotspots@{}", partition_desc);
counter_desc = fmt::format("statistic the hotspots of app {}", partition_desc);
_hot_points[i].init_app_counter(
"app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, counter_desc.c_str());
string read_partition_desc = _app_name + '.' + "read." + std::to_string(i);
Smityz marked this conversation as resolved.
Show resolved Hide resolved
read_counter_name = fmt::format("app.stat.hotspots@{}", read_partition_desc);
read_counter_desc = fmt::format("statistic the hotspots of app {}", read_partition_desc);
_hot_points[i].emplace_back(std::make_unique<dsn::perf_counter_wrapper>());
_hot_points[i][READ_HOTSPOT_DATA]->init_app_counter("app.pegasus",
read_counter_name.c_str(),
COUNTER_TYPE_NUMBER,
read_counter_desc.c_str());
string write_partition_desc = _app_name + '.' + "write." + std::to_string(i);
write_counter_name = fmt::format("app.stat.hotspots@{}", write_partition_desc);
write_counter_desc = fmt::format("statistic the hotspots of app {}", write_partition_desc);
_hot_points[i].emplace_back(std::make_unique<dsn::perf_counter_wrapper>());
_hot_points[i][WRITE_HOTSPOT_DATA]->init_app_counter("app.pegasus",
write_counter_name.c_str(),
COUNTER_TYPE_NUMBER,
write_counter_desc.c_str());
}
}

void hotspot_partition_calculator::data_analyse()
{
dassert(_partition_stat_histories.back().size() == _hot_points.size(),
"partition counts error, please check");
std::vector<double> data_samples;
data_samples.reserve(_partition_stat_histories.size() * _hot_points.size());
auto temp_data = _partition_stat_histories;
double table_qps_sum = 0, standard_deviation = 0, table_qps_avg = 0;
int sample_count = 0;
while (!temp_data.empty()) {
for (const auto &partition_data : temp_data.front()) {
if (partition_data.total_qps - 1.00 > 0) {
data_samples.push_back(partition_data.total_qps);
table_qps_sum += partition_data.total_qps;
sample_count++;
dcheck_eq(_partition_stat_histories.back().size(), _hot_points.size());
for (int data_type = 0; data_type <= 1; data_type++) {
// 0: READ_HOTSPOT_DATA; 1: WRITE_HOTSPOT_DATA
Smityz marked this conversation as resolved.
Show resolved Hide resolved
double table_qps_sum = 0, standard_deviation = 0, table_qps_avg = 0;
int sample_count = 0;
for (const auto &partition_datas : _partition_stat_histories) {
Smityz marked this conversation as resolved.
Show resolved Hide resolved
for (const auto &partition_data : partition_datas) {
if (partition_data.total_qps[data_type] > 1.00) {
table_qps_sum += partition_data.total_qps[data_type];
sample_count++;
}
}
}
temp_data.pop();
}
if (sample_count == 0) {
ddebug("_partition_stat_histories size == 0");
return;
}
table_qps_avg = table_qps_sum / sample_count;
for (const auto &data_sample : data_samples) {
standard_deviation += pow((data_sample - table_qps_avg), 2);
}
standard_deviation = sqrt(standard_deviation / sample_count);
const auto &anly_data = _partition_stat_histories.back();
for (int i = 0; i < _hot_points.size(); i++) {
double hot_point = (anly_data[i].total_qps - table_qps_avg) / standard_deviation;
// perf_counter->set can only be unsigned __int64
// use ceil to guarantee conversion results
hot_point = ceil(std::max(hot_point, double(0)));
_hot_points[i]->set(hot_point);

if (sample_count <= 1) {
ddebug("_partition_stat_histories size <= 1");
return;
}
table_qps_avg = table_qps_sum / sample_count;
for (const auto &partition_datas : _partition_stat_histories) {
for (const auto &partition_data : partition_datas) {
if (partition_data.total_qps[data_type] > 1.00) {
standard_deviation +=
pow((partition_data.total_qps[data_type] - table_qps_avg), 2);
}
}
}
standard_deviation = sqrt(standard_deviation / (sample_count - 1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
standard_deviation = sqrt(standard_deviation / (sample_count - 1));
standard_deviation = sqrt(standard_deviation / (sample_count - 1));

(sample_count - 1) Why sub 1?

Copy link
Contributor Author

@Smityz Smityz Sep 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to https://en.wikipedia.org/wiki/Standard_deviation, the denominator in the sample standard deviation formula is N − 1.

const auto &anly_data = _partition_stat_histories.back();
for (int i = 0; i < _hot_points.size(); i++) {
double hot_point = 0;
if (standard_deviation != 0) {
hot_point =
(anly_data[i].total_qps[data_type] - table_qps_avg) / standard_deviation;
}
// perf_counter->set can only be unsigned __int64
Smityz marked this conversation as resolved.
Show resolved Hide resolved
// use ceil to guarantee conversion results
hot_point = ceil(std::max(hot_point, double(0)));
_hot_points[i][data_type]->get()->set(hot_point);
}
Smityz marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
8 changes: 5 additions & 3 deletions src/server/hotspot_partition_calculator.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
namespace pegasus {
namespace server {

typedef std::list<std::vector<hotspot_partition_data>> statistical_histories;
typedef std::vector<std::vector<std::unique_ptr<dsn::perf_counter_wrapper>>> hot_partition_counters;
Smityz marked this conversation as resolved.
Show resolved Hide resolved
Smityz marked this conversation as resolved.
Show resolved Hide resolved

// hotspot_partition_calculator is used to find the hot partition in a table.
class hotspot_partition_calculator
{
Expand All @@ -40,12 +43,11 @@ class hotspot_partition_calculator

private:
const std::string _app_name;

void init_perf_counter(int perf_counter_count);
// usually a partition with "hot-point value" >= 3 can be considered as a hotspot partition.
hycdong marked this conversation as resolved.
Show resolved Hide resolved
std::vector<dsn::perf_counter_wrapper> _hot_points;
hot_partition_counters _hot_points;
// saving historical data can improve accuracy
std::queue<std::vector<hotspot_partition_data>> _partition_stat_histories;
statistical_histories _partition_stat_histories;

FRIEND_TEST(hotspot_partition_calculator, hotspot_partition_policy);
};
Expand Down
14 changes: 12 additions & 2 deletions src/server/hotspot_partition_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,21 @@
namespace pegasus {
namespace server {

enum partition_qps_type
Smityz marked this conversation as resolved.
Show resolved Hide resolved
{
READ_HOTSPOT_DATA = 0,
WRITE_HOTSPOT_DATA
};

struct hotspot_partition_data
{
hotspot_partition_data(const row_data &row) : total_qps(row.get_total_qps()){};
hotspot_partition_data(const row_data &row)
{
total_qps[READ_HOTSPOT_DATA] = row.get_total_read_qps();
total_qps[WRITE_HOTSPOT_DATA] = row.get_total_write_qps();
}
hotspot_partition_data() {}
double total_qps;
double total_qps[2];
};

} // namespace server
Expand Down
117 changes: 99 additions & 18 deletions src/server/test/hotspot_partition_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,108 @@
namespace pegasus {
namespace server {

const int HOT_SCENARIO_0_READ_HOT_PARTITION = 7;
const int HOT_SCENARIO_0_WRITE_HOT_PARTITION = 0;
std::vector<row_data> generate_hot_scenario_0()
{
std::vector<row_data> test_rows;
test_rows.resize(8);
for (int i = 0; i < 8; i++) {
test_rows[i].get_qps = 1000.0;
test_rows[i].put_qps = 1000.0;
}
test_rows[HOT_SCENARIO_0_READ_HOT_PARTITION].get_qps = 5000.0;
test_rows[HOT_SCENARIO_0_WRITE_HOT_PARTITION].put_qps = 5000.0;
return test_rows;
}

const int HOT_SCENARIO_1_READ_HOT_PARTITION = 3;
const int HOT_SCENARIO_1_WRITE_HOT_PARTITION = 2;
std::vector<row_data> generate_hot_scenario_1()
{
std::vector<row_data> test_rows;
test_rows.resize(8);
for (int i = 0; i < 8; i++) {
test_rows[i].get_qps = 1000.0;
test_rows[i].put_qps = 1000.0;
}
test_rows[HOT_SCENARIO_1_READ_HOT_PARTITION].get_qps = 5000.0;
test_rows[HOT_SCENARIO_1_WRITE_HOT_PARTITION].put_qps = 5000.0;
return test_rows;
}

std::vector<row_data> generate_normal_scenario_0()
{
std::vector<row_data> test_rows;
test_rows.resize(8);
for (int i = 0; i < 8; i++) {
test_rows[i].get_qps = 1000.0;
test_rows[i].put_qps = 1000.0;
}
return test_rows;
}

std::vector<std::vector<double>> get_calculator_result(const hot_partition_counters &counters)
{
std::vector<std::vector<double>> result;
result.resize(2);
for (int i = 0; i < counters.size(); i++) {
result[READ_HOTSPOT_DATA].push_back(counters[i][READ_HOTSPOT_DATA]->get()->get_value());
result[WRITE_HOTSPOT_DATA].push_back(counters[i][WRITE_HOTSPOT_DATA]->get()->get_value());
}
return result;
}

TEST(hotspot_partition_calculator, hotspot_partition_policy)
{
// TODO: refactor the unit test
std::vector<row_data> test_rows(8);
test_rows[0].get_qps = 1000.0;
test_rows[1].get_qps = 1000.0;
test_rows[2].get_qps = 1000.0;
test_rows[3].get_qps = 1000.0;
test_rows[4].get_qps = 1000.0;
test_rows[5].get_qps = 1000.0;
test_rows[6].get_qps = 1000.0;
test_rows[7].get_qps = 5000.0;
hotspot_partition_calculator test_hotspot_calculator("TEST", 8);
test_hotspot_calculator.data_aggregate(test_rows);
test_hotspot_calculator.data_analyse();
std::vector<double> result(8);
for (int i = 0; i < test_hotspot_calculator._hot_points.size(); i++) {
result[i] = test_hotspot_calculator._hot_points[i]->get_value();
}
std::vector<double> expect_vector{0, 0, 0, 0, 0, 0, 0, 3};
ASSERT_EQ(expect_vector, result);
{
// Insert normal scenario data to test
test_hotspot_calculator.data_aggregate(std::move(generate_normal_scenario_0()));
test_hotspot_calculator.data_analyse();
std::vector<std::vector<double>> result =
get_calculator_result(test_hotspot_calculator._hot_points);
std::vector<double> read_expect_vector{0, 0, 0, 0, 0, 0, 0, 0};
std::vector<double> write_expect_vector{0, 0, 0, 0, 0, 0, 0, 0};
ASSERT_EQ(result[READ_HOTSPOT_DATA], read_expect_vector);
ASSERT_EQ(result[WRITE_HOTSPOT_DATA], write_expect_vector);
}

{
// Insert hot scenario 0 data to test
test_hotspot_calculator.data_aggregate(std::move(generate_hot_scenario_0()));
test_hotspot_calculator.data_analyse();
std::vector<std::vector<double>> result =
get_calculator_result(test_hotspot_calculator._hot_points);
std::vector<double> read_expect_vector{0, 0, 0, 0, 0, 0, 0, 4};
std::vector<double> write_expect_vector{4, 0, 0, 0, 0, 0, 0, 0};
ASSERT_EQ(result[READ_HOTSPOT_DATA], read_expect_vector);
ASSERT_EQ(result[WRITE_HOTSPOT_DATA], write_expect_vector);
}

{
// Insert hot scenario 0 data to test again
test_hotspot_calculator.data_aggregate(std::move(generate_hot_scenario_0()));
test_hotspot_calculator.data_analyse();
std::vector<std::vector<double>> result =
get_calculator_result(test_hotspot_calculator._hot_points);
std::vector<double> read_expect_vector{0, 0, 0, 0, 0, 0, 0, 4};
std::vector<double> write_expect_vector{4, 0, 0, 0, 0, 0, 0, 0};
ASSERT_EQ(result[READ_HOTSPOT_DATA], read_expect_vector);
ASSERT_EQ(result[WRITE_HOTSPOT_DATA], write_expect_vector);
}

{
// Insert hot scenario 1 data to test
test_hotspot_calculator.data_aggregate(std::move(generate_hot_scenario_1()));
test_hotspot_calculator.data_analyse();
std::vector<std::vector<double>> result =
get_calculator_result(test_hotspot_calculator._hot_points);
std::vector<double> read_expect_vector{0, 0, 0, 4, 0, 0, 0, 0};
std::vector<double> write_expect_vector{0, 0, 4, 0, 0, 0, 0, 0};
ASSERT_EQ(result[READ_HOTSPOT_DATA], read_expect_vector);
ASSERT_EQ(result[WRITE_HOTSPOT_DATA], write_expect_vector);
}
Smityz marked this conversation as resolved.
Show resolved Hide resolved
}

} // namespace server
Expand Down
8 changes: 8 additions & 0 deletions src/shell/command_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,14 @@ struct row_data

double get_total_cu() const { return recent_read_cu + recent_write_cu; }

double get_total_read_qps() const { return get_qps + multi_get_qps + scan_qps; }

double get_total_write_qps() const
hycdong marked this conversation as resolved.
Show resolved Hide resolved
{
return put_qps + remove_qps + multi_put_qps + multi_remove_qps + check_and_set_qps +
check_and_mutate_qps;
}

std::string row_name;
int32_t app_id = 0;
int32_t partition_count = 0;
Expand Down