Skip to content

Commit

Permalink
feat(hotkey): collector can be terminated by timeout (#625)
Browse files Browse the repository at this point in the history
  • Loading branch information
Smityz authored Oct 22, 2020
1 parent 8e8510b commit e17d5c7
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 6 deletions.
44 changes: 40 additions & 4 deletions src/server/hotkey_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,24 @@
#include <dsn/utility/smart_pointers.h>
#include "base/pegasus_key_schema.h"
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/flags.h>

namespace pegasus {
namespace server {

DSN_DEFINE_int32(
"pegasus.server",
max_seconds_to_detect_hotkey,
150,
"the max time (in seconds) allowed to capture hotkey, will stop if hotkey's not found");

hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type hotkey_type,
dsn::replication::replica_base *r_base)
: replica_base(r_base),
_state(hotkey_collector_state::STOPPED),
_hotkey_type(hotkey_type),
_internal_collector(std::make_shared<hotkey_empty_data_collector>())
_internal_collector(std::make_shared<hotkey_empty_data_collector>()),
_collector_start_time_second(0)
{
}

Expand Down Expand Up @@ -65,7 +73,18 @@ void hotkey_collector::capture_hash_key(const dsn::blob &hash_key, int64_t weigh
_internal_collector->capture_data(hash_key, weight);
}

void hotkey_collector::analyse_data() { _internal_collector->analyse_data(); }
void hotkey_collector::analyse_data()
{
switch (_state.load()) {
case hotkey_collector_state::COARSE_DETECTING:
if (!terminate_if_timeout()) {
_internal_collector->analyse_data();
}
return;
default:
return;
}
}

void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response &resp)
{
Expand All @@ -88,6 +107,7 @@ void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response
dwarn_replica(hint);
return;
case hotkey_collector_state::STOPPED:
_collector_start_time_second = dsn_now_s();
// TODO: (Tangyanzhao) start coarse detecting
_state.store(hotkey_collector_state::COARSE_DETECTING);
resp.err = dsn::ERR_OK;
Expand All @@ -105,13 +125,29 @@ void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response

void hotkey_collector::on_stop_detect(dsn::replication::detect_hotkey_response &resp)
{
_state.store(hotkey_collector_state::STOPPED);
_internal_collector.reset();
terminate();
resp.err = dsn::ERR_OK;
std::string hint =
fmt::format("{} hotkey stopped, cache cleared", dsn::enum_to_string(_hotkey_type));
ddebug_replica(hint);
}

void hotkey_collector::terminate()
{
_state.store(hotkey_collector_state::STOPPED);
_internal_collector.reset();
_collector_start_time_second = 0;
}

bool hotkey_collector::terminate_if_timeout()
{
if (dsn_now_s() >= _collector_start_time_second + FLAGS_max_seconds_to_detect_hotkey) {
ddebug_replica("hotkey collector work time is exhausted but no hotkey has been found");
terminate();
return true;
}
return false;
}

} // namespace server
} // namespace pegasus
4 changes: 4 additions & 0 deletions src/server/hotkey_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,13 @@ class hotkey_collector : public dsn::replication::replica_base
private:
void on_start_detect(dsn::replication::detect_hotkey_response &resp);
void on_stop_detect(dsn::replication::detect_hotkey_response &resp);
void terminate();
bool terminate_if_timeout();

std::atomic<hotkey_collector_state> _state;
const dsn::replication::hotkey_type::type _hotkey_type;
std::shared_ptr<internal_collector_base> _internal_collector;
uint64_t _collector_start_time_second;
};

class internal_collector_base
Expand Down
4 changes: 2 additions & 2 deletions src/server/test/capacity_unit_calculator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class mock_capacity_unit_calculator : public capacity_unit_calculator
explicit mock_capacity_unit_calculator(dsn::replication::replica_base *r)
: capacity_unit_calculator(
r,
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::READ, this),
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::WRITE, this))
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::READ, r),
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::WRITE, r))
{
}

Expand Down

0 comments on commit e17d5c7

Please sign in to comment.