Skip to content

Commit

Permalink
fix(dup_enhancement#20): put `async_duplicate_checkpoint_from_master_…
Browse files Browse the repository at this point in the history
…replica` into `DEFAULT_POOL` to avoid thread lock (apache#1076)
  • Loading branch information
foreverneverer committed Mar 29, 2022
1 parent f18711c commit 516ff0c
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 27 deletions.
4 changes: 3 additions & 1 deletion include/dsn/tool-api/task_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ class task_tracker
// return not finished task count
int cancel_but_not_wait_outstanding_tasks();

void set_success() { _all_tasks_success = true; }
void set_tasks_success() { _all_tasks_success = true; }

void clear_tasks_state() { _all_tasks_success = false; }

bool all_tasks_success() const { return _all_tasks_success; }

Expand Down
18 changes: 13 additions & 5 deletions src/meta/duplication/duplication_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <dsn/dist/replication/duplication_common.h>
#include <dsn/cpp/json_helper.h>
#include <dsn/tool-api/zlocks.h>
#include <dsn/dist/fmt_logging.h>

#include <utility>
#include <fmt/format.h>
Expand Down Expand Up @@ -144,11 +145,18 @@ class duplication_info

bool all_checkpoint_has_prepared()
{
return std::all_of(_progress.begin(),
_progress.end(),
[](std::pair<int, partition_progress> item) -> bool {
return item.second.checkpoint_prepared;
});
int prepared = 0;
bool completed =
std::all_of(_progress.begin(),
_progress.end(),
[&](std::pair<int, partition_progress> item) -> bool {
prepared = item.second.checkpoint_prepared ? prepared + 1 : prepared;
return item.second.checkpoint_prepared;
});
if (!completed) {
dwarn_f("replica checkpoint still running: {}/{}", prepared, _progress.size());
}
return completed;
}

void report_progress_if_time_up();
Expand Down
5 changes: 5 additions & 0 deletions src/meta/duplication/meta_duplication_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,11 @@ void meta_duplication_service::check_follower_app_if_create_completed(
break;
}

if (partition.secondaries.empty()) {
query_err = ERR_NOT_ENOUGH_MEMBER;
break;
}

for (const auto &secondary : partition.secondaries) {
if (secondary.is_invalid()) {
query_err = ERR_INACTIVE_STATE;
Expand Down
5 changes: 3 additions & 2 deletions src/replica/duplication/replica_duplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ replica_duplicator::replica_duplicator(const duplication_entry &ent, replica *r)
void replica_duplicator::prepare_dup()
{
ddebug_replica("start prepare checkpoint to catch up with latest durable decree: "
"start_point_decree({}) vs last_durable_decree({})",
"start_point_decree({}) < last_durable_decree({}) = {}",
_start_point_decree,
_replica->last_durable_decree());
_replica->last_durable_decree(),
_start_point_decree < _replica->last_durable_decree());

tasking::enqueue(
LPC_REPLICATION_COMMON,
Expand Down
41 changes: 25 additions & 16 deletions src/replica/duplication/replica_follower.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,27 @@ void replica_follower::init_master_info()
// ThreadPool: THREAD_POOL_REPLICATION_LONG
error_code replica_follower::duplicate_checkpoint()
{
zauto_lock l(_lock);
if (_duplicating_checkpoint) {
dwarn_replica("duplicate master[{}] checkpoint is running", master_replica_name());
return ERR_BUSY;
}

ddebug_replica("start duplicate master[{}] checkpoint", master_replica_name());
zauto_lock l(_lock);
_duplicating_checkpoint = true;
async_duplicate_checkpoint_from_master_replica();
tasking::enqueue(LPC_DUPLICATE_CHECKPOINT, &_tracker, [=]() mutable {
async_duplicate_checkpoint_from_master_replica();
});
_tracker.wait_outstanding_tasks();
_duplicating_checkpoint = false;
return _tracker.all_tasks_success() ? ERR_OK : ERR_CORRUPTION;
if (_tracker.all_tasks_success()) {
_tracker.clear_tasks_state();
return ERR_OK;
}
return ERR_TRY_AGAIN;
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
// ThreadPool: THREAD_POOL_DEFAULT
void replica_follower::async_duplicate_checkpoint_from_master_replica()
{
rpc_address meta_servers;
Expand All @@ -100,18 +106,16 @@ void replica_follower::async_duplicate_checkpoint_from_master_replica()
msg,
&_tracker,
[&](error_code err, configuration_query_by_index_response &&resp) mutable {
tasking::enqueue(LPC_DUPLICATE_CHECKPOINT, &_tracker, [=]() mutable {
FAIL_POINT_INJECT_F("duplicate_checkpoint_ok", [&](string_view s) -> void {
_tracker.set_success();
return;
});

FAIL_POINT_INJECT_F("duplicate_checkpoint_failed",
[&](string_view s) -> void { return; });
if (update_master_replica_config(err, std::move(resp)) == ERR_OK) {
copy_master_replica_checkpoint();
}
FAIL_POINT_INJECT_F("duplicate_checkpoint_ok", [&](string_view s) -> void {
_tracker.set_tasks_success();
return;
});

FAIL_POINT_INJECT_F("duplicate_checkpoint_failed",
[&](string_view s) -> void { return; });
if (update_master_replica_config(err, std::move(resp)) == ERR_OK) {
copy_master_replica_checkpoint();
}
});
}

Expand Down Expand Up @@ -158,6 +162,11 @@ replica_follower::update_master_replica_config(error_code err,

// since the request just specify one partition, the result size is single
_master_replica_config = resp.partitions[0];
ddebug_replica(
"query master[{}] config successfully and update local config: remote={}, gpid={}",
master_replica_name(),
_master_replica_config.primary.to_string(),
_master_replica_config.pid.to_string());
return ERR_OK;
}

Expand Down Expand Up @@ -241,7 +250,7 @@ void replica_follower::nfs_copy_remote_files(const rpc_address &remote_node,
master_replica_name(),
remote_dir,
size);
_tracker.set_success();
_tracker.set_tasks_success();
});
}

Expand Down
4 changes: 2 additions & 2 deletions src/replica/duplication/test/replica_follower_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class replica_follower_test : public duplication_test_base

void mark_tracker_tasks_success(replica_follower *follower)
{
follower->_tracker.set_success();
follower->_tracker.set_tasks_success();
}

error_code update_master_replica_config(replica_follower *follower,
Expand Down Expand Up @@ -137,7 +137,7 @@ TEST_F(replica_follower_test, test_duplicate_checkpoint)

auto follower = _mock_replica->get_replica_follower();

ASSERT_EQ(follower->duplicate_checkpoint(), ERR_CORRUPTION);
ASSERT_EQ(follower->duplicate_checkpoint(), ERR_TRY_AGAIN);
ASSERT_FALSE(get_duplicating(follower));

mark_tracker_tasks_success(follower);
Expand Down
2 changes: 1 addition & 1 deletion src/replica/replica_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ error_code replica::initialize_on_new()
// clear work on failure
utils::filesystem::remove_path(path);
stub->_fs_manager.remove_replica(pid);
return rep;
return nullptr;
}

error_code replica::initialize_on_load()
Expand Down

0 comments on commit 516ff0c

Please sign in to comment.