diff --git a/include/dsn/tool-api/task_tracker.h b/include/dsn/tool-api/task_tracker.h index c6c21293ec..06b26cab2c 100644 --- a/include/dsn/tool-api/task_tracker.h +++ b/include/dsn/tool-api/task_tracker.h @@ -167,7 +167,9 @@ class task_tracker // return not finished task count int cancel_but_not_wait_outstanding_tasks(); - void set_success() { _all_tasks_success = true; } + void set_tasks_success() { _all_tasks_success = true; } + + void clear_tasks_state() { _all_tasks_success = false; } bool all_tasks_success() const { return _all_tasks_success; } diff --git a/src/meta/duplication/duplication_info.h b/src/meta/duplication/duplication_info.h index 5b7b1bcb66..0c5ab9ddf0 100644 --- a/src/meta/duplication/duplication_info.h +++ b/src/meta/duplication/duplication_info.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -144,11 +145,18 @@ class duplication_info bool all_checkpoint_has_prepared() { - return std::all_of(_progress.begin(), - _progress.end(), - [](std::pair item) -> bool { - return item.second.checkpoint_prepared; - }); + int prepared = 0; + bool completed = + std::all_of(_progress.begin(), + _progress.end(), + [&](std::pair item) -> bool { + prepared = item.second.checkpoint_prepared ? prepared + 1 : prepared; + return item.second.checkpoint_prepared; + }); + if (!completed) { + dwarn_f("replica checkpoint still running: {}/{}", prepared, _progress.size()); + } + return completed; } void report_progress_if_time_up(); diff --git a/src/meta/duplication/meta_duplication_service.cpp b/src/meta/duplication/meta_duplication_service.cpp index 5096ebd18b..7059ffa5e0 100644 --- a/src/meta/duplication/meta_duplication_service.cpp +++ b/src/meta/duplication/meta_duplication_service.cpp @@ -419,6 +419,11 @@ void meta_duplication_service::check_follower_app_if_create_completed( break; } + if (partition.secondaries.empty()) { + query_err = ERR_NOT_ENOUGH_MEMBER; + break; + } + for (const auto &secondary : partition.secondaries) { if (secondary.is_invalid()) { query_err = ERR_INACTIVE_STATE; diff --git a/src/replica/duplication/replica_duplicator.cpp b/src/replica/duplication/replica_duplicator.cpp index 263c86549a..2f22a82d86 100644 --- a/src/replica/duplication/replica_duplicator.cpp +++ b/src/replica/duplication/replica_duplicator.cpp @@ -55,9 +55,10 @@ replica_duplicator::replica_duplicator(const duplication_entry &ent, replica *r) void replica_duplicator::prepare_dup() { ddebug_replica("start prepare checkpoint to catch up with latest durable decree: " - "start_point_decree({}) vs last_durable_decree({})", + "start_point_decree({}) < last_durable_decree({}) = {}", _start_point_decree, - _replica->last_durable_decree()); + _replica->last_durable_decree(), + _start_point_decree < _replica->last_durable_decree()); tasking::enqueue( LPC_REPLICATION_COMMON, diff --git a/src/replica/duplication/replica_follower.cpp b/src/replica/duplication/replica_follower.cpp index 22a80e06e6..26eda8a347 100644 --- a/src/replica/duplication/replica_follower.cpp +++ b/src/replica/duplication/replica_follower.cpp @@ -66,21 +66,27 @@ void replica_follower::init_master_info() // ThreadPool: THREAD_POOL_REPLICATION_LONG error_code replica_follower::duplicate_checkpoint() { + zauto_lock l(_lock); if (_duplicating_checkpoint) { dwarn_replica("duplicate master[{}] checkpoint is running", master_replica_name()); return ERR_BUSY; } ddebug_replica("start duplicate master[{}] checkpoint", master_replica_name()); - zauto_lock l(_lock); _duplicating_checkpoint = true; - async_duplicate_checkpoint_from_master_replica(); + tasking::enqueue(LPC_DUPLICATE_CHECKPOINT, &_tracker, [=]() mutable { + async_duplicate_checkpoint_from_master_replica(); + }); _tracker.wait_outstanding_tasks(); _duplicating_checkpoint = false; - return _tracker.all_tasks_success() ? ERR_OK : ERR_CORRUPTION; + if (_tracker.all_tasks_success()) { + _tracker.clear_tasks_state(); + return ERR_OK; + } + return ERR_TRY_AGAIN; } -// ThreadPool: THREAD_POOL_REPLICATION_LONG +// ThreadPool: THREAD_POOL_DEFAULT void replica_follower::async_duplicate_checkpoint_from_master_replica() { rpc_address meta_servers; @@ -100,18 +106,16 @@ void replica_follower::async_duplicate_checkpoint_from_master_replica() msg, &_tracker, [&](error_code err, configuration_query_by_index_response &&resp) mutable { - tasking::enqueue(LPC_DUPLICATE_CHECKPOINT, &_tracker, [=]() mutable { - FAIL_POINT_INJECT_F("duplicate_checkpoint_ok", [&](string_view s) -> void { - _tracker.set_success(); - return; - }); - - FAIL_POINT_INJECT_F("duplicate_checkpoint_failed", - [&](string_view s) -> void { return; }); - if (update_master_replica_config(err, std::move(resp)) == ERR_OK) { - copy_master_replica_checkpoint(); - } + FAIL_POINT_INJECT_F("duplicate_checkpoint_ok", [&](string_view s) -> void { + _tracker.set_tasks_success(); + return; }); + + FAIL_POINT_INJECT_F("duplicate_checkpoint_failed", + [&](string_view s) -> void { return; }); + if (update_master_replica_config(err, std::move(resp)) == ERR_OK) { + copy_master_replica_checkpoint(); + } }); } @@ -158,6 +162,11 @@ replica_follower::update_master_replica_config(error_code err, // since the request just specify one partition, the result size is single _master_replica_config = resp.partitions[0]; + ddebug_replica( + "query master[{}] config successfully and update local config: remote={}, gpid={}", + master_replica_name(), + _master_replica_config.primary.to_string(), + _master_replica_config.pid.to_string()); return ERR_OK; } @@ -241,7 +250,7 @@ void replica_follower::nfs_copy_remote_files(const rpc_address &remote_node, master_replica_name(), remote_dir, size); - _tracker.set_success(); + _tracker.set_tasks_success(); }); } diff --git a/src/replica/duplication/test/replica_follower_test.cpp b/src/replica/duplication/test/replica_follower_test.cpp index e22ee35d86..96de3f5a15 100644 --- a/src/replica/duplication/test/replica_follower_test.cpp +++ b/src/replica/duplication/test/replica_follower_test.cpp @@ -74,7 +74,7 @@ class replica_follower_test : public duplication_test_base void mark_tracker_tasks_success(replica_follower *follower) { - follower->_tracker.set_success(); + follower->_tracker.set_tasks_success(); } error_code update_master_replica_config(replica_follower *follower, @@ -137,7 +137,7 @@ TEST_F(replica_follower_test, test_duplicate_checkpoint) auto follower = _mock_replica->get_replica_follower(); - ASSERT_EQ(follower->duplicate_checkpoint(), ERR_CORRUPTION); + ASSERT_EQ(follower->duplicate_checkpoint(), ERR_TRY_AGAIN); ASSERT_FALSE(get_duplicating(follower)); mark_tracker_tasks_success(follower); diff --git a/src/replica/replica_init.cpp b/src/replica/replica_init.cpp index b2ae006a20..56957e2136 100644 --- a/src/replica/replica_init.cpp +++ b/src/replica/replica_init.cpp @@ -118,7 +118,7 @@ error_code replica::initialize_on_new() // clear work on failure utils::filesystem::remove_path(path); stub->_fs_manager.remove_replica(pid); - return rep; + return nullptr; } error_code replica::initialize_on_load()