Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(update_replication_factor#2): add RPC for setting the replicatio…
Browse files Browse the repository at this point in the history
…n factor of each table (#1072)
  • Loading branch information
empiredan authored and acelyc111 committed Jun 1, 2022
1 parent 3d01356 commit 9c8a988
Show file tree
Hide file tree
Showing 10 changed files with 367 additions and 23 deletions.
1 change: 1 addition & 0 deletions include/dsn/dist/replication/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_BACKUP_STATUS, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_START_MANUAL_COMPACT, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_MANUAL_COMPACT_STATUS, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_GET_MAX_REPLICA_COUNT, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_SET_MAX_REPLICA_COUNT, TASK_PRIORITY_COMMON)
#undef CURRENT_THREAD_POOL

#define CURRENT_THREAD_POOL THREAD_POOL_META_STATE
Expand Down
3 changes: 3 additions & 0 deletions include/dsn/dist/replication/replication_ddl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ class replication_ddl_client
error_with<configuration_get_max_replica_count_response>
get_max_replica_count(const std::string &app_name);

error_with<configuration_set_max_replica_count_response>
set_max_replica_count(const std::string &app_name, int32_t max_replica_count);

private:
bool static valid_app_char(int c);

Expand Down
11 changes: 11 additions & 0 deletions src/client/replication_ddl_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1729,5 +1729,16 @@ replication_ddl_client::get_max_replica_count(const std::string &app_name)
configuration_get_max_replica_count_rpc(std::move(req), RPC_CM_GET_MAX_REPLICA_COUNT));
}

error_with<configuration_set_max_replica_count_response>
replication_ddl_client::set_max_replica_count(const std::string &app_name,
int32_t max_replica_count)
{
auto req = make_unique<configuration_set_max_replica_count_request>();
req->__set_app_name(app_name);
req->__set_max_replica_count(max_replica_count);
return call_rpc_sync(
configuration_set_max_replica_count_rpc(std::move(req), RPC_CM_SET_MAX_REPLICA_COUNT));
}

} // namespace replication
} // namespace dsn
13 changes: 13 additions & 0 deletions src/common/meta_admin.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,16 @@ struct configuration_get_max_replica_count_response
2:i32 max_replica_count;
3:string hint_message;
}

struct configuration_set_max_replica_count_request
{
1:string app_name;
2:i32 max_replica_count;
}

struct configuration_set_max_replica_count_response
{
1:dsn.error_code err;
2:i32 old_max_replica_count;
3:string hint_message;
}
4 changes: 4 additions & 0 deletions src/meta/meta_rpc_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,9 @@ typedef rpc_holder<query_backup_status_request, query_backup_status_response>
typedef rpc_holder<configuration_get_max_replica_count_request,
configuration_get_max_replica_count_response>
configuration_get_max_replica_count_rpc;
typedef rpc_holder<configuration_set_max_replica_count_request,
configuration_set_max_replica_count_response>
configuration_set_max_replica_count_rpc;

} // namespace replication
} // namespace dsn
16 changes: 16 additions & 0 deletions src/meta/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,9 @@ void meta_service::register_rpc_handlers()
register_rpc_handler_with_rpc_holder(RPC_CM_GET_MAX_REPLICA_COUNT,
"get_max_replica_count",
&meta_service::on_get_max_replica_count);
register_rpc_handler_with_rpc_holder(RPC_CM_SET_MAX_REPLICA_COUNT,
"set_max_replica_count",
&meta_service::on_set_max_replica_count);
}

int meta_service::check_leader(dsn::message_ex *req, dsn::rpc_address *forward_address)
Expand Down Expand Up @@ -1288,5 +1291,18 @@ void meta_service::on_get_max_replica_count(configuration_get_max_replica_count_
server_state::sStateHash);
}

// ThreadPool: THREAD_POOL_META_SERVER
void meta_service::on_set_max_replica_count(configuration_set_max_replica_count_rpc rpc)
{
if (!check_status(rpc)) {
return;
}

tasking::enqueue(LPC_META_STATE_NORMAL,
tracker(),
std::bind(&server_state::set_max_replica_count, _state.get(), rpc),
server_state::sStateHash);
}

} // namespace replication
} // namespace dsn
1 change: 1 addition & 0 deletions src/meta/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ class meta_service : public serverlet<meta_service>

// get/set max_replica_count of an app
void on_get_max_replica_count(configuration_get_max_replica_count_rpc rpc);
void on_set_max_replica_count(configuration_set_max_replica_count_rpc rpc);

// common routines
// ret:
Expand Down
152 changes: 133 additions & 19 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@ void server_state::create_app(dsn::message_ex *msg)

auto level = _meta_svc->get_function_level();
if (level <= meta_function_level::fl_freezed) {
derror_f("current meta function level is freezed since there are too few alive nodes");
derror("current meta function level is freezed, since there are too few alive nodes");
response.err = ERR_STATE_FREEZED;
will_create_app = false;
} else if (request.options.partition_count <= 0 ||
Expand Down Expand Up @@ -2941,13 +2941,19 @@ bool validate_target_max_replica_count_internal(int32_t max_replica_count,

} // anonymous namespace

bool server_state::validate_target_max_replica_count(int32_t max_replica_count)
bool server_state::validate_target_max_replica_count(int32_t max_replica_count,
std::string &hint_message) const
{
auto alive_node_count = static_cast<int32_t>(_meta_svc->get_alive_node_count());
const auto alive_node_count = static_cast<int32_t>(_meta_svc->get_alive_node_count());

std::string hint_message;
bool valid = validate_target_max_replica_count_internal(
return validate_target_max_replica_count_internal(
max_replica_count, alive_node_count, hint_message);
}

bool server_state::validate_target_max_replica_count(int32_t max_replica_count) const
{
std::string hint_message;
const auto valid = validate_target_max_replica_count(max_replica_count, hint_message);
if (!valid) {
derror_f("target max replica count is invalid: message={}", hint_message);
}
Expand Down Expand Up @@ -3123,53 +3129,161 @@ void server_state::on_query_manual_compact_status(query_manual_compact_rpc rpc)
response.__set_progress(total_progress);
}

// ThreadPool: THREAD_POOL_META_STATE
void server_state::get_max_replica_count(configuration_get_max_replica_count_rpc rpc) const
template <typename Response>
std::shared_ptr<app_state> server_state::get_app_and_check_exist(const std::string &app_name,
Response &response) const
{
const auto &app_name = rpc.request().app_name;
auto &response = rpc.response();

zauto_read_lock l(_lock);

auto app = get_app(app_name);
if (app == nullptr) {
response.err = ERR_APP_NOT_EXIST;
response.max_replica_count = 0;
response.hint_message = fmt::format("app({}) does not exist", app_name);
dwarn_f("failed to get max_replica_count: app_name={}, error_code={}",
app_name,
response.err.to_string());
return;
}

return app;
}

template <typename Response>
bool server_state::check_max_replica_count_consistent(const std::shared_ptr<app_state> &app,
Response &response) const
{
for (int i = 0; i < static_cast<int>(app->partitions.size()); ++i) {
const auto &partition_config = app->partitions[i];
if (partition_config.max_replica_count == app->max_replica_count) {
continue;
}

response.err = ERR_INCONSISTENT_STATE;
response.max_replica_count = 0;
response.hint_message = fmt::format("partition_max_replica_count({}) != "
"app_max_replica_count({}) for partition {}",
partition_config.max_replica_count,
app->max_replica_count,
i);
derror_f("failed to get max_replica_count: app_name={}, error_code={}, hint_message={}",
return false;
}

return true;
}

// ThreadPool: THREAD_POOL_META_STATE
void server_state::get_max_replica_count(configuration_get_max_replica_count_rpc rpc) const
{
const auto &app_name = rpc.request().app_name;
auto &response = rpc.response();

zauto_read_lock l(_lock);

auto app = get_app_and_check_exist(app_name, response);
if (app == nullptr) {
response.max_replica_count = 0;
dwarn_f("failed to get max_replica_count: app_name={}, error_code={}, hint_message={}",
app_name,
response.err.to_string(),
response.hint_message);
return;
}

if (!check_max_replica_count_consistent(app, response)) {
response.max_replica_count = 0;
derror_f("failed to get max_replica_count: app_name={}, app_id={}, error_code={}, "
"hint_message={}",
app_name,
app->app_id,
response.err.to_string(),
response.hint_message);
return;
}

response.err = ERR_OK;
response.max_replica_count = app->max_replica_count;

ddebug_f("get max_replica_count successfully: app_name={}, app_id={}, "
"max_replica_count={}",
app_name,
app->app_id,
response.max_replica_count);
}

// ThreadPool: THREAD_POOL_META_STATE
void server_state::set_max_replica_count(configuration_set_max_replica_count_rpc rpc)
{
const auto &app_name = rpc.request().app_name;
const auto new_max_replica_count = rpc.request().max_replica_count;
auto &response = rpc.response();

int32_t app_id = 0;

{
zauto_read_lock l(_lock);

auto app = get_app_and_check_exist(app_name, response);
if (app == nullptr) {
response.old_max_replica_count = 0;
dwarn_f("failed to set max_replica_count: app_name={}, error_code={}, hint_message={}",
app_name,
response.err.to_string(),
response.hint_message);
return;
}

app_id = app->app_id;

if (!check_max_replica_count_consistent(app, response)) {
response.old_max_replica_count = 0;
derror_f("failed to set max_replica_count: app_name={}, app_id={}, error_code={}, "
"hint_message={}",
app_name,
app_id,
response.err.to_string(),
response.hint_message);
return;
}

response.old_max_replica_count = app->max_replica_count;
}

auto level = _meta_svc->get_function_level();
if (level <= meta_function_level::fl_freezed) {
response.err = ERR_STATE_FREEZED;
response.hint_message =
"current meta function level is freezed, since there are too few alive nodes";
derror_f(
"failed to set max_replica_count: app_name={}, app_id={}, error_code={}, message={}",
app_name,
app_id,
response.err.to_string(),
response.hint_message);
return;
}

if (!validate_target_max_replica_count(new_max_replica_count, response.hint_message)) {
response.err = ERR_INVALID_PARAMETERS;
dwarn_f(
"failed to set max_replica_count: app_name={}, app_id={}, error_code={}, message={}",
app_name,
app_id,
response.err.to_string(),
response.hint_message);
return;
}

if (new_max_replica_count == response.old_max_replica_count) {
response.err = ERR_OK;
response.hint_message = "no need to update max_replica_count since it's not changed";
dwarn_f("{}: app_name={}, app_id={}", response.hint_message, app_name, app_id);
return;
}

ddebug_f("request for {} max_replica_count: app_name={}, app_id={}, "
"old_max_replica_count={}, new_max_replica_count={}",
new_max_replica_count > response.old_max_replica_count ? "increasing" : "decreasing",
app_name,
app_id,
response.old_max_replica_count,
new_max_replica_count);

// TODO: update partition-level and app-level max_replica_count
response.err = ERR_OK;
}

} // namespace replication
} // namespace dsn
13 changes: 12 additions & 1 deletion src/meta/server_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class server_state

// get/set max_replica_count of an app
void get_max_replica_count(configuration_get_max_replica_count_rpc rpc) const;
void set_max_replica_count(configuration_set_max_replica_count_rpc rpc);

// return true if no need to do any actions
bool check_all_partitions();
Expand Down Expand Up @@ -300,7 +301,17 @@ class server_state
void transition_staging_state(std::shared_ptr<app_state> &app);

// check whether a max replica count is valid especially for a new app
bool validate_target_max_replica_count(int32_t max_replica_count);
bool validate_target_max_replica_count(int32_t max_replica_count,
std::string &hint_message) const;
bool validate_target_max_replica_count(int32_t max_replica_count) const;

template <typename Response>
std::shared_ptr<app_state> get_app_and_check_exist(const std::string &app_name,
Response &response) const;

template <typename Response>
bool check_max_replica_count_consistent(const std::shared_ptr<app_state> &app,
Response &response) const;

// Used for `on_start_manual_compaction`
bool parse_compaction_envs(start_manual_compact_rpc rpc,
Expand Down
Loading

0 comments on commit 9c8a988

Please sign in to comment.