Skip to content

Commit

Permalink
refactor: split read/write hotspot of hotspot_partition_calculator (#592
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Smityz authored Sep 17, 2020
1 parent 412492d commit 8697976
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 85 deletions.
107 changes: 66 additions & 41 deletions src/server/hotspot_partition_calculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>
#include <math.h>
#include <dsn/dist/fmt_logging.h>
#include <rrdb/rrdb_types.h>
#include <dsn/utility/flags.h>
#include <dsn/tool-api/rpc_address.h>
#include <dsn/tool-api/group_address.h>
Expand All @@ -41,67 +42,91 @@ DSN_DEFINE_int64("pegasus.collector",
"eliminate outdated historical "
"data");

void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> &partitions)
void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> &partition_stats)
{
while (_partition_stat_histories.size() > FLAGS_max_hotspot_store_size - 1) {
_partition_stat_histories.pop();
while (_partitions_stat_histories.size() >= FLAGS_max_hotspot_store_size) {
_partitions_stat_histories.pop_front();
}
std::vector<hotspot_partition_data> temp(partitions.size());
// TODO refactor the data structure
for (int i = 0; i < partitions.size(); i++) {
temp[i] = std::move(hotspot_partition_data(partitions[i]));
std::vector<hotspot_partition_stat> temp;
for (const auto &partition_stat : partition_stats) {
temp.emplace_back(hotspot_partition_stat(partition_stat));
}
_partition_stat_histories.emplace(temp);
_partitions_stat_histories.emplace_back(temp);
}

void hotspot_partition_calculator::init_perf_counter(int partition_count)
{
std::string counter_name;
std::string counter_desc;
for (int i = 0; i < partition_count; i++) {
string partition_desc = _app_name + '.' + std::to_string(i);
counter_name = fmt::format("app.stat.hotspots@{}", partition_desc);
counter_desc = fmt::format("statistic the hotspots of app {}", partition_desc);
_hot_points[i].init_app_counter(
"app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, counter_desc.c_str());
for (int data_type = 0; data_type <= 1; data_type++) {
for (int i = 0; i < partition_count; i++) {
string partition_desc =
_app_name + '.' +
(data_type == partition_qps_type::WRITE_HOTSPOT_DATA ? "write." : "read.") +
std::to_string(i);
std::string counter_name = fmt::format("app.stat.hotspots@{}", partition_desc);
std::string counter_desc =
fmt::format("statistic the hotspots of app {}", partition_desc);
_hot_points[i][data_type].init_app_counter(
"app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, counter_desc.c_str());
}
}
}

void hotspot_partition_calculator::data_analyse()
void hotspot_partition_calculator::stat_histories_analyse(int data_type,
std::vector<int> &hot_points)
{
dassert(_partition_stat_histories.back().size() == _hot_points.size(),
"partition counts error, please check");
std::vector<double> data_samples;
data_samples.reserve(_partition_stat_histories.size() * _hot_points.size());
auto temp_data = _partition_stat_histories;
double table_qps_sum = 0, standard_deviation = 0, table_qps_avg = 0;
int sample_count = 0;
while (!temp_data.empty()) {
for (const auto &partition_data : temp_data.front()) {
if (partition_data.total_qps - 1.00 > 0) {
data_samples.push_back(partition_data.total_qps);
table_qps_sum += partition_data.total_qps;
sample_count++;
}
for (const auto &one_partition_stat_histories : _partitions_stat_histories) {
for (const auto &partition_stat : one_partition_stat_histories) {
table_qps_sum += partition_stat.total_qps[data_type];
sample_count++;
}
temp_data.pop();
}
if (sample_count == 0) {
ddebug("_partition_stat_histories size == 0");
if (sample_count <= 1) {
ddebug("_partitions_stat_histories size <= 1, not enough data for calculation");
return;
}
table_qps_avg = table_qps_sum / sample_count;
for (const auto &data_sample : data_samples) {
standard_deviation += pow((data_sample - table_qps_avg), 2);
for (const auto &one_partition_stat_histories : _partitions_stat_histories) {
for (const auto &partition_stat : one_partition_stat_histories) {
standard_deviation += pow((partition_stat.total_qps[data_type] - table_qps_avg), 2);
}
}
standard_deviation = sqrt(standard_deviation / sample_count);
const auto &anly_data = _partition_stat_histories.back();
for (int i = 0; i < _hot_points.size(); i++) {
double hot_point = (anly_data[i].total_qps - table_qps_avg) / standard_deviation;
// perf_counter->set can only be unsigned __int64
standard_deviation = sqrt(standard_deviation / (sample_count - 1));
const auto &anly_data = _partitions_stat_histories.back();
int hot_point_size = _hot_points.size();
hot_points.resize(hot_point_size);
for (int i = 0; i < hot_point_size; i++) {
double hot_point = 0;
if (standard_deviation != 0) {
hot_point = (anly_data[i].total_qps[data_type] - table_qps_avg) / standard_deviation;
}
// perf_counter->set can only be unsigned uint64_t
// use ceil to guarantee conversion results
hot_point = ceil(std::max(hot_point, double(0)));
_hot_points[i]->set(hot_point);
hot_points[i] = ceil(std::max(hot_point, double(0)));
}
}

void hotspot_partition_calculator::update_hot_point(int data_type, std::vector<int> &hot_points)
{
dcheck_eq(_hot_points.size(), hot_points.size());
int size = hot_points.size();
for (int i = 0; i < size; i++) {
_hot_points[i][data_type].get()->set(hot_points[i]);
}
}

void hotspot_partition_calculator::data_analyse()
{
dassert(_partitions_stat_histories.back().size() == _hot_points.size(),
"The number of partitions in this table has changed, and hotspot analysis cannot be "
"performed,in %s",
_app_name.c_str());
for (int data_type = 0; data_type <= 1; data_type++) {
// data_type 0: READ_HOTSPOT_DATA; 1: WRITE_HOTSPOT_DATA
std::vector<int> hot_points;
stat_histories_analyse(data_type, hot_points);
update_hot_point(data_type, hot_points);
}
}

Expand Down
22 changes: 17 additions & 5 deletions src/server/hotspot_partition_calculator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@

#pragma once

#include "hotspot_partition_data.h"
#include "hotspot_partition_stat.h"
#include <gtest/gtest_prod.h>
#include <dsn/perf_counter/perf_counter.h>

namespace pegasus {
namespace server {

// stores the whole histories of all partitions in one table
typedef std::list<std::vector<hotspot_partition_stat>> stat_histories;
// hot_partition_counters c[index_of_partitions][type_of_read(0)/write(1)_stat]
// so if we have n partitions, we will get 2*n hot_partition_counters, to demonstrate both
// read/write hotspot value
typedef std::vector<std::array<dsn::perf_counter_wrapper, 2>> hot_partition_counters;

// hotspot_partition_calculator is used to find the hot partition in a table.
class hotspot_partition_calculator
{
Expand All @@ -43,15 +50,20 @@ class hotspot_partition_calculator
const dsn::apps::hotkey_detect_action::type action);

private:
const std::string _app_name;
// empirical rule to calculate hot point of each partition
// ref: https://en.wikipedia.org/wiki/68%E2%80%9395%E2%80%9399.7_rule
void stat_histories_analyse(int data_type, std::vector<int> &hot_points);
// set hot_point to corresponding perf_counter
void update_hot_point(int data_type, std::vector<int> &hot_points);

const std::string _app_name;
void init_perf_counter(int perf_counter_count);
// usually a partition with "hot-point value" >= 3 can be considered as a hotspot partition.
std::vector<dsn::perf_counter_wrapper> _hot_points;
hot_partition_counters _hot_points;
// saving historical data can improve accuracy
std::queue<std::vector<hotspot_partition_data>> _partition_stat_histories;
stat_histories _partitions_stat_histories;

FRIEND_TEST(hotspot_partition_calculator, hotspot_partition_policy);
friend class hotspot_partition_test;
};

} // namespace server
Expand Down
20 changes: 0 additions & 20 deletions src/server/hotspot_partition_data.h

This file was deleted.

43 changes: 43 additions & 0 deletions src/server/hotspot_partition_stat.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "shell/command_helper.h"

namespace pegasus {
namespace server {

enum partition_qps_type
{
READ_HOTSPOT_DATA = 0,
WRITE_HOTSPOT_DATA
};

struct hotspot_partition_stat
{
hotspot_partition_stat(const row_data &row)
{
total_qps[READ_HOTSPOT_DATA] = row.get_total_read_qps();
total_qps[WRITE_HOTSPOT_DATA] = row.get_total_write_qps();
}
hotspot_partition_stat() {}
double total_qps[2];
};

} // namespace server
} // namespace pegasus
87 changes: 68 additions & 19 deletions src/server/test/hotspot_partition_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,81 @@

#include "server/hotspot_partition_calculator.h"

#include "pegasus_server_test_base.h"
#include <gtest/gtest.h>

namespace pegasus {
namespace server {

TEST(hotspot_partition_calculator, hotspot_partition_policy)
class hotspot_partition_test : public pegasus_server_test_base
{
// TODO: refactor the unit test
std::vector<row_data> test_rows(8);
test_rows[0].get_qps = 1000.0;
test_rows[1].get_qps = 1000.0;
test_rows[2].get_qps = 1000.0;
test_rows[3].get_qps = 1000.0;
test_rows[4].get_qps = 1000.0;
test_rows[5].get_qps = 1000.0;
test_rows[6].get_qps = 1000.0;
test_rows[7].get_qps = 5000.0;
hotspot_partition_calculator test_hotspot_calculator("TEST", 8);
test_hotspot_calculator.data_aggregate(test_rows);
test_hotspot_calculator.data_analyse();
std::vector<double> result(8);
for (int i = 0; i < test_hotspot_calculator._hot_points.size(); i++) {
result[i] = test_hotspot_calculator._hot_points[i]->get_value();
public:
hotspot_partition_test() : calculator("TEST", 8){};
hotspot_partition_calculator calculator;
std::vector<row_data> generate_row_data()
{
std::vector<row_data> test_rows;
test_rows.resize(8);
for (int i = 0; i < 8; i++) {
test_rows[i].get_qps = 1000.0;
test_rows[i].put_qps = 1000.0;
}
return test_rows;
}
std::vector<double> expect_vector{0, 0, 0, 0, 0, 0, 0, 3};
ASSERT_EQ(expect_vector, result);
std::vector<std::vector<double>> get_calculator_result(const hot_partition_counters &counters)
{
std::vector<std::vector<double>> result;
result.resize(2);
for (int i = 0; i < counters.size(); i++) {
result[READ_HOTSPOT_DATA].push_back(counters[i][READ_HOTSPOT_DATA].get()->get_value());
result[WRITE_HOTSPOT_DATA].push_back(
counters[i][WRITE_HOTSPOT_DATA].get()->get_value());
}
return result;
}
void test_policy_in_scenarios(std::vector<row_data> scenario,
std::vector<std::vector<double>> &expect_result,
hotspot_partition_calculator &calculator)
{
calculator.data_aggregate(std::move(scenario));
calculator.data_analyse();
std::vector<std::vector<double>> result = get_calculator_result(calculator._hot_points);
ASSERT_EQ(result, expect_result);
}
};

TEST_F(hotspot_partition_test, hotspot_partition_policy)
{
// Insert normal scenario data to test
std::vector<row_data> test_rows = generate_row_data();
std::vector<std::vector<double>> expect_vector = {{0, 0, 0, 0, 0, 0, 0, 0},
{0, 0, 0, 0, 0, 0, 0, 0}};
test_policy_in_scenarios(test_rows, expect_vector, calculator);

// Insert hotspot scenario_0 data to test
test_rows = generate_row_data();
const int HOT_SCENARIO_0_READ_HOT_PARTITION = 7;
const int HOT_SCENARIO_0_WRITE_HOT_PARTITION = 0;
test_rows[HOT_SCENARIO_0_READ_HOT_PARTITION].get_qps = 5000.0;
test_rows[HOT_SCENARIO_0_WRITE_HOT_PARTITION].put_qps = 5000.0;
expect_vector = {{0, 0, 0, 0, 0, 0, 0, 4}, {4, 0, 0, 0, 0, 0, 0, 0}};
test_policy_in_scenarios(test_rows, expect_vector, calculator);

// Insert hotspot scenario_0 data to test again
test_rows = generate_row_data();
test_rows[HOT_SCENARIO_0_READ_HOT_PARTITION].get_qps = 5000.0;
test_rows[HOT_SCENARIO_0_WRITE_HOT_PARTITION].put_qps = 5000.0;
expect_vector = {{0, 0, 0, 0, 0, 0, 0, 4}, {4, 0, 0, 0, 0, 0, 0, 0}};
test_policy_in_scenarios(test_rows, expect_vector, calculator);

// Insert hotspot scenario_1 data to test again
test_rows = generate_row_data();
const int HOT_SCENARIO_1_READ_HOT_PARTITION = 3;
const int HOT_SCENARIO_1_WRITE_HOT_PARTITION = 2;
test_rows[HOT_SCENARIO_1_READ_HOT_PARTITION].get_qps = 5000.0;
test_rows[HOT_SCENARIO_1_WRITE_HOT_PARTITION].put_qps = 5000.0;
expect_vector = {{0, 0, 0, 4, 0, 0, 0, 0}, {0, 0, 4, 0, 0, 0, 0, 0}};
test_policy_in_scenarios(test_rows, expect_vector, calculator);
}

} // namespace server
Expand Down
8 changes: 8 additions & 0 deletions src/shell/command_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,14 @@ struct row_data

double get_total_cu() const { return recent_read_cu + recent_write_cu; }

double get_total_read_qps() const { return get_qps + multi_get_qps + scan_qps; }

double get_total_write_qps() const
{
return put_qps + remove_qps + multi_put_qps + multi_remove_qps + check_and_set_qps +
check_and_mutate_qps;
}

std::string row_name;
int32_t app_id = 0;
int32_t partition_count = 0;
Expand Down

0 comments on commit 8697976

Please sign in to comment.