From ec3d3970a402802ac4a7dd6db58271775de05972 Mon Sep 17 00:00:00 2001 From: Dan Wang Date: Fri, 25 Mar 2022 15:45:23 +0800 Subject: [PATCH] feat(update_replication_factor#3): support to set the replication factor for all partitions of each table (#1077) --- src/meta/server_state.cpp | 330 +++++++++++++++++++++- src/meta/server_state.h | 20 ++ src/meta/test/meta_app_operation_test.cpp | 112 +++++++- 3 files changed, 457 insertions(+), 5 deletions(-) diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index 0d0d33981a..9fad2a2b66 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -3199,11 +3199,12 @@ void server_state::set_max_replica_count(configuration_set_max_replica_count_rpc auto &response = rpc.response(); int32_t app_id = 0; + std::shared_ptr app; { zauto_read_lock l(_lock); - auto app = get_app_and_check_exist(app_name, response); + app = get_app_and_check_exist(app_name, response); if (app == nullptr) { response.old_max_replica_count = 0; dwarn_f("failed to set max_replica_count: app_name={}, error_code={}, hint_message={}", @@ -3227,6 +3228,18 @@ void server_state::set_max_replica_count(configuration_set_max_replica_count_rpc } response.old_max_replica_count = app->max_replica_count; + + if (app->status != app_status::AS_AVAILABLE) { + response.err = ERR_INVALID_PARAMETERS; + response.hint_message = fmt::format("app({}) is not in available status", app_name); + derror_f("failed to set max_replica_count: app_name={}, app_id={}, error_code={}, " + "hint_message={}", + app_name, + app_id, + response.err.to_string(), + response.hint_message); + return; + } } auto level = _meta_svc->get_function_level(); @@ -3269,8 +3282,319 @@ void server_state::set_max_replica_count(configuration_set_max_replica_count_rpc response.old_max_replica_count, new_max_replica_count); - // TODO: update partition-level and app-level max_replica_count - response.err = ERR_OK; + do_update_max_replica_count(app, rpc); +} + +// ThreadPool: THREAD_POOL_META_STATE +void server_state::do_update_max_replica_count(std::shared_ptr &app, + configuration_set_max_replica_count_rpc rpc) +{ + std::shared_ptr> results; + { + zauto_write_lock l(_lock); + + results.reset(new std::vector(app->partition_count)); + app->helpers->partitions_in_progress.store(app->partition_count); + } + + auto on_partition_updated = [this, app, rpc, results](error_code ec, + int32_t partition_index) mutable { + const auto &app_name = rpc.request().app_name; + const auto new_max_replica_count = rpc.request().max_replica_count; + + results->at(partition_index) = ec; + + auto uncompleted = --app->helpers->partitions_in_progress; + dassert_f(uncompleted >= 0, + "the uncompleted number should be >= 0 while updating partition-level" + "max_replica_count: uncompleted={}, app_name={}, app_id={}, " + "partition_index={}, partition_count={}, new_max_replica_count={}", + uncompleted, + app_name, + app->app_id, + partition_index, + app->partition_count, + new_max_replica_count); + + if (uncompleted > 0) { + return; + } + + for (int32_t i = 0; i < app->partition_count; ++i) { + if (results->at(i) == ERR_OK) { + continue; + } + + dassert_f(false, + "An error that can't be handled occurs while updating partition-level" + "max_replica_count: error_code={}, app_name={}, app_id={}, " + "partition_index={}, partition_count={}, new_max_replica_count={}", + ec.to_string(), + app_name, + app->app_id, + i, + app->partition_count, + new_max_replica_count); + } + + ddebug_f("all partitions have been changed to the new max_replica_count, ready to update " + "the app-level max_replica_count: app_name={}, app_id={}, partition_count={}, " + "new_max_replica_count={}", + app_name, + app->app_id, + app->partition_count, + new_max_replica_count); + + // TODO: update app-level max_replica_count + auto &response = rpc.response(); + response.err = ERR_OK; + }; + + { + const auto new_max_replica_count = rpc.request().max_replica_count; + + zauto_write_lock l(_lock); + for (int32_t i = 0; i < app->partition_count; ++i) { + update_partition_max_replica_count(app, i, new_max_replica_count, on_partition_updated); + } + } +} + +// ThreadPool: THREAD_POOL_META_STATE +void server_state::update_partition_max_replica_count(std::shared_ptr &app, + int32_t partition_index, + int32_t new_max_replica_count, + partition_callback on_partition_updated) +{ + dassert_f(partition_index < app->partition_count, + "partition_index should be < partition_count: app_name={}, app_id={}, " + "partition_index={}, partition_count={}, new_max_replica_count={}", + app->app_name, + app->app_id, + partition_index, + app->partition_count, + new_max_replica_count); + + const auto &old_partition_config = app->partitions[partition_index]; + const auto old_max_replica_count = old_partition_config.max_replica_count; + + if (new_max_replica_count == old_max_replica_count) { + dwarn_f("partition-level max_replica_count has been updated: app_name={}, " + "app_id={}, partition_index={}, new_max_replica_count={}", + app->app_name, + app->app_id, + partition_index, + new_max_replica_count); + return; + } + + auto &context = app->helpers->contexts[partition_index]; + if (context.stage == config_status::pending_remote_sync) { + ddebug_f("have to wait until another request which is syncing with remote storage " + "is finished, then process the current request of updating max_replica_count: " + "app_name={}, app_id={}, partition_index={}, new_max_replica_count={}", + app->app_name, + app->app_id, + partition_index, + new_max_replica_count); + + tasking::enqueue( + LPC_META_STATE_HIGH, + tracker(), + [this, app, partition_index, new_max_replica_count, on_partition_updated]() mutable { + update_partition_max_replica_count( + app, partition_index, new_max_replica_count, on_partition_updated); + }, + server_state::sStateHash, + std::chrono::milliseconds(100)); + return; + } + + dassert_f(context.stage == config_status::not_pending, + "invalid config status while updating max_replica_count: context.stage={}, " + "app_name={}, app_id={}, partition_index={}, new_max_replica_count={}", + enum_to_string(context.stage), + app->app_name, + app->app_id, + partition_index, + new_max_replica_count); + + context.stage = config_status::pending_remote_sync; + context.pending_sync_request.reset(); + context.msg = nullptr; + + auto new_partition_config = old_partition_config; + new_partition_config.max_replica_count = new_max_replica_count; + ++(new_partition_config.ballot); + context.pending_sync_task = update_partition_max_replica_count_on_remote( + app, new_partition_config, on_partition_updated); +} + +// ThreadPool: THREAD_POOL_META_STATE +task_ptr server_state::update_partition_max_replica_count_on_remote( + std::shared_ptr &app, + const partition_configuration &new_partition_config, + partition_callback on_partition_updated) +{ + const auto &gpid = new_partition_config.pid; + const auto partition_index = gpid.get_partition_index(); + const auto new_max_replica_count = new_partition_config.max_replica_count; + const auto new_ballot = new_partition_config.ballot; + + const auto level = _meta_svc->get_function_level(); + if (level <= meta_function_level::fl_blind) { + dwarn_f("have to wait until meta level becomes more than fl_blind, then process the " + "current request of updating max_replica_count: current_meta_level={}, " + "app_name={}, app_id={}, partition_index={}, new_max_replica_count={}, " + "new_ballot={}", + _meta_function_level_VALUES_TO_NAMES.find(level)->second, + app->app_name, + app->app_id, + partition_index, + new_max_replica_count, + new_ballot); + + // NOTICE: pending_sync_task should be reassigned + return tasking::enqueue(LPC_META_STATE_HIGH, + tracker(), + [this, app, new_partition_config, on_partition_updated]() mutable { + const auto &gpid = new_partition_config.pid; + const auto partition_index = gpid.get_partition_index(); + + zauto_write_lock l(_lock); + + auto &context = app->helpers->contexts[partition_index]; + context.pending_sync_task = + update_partition_max_replica_count_on_remote( + app, new_partition_config, on_partition_updated); + }, + server_state::sStateHash, + std::chrono::seconds(1)); + } + + ddebug_f("request for updating partition-level max_replica_count on remote storage: " + "app_name={}, app_id={}, partition_id={}, new_max_replica_count={}, new_ballot={}", + app->app_name, + app->app_id, + partition_index, + new_max_replica_count, + new_ballot); + + auto partition_path = get_partition_path(gpid); + auto json_config = + dsn::json::json_forwarder::encode(new_partition_config); + return _meta_svc->get_remote_storage()->set_data( + partition_path, + json_config, + LPC_META_STATE_HIGH, + std::bind(&server_state::on_update_partition_max_replica_count_on_remote_reply, + this, + std::placeholders::_1, + app, + new_partition_config, + on_partition_updated), + tracker()); +} + +// ThreadPool: THREAD_POOL_META_STATE +void server_state::on_update_partition_max_replica_count_on_remote_reply( + error_code ec, + std::shared_ptr &app, + const partition_configuration &new_partition_config, + partition_callback on_partition_updated) +{ + const auto &gpid = new_partition_config.pid; + const auto partition_index = gpid.get_partition_index(); + const auto new_max_replica_count = new_partition_config.max_replica_count; + const auto new_ballot = new_partition_config.ballot; + + zauto_write_lock l(_lock); + + ddebug_f("reply for updating partition-level max_replica_count on remote storage: " + "error_code={}, app_name={}, app_id={}, partition_id={}, new_max_replica_count={}, " + "new_ballot={}", + ec.to_string(), + app->app_name, + app->app_id, + partition_index, + new_max_replica_count, + new_ballot); + + auto &context = app->helpers->contexts[partition_index]; + if (ec == ERR_TIMEOUT) { + // NOTICE: pending_sync_task need to be reassigned + context.pending_sync_task = + tasking::enqueue(LPC_META_STATE_HIGH, + tracker(), + [this, app, new_partition_config, on_partition_updated]() mutable { + const auto &gpid = new_partition_config.pid; + const auto partition_index = gpid.get_partition_index(); + + zauto_write_lock l(_lock); + + auto &context = app->helpers->contexts[partition_index]; + context.pending_sync_task = + update_partition_max_replica_count_on_remote( + app, new_partition_config, on_partition_updated); + }, + server_state::sStateHash, + std::chrono::seconds(1)); + return; + } + + if (ec != ERR_OK) { + on_partition_updated(ec, partition_index); + return; + } + + update_partition_max_replica_count_locally(app, new_partition_config); + + context.pending_sync_task = nullptr; + context.pending_sync_request.reset(); + context.stage = config_status::not_pending; + context.msg = nullptr; + + on_partition_updated(ec, partition_index); +} + +// ThreadPool: THREAD_POOL_META_STATE +void server_state::update_partition_max_replica_count_locally( + std::shared_ptr &app, const partition_configuration &new_partition_config) +{ + const auto &gpid = new_partition_config.pid; + const auto partition_index = gpid.get_partition_index(); + const auto new_max_replica_count = new_partition_config.max_replica_count; + const auto new_ballot = new_partition_config.ballot; + + auto &old_partition_config = app->partitions[gpid.get_partition_index()]; + const auto old_max_replica_count = old_partition_config.max_replica_count; + const auto old_ballot = old_partition_config.ballot; + + dassert_f(old_ballot + 1 == new_ballot, + "invalid ballot while updating local max_replica_count: app_name={}, app_id={}, " + "partition_id={}, old_max_replica_count={}, new_max_replica_count={}, " + "old_ballot={}, new_ballot={}", + app->app_name, + app->app_id, + partition_index, + old_max_replica_count, + new_max_replica_count, + old_ballot, + new_ballot); + + std::string old_config_str(boost::lexical_cast(old_partition_config)); + std::string new_config_str(boost::lexical_cast(new_partition_config)); + + old_partition_config = new_partition_config; + + ddebug_f("local partition-level max_replica_count has been changed successfully: ", + "app_name={}, app_id={}, partition_id={}, old_partition_config={}, " + "new_partition_config={}", + app->app_name, + app->app_id, + partition_index, + old_config_str, + new_config_str); } } // namespace replication diff --git a/src/meta/server_state.h b/src/meta/server_state.h index 63f8d472dc..dcec5d787b 100644 --- a/src/meta/server_state.h +++ b/src/meta/server_state.h @@ -313,6 +313,26 @@ class server_state bool check_max_replica_count_consistent(const std::shared_ptr &app, Response &response) const; + using partition_callback = std::function; + void do_update_max_replica_count(std::shared_ptr &app, + configuration_set_max_replica_count_rpc rpc); + void update_partition_max_replica_count(std::shared_ptr &app, + int32_t partition_index, + int32_t new_max_replica_count, + partition_callback on_partition_updated); + task_ptr update_partition_max_replica_count_on_remote( + std::shared_ptr &app, + const partition_configuration &new_partition_config, + partition_callback on_partition_updated); + void on_update_partition_max_replica_count_on_remote_reply( + error_code ec, + std::shared_ptr &app, + const partition_configuration &new_partition_config, + partition_callback on_partition_updated); + void + update_partition_max_replica_count_locally(std::shared_ptr &app, + const partition_configuration &new_partition_config); + // Used for `on_start_manual_compaction` bool parse_compaction_envs(start_manual_compact_rpc rpc, std::vector &keys, diff --git a/src/meta/test/meta_app_operation_test.cpp b/src/meta/test/meta_app_operation_test.cpp index 450759cc2e..c9546515b4 100644 --- a/src/meta/test/meta_app_operation_test.cpp +++ b/src/meta/test/meta_app_operation_test.cpp @@ -151,10 +151,71 @@ class meta_app_operation_test : public meta_test_base auto partition_size = static_cast(app->partitions.size()); for (int i = 0; i < partition_size; ++i) { + // set local max_replica_count of each partition auto &partition_config = app->partitions[i]; partition_config.max_replica_count = max_replica_count; + + // set remote max_replica_count of each partition + auto partition_path = _ss->get_partition_path(partition_config.pid); + auto json_config = + dsn::json::json_forwarder::encode(partition_config); + dsn::task_tracker tracker; + _ms->get_remote_storage()->set_data(partition_path, + json_config, + LPC_META_STATE_HIGH, + [](dsn::error_code ec) { ASSERT_EQ(ec, ERR_OK); }, + &tracker); + tracker.wait_outstanding_tasks(); } + + // set local max_replica_count of app app->max_replica_count = max_replica_count; + + // set remote max_replica_count of app + auto app_path = _ss->get_app_path(*app); + auto ainfo = *(reinterpret_cast(app.get())); + auto json_config = dsn::json::json_forwarder::encode(ainfo); + dsn::task_tracker tracker; + _ms->get_remote_storage()->set_data(app_path, + json_config, + LPC_META_STATE_HIGH, + [](dsn::error_code ec) { ASSERT_EQ(ec, ERR_OK); }, + &tracker); + tracker.wait_outstanding_tasks(); + } + + void verify_all_partitions_max_replica_count(const std::string &app_name, + int32_t expected_max_replica_count) + { + auto app = find_app(app_name); + dassert_f(app != nullptr, "app({}) does not exist", app_name); + + auto partition_size = static_cast(app->partitions.size()); + for (int i = 0; i < partition_size; ++i) { + // verify local max_replica_count of each partition + auto &partition_config = app->partitions[i]; + ASSERT_EQ(partition_config.max_replica_count, expected_max_replica_count); + + // verify remote max_replica_count of each partition + auto partition_path = _ss->get_partition_path(partition_config.pid); + dsn::task_tracker tracker; + _ms->get_remote_storage()->get_data( + partition_path, + LPC_META_CALLBACK, + [ this, expected_pid = partition_config.pid, expected_max_replica_count ]( + error_code ec, const blob &value) { + ASSERT_EQ(ec, ERR_OK); + + partition_configuration partition_config; + dsn::json::json_forwarder::decode(value, + partition_config); + + ASSERT_EQ(partition_config.pid, expected_pid); + ASSERT_EQ(partition_config.max_replica_count, expected_max_replica_count); + }, + &tracker); + tracker.wait_outstanding_tasks(); + } } const std::string APP_NAME = "app_operation_test"; @@ -498,6 +559,28 @@ TEST_F(meta_app_operation_test, set_max_replica_count) // - cluster is freezed (alive_node_count = 0) // - cluster is freezed (alive_node_count = 1 < min_live_node_count_for_unfreeze) // - cluster is freezed (alive_node_count = 2 < min_live_node_count_for_unfreeze) + // - increase with valid max_replica_count (= max_allowed_replica_count, and = alive_node_count) + // - decrease with valid max_replica_count (= max_allowed_replica_count, and = alive_node_count) + // - unchanged valid max_replica_count (= max_allowed_replica_count, and = alive_node_count) + // - increase with valid max_replica_count (= max_allowed_replica_count, and < alive_node_count) + // - decrease with valid max_replica_count (= max_allowed_replica_count, and < alive_node_count) + // - unchanged valid max_replica_count (= max_allowed_replica_count, and < alive_node_count) + // - increase with valid max_replica_count (< max_allowed_replica_count, and = alive_node_count) + // - decrease with valid max_replica_count (< max_allowed_replica_count, and = alive_node_count) + // - unchanged valid max_replica_count (< max_allowed_replica_count, and = alive_node_count) + // - decrease with valid max_replica_count (< max_allowed_replica_count < alive_node_count) + // - unchanged valid max_replica_count (< max_allowed_replica_count < alive_node_count) + // - decrease with valid max_replica_count (< alive_node_count < max_allowed_replica_count) + // - unchanged valid max_replica_count (< alive_node_count < max_allowed_replica_count) + // - increase with valid max_replica_count (< alive_node_count = max_allowed_replica_count) + // - decrease with valid max_replica_count (< alive_node_count = max_allowed_replica_count) + // - unchanged valid max_replica_count (< alive_node_count = max_allowed_replica_count) + // - increase with valid max_replica_count (= min_allowed_replica_count, and < alive_node_count) + // - decrease with valid max_replica_count (= min_allowed_replica_count, and < alive_node_count) + // - unchanged valid max_replica_count (= min_allowed_replica_count, and < alive_node_count) + // - increase max_replica_count from 2 to 3 + // - increase max_replica_count from 1 to 3 + // - decrease max_replica_count from 3 to 1 struct test_case { std::string app_name; @@ -528,7 +611,29 @@ TEST_F(meta_app_operation_test, set_max_replica_count) {APP_NAME, 2, 2, 2, 1, 1, 2, 3, ERR_INVALID_PARAMETERS}, {APP_NAME, 1, 1, 2, 1, 0, 1, 3, ERR_STATE_FREEZED}, {APP_NAME, 1, 1, 2, 2, 1, 1, 3, ERR_STATE_FREEZED}, - {APP_NAME, 1, 1, 2, 3, 2, 1, 3, ERR_STATE_FREEZED}}; + {APP_NAME, 1, 1, 2, 3, 2, 1, 3, ERR_STATE_FREEZED}, + {APP_NAME, 1, 1, 2, 1, 2, 1, 2, ERR_OK}, + {APP_NAME, 2, 2, 1, 1, 1, 1, 1, ERR_OK}, + {APP_NAME, 2, 2, 2, 1, 2, 1, 2, ERR_OK}, + {APP_NAME, 1, 1, 2, 1, 3, 1, 2, ERR_OK}, + {APP_NAME, 2, 2, 1, 1, 2, 1, 1, ERR_OK}, + {APP_NAME, 2, 2, 2, 1, 3, 1, 2, ERR_OK}, + {APP_NAME, 1, 1, 2, 1, 2, 1, 3, ERR_OK}, + {APP_NAME, 3, 3, 2, 1, 2, 1, 3, ERR_OK}, + {APP_NAME, 2, 2, 2, 1, 2, 1, 3, ERR_OK}, + {APP_NAME, 2, 2, 1, 1, 3, 1, 2, ERR_OK}, + {APP_NAME, 1, 1, 1, 1, 3, 1, 2, ERR_OK}, + {APP_NAME, 2, 2, 1, 1, 2, 1, 3, ERR_OK}, + {APP_NAME, 1, 1, 1, 1, 2, 1, 3, ERR_OK}, + {APP_NAME, 1, 1, 2, 1, 3, 1, 3, ERR_OK}, + {APP_NAME, 3, 3, 2, 1, 3, 1, 3, ERR_OK}, + {APP_NAME, 2, 2, 2, 1, 3, 1, 3, ERR_OK}, + {APP_NAME, 1, 1, 2, 1, 3, 2, 3, ERR_OK}, + {APP_NAME, 3, 3, 2, 1, 3, 2, 3, ERR_OK}, + {APP_NAME, 2, 2, 2, 1, 3, 2, 3, ERR_OK}, + {APP_NAME, 2, 2, 3, 2, 3, 2, 3, ERR_OK}, + {APP_NAME, 1, 1, 3, 2, 3, 1, 3, ERR_OK}, + {APP_NAME, 3, 3, 1, 2, 3, 1, 3, ERR_OK}}; const int32_t total_node_count = 3; auto nodes = ensure_enough_alive_nodes(total_node_count); @@ -595,11 +700,14 @@ TEST_F(meta_app_operation_test, set_max_replica_count) const auto set_resp = set_max_replica_count(test.app_name, test.new_max_replica_count); ASSERT_EQ(set_resp.err, test.expected_err); ASSERT_EQ(set_resp.old_max_replica_count, test.expected_old_max_replica_count); + if (test.expected_err == ERR_OK) { + verify_all_partitions_max_replica_count(test.app_name, test.new_max_replica_count); + } const auto get_resp = get_max_replica_count(test.app_name); if (test.expected_err == ERR_APP_NOT_EXIST || test.expected_err == ERR_INCONSISTENT_STATE) { ASSERT_EQ(get_resp.err, test.expected_err); - } else { + } else if (test.expected_err != ERR_OK) { ASSERT_EQ(get_resp.err, ERR_OK); }