Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hotkey): add an interface of hotkey capture #615

Merged
merged 5 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions src/server/capacity_unit_calculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,25 @@
// can be found in the LICENSE file in the root directory of this source tree.

#include "capacity_unit_calculator.h"

#include <dsn/utility/config_api.h>
#include <rocksdb/status.h>
#include "hotkey_collector.h"

namespace pegasus {
namespace server {

capacity_unit_calculator::capacity_unit_calculator(replica_base *r) : replica_base(r)
capacity_unit_calculator::capacity_unit_calculator(
replica_base *r,
std::shared_ptr<hotkey_collector> read_hotkey_collector,
std::shared_ptr<hotkey_collector> write_hotkey_collector)
: replica_base(r),
_read_hotkey_collector(read_hotkey_collector),
_write_hotkey_collector(write_hotkey_collector)
{
dassert(_read_hotkey_collector != nullptr, "read hotkey collector is a nullptr");
dassert(_write_hotkey_collector != nullptr, "write hotkey collector is a nullptr");

_read_capacity_unit_size =
dsn_config_get_value_uint64("pegasus.server",
"perf_counter_read_capacity_unit_size",
Expand Down Expand Up @@ -99,9 +110,11 @@ void capacity_unit_calculator::add_get_cu(int32_t status,

if (status == rocksdb::Status::kNotFound) {
add_read_cu(1);
_read_hotkey_collector->capture_raw_key(key, 1);
return;
}
add_read_cu(key.size() + value.size());
_read_hotkey_collector->capture_raw_key(key, 1);
}

void capacity_unit_calculator::add_multi_get_cu(int32_t status,
Expand All @@ -121,11 +134,14 @@ void capacity_unit_calculator::add_multi_get_cu(int32_t status,
return;
}

uint64_t key_count = kvs.size();
if (status == rocksdb::Status::kNotFound) {
add_read_cu(1);
_read_hotkey_collector->capture_hash_key(hash_key, key_count);
return;
}
add_read_cu(data_size);
_read_hotkey_collector->capture_hash_key(hash_key, key_count);
}

void capacity_unit_calculator::add_scan_cu(int32_t status,
Expand All @@ -141,6 +157,7 @@ void capacity_unit_calculator::add_scan_cu(int32_t status,
return;
}

// TODO: (Tangyanzhao) hotkey detect in scan
int64_t data_size = 0;
for (const auto &kv : kvs) {
data_size += kv.key.size() + kv.value.size();
Expand All @@ -149,20 +166,22 @@ void capacity_unit_calculator::add_scan_cu(int32_t status,
_pfc_scan_bytes->add(data_size);
}

void capacity_unit_calculator::add_sortkey_count_cu(int32_t status)
void capacity_unit_calculator::add_sortkey_count_cu(int32_t status, const dsn::blob &hash_key)
{
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) {
return;
}
add_read_cu(1);
_read_hotkey_collector->capture_hash_key(hash_key, 1);
}

void capacity_unit_calculator::add_ttl_cu(int32_t status)
void capacity_unit_calculator::add_ttl_cu(int32_t status, const dsn::blob &key)
{
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kNotFound) {
return;
}
add_read_cu(1);
_read_hotkey_collector->capture_raw_key(key, 1);
Smityz marked this conversation as resolved.
Show resolved Hide resolved
}

void capacity_unit_calculator::add_put_cu(int32_t status,
Expand All @@ -174,6 +193,7 @@ void capacity_unit_calculator::add_put_cu(int32_t status,
return;
}
add_write_cu(key.size() + value.size());
_write_hotkey_collector->capture_raw_key(key, 1);
}

void capacity_unit_calculator::add_remove_cu(int32_t status, const dsn::blob &key)
Expand All @@ -182,6 +202,7 @@ void capacity_unit_calculator::add_remove_cu(int32_t status, const dsn::blob &ke
return;
}
add_write_cu(key.size());
_write_hotkey_collector->capture_raw_key(key, 1);
}

void capacity_unit_calculator::add_multi_put_cu(int32_t status,
Expand All @@ -195,6 +216,8 @@ void capacity_unit_calculator::add_multi_put_cu(int32_t status,
data_size += hash_key.size() + kv.key.size() + kv.value.size();
}
_pfc_multi_put_bytes->add(hash_key.size() + multi_put_bytes);
uint64_t key_count = kvs.size();
_write_hotkey_collector->capture_raw_key(hash_key, key_count);

if (status != rocksdb::Status::kOk) {
return;
Expand All @@ -214,18 +237,22 @@ void capacity_unit_calculator::add_multi_remove_cu(int32_t status,
for (const auto &sort_key : sort_keys) {
data_size += hash_key.size() + sort_key.size();
}
uint64_t key_count = sort_keys.size();
_write_hotkey_collector->capture_hash_key(hash_key, key_count);
add_write_cu(data_size);
}

void capacity_unit_calculator::add_incr_cu(int32_t status)
void capacity_unit_calculator::add_incr_cu(int32_t status, const dsn::blob &key)
{
if (status != rocksdb::Status::kOk && status != rocksdb::Status::kInvalidArgument) {
return;
}
if (status == rocksdb::Status::kOk) {
add_write_cu(1);
_write_hotkey_collector->capture_raw_key(key, 1);
}
add_read_cu(1);
_read_hotkey_collector->capture_raw_key(key, 1);
}

void capacity_unit_calculator::add_check_and_set_cu(int32_t status,
Expand All @@ -244,8 +271,10 @@ void capacity_unit_calculator::add_check_and_set_cu(int32_t status,

if (status == rocksdb::Status::kOk) {
add_write_cu(hash_key.size() + set_sort_key.size() + value.size());
_write_hotkey_collector->capture_hash_key(hash_key, 1);
}
add_read_cu(hash_key.size() + check_sort_key.size());
_read_hotkey_collector->capture_hash_key(hash_key, 1);
}

void capacity_unit_calculator::add_check_and_mutate_cu(
Expand All @@ -267,11 +296,13 @@ void capacity_unit_calculator::add_check_and_mutate_cu(
status != rocksdb::Status::kTryAgain) {
return;
}

uint64_t key_count = mutate_list.size();
if (status == rocksdb::Status::kOk) {
add_write_cu(data_size);
_write_hotkey_collector->capture_hash_key(hash_key, key_count);
}
add_read_cu(hash_key.size() + check_sort_key.size());
_read_hotkey_collector->capture_hash_key(hash_key, 1);
}

} // namespace server
Expand Down
33 changes: 29 additions & 4 deletions src/server/capacity_unit_calculator.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,22 @@
namespace pegasus {
namespace server {

class hotkey_collector;

class capacity_unit_calculator : public dsn::replication::replica_base
{
public:
explicit capacity_unit_calculator(replica_base *r);
capacity_unit_calculator(replica_base *r,
std::shared_ptr<hotkey_collector> read_hotkey_collector,
std::shared_ptr<hotkey_collector> write_hotkey_collector);

void add_get_cu(int32_t status, const dsn::blob &key, const dsn::blob &value);
void add_multi_get_cu(int32_t status,
const dsn::blob &hash_key,
const std::vector<::dsn::apps::key_value> &kvs);
void add_scan_cu(int32_t status, const std::vector<::dsn::apps::key_value> &kvs);
void add_sortkey_count_cu(int32_t status);
void add_ttl_cu(int32_t status);
void add_sortkey_count_cu(int32_t status, const dsn::blob &hash_key);
void add_ttl_cu(int32_t status, const dsn::blob &key);

void add_put_cu(int32_t status, const dsn::blob &key, const dsn::blob &value);
void add_remove_cu(int32_t status, const dsn::blob &key);
Expand All @@ -32,7 +36,7 @@ class capacity_unit_calculator : public dsn::replication::replica_base
void add_multi_remove_cu(int32_t status,
const dsn::blob &hash_key,
const std::vector<::dsn::blob> &sort_keys);
void add_incr_cu(int32_t status);
void add_incr_cu(int32_t status, const dsn::blob &key);
void add_check_and_set_cu(int32_t status,
const dsn::blob &hash_key,
const dsn::blob &check_sort_key,
Expand Down Expand Up @@ -70,6 +74,27 @@ class capacity_unit_calculator : public dsn::replication::replica_base
::dsn::perf_counter_wrapper _pfc_multi_put_bytes;
::dsn::perf_counter_wrapper _pfc_check_and_set_bytes;
::dsn::perf_counter_wrapper _pfc_check_and_mutate_bytes;

/*
hotkey capturing weight rules:
add_get_cu: whether find the key or not, weight = 1(read_collector),
add_multi_get_cu: weight = returned sortkey count(read_collector),
add_scan_cu : not capture now,
add_sortkey_count_cu: weight = 1(read_collector),
add_ttl_cu: weight = 1(read_collector),
add_put_cu: weight = 1(write_collector),
add_remove_cu: weight = 1(write_collector),
add_multi_put_cu: weight = returned sortkey count(write_collector),
add_multi_remove_cu: weight = returned sortkey count(write_collector),
add_incr_cu: if find the key, weight = 1(write_collector),
else weight = 1(read_collector)
add_check_and_set_cu: if find the key, weight = 1(write_collector),
else weight = 1(read_collector)
add_check_and_mutate_cu: if find the key, weight = mutate_list size
else weight = 1
*/
std::shared_ptr<hotkey_collector> _read_hotkey_collector;
std::shared_ptr<hotkey_collector> _write_hotkey_collector;
};

} // namespace server
Expand Down
7 changes: 4 additions & 3 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ void pegasus_server_impl::on_sortkey_count(sortkey_count_rpc rpc)
resp.count = -1;
}

_cu_calculator->add_sortkey_count_cu(resp.error);
_cu_calculator->add_sortkey_count_cu(resp.error, hash_key);
_pfc_scan_latency->set(dsn_now_ns() - start_time);
}

Expand Down Expand Up @@ -857,7 +857,7 @@ void pegasus_server_impl::on_ttl(ttl_rpc rpc)
}
}

_cu_calculator->add_ttl_cu(resp.error);
_cu_calculator->add_ttl_cu(resp.error, key);
}

void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
Expand Down Expand Up @@ -1486,7 +1486,8 @@ ::dsn::error_code pegasus_server_impl::start(int argc, char **argv)
});

// initialize cu calculator and write service after server being initialized.
_cu_calculator = dsn::make_unique<capacity_unit_calculator>(this);
_cu_calculator = dsn::make_unique<capacity_unit_calculator>(
this, _read_hotkey_collector, _write_hotkey_collector);
_server_write = dsn::make_unique<pegasus_server_write>(this, _verbose_log);

return ::dsn::ERR_OK;
Expand Down
2 changes: 1 addition & 1 deletion src/server/pegasus_write_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ int pegasus_write_service::incr(int64_t decree,
int err = _impl->incr(decree, update, resp);

if (_server->is_primary()) {
_cu_calculator->add_incr_cu(resp.error);
_cu_calculator->add_incr_cu(resp.error, update.key);
}

_pfc_incr_latency->set(dsn_now_ns() - start_time);
Expand Down
10 changes: 6 additions & 4 deletions src/server/test/capacity_unit_calculator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "server/capacity_unit_calculator.h"

#include <dsn/dist/replication/replica_base.h>
#include "server/hotkey_collector.h"

namespace pegasus {
namespace server {
Expand All @@ -26,7 +27,8 @@ class mock_capacity_unit_calculator : public capacity_unit_calculator
}

explicit mock_capacity_unit_calculator(dsn::replication::replica_base *r)
: capacity_unit_calculator(r)
: capacity_unit_calculator(
r, std::make_shared<hotkey_collector>(), std::make_shared<hotkey_collector>())
{
}

Expand Down Expand Up @@ -200,7 +202,7 @@ TEST_F(capacity_unit_calculator_test, scan)
TEST_F(capacity_unit_calculator_test, sortkey_count)
{
for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) {
_cal->add_sortkey_count_cu(i);
_cal->add_sortkey_count_cu(i, key);
if (i == rocksdb::Status::kOk || i == rocksdb::Status::kNotFound) {
ASSERT_EQ(_cal->read_cu, 1);
} else {
Expand All @@ -214,7 +216,7 @@ TEST_F(capacity_unit_calculator_test, sortkey_count)
TEST_F(capacity_unit_calculator_test, ttl)
{
for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) {
_cal->add_ttl_cu(i);
_cal->add_ttl_cu(i, key);
if (i == rocksdb::Status::kOk || i == rocksdb::Status::kNotFound) {
ASSERT_EQ(_cal->read_cu, 1);
} else {
Expand Down Expand Up @@ -300,7 +302,7 @@ TEST_F(capacity_unit_calculator_test, multi_remove)
TEST_F(capacity_unit_calculator_test, incr)
{
for (int i = 0; i < MAX_ROCKSDB_STATUS_CODE; i++) {
_cal->add_incr_cu(i);
_cal->add_incr_cu(i, key);
if (i == rocksdb::Status::kOk) {
ASSERT_EQ(_cal->read_cu, 1);
ASSERT_EQ(_cal->write_cu, 1);
Expand Down