Skip to content

Commit

Permalink
feat(hotkey): implement the RPC handler in hotkey_collector (#621)
Browse files Browse the repository at this point in the history
  • Loading branch information
Smityz authored Oct 19, 2020
1 parent 1461dc5 commit 34455e3
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 10 deletions.
70 changes: 67 additions & 3 deletions src/server/hotkey_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,39 @@

#include "hotkey_collector.h"

#include <dsn/dist/replication/replication_enums.h>
#include <dsn/utility/smart_pointers.h>
#include "base/pegasus_key_schema.h"
#include <dsn/dist/fmt_logging.h>

namespace pegasus {
namespace server {

hotkey_collector::hotkey_collector()
: _internal_collector(dsn::make_unique<hotkey_empty_data_collector>())
hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type hotkey_type,
dsn::replication::replica_base *r_base)
: replica_base(r_base),
_state(hotkey_collector_state::STOPPED),
_hotkey_type(hotkey_type),
_internal_collector(std::make_shared<hotkey_empty_data_collector>())
{
}

// TODO: (Tangyanzhao) implement these functions
void hotkey_collector::handle_rpc(const dsn::replication::detect_hotkey_request &req,
dsn::replication::detect_hotkey_response &resp)
{
switch (req.action) {
case dsn::replication::detect_action::START:
on_start_detect(resp);
return;
case dsn::replication::detect_action::STOP:
on_stop_detect(resp);
return;
default:
std::string hint = fmt::format("{}: can't find this detect action", req.action);
resp.err = dsn::ERR_INVALID_STATE;
resp.__set_err_hint(hint);
derror_replica(hint);
}
}

void hotkey_collector::capture_raw_key(const dsn::blob &raw_key, int64_t weight)
Expand All @@ -49,5 +67,51 @@ void hotkey_collector::capture_hash_key(const dsn::blob &hash_key, int64_t weigh

void hotkey_collector::analyse_data() { _internal_collector->analyse_data(); }

void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response &resp)
{
auto now_state = _state.load();
std::string hint;
switch (now_state) {
case hotkey_collector_state::COARSE_DETECTING:
case hotkey_collector_state::FINE_DETECTING:
resp.err = dsn::ERR_INVALID_STATE;
hint = fmt::format("still detecting {} hotkey, state is {}",
dsn::enum_to_string(_hotkey_type),
enum_to_string(now_state));
dwarn_replica(hint);
return;
case hotkey_collector_state::FINISHED:
resp.err = dsn::ERR_INVALID_STATE;
hint = fmt::format(
"{} hotkey result has been found, you can send a stop rpc to restart hotkey detection",
dsn::enum_to_string(_hotkey_type));
dwarn_replica(hint);
return;
case hotkey_collector_state::STOPPED:
// TODO: (Tangyanzhao) start coarse detecting
_state.store(hotkey_collector_state::COARSE_DETECTING);
resp.err = dsn::ERR_OK;
hint = fmt::format("starting to detect {} hotkey", dsn::enum_to_string(_hotkey_type));
ddebug_replica(hint);
return;
default:
hint = "invalid collector state";
resp.err = dsn::ERR_INVALID_STATE;
resp.__set_err_hint(hint);
derror_replica(hint);
dassert(false, "invalid collector state");
}
}

void hotkey_collector::on_stop_detect(dsn::replication::detect_hotkey_response &resp)
{
_state.store(hotkey_collector_state::STOPPED);
_internal_collector.reset();
resp.err = dsn::ERR_OK;
std::string hint =
fmt::format("{} hotkey stopped, cache cleared", dsn::enum_to_string(_hotkey_type));
ddebug_replica(hint);
}

} // namespace server
} // namespace pegasus
12 changes: 8 additions & 4 deletions src/server/hotkey_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

#pragma once

#include <dsn/utility/string_view.h>
#include <dsn/dist/replication/replication_types.h>
#include "hotkey_collector_state.h"
#include <dsn/dist/replication/replica_base.h>

namespace pegasus {
namespace server {
Expand Down Expand Up @@ -63,10 +63,11 @@ class internal_collector_base;
// | | | Hotkey |
// +--------------------+ +----------------------------------------------------+

class hotkey_collector
class hotkey_collector : public dsn::replication::replica_base
{
public:
hotkey_collector();
hotkey_collector(dsn::replication::hotkey_type::type hotkey_type,
dsn::replication::replica_base *r_base);
// TODO: (Tangyanzhao) capture_*_key should be consistent with hotspot detection
// weight: calculate the weight according to the specific situation
void capture_raw_key(const dsn::blob &raw_key, int64_t weight);
Expand All @@ -76,8 +77,11 @@ class hotkey_collector
/*out*/ dsn::replication::detect_hotkey_response &resp);

private:
std::unique_ptr<internal_collector_base> _internal_collector;
void on_start_detect(dsn::replication::detect_hotkey_response &resp);
void on_stop_detect(dsn::replication::detect_hotkey_response &resp);
std::atomic<hotkey_collector_state> _state;
const dsn::replication::hotkey_type::type _hotkey_type;
std::shared_ptr<internal_collector_base> _internal_collector;
};

class internal_collector_base
Expand Down
6 changes: 4 additions & 2 deletions src/server/pegasus_server_impl_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
_primary_address = dsn::rpc_address(dsn_primary_address()).to_string();
_gpid = get_gpid();

_read_hotkey_collector = std::make_shared<hotkey_collector>();
_write_hotkey_collector = std::make_shared<hotkey_collector>();
_read_hotkey_collector =
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::READ, this);
_write_hotkey_collector =
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::WRITE, this);

_verbose_log = dsn_config_get_value_bool("pegasus.server",
"rocksdb_verbose_log",
Expand Down
4 changes: 3 additions & 1 deletion src/server/test/capacity_unit_calculator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ class mock_capacity_unit_calculator : public capacity_unit_calculator

explicit mock_capacity_unit_calculator(dsn::replication::replica_base *r)
: capacity_unit_calculator(
r, std::make_shared<hotkey_collector>(), std::make_shared<hotkey_collector>())
r,
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::READ, this),
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::WRITE, this))
{
}

Expand Down

0 comments on commit 34455e3

Please sign in to comment.