From f18711cc5c214f2559d83d169ac9715b91d033fd Mon Sep 17 00:00:00 2001 From: Jiashuo Date: Thu, 10 Mar 2022 11:13:09 +0800 Subject: [PATCH] fix(dup_enhancement#19): change checkpoint status after manual checkpoint completed (#1071) --- src/meta/duplication/duplication_info.h | 2 +- .../duplication/meta_duplication_service.cpp | 10 +++++-- src/replica/replica_chkpt.cpp | 28 ++++++++++++++++--- src/replica/test/replica_test.cpp | 4 ++- 4 files changed, 35 insertions(+), 9 deletions(-) diff --git a/src/meta/duplication/duplication_info.h b/src/meta/duplication/duplication_info.h index 4b3755f534..5b7b1bcb66 100644 --- a/src/meta/duplication/duplication_info.h +++ b/src/meta/duplication/duplication_info.h @@ -64,7 +64,7 @@ class duplication_info duplication_info() = default; - void start() { alter_status(duplication_status::DS_PREPARE); } + error_code start() { return alter_status(duplication_status::DS_PREPARE); } // error will be returned if this state transition is not allowed. error_code diff --git a/src/meta/duplication/meta_duplication_service.cpp b/src/meta/duplication/meta_duplication_service.cpp index e4a667c7ce..5096ebd18b 100644 --- a/src/meta/duplication/meta_duplication_service.cpp +++ b/src/meta/duplication/meta_duplication_service.cpp @@ -188,7 +188,11 @@ void meta_duplication_service::do_add_duplication(std::shared_ptr &ap duplication_info_s_ptr &dup, duplication_add_rpc &rpc) { - dup->start(); + const auto err = dup->start(); + if (dsn_unlikely(err != ERR_OK)) { + derror_f("start dup[{}({})] failed: err = {}", app->app_name, dup->id, err.to_string()); + return; + } blob value = dup->to_json_blob(); std::queue nodes({get_duplication_path(*app), std::to_string(dup->id)}); @@ -365,7 +369,7 @@ void meta_duplication_service::create_follower_app_for_duplication( } else { derror_f("created follower app[{}.{}] to trigger duplicate checkpoint failed: " "duplication_status = {}, create_err = {}, update_err = {}", - get_current_cluster_name(), + dup->follower_cluster_name, dup->app_name, duplication_status_to_string(dup->status()), create_err.to_string(), @@ -443,7 +447,7 @@ void meta_duplication_service::check_follower_app_if_create_completed( } else { derror_f("query follower app[{}.{}] replica configuration completed, result: " "duplication_status = {}, query_err = {}, update_err = {}", - get_current_cluster_name(), + dup->follower_cluster_name, dup->app_name, duplication_status_to_string(dup->status()), query_err.to_string(), diff --git a/src/replica/replica_chkpt.cpp b/src/replica/replica_chkpt.cpp index c3e72c0f31..25543e506c 100644 --- a/src/replica/replica_chkpt.cpp +++ b/src/replica/replica_chkpt.cpp @@ -142,7 +142,7 @@ error_code replica::trigger_manual_emergency_checkpoint(decree old_decree) } if (old_decree <= _app->last_durable_decree()) { - ddebug_replica("checkpoint has been successful: old = {} vs latest = {}", + ddebug_replica("checkpoint has been completed: old = {} vs latest = {}", old_decree, _app->last_durable_decree()); _is_manual_emergency_checkpointing = false; @@ -166,8 +166,8 @@ error_code replica::trigger_manual_emergency_checkpoint(decree old_decree) return ERR_TRY_AGAIN; } - _is_manual_emergency_checkpointing = true; init_checkpoint(true); + _is_manual_emergency_checkpointing = true; return ERR_OK; } @@ -249,7 +249,18 @@ error_code replica::background_async_checkpoint(bool is_emergency) _app->last_durable_decree()); update_last_checkpoint_generate_time(); } - } else if (err == ERR_TRY_AGAIN) { + + if (_is_manual_emergency_checkpointing) { + _is_manual_emergency_checkpointing = false; + _stub->_manual_emergency_checkpointing_count == 0 + ? 0 + : (--_stub->_manual_emergency_checkpointing_count); + } + + return err; + } + + if (err == ERR_TRY_AGAIN) { // already triggered memory flushing on async_checkpoint(), then try again later. ddebug("%s: call app.async_checkpoint() returns ERR_TRY_AGAIN, time_used_ns = %" PRIu64 ", schedule later checkpoint after 10 seconds", @@ -260,7 +271,16 @@ error_code replica::background_async_checkpoint(bool is_emergency) [this] { init_checkpoint(false); }, get_gpid().thread_hash(), std::chrono::seconds(10)); - } else if (err == ERR_WRONG_TIMING) { + return err; + } + + if (_is_manual_emergency_checkpointing) { + _is_manual_emergency_checkpointing = false; + _stub->_manual_emergency_checkpointing_count == 0 + ? 0 + : (--_stub->_manual_emergency_checkpointing_count); + } + if (err == ERR_WRONG_TIMING) { // do nothing ddebug("%s: call app.async_checkpoint() returns ERR_WRONG_TIMING, time_used_ns = %" PRIu64 ", just ignore", diff --git a/src/replica/test/replica_test.cpp b/src/replica/test/replica_test.cpp index dc255e0221..b4c18103da 100644 --- a/src/replica/test/replica_test.cpp +++ b/src/replica/test/replica_test.cpp @@ -358,7 +358,9 @@ TEST_F(replica_test, test_trigger_manual_emergency_checkpoint) force_update_checkpointing(true); ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(101), ERR_BUSY); ASSERT_TRUE(is_checkpointing()); - force_update_checkpointing(false); + // test running task completed + _mock_replica->tracker()->wait_outstanding_tasks(); + ASSERT_FALSE(is_checkpointing()); // test exceed max concurrent count ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(101), ERR_OK);