Skip to content

Commit

Permalink
fix(dup_enhancement#19): change checkpoint status after manual checkp…
Browse files Browse the repository at this point in the history
…oint completed (apache#1071)
  • Loading branch information
foreverneverer committed Mar 29, 2022
1 parent 7752c70 commit f18711c
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/meta/duplication/duplication_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class duplication_info

duplication_info() = default;

void start() { alter_status(duplication_status::DS_PREPARE); }
error_code start() { return alter_status(duplication_status::DS_PREPARE); }

// error will be returned if this state transition is not allowed.
error_code
Expand Down
10 changes: 7 additions & 3 deletions src/meta/duplication/meta_duplication_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,11 @@ void meta_duplication_service::do_add_duplication(std::shared_ptr<app_state> &ap
duplication_info_s_ptr &dup,
duplication_add_rpc &rpc)
{
dup->start();
const auto err = dup->start();
if (dsn_unlikely(err != ERR_OK)) {
derror_f("start dup[{}({})] failed: err = {}", app->app_name, dup->id, err.to_string());
return;
}
blob value = dup->to_json_blob();

std::queue<std::string> nodes({get_duplication_path(*app), std::to_string(dup->id)});
Expand Down Expand Up @@ -365,7 +369,7 @@ void meta_duplication_service::create_follower_app_for_duplication(
} else {
derror_f("created follower app[{}.{}] to trigger duplicate checkpoint failed: "
"duplication_status = {}, create_err = {}, update_err = {}",
get_current_cluster_name(),
dup->follower_cluster_name,
dup->app_name,
duplication_status_to_string(dup->status()),
create_err.to_string(),
Expand Down Expand Up @@ -443,7 +447,7 @@ void meta_duplication_service::check_follower_app_if_create_completed(
} else {
derror_f("query follower app[{}.{}] replica configuration completed, result: "
"duplication_status = {}, query_err = {}, update_err = {}",
get_current_cluster_name(),
dup->follower_cluster_name,
dup->app_name,
duplication_status_to_string(dup->status()),
query_err.to_string(),
Expand Down
28 changes: 24 additions & 4 deletions src/replica/replica_chkpt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ error_code replica::trigger_manual_emergency_checkpoint(decree old_decree)
}

if (old_decree <= _app->last_durable_decree()) {
ddebug_replica("checkpoint has been successful: old = {} vs latest = {}",
ddebug_replica("checkpoint has been completed: old = {} vs latest = {}",
old_decree,
_app->last_durable_decree());
_is_manual_emergency_checkpointing = false;
Expand All @@ -166,8 +166,8 @@ error_code replica::trigger_manual_emergency_checkpoint(decree old_decree)
return ERR_TRY_AGAIN;
}

_is_manual_emergency_checkpointing = true;
init_checkpoint(true);
_is_manual_emergency_checkpointing = true;
return ERR_OK;
}

Expand Down Expand Up @@ -249,7 +249,18 @@ error_code replica::background_async_checkpoint(bool is_emergency)
_app->last_durable_decree());
update_last_checkpoint_generate_time();
}
} else if (err == ERR_TRY_AGAIN) {

if (_is_manual_emergency_checkpointing) {
_is_manual_emergency_checkpointing = false;
_stub->_manual_emergency_checkpointing_count == 0
? 0
: (--_stub->_manual_emergency_checkpointing_count);
}

return err;
}

if (err == ERR_TRY_AGAIN) {
// already triggered memory flushing on async_checkpoint(), then try again later.
ddebug("%s: call app.async_checkpoint() returns ERR_TRY_AGAIN, time_used_ns = %" PRIu64
", schedule later checkpoint after 10 seconds",
Expand All @@ -260,7 +271,16 @@ error_code replica::background_async_checkpoint(bool is_emergency)
[this] { init_checkpoint(false); },
get_gpid().thread_hash(),
std::chrono::seconds(10));
} else if (err == ERR_WRONG_TIMING) {
return err;
}

if (_is_manual_emergency_checkpointing) {
_is_manual_emergency_checkpointing = false;
_stub->_manual_emergency_checkpointing_count == 0
? 0
: (--_stub->_manual_emergency_checkpointing_count);
}
if (err == ERR_WRONG_TIMING) {
// do nothing
ddebug("%s: call app.async_checkpoint() returns ERR_WRONG_TIMING, time_used_ns = %" PRIu64
", just ignore",
Expand Down
4 changes: 3 additions & 1 deletion src/replica/test/replica_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,9 @@ TEST_F(replica_test, test_trigger_manual_emergency_checkpoint)
force_update_checkpointing(true);
ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(101), ERR_BUSY);
ASSERT_TRUE(is_checkpointing());
force_update_checkpointing(false);
// test running task completed
_mock_replica->tracker()->wait_outstanding_tasks();
ASSERT_FALSE(is_checkpointing());

// test exceed max concurrent count
ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(101), ERR_OK);
Expand Down

0 comments on commit f18711c

Please sign in to comment.