Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(collector): sort out the structure of partition hotspot detection #597

Merged
merged 40 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
14f5f52
refactor(collector): sort out the structure of partition hotspot dete…
Smityz Sep 7, 2020
f65aa72
add cpp
Smityz Sep 7, 2020
785be3f
license
Smityz Sep 7, 2020
98bd7f3
del policy
Smityz Sep 8, 2020
54135fb
header
Smityz Sep 8, 2020
76ff93b
Merge branch 'master' into del_old_alg
Smityz Sep 8, 2020
3c8a516
update
Smityz Sep 8, 2020
dfa3091
fix
Smityz Sep 8, 2020
f31953a
Update src/server/info_collector.cpp
Smityz Sep 8, 2020
6575e40
Merge branch 'master' into del_old_alg
Smityz Sep 8, 2020
6d62ad2
rename
Smityz Sep 8, 2020
8a421bf
Merge branch 'del_old_alg' of github.com:Smityz/pegasus into del_old_alg
Smityz Sep 8, 2020
e74ee75
merge
Smityz Sep 8, 2020
ce085fc
rename
Smityz Sep 8, 2020
6a63987
rename
Smityz Sep 8, 2020
1143c74
format
Smityz Sep 9, 2020
8dfeeb7
Update hotspot_partition_calculator.h
Smityz Sep 9, 2020
2cee641
fix
Smityz Sep 9, 2020
b6c5694
Update src/server/hotspot_partition_calculator.h
Smityz Sep 9, 2020
e75f44d
Update src/server/hotspot_partition_calculator.h
Smityz Sep 9, 2020
7c012be
Update src/server/hotspot_partition_calculator.cpp
Smityz Sep 9, 2020
1eb187f
Update src/server/hotspot_partition_calculator.h
Smityz Sep 9, 2020
70e083d
Update src/server/hotspot_partition_calculator.h
Smityz Sep 9, 2020
6804702
update
Smityz Sep 9, 2020
4c50397
Merge branch 'del_old_alg' of github.com:Smityz/pegasus into del_old_alg
Smityz Sep 9, 2020
99954d7
format
Smityz Sep 9, 2020
e44e08c
Update src/server/hotspot_partition_calculator.h
Smityz Sep 9, 2020
b2b97e8
Update src/server/hotspot_partition_calculator.h
Smityz Sep 9, 2020
7e9a76b
Update src/server/hotspot_partition_calculator.cpp
Smityz Sep 9, 2020
0134f50
Update src/server/hotspot_partition_calculator.cpp
Smityz Sep 9, 2020
8ac6320
update
Smityz Sep 9, 2020
5b1a36a
Merge branch 'del_old_alg' of github.com:Smityz/pegasus into del_old_alg
Smityz Sep 9, 2020
120dab2
format
Smityz Sep 9, 2020
cec568f
format
Smityz Sep 9, 2020
012ce98
Update hotspot_partition_calculator.cpp
Smityz Sep 9, 2020
d27d7a2
Update src/server/hotspot_partition_calculator.cpp
Smityz Sep 9, 2020
734f290
format
Smityz Sep 9, 2020
88de731
update
Smityz Sep 10, 2020
05dc899
todo
Smityz Sep 10, 2020
6ff0822
Merge branch 'master' into del_old_alg
acelyc111 Sep 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions src/server/hotspot_partition_calculator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "hotspot_partition_calculator.h"

#include <algorithm>
#include <math.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/flags.h>

namespace pegasus {
namespace server {

DSN_DEFINE_int64("pegasus.collector",
max_hotspot_store_size,
100,
"the max count of historical data "
"stored in calculator, The FIFO "
"queue design is used to "
"eliminate outdated historical "
"data");

void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> &partitions)
{
while (_partition_stat_histories.size() > FLAGS_max_hotspot_store_size - 1) {
_partition_stat_histories.pop();
}
std::vector<hotspot_partition_data> temp(partitions.size());
// TODO refactor the data structure
for (int i = 0; i < partitions.size(); i++) {
temp[i] = std::move(hotspot_partition_data(partitions[i]));
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
}
_partition_stat_histories.emplace(temp);
}

void hotspot_partition_calculator::init_perf_counter(int partition_count)
{
std::string counter_name;
std::string counter_desc;
for (int i = 0; i < partition_count; i++) {
string partition_desc = _app_name + '.' + std::to_string(i);
counter_name = fmt::format("app.stat.hotspots@{}", partition_desc);
counter_desc = fmt::format("statistic the hotspots of app {}", partition_desc);
_hot_points[i].init_app_counter(
"app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, counter_desc.c_str());
}
}

void hotspot_partition_calculator::data_analyse()
{
dassert(_partition_stat_histories.back().size() == _hot_points.size(),
"partition counts error, please check");
std::vector<double> data_samples;
data_samples.reserve(_partition_stat_histories.size() * _hot_points.size());
auto temp_data = _partition_stat_histories;
double table_qps_sum = 0, standard_deviation = 0, table_qps_avg = 0;
int sample_count = 0;
while (!temp_data.empty()) {
for (const auto &partition_data : temp_data.front()) {
if (partition_data.total_qps - 1.00 > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need such prerequiste? What if a partition qps is 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to prevent too many 0 elements in the queue from interfering with the accuracy of the historical data, we choose to ignore some small values

data_samples.push_back(partition_data.total_qps);
table_qps_sum += partition_data.total_qps;
sample_count++;
}
}
temp_data.pop();
}
if (sample_count == 0) {
ddebug("_partition_stat_histories size == 0");
return;
}
table_qps_avg = table_qps_sum / sample_count;
for (const auto &data_sample : data_samples) {
standard_deviation += pow((data_sample - table_qps_avg), 2);
}
standard_deviation = sqrt(standard_deviation / sample_count);
const auto &anly_data = _partition_stat_histories.back();
for (int i = 0; i < _hot_points.size(); i++) {
double hot_point = (anly_data[i].total_qps - table_qps_avg) / standard_deviation;
// perf_counter->set can only be unsigned __int64
// use ceil to guarantee conversion results
hot_point = ceil(std::max(hot_point, double(0)));
_hot_points[i]->set(hot_point);
}
}

} // namespace server
} // namespace pegasus
54 changes: 54 additions & 0 deletions src/server/hotspot_partition_calculator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "hotspot_partition_data.h"
#include <gtest/gtest_prod.h>
#include <dsn/perf_counter/perf_counter.h>

namespace pegasus {
namespace server {

// hotspot_partition_calculator is used to find the hot partition in a table.
class hotspot_partition_calculator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class hotspot_partition_calculator
class hotspot_partition_calculator : public replica_base

replica_base has a app_name() method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hotspot_partition_calculator stores the statistical data of the correspond table, _app_name is for that table, not itself.

{
public:
hotspot_partition_calculator(const std::string &app_name, int partition_count)
: _app_name(app_name), _hot_points(partition_count)
{
init_perf_counter(partition_count);
}
// aggregate related data of hotspot detection
void data_aggregate(const std::vector<row_data> &partitions);
// analyse the saved data to find hotspot partition
void data_analyse();

private:
const std::string _app_name;

void init_perf_counter(int perf_counter_count);
// usually a partition with "hot-point value" >= 3 can be considered as a hotspot partition.
std::vector<dsn::perf_counter_wrapper> _hot_points;
// saving historical data can improve accuracy
std::queue<std::vector<hotspot_partition_data>> _partition_stat_histories;

FRIEND_TEST(hotspot_partition_calculator, hotspot_partition_policy);
};

} // namespace server
} // namespace pegasus
9 changes: 2 additions & 7 deletions src/server/hotspot_partition_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,16 @@

#pragma once

#include "shell/commands.h"
#include "shell/command_helper.h"

namespace pegasus {
namespace server {

struct hotspot_partition_data
{
hotspot_partition_data(const row_data &row)
: total_qps(row.get_total_qps()),
total_cu(row.get_total_cu()),
partition_name(row.row_name){};
hotspot_partition_data(const row_data &row) : total_qps(row.get_total_qps()){};
hotspot_partition_data() {}
double total_qps;
double total_cu;
std::string partition_name;
};

} // namespace server
Expand Down
39 changes: 10 additions & 29 deletions src/server/info_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "base/pegasus_const.h"
#include "result_writer.h"
#include "hotspot_partition_calculator.h"

using namespace ::dsn;
using namespace ::dsn::replication;
Expand Down Expand Up @@ -78,10 +79,6 @@ info_collector::info_collector()
"storage_size_fetch_interval_seconds",
3600, // default value 1h
"storage size fetch interval seconds");
_hotspot_detect_algorithm = dsn_config_get_value_string("pegasus.collector",
"hotspot_detect_algorithm",
"hotspot_algo_qps_variance",
"hotspot_detect_algorithm");
// _storage_size_retry_wait_seconds is in range of [1, 60]
_storage_size_retry_wait_seconds =
std::min(60u, std::max(1u, _storage_size_fetch_interval_seconds / 10));
Expand All @@ -96,9 +93,6 @@ info_collector::~info_collector()
for (auto kv : _app_stat_counters) {
delete kv.second;
}
for (auto store : _hotspot_calculator_store) {
delete store.second;
}
}

void info_collector::start()
Expand Down Expand Up @@ -150,15 +144,11 @@ void info_collector::on_app_stat()
// get row data statistics for all of the apps
all_stats.merge(app_stats);

// hotspot_calculator is to detect hotspots
hotspot_calculator *hotspot_calculator =
// hotspot_partition_calculator is used for detecting hotspots
auto hotspot_partition_calculator =
get_hotspot_calculator(app_rows.first, app_rows.second.size());
if (!hotspot_calculator) {
continue;
}
hotspot_calculator->aggregate(app_rows.second);
// new policy can be designed by strategy pattern in hotspot_partition_data.h
hotspot_calculator->start_alg();
hotspot_partition_calculator->data_aggregate(app_rows.second);
hotspot_partition_calculator->data_analyse();
}
get_app_counters(all_stats.app_name)->set(all_stats);

Expand Down Expand Up @@ -302,25 +292,16 @@ void info_collector::on_storage_size_stat(int remaining_retry_count)
_result_writer->set_result(st_stat.timestamp, "ss", st_stat.dump_to_json());
}

hotspot_calculator *info_collector::get_hotspot_calculator(const std::string &app_name,
const int partition_num)
std::shared_ptr<hotspot_partition_calculator>
info_collector::get_hotspot_calculator(const std::string &app_name, const int partition_count)
{
// use appname+partition_num as a key can prevent the impact of dynamic partition changes
std::string app_name_pcount = fmt::format("{}.{}", app_name, partition_num);
// use app_name+partition_count as a key can prevent the impact of dynamic partition changes
std::string app_name_pcount = fmt::format("{}.{}", app_name, partition_count);
auto iter = _hotspot_calculator_store.find(app_name_pcount);
if (iter != _hotspot_calculator_store.end()) {
return iter->second;
}
std::unique_ptr<hotspot_policy> policy;
if (_hotspot_detect_algorithm == "hotspot_algo_qps_variance") {
policy.reset(new hotspot_algo_qps_variance());
} else {
dwarn("hotspot detection is disabled");
_hotspot_calculator_store[app_name_pcount] = nullptr;
return nullptr;
}
hotspot_calculator *calculator =
new hotspot_calculator(app_name, partition_num, std::move(policy));
auto calculator = std::make_shared<hotspot_partition_calculator>(app_name, partition_count);
_hotspot_calculator_store[app_name_pcount] = calculator;
return calculator;
}
Expand Down
13 changes: 7 additions & 6 deletions src/server/info_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

#include "../shell/commands.h"
#include "table_stats.h"
#include "table_hotspot_policy.h"

namespace pegasus {
namespace server {

class result_writer;
class hotspot_partition_calculator;

class info_collector
{
Expand Down Expand Up @@ -177,15 +177,16 @@ class info_collector
uint32_t _storage_size_fetch_interval_seconds;
uint32_t _storage_size_retry_wait_seconds;
uint32_t _storage_size_retry_max_count;
std::string _hotspot_detect_algorithm;
::dsn::task_ptr _storage_size_stat_timer_task;
::dsn::utils::ex_lock_nr _capacity_unit_update_info_lock;
// mapping 'node address' --> 'last updated timestamp'
std::map<std::string, string> _capacity_unit_update_info;
std::map<std::string, hotspot_calculator *> _hotspot_calculator_store;

hotspot_calculator *get_hotspot_calculator(const std::string &app_name,
const int partition_num);
// _hotspot_calculator_store is to save hotspot_partition_calculator for each table, a
// hotspot_partition_calculator saves historical hotspot data and alert perf_counters of
// corresponding table
std::map<std::string, std::shared_ptr<hotspot_partition_calculator>> _hotspot_calculator_store;
Smityz marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<hotspot_partition_calculator>
get_hotspot_calculator(const std::string &app_name, const int partition_count);
};

} // namespace server
Expand Down
40 changes: 0 additions & 40 deletions src/server/table_hotspot_policy.cpp

This file was deleted.

Loading