diff --git a/src/meta/duplication/duplication_info.cpp b/src/meta/duplication/duplication_info.cpp index d399978950..f85c6f4f74 100644 --- a/src/meta/duplication/duplication_info.cpp +++ b/src/meta/duplication/duplication_info.cpp @@ -144,7 +144,7 @@ bool duplication_info::alter_progress(int partition_index, } if (confirm_entry.__isset.last_committed_decree) { - p.last_committed_decree = confirm_entry.last_committed_decree; + p.last_committed_decree = confirm_entry.last_committed_decree; } p.checkpoint_prepared = confirm_entry.checkpoint_prepared; @@ -153,17 +153,17 @@ bool duplication_info::alter_progress(int partition_index, } if (p.volatile_decree == p.stored_decree) { - return false; + return false; } - // Progress update is not supposed to be too frequent. - if (dsn_now_ms() < p.last_progress_update_ms + FLAGS_dup_progress_min_update_period_ms) { - return false; - } + // Progress update is not supposed to be too frequent. + if (dsn_now_ms() < p.last_progress_update_ms + FLAGS_dup_progress_min_update_period_ms) { + return false; + } - p.is_altering = true; - p.last_progress_update_ms = dsn_now_ms(); - return true; + p.is_altering = true; + p.last_progress_update_ms = dsn_now_ms(); + return true; } void duplication_info::persist_progress(int partition_index) @@ -182,14 +182,14 @@ void duplication_info::persist_status() if (!_is_altering) { LOG_ERROR_PREFIX("the status of this duplication is not being altered: status={}, " - "next_status={}, master_app_id={}, master_app_name={}, " - "follower_cluster_name={}, follower_app_name={}" - duplication_status_to_string(_status), - duplication_status_to_string(_next_status), - app_id, - app_name, - remote_cluster_name, - remote_app_name); + "next_status={}, master_app_id={}, master_app_name={}, " + "follower_cluster_name={}, follower_app_name={}", + duplication_status_to_string(_status), + duplication_status_to_string(_next_status), + app_id, + app_name, + remote_cluster_name, + remote_app_name); return; } @@ -233,8 +233,8 @@ void duplication_info::report_progress_if_time_up() return; } - _last_progress_report_ms = dsn_now_ms(); - LOG_INFO("duplication report: {}", to_string()); + _last_progress_report_ms = dsn_now_ms(); + LOG_INFO("duplication report: {}", to_string()); } duplication_info_s_ptr duplication_info::decode_from_blob(dupid_t dup_id, diff --git a/src/meta/duplication/duplication_info.h b/src/meta/duplication/duplication_info.h index 2dccec4380..962d9f54f0 100644 --- a/src/meta/duplication/duplication_info.h +++ b/src/meta/duplication/duplication_info.h @@ -221,7 +221,7 @@ class duplication_info int64_t volatile_decree{invalid_decree}; int64_t stored_decree{invalid_decree}; - // + // int64_t last_committed_decree{invalid_decree}; bool is_altering{false}; diff --git a/src/meta/duplication/meta_duplication_service.cpp b/src/meta/duplication/meta_duplication_service.cpp index a021658b0b..f723ece5f1 100644 --- a/src/meta/duplication/meta_duplication_service.cpp +++ b/src/meta/duplication/meta_duplication_service.cpp @@ -795,26 +795,28 @@ void meta_duplication_service::do_update_partition_confirmed( return; } - const auto &path = get_partition_path(dup, std::to_string(partition_idx)); - const auto &value = blob::create_from_bytes(std::to_string(confirm_entry.confirmed_decree)); + const auto &path = get_partition_path(dup, std::to_string(partition_idx)); + + _meta_svc->get_meta_storage()->get_data( + path, [dup, rpc, partition_idx, confirm_entry, path, this](const blob &data) mutable { + auto value = blob::create_from_bytes(std::to_string(confirm_entry.confirmed_decree)); - _meta_svc->get_meta_storage()->get_data(path, [=](const blob &data) mutable { if (data.empty()) { _meta_svc->get_meta_storage()->create_node( - path, std::move(value), [=]() mutable { + path, std::move(value), [dup, rpc, partition_idx, confirm_entry]() mutable { dup->persist_progress(partition_idx); rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] = confirm_entry.confirmed_decree; }); return; - } + } - _meta_svc->get_meta_storage()->set_data( - path, std::move(value), [=]() mutable { - dup->persist_progress(partition_idx); - rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] = - confirm_entry.confirmed_decree; - }); + _meta_svc->get_meta_storage()->set_data( + path, std::move(value), [dup, rpc, partition_idx, confirm_entry]() mutable { + dup->persist_progress(partition_idx); + rpc.response().dup_map[dup->app_id][dup->id].progress[partition_idx] = + confirm_entry.confirmed_decree; + }); // duplication_sync_rpc will finally be replied when confirmed points // of all partitions are stored. diff --git a/src/meta/meta_state_service_utils.h b/src/meta/meta_state_service_utils.h index 1676637be5..a4760918a0 100644 --- a/src/meta/meta_state_service_utils.h +++ b/src/meta/meta_state_service_utils.h @@ -69,7 +69,7 @@ struct meta_storage /// Will fatal if node doesn't exists. void set_data(std::string &&node, blob &&value, std::function &&cb); - void set_data(const std::string &node, blob &&value, std::function &&cb); + void set_data(const std::string &node, blob &&value, std::function &&cb) { set_data(std::string(node), std::move(value), std::move(cb)); } diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp index 59a5e0c74a..6a352473f1 100644 --- a/src/meta/server_state.cpp +++ b/src/meta/server_state.cpp @@ -1130,7 +1130,8 @@ void server_state::create_app(dsn::message_ex *msg) duplicating ? fmt::format("master_cluster_name={}, master_app_name={}", master_cluster->second, - gutil::FindWithDefault(request.options.envs, duplication_constants::kEnvMasterAppNameKey)) + gutil::FindWithDefault(request.options.envs, + duplication_constants::kEnvMasterAppNameKey)) : "false"); auto option_match_check = [](const create_app_options &opt, const app_state &exist_app) { diff --git a/src/meta/test/duplication_info_test.cpp b/src/meta/test/duplication_info_test.cpp index 7dfa02f214..84e029c368 100644 --- a/src/meta/test/duplication_info_test.cpp +++ b/src/meta/test/duplication_info_test.cpp @@ -31,7 +31,7 @@ #include "gtest/gtest.h" #include "runtime/app_model.h" -DSN_DECLARE_uint64(dup_progress_min_update_period_ms) +DSN_DECLARE_uint64(dup_progress_min_update_period_ms); namespace dsn { namespace replication { @@ -96,8 +96,7 @@ class duplication_info_test : public testing::Test ASSERT_FALSE(dup.alter_progress(1, entry)); ASSERT_FALSE(dup._progress[1].is_altering); - dup._progress[1].last_progress_update_ms -= - FLAGS_dup_progress_min_update_period_ms + 100; + dup._progress[1].last_progress_update_ms -= FLAGS_dup_progress_min_update_period_ms + 100; entry.confirmed_decree = 15; entry.checkpoint_prepared = true;