Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(update_replication_factor#3): support to set the replication factor for all partitions of each table #1077

Merged
Merged
335 changes: 332 additions & 3 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3199,11 +3199,13 @@ void server_state::set_max_replica_count(configuration_set_max_replica_count_rpc
auto &response = rpc.response();

int32_t app_id = 0;
int32_t partition_count = 0;
std::shared_ptr<app_state> app;

{
zauto_read_lock l(_lock);

auto app = get_app_and_check_exist(app_name, response);
app = get_app_and_check_exist(app_name, response);
if (app == nullptr) {
response.old_max_replica_count = 0;
dwarn_f("failed to set max_replica_count: app_name={}, error_code={}, hint_message={}",
Expand All @@ -3227,6 +3229,20 @@ void server_state::set_max_replica_count(configuration_set_max_replica_count_rpc
}

response.old_max_replica_count = app->max_replica_count;

if (app->status != app_status::AS_AVAILABLE) {
response.err = ERR_INVALID_PARAMETERS;
response.hint_message = fmt::format("app({}) is not in available status", app_name);
derror_f("failed to set max_replica_count: app_name={}, app_id={}, error_code={}, "
"hint_message={}",
app_name,
app_id,
response.err.to_string(),
response.hint_message);
return;
}

partition_count = app->partition_count;
}

auto level = _meta_svc->get_function_level();
Expand Down Expand Up @@ -3269,8 +3285,321 @@ void server_state::set_max_replica_count(configuration_set_max_replica_count_rpc
response.old_max_replica_count,
new_max_replica_count);

// TODO: update partition-level and app-level max_replica_count
response.err = ERR_OK;
do_update_max_replica_count(app, app_id, partition_count, rpc);
}

// ThreadPool: THREAD_POOL_META_STATE
void server_state::do_update_max_replica_count(std::shared_ptr<app_state> &app,
int32_t app_id,
int32_t partition_count,
configuration_set_max_replica_count_rpc rpc)
hycdong marked this conversation as resolved.
Show resolved Hide resolved
{
std::shared_ptr<std::vector<error_code>> results(new std::vector<error_code>(partition_count));
std::shared_ptr<std::atomic_int> updates_in_progress(new std::atomic_int(partition_count));
hycdong marked this conversation as resolved.
Show resolved Hide resolved
auto on_partition_updated = [this, app_id, partition_count, rpc, results, updates_in_progress](
error_code ec, int32_t partition_index) {
const auto &app_name = rpc.request().app_name;
const auto new_max_replica_count = rpc.request().max_replica_count;

results->at(partition_index) = ec;

auto uncompleted = --(*updates_in_progress);
dassert_f(uncompleted >= 0,
"the uncompleted number should be >= 0 while updating partition-level"
"max_replica_count: uncompleted={}, app_name={}, app_id={}, "
"partition_index={}, partition_count={}, new_max_replica_count={}",
uncompleted,
app_name,
app_id,
partition_index,
partition_count,
new_max_replica_count);

if (uncompleted > 0) {
return;
}

for (int32_t i = 0; i < partition_count; ++i) {
if (results->at(i) == ERR_OK) {
continue;
}

dassert_f(false,
"An error that can't be handled occurs while updating partition-level"
"max_replica_count: error_code={}, app_name={}, app_id={}, "
"partition_index={}, partition_count={}, new_max_replica_count={}",
ec.to_string(),
app_name,
app_id,
i,
partition_count,
new_max_replica_count);
}

ddebug_f("all partitions have been changed to the new max_replica_count, ready to update "
"the app-level max_replica_count: app_name={}, app_id={}, partition_count={}, "
"new_max_replica_count={}",
app_name,
app_id,
partition_count,
new_max_replica_count);

// TODO: update app-level max_replica_count
auto &response = rpc.response();
response.err = ERR_OK;
};

const auto new_max_replica_count = rpc.request().max_replica_count;
update_all_partitions_max_replica_count(app, new_max_replica_count, on_partition_updated);
}

// ThreadPool: THREAD_POOL_META_STATE
void server_state::update_all_partitions_max_replica_count(std::shared_ptr<app_state> &app,
hycdong marked this conversation as resolved.
Show resolved Hide resolved
int32_t new_max_replica_count,
partition_callback on_partition_updated)
{
zauto_write_lock l(_lock);

for (int32_t i = 0; i < app->partition_count; ++i) {
update_partition_max_replica_count(app, i, new_max_replica_count, on_partition_updated);
}
}

// ThreadPool: THREAD_POOL_META_STATE
void server_state::update_partition_max_replica_count(std::shared_ptr<app_state> &app,
int32_t partition_index,
int32_t new_max_replica_count,
partition_callback on_partition_updated)
{
dassert_f(partition_index < app->partition_count,
"partition_index should be < partition_count: app_name={}, app_id={}, "
"partition_index={}, partition_count={}, new_max_replica_count={}",
app->app_name,
app->app_id,
partition_index,
app->partition_count,
new_max_replica_count);

const auto &old_partition_config = app->partitions[partition_index];
const auto old_max_replica_count = old_partition_config.max_replica_count;

if (new_max_replica_count == old_max_replica_count) {
dwarn_f("partition-level max_replica_count has been updated: app_name={}, "
"app_id={}, partition_index={}, new_max_replica_count={}",
app->app_name,
app->app_id,
partition_index,
new_max_replica_count);
return;
}

auto &context = app->helpers->contexts[partition_index];
if (context.stage == config_status::pending_remote_sync) {
ddebug_f("have to wait until another request which is syncing with remote storage "
"is finished, then process the current request of updating max_replica_count: "
"app_name={}, app_id={}, partition_index={}, new_max_replica_count={}",
app->app_name,
app->app_id,
partition_index,
new_max_replica_count);

tasking::enqueue(
LPC_META_STATE_HIGH,
tracker(),
[this, app, partition_index, new_max_replica_count, on_partition_updated]() mutable {
update_partition_max_replica_count(
app, partition_index, new_max_replica_count, on_partition_updated);
},
server_state::sStateHash,
std::chrono::milliseconds(100));
return;
}

dassert_f(context.stage == config_status::not_pending,
"invalid config status while updating max_replica_count: context.stage={}, "
"app_name={}, app_id={}, partition_index={}, new_max_replica_count={}",
enum_to_string(context.stage),
app->app_name,
app->app_id,
partition_index,
new_max_replica_count);

context.stage = config_status::pending_remote_sync;
context.pending_sync_request.reset();
context.msg = nullptr;

auto new_partition_config = old_partition_config;
new_partition_config.max_replica_count = new_max_replica_count;
++(new_partition_config.ballot);
context.pending_sync_task = update_partition_max_replica_count_on_remote(
app, new_partition_config, on_partition_updated);
}

// ThreadPool: THREAD_POOL_META_STATE
task_ptr server_state::update_partition_max_replica_count_on_remote(
std::shared_ptr<app_state> &app,
const partition_configuration &new_partition_config,
partition_callback on_partition_updated)
{
const auto &gpid = new_partition_config.pid;
const auto partition_index = gpid.get_partition_index();
const auto new_max_replica_count = new_partition_config.max_replica_count;
const auto new_ballot = new_partition_config.ballot;

const auto level = _meta_svc->get_function_level();
if (level <= meta_function_level::fl_blind) {
dwarn_f("have to wait until meta level becomes more than fl_blind, then process the "
"current request of updating max_replica_count: current_meta_level={}, "
"app_name={}, app_id={}, partition_index={}, new_max_replica_count={}, "
"new_ballot={}",
_meta_function_level_VALUES_TO_NAMES.find(level)->second,
app->app_name,
app->app_id,
partition_index,
new_max_replica_count,
new_ballot);

// NOTICE: pending_sync_task should be reassigned
return tasking::enqueue(LPC_META_STATE_HIGH,
tracker(),
[this, app, new_partition_config, on_partition_updated]() mutable {
const auto &gpid = new_partition_config.pid;
const auto partition_index = gpid.get_partition_index();

zauto_write_lock l(_lock);

auto &context = app->helpers->contexts[partition_index];
context.pending_sync_task =
update_partition_max_replica_count_on_remote(
app, new_partition_config, on_partition_updated);
},
server_state::sStateHash,
std::chrono::seconds(1));
}

ddebug_f("request for updating partition-level max_replica_count on remote storage: "
"app_name={}, app_id={}, partition_id={}, new_max_replica_count={}, new_ballot={}",
app->app_name,
app->app_id,
partition_index,
new_max_replica_count,
new_ballot);

auto partition_path = get_partition_path(gpid);
auto json_config =
dsn::json::json_forwarder<partition_configuration>::encode(new_partition_config);
return _meta_svc->get_remote_storage()->set_data(
partition_path,
json_config,
LPC_META_STATE_HIGH,
std::bind(&server_state::on_update_partition_max_replica_count_on_remote_reply,
this,
std::placeholders::_1,
app,
new_partition_config,
on_partition_updated),
tracker());
}

// ThreadPool: THREAD_POOL_META_STATE
void server_state::on_update_partition_max_replica_count_on_remote_reply(
error_code ec,
std::shared_ptr<app_state> &app,
const partition_configuration &new_partition_config,
partition_callback on_partition_updated)
{
const auto &gpid = new_partition_config.pid;
const auto partition_index = gpid.get_partition_index();
const auto new_max_replica_count = new_partition_config.max_replica_count;
const auto new_ballot = new_partition_config.ballot;

zauto_write_lock l(_lock);

ddebug_f("reply for updating partition-level max_replica_count on remote storage: "
"error_code={}, app_name={}, app_id={}, partition_id={}, new_max_replica_count={}, "
"new_ballot={}",
ec.to_string(),
app->app_name,
app->app_id,
partition_index,
new_max_replica_count,
new_ballot);

auto &context = app->helpers->contexts[partition_index];
if (ec == ERR_TIMEOUT) {
// NOTICE: pending_sync_task need to be reassigned
context.pending_sync_task =
tasking::enqueue(LPC_META_STATE_HIGH,
tracker(),
[this, app, new_partition_config, on_partition_updated]() mutable {
const auto &gpid = new_partition_config.pid;
const auto partition_index = gpid.get_partition_index();

zauto_write_lock l(_lock);

auto &context = app->helpers->contexts[partition_index];
context.pending_sync_task =
update_partition_max_replica_count_on_remote(
app, new_partition_config, on_partition_updated);
},
server_state::sStateHash,
std::chrono::seconds(1));
return;
}

if (ec != ERR_OK) {
on_partition_updated(ec, partition_index);
return;
}

update_partition_max_replica_count_locally(app, new_partition_config);

context.pending_sync_task = nullptr;
context.pending_sync_request.reset();
context.stage = config_status::not_pending;
context.msg = nullptr;

on_partition_updated(ec, partition_index);
}

// ThreadPool: THREAD_POOL_META_STATE
void server_state::update_partition_max_replica_count_locally(
std::shared_ptr<app_state> &app, const partition_configuration &new_partition_config)
{
const auto &gpid = new_partition_config.pid;
const auto partition_index = gpid.get_partition_index();
const auto new_max_replica_count = new_partition_config.max_replica_count;
const auto new_ballot = new_partition_config.ballot;

auto &old_partition_config = app->partitions[gpid.get_partition_index()];
const auto old_max_replica_count = old_partition_config.max_replica_count;
const auto old_ballot = old_partition_config.ballot;

dassert_f(old_ballot + 1 == new_ballot,
"invalid ballot while updating local max_replica_count: app_name={}, app_id={}, "
"partition_id={}, old_max_replica_count={}, new_max_replica_count={}, "
"old_ballot={}, new_ballot={}",
app->app_name,
app->app_id,
partition_index,
old_max_replica_count,
new_max_replica_count,
old_ballot,
new_ballot);

std::string old_config_str(boost::lexical_cast<std::string>(old_partition_config));
std::string new_config_str(boost::lexical_cast<std::string>(new_partition_config));

old_partition_config = new_partition_config;

ddebug_f("local partition-level max_replica_count has been changed successfully: ",
"app_name={}, app_id={}, partition_id={}, old_partition_config={}, "
"new_partition_config={}",
app->app_name,
app->app_id,
partition_index,
old_config_str,
new_config_str);
}

} // namespace replication
Expand Down
25 changes: 25 additions & 0 deletions src/meta/server_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,31 @@ class server_state
bool check_max_replica_count_consistent(const std::shared_ptr<app_state> &app,
Response &response) const;

using partition_callback = std::function<void(error_code, int32_t)>;
void do_update_max_replica_count(std::shared_ptr<app_state> &app,
int32_t app_id,
int32_t partition_count,
configuration_set_max_replica_count_rpc rpc);
void update_all_partitions_max_replica_count(std::shared_ptr<app_state> &app,
int32_t new_max_replica_count,
partition_callback on_partition_updated);
void update_partition_max_replica_count(std::shared_ptr<app_state> &app,
int32_t partition_index,
int32_t new_max_replica_count,
partition_callback on_partition_updated);
task_ptr update_partition_max_replica_count_on_remote(
std::shared_ptr<app_state> &app,
const partition_configuration &new_partition_config,
partition_callback on_partition_updated);
void on_update_partition_max_replica_count_on_remote_reply(
error_code ec,
std::shared_ptr<app_state> &app,
const partition_configuration &new_partition_config,
partition_callback on_partition_updated);
void
update_partition_max_replica_count_locally(std::shared_ptr<app_state> &app,
const partition_configuration &new_partition_config);

// Used for `on_start_manual_compaction`
bool parse_compaction_envs(start_manual_compact_rpc rpc,
std::vector<std::string> &keys,
Expand Down
Loading