From 09580586b230e3a488e502c18dc1146ac2e1a935 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 16 Mar 2022 17:55:46 +0800 Subject: [PATCH 1/9] feat(update_replication_factor#3): support to set the replication factor for all partitions of each table --- src/meta/server_state.cpp | 359 +++++++++++++++++++++++++++++++++++++- src/meta/server_state.h | 17 ++ 2 files changed, 374 insertions(+), 2 deletions(-) diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index 0d0d33981a..afd43ae1fc 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -3269,8 +3269,363 @@ void server_state::set_max_replica_count(configuration_set_max_replica_count_rpc response.old_max_replica_count, new_max_replica_count); - // TODO: update partition-level and app-level max_replica_count - response.err = ERR_OK; + // update max_replica_count of each partition from index 0 + tasking::enqueue(LPC_META_STATE_HIGH, + tracker(), + [this, rpc]() { update_partition_max_replica_count(0, rpc); }, + server_state::sStateHash); +} + +// ThreadPool: THREAD_POOL_META_STATE +void server_state::update_partition_max_replica_count(int32_t partition_index, + configuration_set_max_replica_count_rpc rpc) +{ + const auto &app_name = rpc.request().app_name; + auto &response = rpc.response(); + + zauto_write_lock l(_lock); + + auto app = get_app_and_check_exist(app_name, response); + if (app == nullptr) { + response.hint_message += fmt::format(" while updating max_replica_count of partition {}", partition_index); + derror_f("failed to set max_replica_count: app_name={}, error_code={}, hint_message={}", + app_name, + response.err.to_string(), + response.hint_message); + return; + } + + const auto app_id = app->app_id; + + dassert_f(app->is_stateful, + "don't support stateless apps currently: app_name={}, app_id={}", + app_name, + app_id); + + const auto new_max_replica_count = rpc.request().max_replica_count; + + const auto partition_count = static_cast(app->partitions.size()); + if (partition_index >= partition_count) { + dwarn_f("partition-level max_replica_count might have been decreased, " + "try to update app-level max_replica_count: app_name={}, app_id={}, " + "partition_index={}, partition_count={}, new_max_replica_count={}", + app_name, + app_id, + partition_index, + partition_count, + new_max_replica_count); + + // TODO: update app-level max_replica_count + response.err = ERR_OK; + return; + } + + const auto &old_partition_config = app->partitions[partition_index]; + const auto old_max_replica_count = old_partition_config.max_replica_count; + + if (new_max_replica_count == old_max_replica_count) { + dwarn_f("partition-level max_replica_count has been updated, continue to update the next " + "partition: app_name={}, app_id={}, partition_index={}, new_max_replica_count={}", + app_name, + app_id, + partition_index, + new_max_replica_count); + + update_next_partition_max_replica_count(*app, partition_index, rpc); + return; + } + + auto &context = app->helpers->contexts[partition_index]; + if (context.stage == config_status::pending_remote_sync) { + ddebug_f("have to wait until another request which is syncing with remote storage " + "is finished, then process the current request of updating max_replica_count: " + "app_name={}, app_id={}, partition_index={}, new_max_replica_count={}", + app_name, + app_id, + partition_index, + new_max_replica_count); + + tasking::enqueue(LPC_META_STATE_HIGH, + tracker(), + [this, rpc, partition_index]() { + update_partition_max_replica_count(partition_index, rpc); + }, + server_state::sStateHash, + std::chrono::milliseconds(100)); + return; + } + + dassert_f(context.stage == config_status::not_pending, + "invalid config status while updating max_replica_count: context.stage={}, " + "app_name={}, app_id={}, partition_index={}, new_max_replica_count={}", + enum_to_string(context.stage), + app_name, + app_id, + partition_index, + new_max_replica_count); + + context.stage = config_status::pending_remote_sync; + context.pending_sync_request.reset(); + context.msg = nullptr; + + auto new_partition_config = old_partition_config; + new_partition_config.max_replica_count = new_max_replica_count; + ++(new_partition_config.ballot); + context.pending_sync_task = + update_partition_max_replica_count_on_remote(new_partition_config, rpc); +} + +// ThreadPool: THREAD_POOL_META_STATE +task_ptr server_state::update_partition_max_replica_count_on_remote( + const partition_configuration &new_partition_config, + configuration_set_max_replica_count_rpc rpc) +{ + const auto &gpid = new_partition_config.pid; + const auto app_id = gpid.get_app_id(); + const auto partition_index = gpid.get_partition_index(); + const auto new_max_replica_count = new_partition_config.max_replica_count; + const auto new_ballot = new_partition_config.ballot; + const auto &app_name = rpc.request().app_name; + + auto level = _meta_svc->get_function_level(); + if (level <= meta_function_level::fl_blind) { + dwarn_f("have to wait until meta level becomes more than fl_blind, then process the " + "current request of updating max_replica_count: current_meta_level={}, " + "app_name={}, app_id={}, partition_index={}, new_max_replica_count={}, " + "new_ballot={}", + _meta_function_level_VALUES_TO_NAMES.find(level)->second, + app_name, + app_id, + partition_index, + new_max_replica_count, + new_ballot); + + // NOTICE: pending_sync_task need to be reassigned + return tasking::enqueue(LPC_META_STATE_HIGH, + tracker(), + [this, new_partition_config, rpc]() { + const auto &gpid = new_partition_config.pid; + const auto partition_index = gpid.get_partition_index(); + + const auto &app_name = rpc.request().app_name; + auto &response = rpc.response(); + + zauto_write_lock l(_lock); + + auto app = get_app_and_check_exist(app_name, response); + if (app == nullptr) { + const auto app_id = gpid.get_app_id(); + response.hint_message += fmt::format(" while ready to update max_replica_count of partition {} on remote", partition_index); + derror_f("failed to set max_replica_count: app_name={}, app_id={}, error_code={}, hint_message={}", + app_name, + app_id, + response.err.to_string(), + response.hint_message); + return; + } + + auto &context = app->helpers->contexts[partition_index]; + context.pending_sync_task = + update_partition_max_replica_count_on_remote( + new_partition_config, rpc); + }, + server_state::sStateHash, + std::chrono::seconds(1)); + } + + ddebug_f("request for updating partition-level max_replica_count on remote storage: " + "app_name={}, app_id={}, partition_id={}, new_max_replica_count={}, new_ballot={}", + app_name, + app_id, + partition_index, + new_max_replica_count, + new_ballot); + + auto partition_path = get_partition_path(gpid); + auto json_config = + dsn::json::json_forwarder::encode(new_partition_config); + return _meta_svc->get_remote_storage()->set_data( + partition_path, + json_config, + LPC_META_STATE_HIGH, + std::bind(&server_state::on_update_partition_max_replica_count_on_remote_reply, + this, + std::placeholders::_1, + new_partition_config, + rpc), + tracker()); +} + +// ThreadPool: THREAD_POOL_META_STATE +void server_state::on_update_partition_max_replica_count_on_remote_reply( + error_code ec, + const partition_configuration &new_partition_config, + configuration_set_max_replica_count_rpc rpc) +{ + const auto &gpid = new_partition_config.pid; + const auto app_id = gpid.get_app_id(); + const auto partition_index = gpid.get_partition_index(); + const auto new_max_replica_count = new_partition_config.max_replica_count; + const auto new_ballot = new_partition_config.ballot; + + const auto &app_name = rpc.request().app_name; + auto &response = rpc.response(); + + ddebug_f("reply for updating partition-level max_replica_count on remote storage: " + "error_code={}, app_name={}, app_id={}, partition_id={}, new_max_replica_count={}, " + "new_ballot={}", + ec.to_string(), + app_name, + app_id, + partition_index, + new_max_replica_count, + new_ballot); + + zauto_write_lock l(_lock); + + auto app = get_app_and_check_exist(app_name, response); + if (app == nullptr) { + response.hint_message += fmt::format(" while replying for updating max_replica_count of partition {} on remote", partition_index); + derror_f("failed to set max_replica_count: app_name={}, app_id={}, error_code={}, hint_message={}", + app_name, + app_id, + response.err.to_string(), + response.hint_message); + return; + } + + // if multiple threads exist in the thread pool, the check may failed + dassert_f(app->status == app_status::AS_AVAILABLE || app->status == app_status::AS_DROPPING, + "invalid app_status (if app removed, this task should be cancelled): app_status={}", + ::dsn::enum_to_string(app->status)); + + auto &context = app->helpers->contexts[partition_index]; + if (ec == ERR_TIMEOUT) { + // NOTICE: pending_sync_task need to be reassigned + context.pending_sync_task = tasking::enqueue( + LPC_META_STATE_HIGH, + tracker(), + [this, new_partition_config, rpc]() { + const auto &gpid = new_partition_config.pid; + const auto partition_index = gpid.get_partition_index(); + + const auto &app_name = rpc.request().app_name; + auto &response = rpc.response(); + + zauto_write_lock l(_lock); + + auto app = get_app_and_check_exist(app_name, response); + if (app == nullptr) { + const auto app_id = gpid.get_app_id(); + response.hint_message += fmt::format(" while timeout for updating max_replica_count of partition {} on remote", partition_index); + derror_f("failed to set max_replica_count: app_name={}, app_id={}, error_code={}, hint_message={}", + app_name, + app_id, + response.err.to_string(), + response.hint_message); + return; + } + + auto &context = app->helpers->contexts[partition_index]; + context.pending_sync_task = + update_partition_max_replica_count_on_remote(new_partition_config, rpc); + }, + server_state::sStateHash, + std::chrono::seconds(1)); + return; + } + + if (ec != ERR_OK) { + dassert_f(false, "we can't handle this right now: error_code={}", ec.to_string()); + return; + } + + update_partition_max_replica_count_locally(new_partition_config, *app); + + context.pending_sync_task = nullptr; + context.pending_sync_request.reset(); + context.stage = config_status::not_pending; + context.msg = nullptr; + + update_next_partition_max_replica_count(*app, partition_index, rpc); +} + +// ThreadPool: THREAD_POOL_META_STATE +void server_state::update_partition_max_replica_count_locally( + const partition_configuration &new_partition_config, app_state &app) +{ + const auto &gpid = new_partition_config.pid; + const auto app_id = gpid.get_app_id(); + const auto partition_index = gpid.get_partition_index(); + + const auto &app_name = app.app_name; + auto &old_partition_config = app.partitions[gpid.get_partition_index()]; + + if (app.is_stateful) { + const auto new_max_replica_count = new_partition_config.max_replica_count; + const auto new_ballot = new_partition_config.ballot; + + const auto old_max_replica_count = old_partition_config.max_replica_count; + const auto old_ballot = old_partition_config.ballot; + + dassert_f(old_ballot + 1 == new_ballot, + "invalid ballot while updating local max_replica_count: app_name={}, app_id={}, " + "partition_id={}, old_max_replica_count={}, new_max_replica_count={}, " + "old_ballot={}, new_ballot={}", + app_name, + app_id, + partition_index, + old_max_replica_count, + new_max_replica_count, + old_ballot, + new_ballot); + } + + std::string old_config_str(boost::lexical_cast(old_partition_config)); + std::string new_config_str(boost::lexical_cast(new_partition_config)); + + old_partition_config = new_partition_config; + + ddebug_f("local partition-level max_replica_count has been changed successfully: ", + "app_name={}, app_id={}, partition_id={}, old_partition_config={}, " + "new_partition_config={}", + app_name, + app_id, + partition_index, + old_config_str, + new_config_str); +} + +// ThreadPool: THREAD_POOL_META_STATE +void server_state::update_next_partition_max_replica_count( + const app_state &app, int32_t partition_index, configuration_set_max_replica_count_rpc rpc) +{ + const auto next_partition_index = partition_index + 1; + const auto partition_count = static_cast(app.partitions.size()); + + if (next_partition_index >= partition_count) { + const auto new_max_replica_count = rpc.request().max_replica_count; + ddebug_f("all partitions have been changed to the new max_replica_count, ready to update " + "the app-level max_replica_count: app_name={}, app_id={}, partition_id={}, " + "partition_count={}, old_max_replica_count={}, new_max_replica_count={}", + app.app_name, + app.app_id, + partition_index, + partition_count, + app.max_replica_count, + new_max_replica_count); + + // TODO: update app-level max_replica_count + response.err = ERR_OK; + return; + } + + tasking::enqueue(LPC_META_STATE_HIGH, + tracker(), + [this, rpc, next_partition_index]() { + update_partition_max_replica_count(next_partition_index, rpc); + }, + server_state::sStateHash); } } // namespace replication diff --git a/src/meta/server_state.h b/src/meta/server_state.h index 63f8d472dc..9e5c311b6a 100644 --- a/src/meta/server_state.h +++ b/src/meta/server_state.h @@ -313,6 +313,23 @@ class server_state bool check_max_replica_count_consistent(const std::shared_ptr &app, Response &response) const; + void update_partition_max_replica_count(int partition_index, + configuration_set_max_replica_count_rpc rpc); + + task_ptr update_partition_max_replica_count_on_remote( + const partition_configuration &new_partition_config, + configuration_set_max_replica_count_rpc rpc); + void on_update_partition_max_replica_count_on_remote_reply( + error_code ec, + const partition_configuration &new_partition_config, + configuration_set_max_replica_count_rpc rpc); + void + update_partition_max_replica_count_locally(const partition_configuration &new_partition_config, + app_state &app); + void update_next_partition_max_replica_count(const app_state &app, + int32_t partition_index, + configuration_set_max_replica_count_rpc rpc); + // Used for `on_start_manual_compaction` bool parse_compaction_envs(start_manual_compact_rpc rpc, std::vector &keys, From c6d4379c65906fb81ce5c0e7082bbb0df9545856 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 16 Mar 2022 18:25:21 +0800 Subject: [PATCH 2/9] feat(update_replication_factor#3): support to set the replication factor for all partitions of each table --- src/meta/server_state.cpp | 116 +++++++++++++++++++++----------------- 1 file changed, 63 insertions(+), 53 deletions(-) diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index afd43ae1fc..72e42cccba 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -3287,11 +3287,12 @@ void server_state::update_partition_max_replica_count(int32_t partition_index, auto app = get_app_and_check_exist(app_name, response); if (app == nullptr) { - response.hint_message += fmt::format(" while updating max_replica_count of partition {}", partition_index); + response.hint_message += + fmt::format(" while updating max_replica_count of partition {}", partition_index); derror_f("failed to set max_replica_count: app_name={}, error_code={}, hint_message={}", - app_name, - response.err.to_string(), - response.hint_message); + app_name, + response.err.to_string(), + response.hint_message); return; } @@ -3302,9 +3303,8 @@ void server_state::update_partition_max_replica_count(int32_t partition_index, app_name, app_id); - const auto new_max_replica_count = rpc.request().max_replica_count; - const auto partition_count = static_cast(app->partitions.size()); + const auto new_max_replica_count = rpc.request().max_replica_count; if (partition_index >= partition_count) { dwarn_f("partition-level max_replica_count might have been decreased, " "try to update app-level max_replica_count: app_name={}, app_id={}, " @@ -3400,37 +3400,39 @@ task_ptr server_state::update_partition_max_replica_count_on_remote( new_max_replica_count, new_ballot); - // NOTICE: pending_sync_task need to be reassigned - return tasking::enqueue(LPC_META_STATE_HIGH, - tracker(), - [this, new_partition_config, rpc]() { - const auto &gpid = new_partition_config.pid; - const auto partition_index = gpid.get_partition_index(); - - const auto &app_name = rpc.request().app_name; - auto &response = rpc.response(); - - zauto_write_lock l(_lock); - - auto app = get_app_and_check_exist(app_name, response); - if (app == nullptr) { - const auto app_id = gpid.get_app_id(); - response.hint_message += fmt::format(" while ready to update max_replica_count of partition {} on remote", partition_index); - derror_f("failed to set max_replica_count: app_name={}, app_id={}, error_code={}, hint_message={}", - app_name, - app_id, - response.err.to_string(), - response.hint_message); - return; - } - - auto &context = app->helpers->contexts[partition_index]; - context.pending_sync_task = - update_partition_max_replica_count_on_remote( - new_partition_config, rpc); - }, - server_state::sStateHash, - std::chrono::seconds(1)); + // NOTICE: pending_sync_task should be reassigned + return tasking::enqueue( + LPC_META_STATE_HIGH, + tracker(), + [this, new_partition_config, rpc]() { + const auto &gpid = new_partition_config.pid; + const auto partition_index = gpid.get_partition_index(); + + const auto &app_name = rpc.request().app_name; + auto &response = rpc.response(); + + zauto_write_lock l(_lock); + + auto app = get_app_and_check_exist(app_name, response); + if (app == nullptr) { + response.hint_message += fmt::format( + " while ready to update max_replica_count of partition {} on remote", + partition_index); + derror_f("failed to set max_replica_count: app_name={}, app_id={}, " + "error_code={}, hint_message={}", + app_name, + gpid.get_app_id(), + response.err.to_string(), + response.hint_message); + return; + } + + auto &context = app->helpers->contexts[partition_index]; + context.pending_sync_task = + update_partition_max_replica_count_on_remote(new_partition_config, rpc); + }, + server_state::sStateHash, + std::chrono::seconds(1)); } ddebug_f("request for updating partition-level max_replica_count on remote storage: " @@ -3485,19 +3487,23 @@ void server_state::on_update_partition_max_replica_count_on_remote_reply( auto app = get_app_and_check_exist(app_name, response); if (app == nullptr) { - response.hint_message += fmt::format(" while replying for updating max_replica_count of partition {} on remote", partition_index); - derror_f("failed to set max_replica_count: app_name={}, app_id={}, error_code={}, hint_message={}", - app_name, - app_id, - response.err.to_string(), - response.hint_message); + response.hint_message += + fmt::format(" while replying for updating max_replica_count of partition {} on remote", + partition_index); + derror_f("failed to set max_replica_count: app_name={}, app_id={}, error_code={}, " + "hint_message={}", + app_name, + app_id, + response.err.to_string(), + response.hint_message); return; } // if multiple threads exist in the thread pool, the check may failed - dassert_f(app->status == app_status::AS_AVAILABLE || app->status == app_status::AS_DROPPING, - "invalid app_status (if app removed, this task should be cancelled): app_status={}", - ::dsn::enum_to_string(app->status)); + dassert_f( + app->status == app_status::AS_AVAILABLE || app->status == app_status::AS_DROPPING, + "invalid app_status (if app is removed, this task should be cancelled): app_status={}", + ::dsn::enum_to_string(app->status)); auto &context = app->helpers->contexts[partition_index]; if (ec == ERR_TIMEOUT) { @@ -3516,13 +3522,15 @@ void server_state::on_update_partition_max_replica_count_on_remote_reply( auto app = get_app_and_check_exist(app_name, response); if (app == nullptr) { - const auto app_id = gpid.get_app_id(); - response.hint_message += fmt::format(" while timeout for updating max_replica_count of partition {} on remote", partition_index); - derror_f("failed to set max_replica_count: app_name={}, app_id={}, error_code={}, hint_message={}", - app_name, - app_id, - response.err.to_string(), - response.hint_message); + response.hint_message += fmt::format( + " while timeout for updating max_replica_count of partition {} on remote", + partition_index); + derror_f("failed to set max_replica_count: app_name={}, app_id={}, " + "error_code={}, hint_message={}", + app_name, + gpid.get_app_id(), + response.err.to_string(), + response.hint_message); return; } @@ -3605,6 +3613,7 @@ void server_state::update_next_partition_max_replica_count( if (next_partition_index >= partition_count) { const auto new_max_replica_count = rpc.request().max_replica_count; + ddebug_f("all partitions have been changed to the new max_replica_count, ready to update " "the app-level max_replica_count: app_name={}, app_id={}, partition_id={}, " "partition_count={}, old_max_replica_count={}, new_max_replica_count={}", @@ -3616,6 +3625,7 @@ void server_state::update_next_partition_max_replica_count( new_max_replica_count); // TODO: update app-level max_replica_count + auto &response = rpc.response(); response.err = ERR_OK; return; } From 01acd9194cfea05624c41bf28aab7f931ef8ba61 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 16 Mar 2022 21:19:30 +0800 Subject: [PATCH 3/9] feat(update_replication_factor#3): support to set the replication factor for all partitions of each table --- src/meta/test/meta_app_operation_test.cpp | 51 ++++++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/src/meta/test/meta_app_operation_test.cpp b/src/meta/test/meta_app_operation_test.cpp index 450759cc2e..9101543295 100644 --- a/src/meta/test/meta_app_operation_test.cpp +++ b/src/meta/test/meta_app_operation_test.cpp @@ -157,6 +157,40 @@ class meta_app_operation_test : public meta_test_base app->max_replica_count = max_replica_count; } + void verify_all_partitions_max_replica_count(const std::string &app_name, + int32_t expected_max_replica_count) + { + auto app = find_app(app_name); + dassert_f(app != nullptr, "app({}) does not exist", app_name); + + auto partition_size = static_cast(app->partitions.size()); + for (int i = 0; i < partition_size; ++i) { + // verify local max_replica_count of each partition + auto &partition_config = app->partitions[i]; + ASSERT_EQ(partition_config.max_replica_count, expected_max_replica_count); + + // verify remote max_replica_count of each partition + auto partition_path = _ss->get_partition_path(partition_config.pid); + auto storage = _ms->get_remote_storage(); + dsn::task_tracker tracker; + storage->get_data( + partition_path, + LPC_META_CALLBACK, + [this, expected_pid = partition_config.pid, expected_max_replica_count](error_code ec, + const blob &value) { + ASSERT_EQ(ec, ERR_OK); + + partition_configuration partition_config; + dsn::json::json_forwarder::decode(value, partition_config); + + ASSERT_EQ(partition_config.pid, expected_pid); + ASSERT_EQ(partition_config.max_replica_count, expected_max_replica_count); + }, + &tracker); + tracker.wait_outstanding_tasks(); + } + } + const std::string APP_NAME = "app_operation_test"; const std::string OLD_APP_NAME = "old_app_operation"; }; @@ -498,6 +532,12 @@ TEST_F(meta_app_operation_test, set_max_replica_count) // - cluster is freezed (alive_node_count = 0) // - cluster is freezed (alive_node_count = 1 < min_live_node_count_for_unfreeze) // - cluster is freezed (alive_node_count = 2 < min_live_node_count_for_unfreeze) + // - increase with valid max_replica_count (= max_allowed_replica_count, and = alive_node_count) + // - decrease with valid max_replica_count (= max_allowed_replica_count, and = alive_node_count) + // - increase with valid max_replica_count (= max_allowed_replica_count, and < alive_node_count) + // - decrease with valid max_replica_count (= max_allowed_replica_count, and < alive_node_count) + // - increase with valid max_replica_count (< max_allowed_replica_count, and = alive_node_count) + // - decrease with valid max_replica_count (< max_allowed_replica_count, and = alive_node_count) struct test_case { std::string app_name; @@ -528,7 +568,13 @@ TEST_F(meta_app_operation_test, set_max_replica_count) {APP_NAME, 2, 2, 2, 1, 1, 2, 3, ERR_INVALID_PARAMETERS}, {APP_NAME, 1, 1, 2, 1, 0, 1, 3, ERR_STATE_FREEZED}, {APP_NAME, 1, 1, 2, 2, 1, 1, 3, ERR_STATE_FREEZED}, - {APP_NAME, 1, 1, 2, 3, 2, 1, 3, ERR_STATE_FREEZED}}; + {APP_NAME, 1, 1, 2, 3, 2, 1, 3, ERR_STATE_FREEZED}, + {APP_NAME, 1, 1, 2, 1, 2, 1, 2, ERR_OK}, + {APP_NAME, 2, 2, 1, 1, 1, 1, 1, ERR_OK}, + {APP_NAME, 1, 1, 2, 1, 3, 1, 2, ERR_OK}, + {APP_NAME, 2, 2, 1, 1, 2, 1, 1, ERR_OK}, + {APP_NAME, 1, 1, 2, 1, 2, 1, 3, ERR_OK}, + {APP_NAME, 3, 3, 2, 1, 2, 1, 3, ERR_OK}}; const int32_t total_node_count = 3; auto nodes = ensure_enough_alive_nodes(total_node_count); @@ -595,6 +641,9 @@ TEST_F(meta_app_operation_test, set_max_replica_count) const auto set_resp = set_max_replica_count(test.app_name, test.new_max_replica_count); ASSERT_EQ(set_resp.err, test.expected_err); ASSERT_EQ(set_resp.old_max_replica_count, test.expected_old_max_replica_count); + if (test.expected_err == ERR_OK) { + verify_all_partitions_max_replica_count(test.app_name, test.new_max_replica_count); + } const auto get_resp = get_max_replica_count(test.app_name); if (test.expected_err == ERR_APP_NOT_EXIST || test.expected_err == ERR_INCONSISTENT_STATE) { From e45d08a802f7025b362c644c5db7e9cfcf6c515b Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 16 Mar 2022 21:32:16 +0800 Subject: [PATCH 4/9] feat(update_replication_factor#3): support to set the replication factor for all partitions of each table --- src/meta/test/meta_app_operation_test.cpp | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/meta/test/meta_app_operation_test.cpp b/src/meta/test/meta_app_operation_test.cpp index 9101543295..62d626c354 100644 --- a/src/meta/test/meta_app_operation_test.cpp +++ b/src/meta/test/meta_app_operation_test.cpp @@ -174,19 +174,20 @@ class meta_app_operation_test : public meta_test_base auto storage = _ms->get_remote_storage(); dsn::task_tracker tracker; storage->get_data( - partition_path, - LPC_META_CALLBACK, - [this, expected_pid = partition_config.pid, expected_max_replica_count](error_code ec, - const blob &value) { + partition_path, + LPC_META_CALLBACK, + [ this, expected_pid = partition_config.pid, expected_max_replica_count ]( + error_code ec, const blob &value) { ASSERT_EQ(ec, ERR_OK); partition_configuration partition_config; - dsn::json::json_forwarder::decode(value, partition_config); + dsn::json::json_forwarder::decode(value, + partition_config); ASSERT_EQ(partition_config.pid, expected_pid); ASSERT_EQ(partition_config.max_replica_count, expected_max_replica_count); - }, - &tracker); + }, + &tracker); tracker.wait_outstanding_tasks(); } } @@ -648,7 +649,7 @@ TEST_F(meta_app_operation_test, set_max_replica_count) const auto get_resp = get_max_replica_count(test.app_name); if (test.expected_err == ERR_APP_NOT_EXIST || test.expected_err == ERR_INCONSISTENT_STATE) { ASSERT_EQ(get_resp.err, test.expected_err); - } else { + } else if (test.expected_err != ERR_OK) { ASSERT_EQ(get_resp.err, ERR_OK); } From d0abb074fb2b6080895d224976dda2a33c9d9e8d Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Thu, 17 Mar 2022 01:10:10 +0800 Subject: [PATCH 5/9] feat(update_replication_factor#3): support to set the replication factor for all partitions of each table --- src/meta/test/meta_app_operation_test.cpp | 64 +++++++++++++++++++++-- 1 file changed, 61 insertions(+), 3 deletions(-) diff --git a/src/meta/test/meta_app_operation_test.cpp b/src/meta/test/meta_app_operation_test.cpp index 62d626c354..c9546515b4 100644 --- a/src/meta/test/meta_app_operation_test.cpp +++ b/src/meta/test/meta_app_operation_test.cpp @@ -151,10 +151,37 @@ class meta_app_operation_test : public meta_test_base auto partition_size = static_cast(app->partitions.size()); for (int i = 0; i < partition_size; ++i) { + // set local max_replica_count of each partition auto &partition_config = app->partitions[i]; partition_config.max_replica_count = max_replica_count; + + // set remote max_replica_count of each partition + auto partition_path = _ss->get_partition_path(partition_config.pid); + auto json_config = + dsn::json::json_forwarder::encode(partition_config); + dsn::task_tracker tracker; + _ms->get_remote_storage()->set_data(partition_path, + json_config, + LPC_META_STATE_HIGH, + [](dsn::error_code ec) { ASSERT_EQ(ec, ERR_OK); }, + &tracker); + tracker.wait_outstanding_tasks(); } + + // set local max_replica_count of app app->max_replica_count = max_replica_count; + + // set remote max_replica_count of app + auto app_path = _ss->get_app_path(*app); + auto ainfo = *(reinterpret_cast(app.get())); + auto json_config = dsn::json::json_forwarder::encode(ainfo); + dsn::task_tracker tracker; + _ms->get_remote_storage()->set_data(app_path, + json_config, + LPC_META_STATE_HIGH, + [](dsn::error_code ec) { ASSERT_EQ(ec, ERR_OK); }, + &tracker); + tracker.wait_outstanding_tasks(); } void verify_all_partitions_max_replica_count(const std::string &app_name, @@ -171,9 +198,8 @@ class meta_app_operation_test : public meta_test_base // verify remote max_replica_count of each partition auto partition_path = _ss->get_partition_path(partition_config.pid); - auto storage = _ms->get_remote_storage(); dsn::task_tracker tracker; - storage->get_data( + _ms->get_remote_storage()->get_data( partition_path, LPC_META_CALLBACK, [ this, expected_pid = partition_config.pid, expected_max_replica_count ]( @@ -535,10 +561,26 @@ TEST_F(meta_app_operation_test, set_max_replica_count) // - cluster is freezed (alive_node_count = 2 < min_live_node_count_for_unfreeze) // - increase with valid max_replica_count (= max_allowed_replica_count, and = alive_node_count) // - decrease with valid max_replica_count (= max_allowed_replica_count, and = alive_node_count) + // - unchanged valid max_replica_count (= max_allowed_replica_count, and = alive_node_count) // - increase with valid max_replica_count (= max_allowed_replica_count, and < alive_node_count) // - decrease with valid max_replica_count (= max_allowed_replica_count, and < alive_node_count) + // - unchanged valid max_replica_count (= max_allowed_replica_count, and < alive_node_count) // - increase with valid max_replica_count (< max_allowed_replica_count, and = alive_node_count) // - decrease with valid max_replica_count (< max_allowed_replica_count, and = alive_node_count) + // - unchanged valid max_replica_count (< max_allowed_replica_count, and = alive_node_count) + // - decrease with valid max_replica_count (< max_allowed_replica_count < alive_node_count) + // - unchanged valid max_replica_count (< max_allowed_replica_count < alive_node_count) + // - decrease with valid max_replica_count (< alive_node_count < max_allowed_replica_count) + // - unchanged valid max_replica_count (< alive_node_count < max_allowed_replica_count) + // - increase with valid max_replica_count (< alive_node_count = max_allowed_replica_count) + // - decrease with valid max_replica_count (< alive_node_count = max_allowed_replica_count) + // - unchanged valid max_replica_count (< alive_node_count = max_allowed_replica_count) + // - increase with valid max_replica_count (= min_allowed_replica_count, and < alive_node_count) + // - decrease with valid max_replica_count (= min_allowed_replica_count, and < alive_node_count) + // - unchanged valid max_replica_count (= min_allowed_replica_count, and < alive_node_count) + // - increase max_replica_count from 2 to 3 + // - increase max_replica_count from 1 to 3 + // - decrease max_replica_count from 3 to 1 struct test_case { std::string app_name; @@ -572,10 +614,26 @@ TEST_F(meta_app_operation_test, set_max_replica_count) {APP_NAME, 1, 1, 2, 3, 2, 1, 3, ERR_STATE_FREEZED}, {APP_NAME, 1, 1, 2, 1, 2, 1, 2, ERR_OK}, {APP_NAME, 2, 2, 1, 1, 1, 1, 1, ERR_OK}, + {APP_NAME, 2, 2, 2, 1, 2, 1, 2, ERR_OK}, {APP_NAME, 1, 1, 2, 1, 3, 1, 2, ERR_OK}, {APP_NAME, 2, 2, 1, 1, 2, 1, 1, ERR_OK}, + {APP_NAME, 2, 2, 2, 1, 3, 1, 2, ERR_OK}, {APP_NAME, 1, 1, 2, 1, 2, 1, 3, ERR_OK}, - {APP_NAME, 3, 3, 2, 1, 2, 1, 3, ERR_OK}}; + {APP_NAME, 3, 3, 2, 1, 2, 1, 3, ERR_OK}, + {APP_NAME, 2, 2, 2, 1, 2, 1, 3, ERR_OK}, + {APP_NAME, 2, 2, 1, 1, 3, 1, 2, ERR_OK}, + {APP_NAME, 1, 1, 1, 1, 3, 1, 2, ERR_OK}, + {APP_NAME, 2, 2, 1, 1, 2, 1, 3, ERR_OK}, + {APP_NAME, 1, 1, 1, 1, 2, 1, 3, ERR_OK}, + {APP_NAME, 1, 1, 2, 1, 3, 1, 3, ERR_OK}, + {APP_NAME, 3, 3, 2, 1, 3, 1, 3, ERR_OK}, + {APP_NAME, 2, 2, 2, 1, 3, 1, 3, ERR_OK}, + {APP_NAME, 1, 1, 2, 1, 3, 2, 3, ERR_OK}, + {APP_NAME, 3, 3, 2, 1, 3, 2, 3, ERR_OK}, + {APP_NAME, 2, 2, 2, 1, 3, 2, 3, ERR_OK}, + {APP_NAME, 2, 2, 3, 2, 3, 2, 3, ERR_OK}, + {APP_NAME, 1, 1, 3, 2, 3, 1, 3, ERR_OK}, + {APP_NAME, 3, 3, 1, 2, 3, 1, 3, ERR_OK}}; const int32_t total_node_count = 3; auto nodes = ensure_enough_alive_nodes(total_node_count); From 0fdf0a03a5e50a4643a0da261ec2d6a929946d20 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 23 Mar 2022 16:52:35 +0800 Subject: [PATCH 6/9] feat(update_replication_factor#3): support to set the replication factor for all partitions of each table --- src/meta/server_state.cpp | 399 +++++++++++++++++--------------------- src/meta/server_state.h | 28 ++- 2 files changed, 200 insertions(+), 227 deletions(-) diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index 72e42cccba..bfb53dcb5c 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -3199,11 +3199,13 @@ void server_state::set_max_replica_count(configuration_set_max_replica_count_rpc auto &response = rpc.response(); int32_t app_id = 0; + int32_t partition_count = 0; + std::shared_ptr app; { zauto_read_lock l(_lock); - auto app = get_app_and_check_exist(app_name, response); + app = get_app_and_check_exist(app_name, response); if (app == nullptr) { response.old_max_replica_count = 0; dwarn_f("failed to set max_replica_count: app_name={}, error_code={}, hint_message={}", @@ -3217,6 +3219,7 @@ void server_state::set_max_replica_count(configuration_set_max_replica_count_rpc if (!check_max_replica_count_consistent(app, response)) { response.old_max_replica_count = 0; + response.hint_message = fmt::format("app({}) does not exist", app_name); derror_f("failed to set max_replica_count: app_name={}, app_id={}, error_code={}, " "hint_message={}", app_name, @@ -3227,6 +3230,20 @@ void server_state::set_max_replica_count(configuration_set_max_replica_count_rpc } response.old_max_replica_count = app->max_replica_count; + + if (app->status != app_status::AS_AVAILABLE) { + response.err = ERR_INVALID_PARAMETERS; + response.hint_message = fmt::format("app({}) is not in available status", app_name); + derror_f("failed to set max_replica_count: app_name={}, app_id={}, error_code={}, " + "hint_message={}", + app_name, + app_id, + response.err.to_string(), + response.hint_message); + return; + } + + partition_count = app->partition_count; } auto level = _meta_svc->get_function_level(); @@ -3269,69 +3286,111 @@ void server_state::set_max_replica_count(configuration_set_max_replica_count_rpc response.old_max_replica_count, new_max_replica_count); - // update max_replica_count of each partition from index 0 - tasking::enqueue(LPC_META_STATE_HIGH, - tracker(), - [this, rpc]() { update_partition_max_replica_count(0, rpc); }, - server_state::sStateHash); + do_update_max_replica_count(app, app_id, partition_count, rpc); } // ThreadPool: THREAD_POOL_META_STATE -void server_state::update_partition_max_replica_count(int32_t partition_index, - configuration_set_max_replica_count_rpc rpc) -{ - const auto &app_name = rpc.request().app_name; - auto &response = rpc.response(); +void server_state::do_update_max_replica_count(std::shared_ptr &app, + int32_t app_id, + int32_t partition_count, + configuration_set_max_replica_count_rpc rpc) +{ + std::shared_ptr> results(new std::vector(partition_count)); + std::shared_ptr updates_in_progress(new std::atomic_int(partition_count)); + auto on_partition_updated = [this, app_id, partition_count, rpc, results, updates_in_progress]( + error_code ec, int32_t partition_index) { + const auto &app_name = rpc.request().app_name; + const auto new_max_replica_count = rpc.request().max_replica_count; - zauto_write_lock l(_lock); + results->at(partition_index) = ec; - auto app = get_app_and_check_exist(app_name, response); - if (app == nullptr) { - response.hint_message += - fmt::format(" while updating max_replica_count of partition {}", partition_index); - derror_f("failed to set max_replica_count: app_name={}, error_code={}, hint_message={}", - app_name, - response.err.to_string(), - response.hint_message); - return; - } + auto uncompleted = --(*updates_in_progress); + dassert_f(uncompleted >= 0, + "the uncompleted number should be >= 0 while updating partition-level" + "max_replica_count: uncompleted={}, app_name={}, app_id={}, " + "partition_index={}, partition_count={}, new_max_replica_count={}", + uncompleted, + app_name, + app_id, + partition_index, + partition_count, + new_max_replica_count); - const auto app_id = app->app_id; + if (uncompleted > 0) { + return; + } - dassert_f(app->is_stateful, - "don't support stateless apps currently: app_name={}, app_id={}", - app_name, - app_id); + for (int32_t i = 0; i < partition_count; ++i) { + if (results->at(i) == ERR_OK) { + continue; + } - const auto partition_count = static_cast(app->partitions.size()); - const auto new_max_replica_count = rpc.request().max_replica_count; - if (partition_index >= partition_count) { - dwarn_f("partition-level max_replica_count might have been decreased, " - "try to update app-level max_replica_count: app_name={}, app_id={}, " - "partition_index={}, partition_count={}, new_max_replica_count={}", - app_name, - app_id, - partition_index, - partition_count, - new_max_replica_count); + dassert_f(false, + "An error that can't be handled occurs while updating partition-level" + "max_replica_count: error_code={}, app_name={}, app_id={}, " + "partition_index={}, partition_count={}, new_max_replica_count={}", + ec.to_string(), + app_name, + app_id, + i, + partition_count, + new_max_replica_count); + } + + ddebug_f("all partitions have been changed to the new max_replica_count, ready to update " + "the app-level max_replica_count: app_name={}, app_id={}, partition_count={}, " + "new_max_replica_count={}", + app_name, + app_id, + partition_count, + new_max_replica_count); // TODO: update app-level max_replica_count + auto &response = rpc.response(); response.err = ERR_OK; - return; + }; + + const auto new_max_replica_count = rpc.request().max_replica_count; + update_all_partitions_max_replica_count(app, new_max_replica_count, on_partition_updated); +} + +// ThreadPool: THREAD_POOL_META_STATE +void server_state::update_all_partitions_max_replica_count(std::shared_ptr &app, + int32_t new_max_replica_count, + partition_callback on_partition_updated) +{ + zauto_write_lock l(_lock); + + for (int32_t i = 0; i < app->partition_count; ++i) { + update_partition_max_replica_count(app, i, new_max_replica_count, on_partition_updated); } +} + +// ThreadPool: THREAD_POOL_META_STATE +void server_state::update_partition_max_replica_count(std::shared_ptr app, + int32_t partition_index, + int32_t new_max_replica_count, + partition_callback on_partition_updated) +{ + dassert_f(partition_index < app->partition_count, + "partition_index should be < partition_count: app_name={}, app_id={}, " + "partition_index={}, partition_count={}, new_max_replica_count={}", + app->app_name, + app->app_id, + partition_index, + app->partition_count, + new_max_replica_count); const auto &old_partition_config = app->partitions[partition_index]; const auto old_max_replica_count = old_partition_config.max_replica_count; if (new_max_replica_count == old_max_replica_count) { - dwarn_f("partition-level max_replica_count has been updated, continue to update the next " - "partition: app_name={}, app_id={}, partition_index={}, new_max_replica_count={}", - app_name, - app_id, + dwarn_f("partition-level max_replica_count has been updated: app_name={}, " + "app_id={}, partition_index={}, new_max_replica_count={}", + app->app_name, + app->app_id, partition_index, new_max_replica_count); - - update_next_partition_max_replica_count(*app, partition_index, rpc); return; } @@ -3340,18 +3399,20 @@ void server_state::update_partition_max_replica_count(int32_t partition_index, ddebug_f("have to wait until another request which is syncing with remote storage " "is finished, then process the current request of updating max_replica_count: " "app_name={}, app_id={}, partition_index={}, new_max_replica_count={}", - app_name, - app_id, + app->app_name, + app->app_id, partition_index, new_max_replica_count); - tasking::enqueue(LPC_META_STATE_HIGH, - tracker(), - [this, rpc, partition_index]() { - update_partition_max_replica_count(partition_index, rpc); - }, - server_state::sStateHash, - std::chrono::milliseconds(100)); + tasking::enqueue( + LPC_META_STATE_HIGH, + tracker(), + [this, app, partition_index, new_max_replica_count, on_partition_updated]() { + update_partition_max_replica_count( + app, partition_index, new_max_replica_count, on_partition_updated); + }, + server_state::sStateHash, + std::chrono::milliseconds(100)); return; } @@ -3359,8 +3420,8 @@ void server_state::update_partition_max_replica_count(int32_t partition_index, "invalid config status while updating max_replica_count: context.stage={}, " "app_name={}, app_id={}, partition_index={}, new_max_replica_count={}", enum_to_string(context.stage), - app_name, - app_id, + app->app_name, + app->app_id, partition_index, new_max_replica_count); @@ -3371,74 +3432,56 @@ void server_state::update_partition_max_replica_count(int32_t partition_index, auto new_partition_config = old_partition_config; new_partition_config.max_replica_count = new_max_replica_count; ++(new_partition_config.ballot); - context.pending_sync_task = - update_partition_max_replica_count_on_remote(new_partition_config, rpc); + context.pending_sync_task = update_partition_max_replica_count_on_remote( + app, new_partition_config, on_partition_updated); } // ThreadPool: THREAD_POOL_META_STATE task_ptr server_state::update_partition_max_replica_count_on_remote( + std::shared_ptr app, const partition_configuration &new_partition_config, - configuration_set_max_replica_count_rpc rpc) + partition_callback on_partition_updated) { const auto &gpid = new_partition_config.pid; - const auto app_id = gpid.get_app_id(); const auto partition_index = gpid.get_partition_index(); const auto new_max_replica_count = new_partition_config.max_replica_count; const auto new_ballot = new_partition_config.ballot; - const auto &app_name = rpc.request().app_name; - auto level = _meta_svc->get_function_level(); + const auto level = _meta_svc->get_function_level(); if (level <= meta_function_level::fl_blind) { dwarn_f("have to wait until meta level becomes more than fl_blind, then process the " "current request of updating max_replica_count: current_meta_level={}, " "app_name={}, app_id={}, partition_index={}, new_max_replica_count={}, " "new_ballot={}", _meta_function_level_VALUES_TO_NAMES.find(level)->second, - app_name, - app_id, + app->app_name, + app->app_id, partition_index, new_max_replica_count, new_ballot); // NOTICE: pending_sync_task should be reassigned - return tasking::enqueue( - LPC_META_STATE_HIGH, - tracker(), - [this, new_partition_config, rpc]() { - const auto &gpid = new_partition_config.pid; - const auto partition_index = gpid.get_partition_index(); + return tasking::enqueue(LPC_META_STATE_HIGH, + tracker(), + [this, app, new_partition_config, on_partition_updated]() { + const auto &gpid = new_partition_config.pid; + const auto partition_index = gpid.get_partition_index(); - const auto &app_name = rpc.request().app_name; - auto &response = rpc.response(); + zauto_write_lock l(_lock); - zauto_write_lock l(_lock); - - auto app = get_app_and_check_exist(app_name, response); - if (app == nullptr) { - response.hint_message += fmt::format( - " while ready to update max_replica_count of partition {} on remote", - partition_index); - derror_f("failed to set max_replica_count: app_name={}, app_id={}, " - "error_code={}, hint_message={}", - app_name, - gpid.get_app_id(), - response.err.to_string(), - response.hint_message); - return; - } - - auto &context = app->helpers->contexts[partition_index]; - context.pending_sync_task = - update_partition_max_replica_count_on_remote(new_partition_config, rpc); - }, - server_state::sStateHash, - std::chrono::seconds(1)); + auto &context = app->helpers->contexts[partition_index]; + context.pending_sync_task = + update_partition_max_replica_count_on_remote( + app, new_partition_config, on_partition_updated); + }, + server_state::sStateHash, + std::chrono::seconds(1)); } ddebug_f("request for updating partition-level max_replica_count on remote storage: " "app_name={}, app_id={}, partition_id={}, new_max_replica_count={}, new_ballot={}", - app_name, - app_id, + app->app_name, + app->app_id, partition_index, new_max_replica_count, new_ballot); @@ -3453,141 +3496,97 @@ task_ptr server_state::update_partition_max_replica_count_on_remote( std::bind(&server_state::on_update_partition_max_replica_count_on_remote_reply, this, std::placeholders::_1, + app, new_partition_config, - rpc), + on_partition_updated), tracker()); } // ThreadPool: THREAD_POOL_META_STATE void server_state::on_update_partition_max_replica_count_on_remote_reply( error_code ec, + std::shared_ptr &app, const partition_configuration &new_partition_config, - configuration_set_max_replica_count_rpc rpc) + partition_callback on_partition_updated) { const auto &gpid = new_partition_config.pid; - const auto app_id = gpid.get_app_id(); const auto partition_index = gpid.get_partition_index(); const auto new_max_replica_count = new_partition_config.max_replica_count; const auto new_ballot = new_partition_config.ballot; - const auto &app_name = rpc.request().app_name; - auto &response = rpc.response(); + zauto_write_lock l(_lock); ddebug_f("reply for updating partition-level max_replica_count on remote storage: " "error_code={}, app_name={}, app_id={}, partition_id={}, new_max_replica_count={}, " "new_ballot={}", ec.to_string(), - app_name, - app_id, + app->app_name, + app->app_id, partition_index, new_max_replica_count, new_ballot); - zauto_write_lock l(_lock); - - auto app = get_app_and_check_exist(app_name, response); - if (app == nullptr) { - response.hint_message += - fmt::format(" while replying for updating max_replica_count of partition {} on remote", - partition_index); - derror_f("failed to set max_replica_count: app_name={}, app_id={}, error_code={}, " - "hint_message={}", - app_name, - app_id, - response.err.to_string(), - response.hint_message); - return; - } - - // if multiple threads exist in the thread pool, the check may failed - dassert_f( - app->status == app_status::AS_AVAILABLE || app->status == app_status::AS_DROPPING, - "invalid app_status (if app is removed, this task should be cancelled): app_status={}", - ::dsn::enum_to_string(app->status)); - auto &context = app->helpers->contexts[partition_index]; if (ec == ERR_TIMEOUT) { // NOTICE: pending_sync_task need to be reassigned - context.pending_sync_task = tasking::enqueue( - LPC_META_STATE_HIGH, - tracker(), - [this, new_partition_config, rpc]() { - const auto &gpid = new_partition_config.pid; - const auto partition_index = gpid.get_partition_index(); - - const auto &app_name = rpc.request().app_name; - auto &response = rpc.response(); - - zauto_write_lock l(_lock); + context.pending_sync_task = + tasking::enqueue(LPC_META_STATE_HIGH, + tracker(), + [this, app, new_partition_config, on_partition_updated]() { + const auto &gpid = new_partition_config.pid; + const auto partition_index = gpid.get_partition_index(); - auto app = get_app_and_check_exist(app_name, response); - if (app == nullptr) { - response.hint_message += fmt::format( - " while timeout for updating max_replica_count of partition {} on remote", - partition_index); - derror_f("failed to set max_replica_count: app_name={}, app_id={}, " - "error_code={}, hint_message={}", - app_name, - gpid.get_app_id(), - response.err.to_string(), - response.hint_message); - return; - } + zauto_write_lock l(_lock); - auto &context = app->helpers->contexts[partition_index]; - context.pending_sync_task = - update_partition_max_replica_count_on_remote(new_partition_config, rpc); - }, - server_state::sStateHash, - std::chrono::seconds(1)); + auto &context = app->helpers->contexts[partition_index]; + context.pending_sync_task = + update_partition_max_replica_count_on_remote( + app, new_partition_config, on_partition_updated); + }, + server_state::sStateHash, + std::chrono::seconds(1)); return; } if (ec != ERR_OK) { - dassert_f(false, "we can't handle this right now: error_code={}", ec.to_string()); + on_partition_updated(ec, partition_index); return; } - update_partition_max_replica_count_locally(new_partition_config, *app); + update_partition_max_replica_count_locally(app, new_partition_config); context.pending_sync_task = nullptr; context.pending_sync_request.reset(); context.stage = config_status::not_pending; context.msg = nullptr; - update_next_partition_max_replica_count(*app, partition_index, rpc); + on_partition_updated(ec, partition_index); } // ThreadPool: THREAD_POOL_META_STATE void server_state::update_partition_max_replica_count_locally( - const partition_configuration &new_partition_config, app_state &app) + std::shared_ptr &app, const partition_configuration &new_partition_config) { const auto &gpid = new_partition_config.pid; - const auto app_id = gpid.get_app_id(); const auto partition_index = gpid.get_partition_index(); + const auto new_max_replica_count = new_partition_config.max_replica_count; + const auto new_ballot = new_partition_config.ballot; - const auto &app_name = app.app_name; - auto &old_partition_config = app.partitions[gpid.get_partition_index()]; - - if (app.is_stateful) { - const auto new_max_replica_count = new_partition_config.max_replica_count; - const auto new_ballot = new_partition_config.ballot; - - const auto old_max_replica_count = old_partition_config.max_replica_count; - const auto old_ballot = old_partition_config.ballot; + auto &old_partition_config = app->partitions[gpid.get_partition_index()]; + const auto old_max_replica_count = old_partition_config.max_replica_count; + const auto old_ballot = old_partition_config.ballot; - dassert_f(old_ballot + 1 == new_ballot, - "invalid ballot while updating local max_replica_count: app_name={}, app_id={}, " - "partition_id={}, old_max_replica_count={}, new_max_replica_count={}, " - "old_ballot={}, new_ballot={}", - app_name, - app_id, - partition_index, - old_max_replica_count, - new_max_replica_count, - old_ballot, - new_ballot); - } + dassert_f(old_ballot + 1 == new_ballot, + "invalid ballot while updating local max_replica_count: app_name={}, app_id={}, " + "partition_id={}, old_max_replica_count={}, new_max_replica_count={}, " + "old_ballot={}, new_ballot={}", + app->app_name, + app->app_id, + partition_index, + old_max_replica_count, + new_max_replica_count, + old_ballot, + new_ballot); std::string old_config_str(boost::lexical_cast(old_partition_config)); std::string new_config_str(boost::lexical_cast(new_partition_config)); @@ -3597,46 +3596,12 @@ void server_state::update_partition_max_replica_count_locally( ddebug_f("local partition-level max_replica_count has been changed successfully: ", "app_name={}, app_id={}, partition_id={}, old_partition_config={}, " "new_partition_config={}", - app_name, - app_id, + app->app_name, + app->app_id, partition_index, old_config_str, new_config_str); } -// ThreadPool: THREAD_POOL_META_STATE -void server_state::update_next_partition_max_replica_count( - const app_state &app, int32_t partition_index, configuration_set_max_replica_count_rpc rpc) -{ - const auto next_partition_index = partition_index + 1; - const auto partition_count = static_cast(app.partitions.size()); - - if (next_partition_index >= partition_count) { - const auto new_max_replica_count = rpc.request().max_replica_count; - - ddebug_f("all partitions have been changed to the new max_replica_count, ready to update " - "the app-level max_replica_count: app_name={}, app_id={}, partition_id={}, " - "partition_count={}, old_max_replica_count={}, new_max_replica_count={}", - app.app_name, - app.app_id, - partition_index, - partition_count, - app.max_replica_count, - new_max_replica_count); - - // TODO: update app-level max_replica_count - auto &response = rpc.response(); - response.err = ERR_OK; - return; - } - - tasking::enqueue(LPC_META_STATE_HIGH, - tracker(), - [this, rpc, next_partition_index]() { - update_partition_max_replica_count(next_partition_index, rpc); - }, - server_state::sStateHash); -} - } // namespace replication } // namespace dsn diff --git a/src/meta/server_state.h b/src/meta/server_state.h index 9e5c311b6a..ed82cc4885 100644 --- a/src/meta/server_state.h +++ b/src/meta/server_state.h @@ -313,22 +313,30 @@ class server_state bool check_max_replica_count_consistent(const std::shared_ptr &app, Response &response) const; - void update_partition_max_replica_count(int partition_index, - configuration_set_max_replica_count_rpc rpc); - + using partition_callback = std::function; + void do_update_max_replica_count(std::shared_ptr &app, + int32_t app_id, + int32_t partition_count, + configuration_set_max_replica_count_rpc rpc); + void update_all_partitions_max_replica_count(std::shared_ptr &app, + int32_t new_max_replica_count, + partition_callback on_partition_updated); + void update_partition_max_replica_count(std::shared_ptr app, + int32_t partition_index, + int32_t new_max_replica_count, + partition_callback on_partition_updated); task_ptr update_partition_max_replica_count_on_remote( + std::shared_ptr app, const partition_configuration &new_partition_config, - configuration_set_max_replica_count_rpc rpc); + partition_callback on_partition_updated); void on_update_partition_max_replica_count_on_remote_reply( error_code ec, + std::shared_ptr &app, const partition_configuration &new_partition_config, - configuration_set_max_replica_count_rpc rpc); + partition_callback on_partition_updated); void - update_partition_max_replica_count_locally(const partition_configuration &new_partition_config, - app_state &app); - void update_next_partition_max_replica_count(const app_state &app, - int32_t partition_index, - configuration_set_max_replica_count_rpc rpc); + update_partition_max_replica_count_locally(std::shared_ptr &app, + const partition_configuration &new_partition_config); // Used for `on_start_manual_compaction` bool parse_compaction_envs(start_manual_compact_rpc rpc, From 66d56ae990f6758dce5fc0adbe6a9a0ef6714d77 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 23 Mar 2022 17:03:38 +0800 Subject: [PATCH 7/9] feat(update_replication_factor#3): support to set the replication factor for all partitions of each table --- src/meta/server_state.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index bfb53dcb5c..77a18a66b8 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -3219,7 +3219,6 @@ void server_state::set_max_replica_count(configuration_set_max_replica_count_rpc if (!check_max_replica_count_consistent(app, response)) { response.old_max_replica_count = 0; - response.hint_message = fmt::format("app({}) does not exist", app_name); derror_f("failed to set max_replica_count: app_name={}, app_id={}, error_code={}, " "hint_message={}", app_name, From 942cac31e74408d3a48dd7161d803cd32deed19e Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Wed, 23 Mar 2022 18:15:03 +0800 Subject: [PATCH 8/9] feat(update_replication_factor#3): support to set the replication factor for all partitions of each table --- src/meta/server_state.cpp | 10 +++++----- src/meta/server_state.h | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index 77a18a66b8..2664e8be71 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -3366,7 +3366,7 @@ void server_state::update_all_partitions_max_replica_count(std::shared_ptr app, +void server_state::update_partition_max_replica_count(std::shared_ptr &app, int32_t partition_index, int32_t new_max_replica_count, partition_callback on_partition_updated) @@ -3406,7 +3406,7 @@ void server_state::update_partition_max_replica_count(std::shared_ptr tasking::enqueue( LPC_META_STATE_HIGH, tracker(), - [this, app, partition_index, new_max_replica_count, on_partition_updated]() { + [this, app, partition_index, new_max_replica_count, on_partition_updated]() mutable { update_partition_max_replica_count( app, partition_index, new_max_replica_count, on_partition_updated); }, @@ -3437,7 +3437,7 @@ void server_state::update_partition_max_replica_count(std::shared_ptr // ThreadPool: THREAD_POOL_META_STATE task_ptr server_state::update_partition_max_replica_count_on_remote( - std::shared_ptr app, + std::shared_ptr &app, const partition_configuration &new_partition_config, partition_callback on_partition_updated) { @@ -3462,7 +3462,7 @@ task_ptr server_state::update_partition_max_replica_count_on_remote( // NOTICE: pending_sync_task should be reassigned return tasking::enqueue(LPC_META_STATE_HIGH, tracker(), - [this, app, new_partition_config, on_partition_updated]() { + [this, app, new_partition_config, on_partition_updated]() mutable { const auto &gpid = new_partition_config.pid; const auto partition_index = gpid.get_partition_index(); @@ -3531,7 +3531,7 @@ void server_state::on_update_partition_max_replica_count_on_remote_reply( context.pending_sync_task = tasking::enqueue(LPC_META_STATE_HIGH, tracker(), - [this, app, new_partition_config, on_partition_updated]() { + [this, app, new_partition_config, on_partition_updated]() mutable { const auto &gpid = new_partition_config.pid; const auto partition_index = gpid.get_partition_index(); diff --git a/src/meta/server_state.h b/src/meta/server_state.h index ed82cc4885..e7ba5ec5d4 100644 --- a/src/meta/server_state.h +++ b/src/meta/server_state.h @@ -321,12 +321,12 @@ class server_state void update_all_partitions_max_replica_count(std::shared_ptr &app, int32_t new_max_replica_count, partition_callback on_partition_updated); - void update_partition_max_replica_count(std::shared_ptr app, + void update_partition_max_replica_count(std::shared_ptr &app, int32_t partition_index, int32_t new_max_replica_count, partition_callback on_partition_updated); task_ptr update_partition_max_replica_count_on_remote( - std::shared_ptr app, + std::shared_ptr &app, const partition_configuration &new_partition_config, partition_callback on_partition_updated); void on_update_partition_max_replica_count_on_remote_reply( From 6d789d98b378f3228dbaf1ee1acdf21216cfcd69 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Thu, 24 Mar 2022 20:34:06 +0800 Subject: [PATCH 9/9] feat(update_replication_factor#3): support to set the replication factor for all partitions of each table --- src/meta/server_state.cpp | 55 ++++++++++++++++++--------------------- src/meta/server_state.h | 5 ---- 2 files changed, 25 insertions(+), 35 deletions(-) diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index 2664e8be71..9fad2a2b66 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -3199,7 +3199,6 @@ void server_state::set_max_replica_count(configuration_set_max_replica_count_rpc auto &response = rpc.response(); int32_t app_id = 0; - int32_t partition_count = 0; std::shared_ptr app; { @@ -3241,8 +3240,6 @@ void server_state::set_max_replica_count(configuration_set_max_replica_count_rpc response.hint_message); return; } - - partition_count = app->partition_count; } auto level = _meta_svc->get_function_level(); @@ -3285,41 +3282,45 @@ void server_state::set_max_replica_count(configuration_set_max_replica_count_rpc response.old_max_replica_count, new_max_replica_count); - do_update_max_replica_count(app, app_id, partition_count, rpc); + do_update_max_replica_count(app, rpc); } // ThreadPool: THREAD_POOL_META_STATE void server_state::do_update_max_replica_count(std::shared_ptr &app, - int32_t app_id, - int32_t partition_count, configuration_set_max_replica_count_rpc rpc) { - std::shared_ptr> results(new std::vector(partition_count)); - std::shared_ptr updates_in_progress(new std::atomic_int(partition_count)); - auto on_partition_updated = [this, app_id, partition_count, rpc, results, updates_in_progress]( - error_code ec, int32_t partition_index) { + std::shared_ptr> results; + { + zauto_write_lock l(_lock); + + results.reset(new std::vector(app->partition_count)); + app->helpers->partitions_in_progress.store(app->partition_count); + } + + auto on_partition_updated = [this, app, rpc, results](error_code ec, + int32_t partition_index) mutable { const auto &app_name = rpc.request().app_name; const auto new_max_replica_count = rpc.request().max_replica_count; results->at(partition_index) = ec; - auto uncompleted = --(*updates_in_progress); + auto uncompleted = --app->helpers->partitions_in_progress; dassert_f(uncompleted >= 0, "the uncompleted number should be >= 0 while updating partition-level" "max_replica_count: uncompleted={}, app_name={}, app_id={}, " "partition_index={}, partition_count={}, new_max_replica_count={}", uncompleted, app_name, - app_id, + app->app_id, partition_index, - partition_count, + app->partition_count, new_max_replica_count); if (uncompleted > 0) { return; } - for (int32_t i = 0; i < partition_count; ++i) { + for (int32_t i = 0; i < app->partition_count; ++i) { if (results->at(i) == ERR_OK) { continue; } @@ -3330,9 +3331,9 @@ void server_state::do_update_max_replica_count(std::shared_ptr &app, "partition_index={}, partition_count={}, new_max_replica_count={}", ec.to_string(), app_name, - app_id, + app->app_id, i, - partition_count, + app->partition_count, new_max_replica_count); } @@ -3340,8 +3341,8 @@ void server_state::do_update_max_replica_count(std::shared_ptr &app, "the app-level max_replica_count: app_name={}, app_id={}, partition_count={}, " "new_max_replica_count={}", app_name, - app_id, - partition_count, + app->app_id, + app->partition_count, new_max_replica_count); // TODO: update app-level max_replica_count @@ -3349,19 +3350,13 @@ void server_state::do_update_max_replica_count(std::shared_ptr &app, response.err = ERR_OK; }; - const auto new_max_replica_count = rpc.request().max_replica_count; - update_all_partitions_max_replica_count(app, new_max_replica_count, on_partition_updated); -} - -// ThreadPool: THREAD_POOL_META_STATE -void server_state::update_all_partitions_max_replica_count(std::shared_ptr &app, - int32_t new_max_replica_count, - partition_callback on_partition_updated) -{ - zauto_write_lock l(_lock); + { + const auto new_max_replica_count = rpc.request().max_replica_count; - for (int32_t i = 0; i < app->partition_count; ++i) { - update_partition_max_replica_count(app, i, new_max_replica_count, on_partition_updated); + zauto_write_lock l(_lock); + for (int32_t i = 0; i < app->partition_count; ++i) { + update_partition_max_replica_count(app, i, new_max_replica_count, on_partition_updated); + } } } diff --git a/src/meta/server_state.h b/src/meta/server_state.h index e7ba5ec5d4..dcec5d787b 100644 --- a/src/meta/server_state.h +++ b/src/meta/server_state.h @@ -315,12 +315,7 @@ class server_state using partition_callback = std::function; void do_update_max_replica_count(std::shared_ptr &app, - int32_t app_id, - int32_t partition_count, configuration_set_max_replica_count_rpc rpc); - void update_all_partitions_max_replica_count(std::shared_ptr &app, - int32_t new_max_replica_count, - partition_callback on_partition_updated); void update_partition_max_replica_count(std::shared_ptr &app, int32_t partition_index, int32_t new_max_replica_count,