Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(split): replica server handle pause and cancel status #681

Merged
merged 3 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions include/dsn/dist/replication/replication_types.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions src/common/replication_types.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/replica/replica_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ void replica::on_group_check_reply(error_code err,
req->config.status == partition_status::PS_POTENTIAL_SECONDARY) {
handle_learning_succeeded_on_primary(req->node, resp->learner_signature);
}
_split_mgr->primary_parent_handle_stop_split(req, resp);
}
}

Expand Down
18 changes: 11 additions & 7 deletions src/replica/replica_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ void primary_context::cleanup(bool clean_pending_mutations)
// clean up checkpoint
CLEANUP_TASK_ALWAYS(checkpoint_task)

// clean up register child task
CLEANUP_TASK_ALWAYS(register_child_task)

// cleanup group bulk load
for (auto &kv : group_bulk_load_pending_replies) {
CLEANUP_TASK_ALWAYS(kv.second);
Expand All @@ -78,11 +75,9 @@ void primary_context::cleanup(bool clean_pending_mutations)

membership.ballot = 0;

caught_up_children.clear();

sync_send_write_request = false;

cleanup_bulk_load_states();

cleanup_split_states();
}

bool primary_context::is_cleaned()
Expand Down Expand Up @@ -170,6 +165,15 @@ void primary_context::cleanup_bulk_load_states()
ingestion_is_empty_prepare_sent = false;
}

void primary_context::cleanup_split_states()
{
CLEANUP_TASK_ALWAYS(register_child_task)

caught_up_children.clear();
sync_send_write_request = false;
split_stopped_secondary.clear();
}

bool secondary_context::cleanup(bool force)
{
CLEANUP_TASK(checkpoint_task, force)
Expand Down
6 changes: 6 additions & 0 deletions src/replica/replica_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ class primary_context

void cleanup_bulk_load_states();

void cleanup_split_states();

public:
// membership mgr, including learners
partition_configuration membership;
Expand Down Expand Up @@ -152,6 +154,10 @@ class primary_context
// primary parent register child on meta_server task
dsn::task_ptr register_child_task;

// Used partition split
// secondary replica address who has paused or canceled split
std::unordered_set<rpc_address> split_stopped_secondary;

// Used for bulk load
// group bulk_load response tasks of RPC_GROUP_BULK_LOAD for each secondary replica
node_tasks group_bulk_load_pending_replies;
Expand Down
98 changes: 95 additions & 3 deletions src/replica/split/replica_split_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ void replica_split_manager::parent_start_split(
return;
}

// TODO(heyuchen): if partition is primary, reset split related varieties

if (status() == partition_status::PS_PRIMARY) {
_replica->_primary_states.cleanup_split_states();
}
_partition_version.store(_replica->_app_info.partition_count - 1);

_split_status = split_status::SPLITTING;
Expand Down Expand Up @@ -1144,6 +1145,17 @@ void replica_split_manager::trigger_primary_parent_split(
return;
}

if (meta_split_status == split_status::PAUSING ||
meta_split_status == split_status::CANCELING) {
parent_stop_split(meta_split_status);
return;
}

if (meta_split_status == split_status::PAUSED) {
dwarn_replica("split has been paused, ignore it");
return;
}

// TODO(heyuchen): add other split_status check
}

Expand All @@ -1169,7 +1181,11 @@ void replica_split_manager::trigger_secondary_parent_split(
return;
}

// TODO(heyuchen): add other split_status check, response will be used in future
if (request.meta_split_status == split_status::PAUSING ||
request.meta_split_status == split_status::CANCELING) { // secondary pause or cancel split
parent_stop_split(request.meta_split_status);
response.__set_is_split_stopped(true);
}
}

// ThreadPool: THREAD_POOL_REPLICATION
Expand Down Expand Up @@ -1257,5 +1273,81 @@ void replica_split_manager::on_copy_mutation_reply(error_code ec,
// TBD
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::parent_stop_split(
split_status::type meta_split_status) // on parent partition
{
dassert_replica(status() == partition_status::PS_PRIMARY ||
status() == partition_status::PS_SECONDARY,
"wrong partition_status({})",
enum_to_string(status()));
dassert_replica(_split_status == split_status::SPLITTING ||
_split_status == split_status::NOT_SPLIT,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why allow stop under NOT_SPLIT?

Copy link
Contributor

@foreverneverer foreverneverer Jan 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, if we send cancel or pause by mistake, dassert will cause crash? is your expect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When learn happened, parent partition may stop split, set it _split_status as NOT_SPLIT, meta server won't know it, so it is possible when a pause or cancel split request sync to parent partition, its split_status is NOT_SPLIT.

Copy link
Contributor Author

@hycdong hycdong Jan 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, if we send cancel or pause by mistake, dassert will cause crash? is your expect?

Pause or cancel split request will send to meta server, meta server should check if this table is splitting, you can reference pr679. Besides, parent partition will not set split_status as PAUSING and CANCELING, when it receives pause or cancel request, it will set it NOT_SPLIT. I don't know if I explain it clearly, you can comment to me if you have any questions.

"wrong split_status({})",
enum_to_string(_split_status));

auto old_status = _split_status;
if (_split_status == split_status::SPLITTING) {
_stub->split_replica_error_handler(
_child_gpid,
std::bind(&replica_split_manager::child_handle_split_error,
std::placeholders::_1,
"stop partition split"));
parent_cleanup_split_context();
}
_partition_version.store(_replica->_app_info.partition_count - 1);

if (status() == partition_status::PS_PRIMARY) {
_replica->_primary_states.sync_send_write_request = false;
_replica->broadcast_group_check();
}
ddebug_replica(
"{} split succeed, status = {}, old split_status = {}, child partition_index = {}",
meta_split_status == split_status::PAUSING ? "pause" : "cancel",
enum_to_string(status()),
enum_to_string(old_status),
get_gpid().get_partition_index() + _replica->_app_info.partition_count);
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::primary_parent_handle_stop_split(
const std::shared_ptr<group_check_request> &req,
const std::shared_ptr<group_check_response> &resp) // on primary parent partition
{
if (!req->__isset.meta_split_status || (req->meta_split_status != split_status::PAUSING &&
req->meta_split_status != split_status::CANCELING)) {
// partition is not executing split or not stopping split
return;
}

if (!resp->__isset.is_split_stopped || !resp->is_split_stopped) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__isset.is_split_stopped and is_split_stopped defference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__isset.is_split_stopped = false means this group_check_response doesn't have any information about pause or cancel split including normal group_check or splitting group_check, when it is true, meaning this group_check include pause or cancel split information. resp->is_split_stopped = true means secondary parent partition pause or cancel split succeed.

// secondary has not stopped split
return;
}

_replica->_primary_states.split_stopped_secondary.insert(req->node);
auto count = 0;
for (auto &iter : _replica->_primary_states.statuses) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can count be stored as global variable, but not repeat compute when check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's safety to check it each time, because _primary_states.statuses may change during learn, and this case is not always happened, it's okay to check it.

if (iter.second == partition_status::PS_SECONDARY &&
_replica->_primary_states.split_stopped_secondary.find(iter.first) !=
_replica->_primary_states.split_stopped_secondary.end()) {
++count;
}
}
// all secondaries have already stop split succeed
if (count == _replica->_primary_states.membership.max_replica_count - 1) {
_replica->_primary_states.cleanup_split_states();
parent_send_notify_stop_request(req->meta_split_status);
}
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::parent_send_notify_stop_request(
split_status::type meta_split_status) // on primary parent
{
FAIL_POINT_INJECT_F("replica_parent_send_notify_stop_request", [](dsn::string_view) {});
// TODO(hyc): TBD
}

} // namespace replication
} // namespace dsn
9 changes: 9 additions & 0 deletions src/replica/split/replica_split_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,15 @@ class replica_split_manager : replica_base
// when child copy mutation synchronously, parent replica handle child ack
void on_copy_mutation_reply(dsn::error_code ec, ballot b, decree d);

// parent partition pause or cancel split
void parent_stop_split(split_status::type meta_split_status);

// called by `on_group_check_reply` in `replica_check.cpp`
// if group all replica pause/cancel split, send notify request to meta server
void primary_parent_handle_stop_split(const std::shared_ptr<group_check_request> &req,
const std::shared_ptr<group_check_response> &resp);
void parent_send_notify_stop_request(split_status::type meta_split_status);

//
// helper functions
//
Expand Down
Loading