Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Smityz committed Oct 16, 2020
1 parent 20d4236 commit b827bb7
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 49 deletions.
95 changes: 52 additions & 43 deletions src/server/hotkey_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <dsn/utility/smart_pointers.h>
#include "base/pegasus_key_schema.h"
#include <dsn/dist/fmt_logging.h>

namespace pegasus {
namespace server {
Expand All @@ -29,27 +30,39 @@ hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type hotkey_ty
: replica_base(r_base),
_state(hotkey_collector_state::STOPPED),
_hotkey_type(hotkey_type),
_internal_collector(dsn::make_unique<hotkey_empty_data_collector>())
_internal_collector(std::make_shared<hotkey_empty_data_collector>())
{
}

// TODO: (Tangyanzhao) implement these functions
void hotkey_collector::handle_rpc(const dsn::replication::detect_hotkey_request &req,
dsn::replication::detect_hotkey_response &resp)
{
if (req.action == dsn::replication::detect_action::START) {
std::string err_hint;
if (start_detect(err_hint)) {
resp.err = dsn::ERR_OK;
} else {
resp.err = dsn::ERR_SERVICE_ALREADY_EXIST;
resp.__set_err_hint(err_hint.c_str());
switch (req.action) {
case dsn::replication::detect_action::START:
if (is_detecting_now()) {
return_err_resp(resp,
fmt::format("still detecting {} hotkey, state is {}",
dsn::enum_to_string(_hotkey_type),
enum_to_string(_state.load())));
}
return;
} else {
stop_detect();
resp.err = dsn::ERR_OK;
return;
if (is_detecting_finished()) {
return_err_resp(resp,
fmt::format("still detecting {} hotkey, state is {}",
dsn::enum_to_string(_hotkey_type),
enum_to_string(_state.load())));
}
if (is_ready_to_detect()) {
_state.store(hotkey_collector_state::COARSE_DETECTING);
return_normal_resp(
resp,
fmt::format("starting to detect {} hotkey", dsn::enum_to_string(_hotkey_type)));
}
case dsn::replication::detect_action::STOP:
_state.store(hotkey_collector_state::STOPPED);
_internal_collector.reset();
return_normal_resp(
resp,
fmt::format("{} hotkey stopped, cache cleared", dsn::enum_to_string(_hotkey_type)));
}
}

Expand All @@ -68,39 +81,35 @@ void hotkey_collector::capture_hash_key(const dsn::blob &hash_key, int64_t weigh

void hotkey_collector::analyse_data() { _internal_collector->analyse_data(); }

bool hotkey_collector::start_detect(std::string &err_hint)
bool hotkey_collector::is_detecting_now()
{
auto now_state = _state.load();
switch (_state.load()) {
case hotkey_collector_state::COARSE_DETECTING:
case hotkey_collector_state::FINE_DETECTING:
err_hint = fmt::format("still detecting {} hotkey, state is {}",
dsn::enum_to_string(_hotkey_type),
enum_to_string(now_state));
ddebug_replica(err_hint);
return false;
case hotkey_collector_state::FINISHED:
err_hint = fmt::format(
"{} hotkey result has been found, you can send a stop rpc to restart hotkey detection",
dsn::enum_to_string(_hotkey_type));
ddebug_replica(err_hint);
return false;
case hotkey_collector_state::STOPPED:
// TODO: (Tangyanzhao) start coarse detecting
_state.store(hotkey_collector_state::COARSE_DETECTING);
ddebug_replica("starting to detect {} hotkey", dsn::enum_to_string(_hotkey_type));
return true;
default:
err_hint = "invalid collector state";
ddebug_replica(err_hint);
return false;
}
return (_state.load() == hotkey_collector_state::COARSE_DETECTING ||
_state.load() == hotkey_collector_state::FINE_DETECTING);
}

bool hotkey_collector::is_detecting_finished()
{
return (_state.load() == hotkey_collector_state::FINISHED);
}

bool hotkey_collector::is_ready_to_detect()
{
return (_state.load() == hotkey_collector_state::STOPPED);
}

void hotkey_collector::return_err_resp(dsn::replication::detect_hotkey_response &resp,
std::string hint)
{
ddebug_replica(hint);
resp.err = dsn::ERR_SERVICE_ALREADY_EXIST;
resp.__set_err_hint(hint.c_str());
}

void hotkey_collector::stop_detect()
void hotkey_collector::return_normal_resp(dsn::replication::detect_hotkey_response &resp,
std::string hint)
{
_state.store(hotkey_collector_state::STOPPED);
derror_replica("{} hotkey stopped, cache cleared", dsn::enum_to_string(_hotkey_type));
dinfo_replica(hint);
resp.err = dsn::ERR_OK;
}

} // namespace server
Expand Down
13 changes: 7 additions & 6 deletions src/server/hotkey_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

#pragma once

#include <dsn/utility/string_view.h>
#include <dsn/dist/replication/replication_types.h>
#include <dsn/dist/replication/replica_base.h>
#include <dsn/dist/fmt_logging.h>
#include "hotkey_collector_state.h"
#include <dsn/dist/replication/replica_base.h>

namespace pegasus {
namespace server {
Expand Down Expand Up @@ -79,12 +77,15 @@ class hotkey_collector : public dsn::replication::replica_base
/*out*/ dsn::replication::detect_hotkey_response &resp);

private:
bool start_detect(/*out*/ std::string &err_hint);
void stop_detect();
bool is_detecting_now();
bool is_detecting_finished();
bool is_ready_to_detect();
void return_normal_resp(dsn::replication::detect_hotkey_response &resp, std::string hint);
void return_err_resp(dsn::replication::detect_hotkey_response &resp, std::string hint);

std::atomic<hotkey_collector_state> _state;
const dsn::replication::hotkey_type::type _hotkey_type;
std::unique_ptr<internal_collector_base> _internal_collector;
std::shared_ptr<internal_collector_base> _internal_collector;
};

class internal_collector_base
Expand Down

0 comments on commit b827bb7

Please sign in to comment.