diff --git a/src/server/hotspot_partition_calculator.cpp b/src/server/hotspot_partition_calculator.cpp index 839092d019..84a14e77c7 100644 --- a/src/server/hotspot_partition_calculator.cpp +++ b/src/server/hotspot_partition_calculator.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -41,67 +42,91 @@ DSN_DEFINE_int64("pegasus.collector", "eliminate outdated historical " "data"); -void hotspot_partition_calculator::data_aggregate(const std::vector &partitions) +void hotspot_partition_calculator::data_aggregate(const std::vector &partition_stats) { - while (_partition_stat_histories.size() > FLAGS_max_hotspot_store_size - 1) { - _partition_stat_histories.pop(); + while (_partitions_stat_histories.size() >= FLAGS_max_hotspot_store_size) { + _partitions_stat_histories.pop_front(); } - std::vector temp(partitions.size()); - // TODO refactor the data structure - for (int i = 0; i < partitions.size(); i++) { - temp[i] = std::move(hotspot_partition_data(partitions[i])); + std::vector temp; + for (const auto &partition_stat : partition_stats) { + temp.emplace_back(hotspot_partition_stat(partition_stat)); } - _partition_stat_histories.emplace(temp); + _partitions_stat_histories.emplace_back(temp); } void hotspot_partition_calculator::init_perf_counter(int partition_count) { - std::string counter_name; - std::string counter_desc; - for (int i = 0; i < partition_count; i++) { - string partition_desc = _app_name + '.' + std::to_string(i); - counter_name = fmt::format("app.stat.hotspots@{}", partition_desc); - counter_desc = fmt::format("statistic the hotspots of app {}", partition_desc); - _hot_points[i].init_app_counter( - "app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, counter_desc.c_str()); + for (int data_type = 0; data_type <= 1; data_type++) { + for (int i = 0; i < partition_count; i++) { + string partition_desc = + _app_name + '.' + + (data_type == partition_qps_type::WRITE_HOTSPOT_DATA ? "write." : "read.") + + std::to_string(i); + std::string counter_name = fmt::format("app.stat.hotspots@{}", partition_desc); + std::string counter_desc = + fmt::format("statistic the hotspots of app {}", partition_desc); + _hot_points[i][data_type].init_app_counter( + "app.pegasus", counter_name.c_str(), COUNTER_TYPE_NUMBER, counter_desc.c_str()); + } } } -void hotspot_partition_calculator::data_analyse() +void hotspot_partition_calculator::stat_histories_analyse(int data_type, + std::vector &hot_points) { - dassert(_partition_stat_histories.back().size() == _hot_points.size(), - "partition counts error, please check"); - std::vector data_samples; - data_samples.reserve(_partition_stat_histories.size() * _hot_points.size()); - auto temp_data = _partition_stat_histories; double table_qps_sum = 0, standard_deviation = 0, table_qps_avg = 0; int sample_count = 0; - while (!temp_data.empty()) { - for (const auto &partition_data : temp_data.front()) { - if (partition_data.total_qps - 1.00 > 0) { - data_samples.push_back(partition_data.total_qps); - table_qps_sum += partition_data.total_qps; - sample_count++; - } + for (const auto &one_partition_stat_histories : _partitions_stat_histories) { + for (const auto &partition_stat : one_partition_stat_histories) { + table_qps_sum += partition_stat.total_qps[data_type]; + sample_count++; } - temp_data.pop(); } - if (sample_count == 0) { - ddebug("_partition_stat_histories size == 0"); + if (sample_count <= 1) { + ddebug("_partitions_stat_histories size <= 1, not enough data for calculation"); return; } table_qps_avg = table_qps_sum / sample_count; - for (const auto &data_sample : data_samples) { - standard_deviation += pow((data_sample - table_qps_avg), 2); + for (const auto &one_partition_stat_histories : _partitions_stat_histories) { + for (const auto &partition_stat : one_partition_stat_histories) { + standard_deviation += pow((partition_stat.total_qps[data_type] - table_qps_avg), 2); + } } - standard_deviation = sqrt(standard_deviation / sample_count); - const auto &anly_data = _partition_stat_histories.back(); - for (int i = 0; i < _hot_points.size(); i++) { - double hot_point = (anly_data[i].total_qps - table_qps_avg) / standard_deviation; - // perf_counter->set can only be unsigned __int64 + standard_deviation = sqrt(standard_deviation / (sample_count - 1)); + const auto &anly_data = _partitions_stat_histories.back(); + int hot_point_size = _hot_points.size(); + hot_points.resize(hot_point_size); + for (int i = 0; i < hot_point_size; i++) { + double hot_point = 0; + if (standard_deviation != 0) { + hot_point = (anly_data[i].total_qps[data_type] - table_qps_avg) / standard_deviation; + } + // perf_counter->set can only be unsigned uint64_t // use ceil to guarantee conversion results - hot_point = ceil(std::max(hot_point, double(0))); - _hot_points[i]->set(hot_point); + hot_points[i] = ceil(std::max(hot_point, double(0))); + } +} + +void hotspot_partition_calculator::update_hot_point(int data_type, std::vector &hot_points) +{ + dcheck_eq(_hot_points.size(), hot_points.size()); + int size = hot_points.size(); + for (int i = 0; i < size; i++) { + _hot_points[i][data_type].get()->set(hot_points[i]); + } +} + +void hotspot_partition_calculator::data_analyse() +{ + dassert(_partitions_stat_histories.back().size() == _hot_points.size(), + "The number of partitions in this table has changed, and hotspot analysis cannot be " + "performed,in %s", + _app_name.c_str()); + for (int data_type = 0; data_type <= 1; data_type++) { + // data_type 0: READ_HOTSPOT_DATA; 1: WRITE_HOTSPOT_DATA + std::vector hot_points; + stat_histories_analyse(data_type, hot_points); + update_hot_point(data_type, hot_points); } } diff --git a/src/server/hotspot_partition_calculator.h b/src/server/hotspot_partition_calculator.h index 8b13718b7a..2b23ee92c3 100644 --- a/src/server/hotspot_partition_calculator.h +++ b/src/server/hotspot_partition_calculator.h @@ -17,13 +17,20 @@ #pragma once -#include "hotspot_partition_data.h" +#include "hotspot_partition_stat.h" #include #include namespace pegasus { namespace server { +// stores the whole histories of all partitions in one table +typedef std::list> stat_histories; +// hot_partition_counters c[index_of_partitions][type_of_read(0)/write(1)_stat] +// so if we have n partitions, we will get 2*n hot_partition_counters, to demonstrate both +// read/write hotspot value +typedef std::vector> hot_partition_counters; + // hotspot_partition_calculator is used to find the hot partition in a table. class hotspot_partition_calculator { @@ -43,15 +50,20 @@ class hotspot_partition_calculator const dsn::apps::hotkey_detect_action::type action); private: - const std::string _app_name; + // empirical rule to calculate hot point of each partition + // ref: https://en.wikipedia.org/wiki/68%E2%80%9395%E2%80%9399.7_rule + void stat_histories_analyse(int data_type, std::vector &hot_points); + // set hot_point to corresponding perf_counter + void update_hot_point(int data_type, std::vector &hot_points); + const std::string _app_name; void init_perf_counter(int perf_counter_count); // usually a partition with "hot-point value" >= 3 can be considered as a hotspot partition. - std::vector _hot_points; + hot_partition_counters _hot_points; // saving historical data can improve accuracy - std::queue> _partition_stat_histories; + stat_histories _partitions_stat_histories; - FRIEND_TEST(hotspot_partition_calculator, hotspot_partition_policy); + friend class hotspot_partition_test; }; } // namespace server diff --git a/src/server/hotspot_partition_data.h b/src/server/hotspot_partition_data.h deleted file mode 100644 index b41b1b1acb..0000000000 --- a/src/server/hotspot_partition_data.h +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (c) 2017, Xiaomi, Inc. All rights reserved. -// This source code is licensed under the Apache License Version 2.0, which -// can be found in the LICENSE file in the root directory of this source tree. - -#pragma once - -#include "shell/command_helper.h" - -namespace pegasus { -namespace server { - -struct hotspot_partition_data -{ - hotspot_partition_data(const row_data &row) : total_qps(row.get_total_qps()){}; - hotspot_partition_data() {} - double total_qps; -}; - -} // namespace server -} // namespace pegasus diff --git a/src/server/hotspot_partition_stat.h b/src/server/hotspot_partition_stat.h new file mode 100644 index 0000000000..d0a0efe805 --- /dev/null +++ b/src/server/hotspot_partition_stat.h @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "shell/command_helper.h" + +namespace pegasus { +namespace server { + +enum partition_qps_type +{ + READ_HOTSPOT_DATA = 0, + WRITE_HOTSPOT_DATA +}; + +struct hotspot_partition_stat +{ + hotspot_partition_stat(const row_data &row) + { + total_qps[READ_HOTSPOT_DATA] = row.get_total_read_qps(); + total_qps[WRITE_HOTSPOT_DATA] = row.get_total_write_qps(); + } + hotspot_partition_stat() {} + double total_qps[2]; +}; + +} // namespace server +} // namespace pegasus diff --git a/src/server/test/hotspot_partition_test.cpp b/src/server/test/hotspot_partition_test.cpp index 732a129b63..f05a91fbdb 100644 --- a/src/server/test/hotspot_partition_test.cpp +++ b/src/server/test/hotspot_partition_test.cpp @@ -17,32 +17,81 @@ #include "server/hotspot_partition_calculator.h" +#include "pegasus_server_test_base.h" #include namespace pegasus { namespace server { -TEST(hotspot_partition_calculator, hotspot_partition_policy) +class hotspot_partition_test : public pegasus_server_test_base { - // TODO: refactor the unit test - std::vector test_rows(8); - test_rows[0].get_qps = 1000.0; - test_rows[1].get_qps = 1000.0; - test_rows[2].get_qps = 1000.0; - test_rows[3].get_qps = 1000.0; - test_rows[4].get_qps = 1000.0; - test_rows[5].get_qps = 1000.0; - test_rows[6].get_qps = 1000.0; - test_rows[7].get_qps = 5000.0; - hotspot_partition_calculator test_hotspot_calculator("TEST", 8); - test_hotspot_calculator.data_aggregate(test_rows); - test_hotspot_calculator.data_analyse(); - std::vector result(8); - for (int i = 0; i < test_hotspot_calculator._hot_points.size(); i++) { - result[i] = test_hotspot_calculator._hot_points[i]->get_value(); +public: + hotspot_partition_test() : calculator("TEST", 8){}; + hotspot_partition_calculator calculator; + std::vector generate_row_data() + { + std::vector test_rows; + test_rows.resize(8); + for (int i = 0; i < 8; i++) { + test_rows[i].get_qps = 1000.0; + test_rows[i].put_qps = 1000.0; + } + return test_rows; } - std::vector expect_vector{0, 0, 0, 0, 0, 0, 0, 3}; - ASSERT_EQ(expect_vector, result); + std::vector> get_calculator_result(const hot_partition_counters &counters) + { + std::vector> result; + result.resize(2); + for (int i = 0; i < counters.size(); i++) { + result[READ_HOTSPOT_DATA].push_back(counters[i][READ_HOTSPOT_DATA].get()->get_value()); + result[WRITE_HOTSPOT_DATA].push_back( + counters[i][WRITE_HOTSPOT_DATA].get()->get_value()); + } + return result; + } + void test_policy_in_scenarios(std::vector scenario, + std::vector> &expect_result, + hotspot_partition_calculator &calculator) + { + calculator.data_aggregate(std::move(scenario)); + calculator.data_analyse(); + std::vector> result = get_calculator_result(calculator._hot_points); + ASSERT_EQ(result, expect_result); + } +}; + +TEST_F(hotspot_partition_test, hotspot_partition_policy) +{ + // Insert normal scenario data to test + std::vector test_rows = generate_row_data(); + std::vector> expect_vector = {{0, 0, 0, 0, 0, 0, 0, 0}, + {0, 0, 0, 0, 0, 0, 0, 0}}; + test_policy_in_scenarios(test_rows, expect_vector, calculator); + + // Insert hotspot scenario_0 data to test + test_rows = generate_row_data(); + const int HOT_SCENARIO_0_READ_HOT_PARTITION = 7; + const int HOT_SCENARIO_0_WRITE_HOT_PARTITION = 0; + test_rows[HOT_SCENARIO_0_READ_HOT_PARTITION].get_qps = 5000.0; + test_rows[HOT_SCENARIO_0_WRITE_HOT_PARTITION].put_qps = 5000.0; + expect_vector = {{0, 0, 0, 0, 0, 0, 0, 4}, {4, 0, 0, 0, 0, 0, 0, 0}}; + test_policy_in_scenarios(test_rows, expect_vector, calculator); + + // Insert hotspot scenario_0 data to test again + test_rows = generate_row_data(); + test_rows[HOT_SCENARIO_0_READ_HOT_PARTITION].get_qps = 5000.0; + test_rows[HOT_SCENARIO_0_WRITE_HOT_PARTITION].put_qps = 5000.0; + expect_vector = {{0, 0, 0, 0, 0, 0, 0, 4}, {4, 0, 0, 0, 0, 0, 0, 0}}; + test_policy_in_scenarios(test_rows, expect_vector, calculator); + + // Insert hotspot scenario_1 data to test again + test_rows = generate_row_data(); + const int HOT_SCENARIO_1_READ_HOT_PARTITION = 3; + const int HOT_SCENARIO_1_WRITE_HOT_PARTITION = 2; + test_rows[HOT_SCENARIO_1_READ_HOT_PARTITION].get_qps = 5000.0; + test_rows[HOT_SCENARIO_1_WRITE_HOT_PARTITION].put_qps = 5000.0; + expect_vector = {{0, 0, 0, 4, 0, 0, 0, 0}, {0, 0, 4, 0, 0, 0, 0, 0}}; + test_policy_in_scenarios(test_rows, expect_vector, calculator); } } // namespace server diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h index 8f2f5ec66c..4f7c280d28 100644 --- a/src/shell/command_helper.h +++ b/src/shell/command_helper.h @@ -549,6 +549,14 @@ struct row_data double get_total_cu() const { return recent_read_cu + recent_write_cu; } + double get_total_read_qps() const { return get_qps + multi_get_qps + scan_qps; } + + double get_total_write_qps() const + { + return put_qps + remove_qps + multi_put_qps + multi_remove_qps + check_and_set_qps + + check_and_mutate_qps; + } + std::string row_name; int32_t app_id = 0; int32_t partition_count = 0;