From 51578db98da9f1c7278148d6df08b5d4c220b553 Mon Sep 17 00:00:00 2001 From: Smilencer <527646889@qq.com> Date: Mon, 28 Dec 2020 13:28:44 +0800 Subject: [PATCH] feat(hotkey): add an interface to query hotkey on the specified partition (#662) --- rdsn | 2 +- src/server/hotkey_collector.cpp | 18 ++++++++++++++++++ src/server/hotkey_collector.h | 1 + src/server/pegasus_server_impl.cpp | 3 ++- src/server/test/hotkey_collector_test.cpp | 6 ++++++ 5 files changed, 28 insertions(+), 2 deletions(-) diff --git a/rdsn b/rdsn index 67e06d2c50..29c6ccca84 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 67e06d2c5022e82984b0c7e6651876b5b0e7410d +Subproject commit 29c6ccca84873f4735c690dc8a616b8ef93377eb diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp index 60263f6a6f..23aa21a433 100644 --- a/src/server/hotkey_collector.cpp +++ b/src/server/hotkey_collector.cpp @@ -168,6 +168,7 @@ inline void hotkey_collector::change_state_by_result() case hotkey_collector_state::FINE_DETECTING: if (!_result.hot_hash_key.empty()) { change_state_to_finished(); + derror_replica("Find the hotkey: {}", _result.hot_hash_key); } break; default: @@ -185,6 +186,9 @@ void hotkey_collector::handle_rpc(const dsn::replication::detect_hotkey_request case dsn::replication::detect_action::STOP: on_stop_detect(resp); return; + case dsn::replication::detect_action::QUERY: + query_result(resp); + return; default: std::string hint = fmt::format("{}: can't find this detect action", req.action); resp.err = dsn::ERR_INVALID_STATE; @@ -272,6 +276,20 @@ void hotkey_collector::on_stop_detect(dsn::replication::detect_hotkey_response & ddebug_replica(hint); } +void hotkey_collector::query_result(dsn::replication::detect_hotkey_response &resp) +{ + if (_state != hotkey_collector_state::FINISHED) { + resp.err = dsn::ERR_BUSY; + std::string hint = fmt::format("hotkey is detecting now, now state: {}", + dsn::enum_to_string(_hotkey_type)); + resp.__set_err_hint(hint); + ddebug_replica(hint); + } else { + resp.err = dsn::ERR_OK; + resp.__set_hotkey_result(_result.hot_hash_key); + } +} + bool hotkey_collector::terminate_if_timeout() { if (dsn_now_s() >= _collector_start_time_second.load() + FLAGS_max_seconds_to_detect_hotkey) { diff --git a/src/server/hotkey_collector.h b/src/server/hotkey_collector.h index a54f01941f..bdc7fcbd67 100644 --- a/src/server/hotkey_collector.h +++ b/src/server/hotkey_collector.h @@ -98,6 +98,7 @@ class hotkey_collector : public dsn::replication::replica_base private: void on_start_detect(dsn::replication::detect_hotkey_response &resp); void on_stop_detect(dsn::replication::detect_hotkey_response &resp); + void query_result(dsn::replication::detect_hotkey_response &resp); void change_state_to_stopped(); void change_state_to_coarse_detecting(); diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 949eead7e6..5c3d5c718d 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -2841,7 +2841,8 @@ void pegasus_server_impl::on_detect_hotkey(const dsn::replication::detect_hotkey { if (dsn_unlikely(req.action != dsn::replication::detect_action::START && - req.action != dsn::replication::detect_action::STOP)) { + req.action != dsn::replication::detect_action::STOP && + req.action != dsn::replication::detect_action::QUERY)) { resp.err = dsn::ERR_INVALID_PARAMETERS; resp.__set_err_hint("invalid detect_action"); return; diff --git a/src/server/test/hotkey_collector_test.cpp b/src/server/test/hotkey_collector_test.cpp index 5eea0d8aad..99913a45fe 100644 --- a/src/server/test/hotkey_collector_test.cpp +++ b/src/server/test/hotkey_collector_test.cpp @@ -304,6 +304,12 @@ TEST_F(hotkey_collector_test, state_transform) ASSERT_TRUE(result->if_find_result); ASSERT_EQ(result->hot_hash_key, "ThisisahotkeyThisisahotkey"); + on_detect_hotkey(generate_control_rpc(dsn::replication::hotkey_type::READ, + dsn::replication::detect_action::QUERY), + resp); + ASSERT_EQ(resp.err, dsn::ERR_OK); + ASSERT_EQ(resp.hotkey_result, "ThisisahotkeyThisisahotkey"); + on_detect_hotkey(generate_control_rpc(dsn::replication::hotkey_type::READ, dsn::replication::detect_action::STOP), resp);