Skip to content

Commit

Permalink
feat(split): child copy mutation asynchronously (#676)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored Dec 3, 2020
1 parent 65edf61 commit d9926a9
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "mutation_log.h"
#include "replica_stub.h"
#include "bulk_load/replica_bulk_loader.h"
#include "split/replica_split_manager.h"
#include "runtime/security/access_controller.h"
#include <dsn/utils/latency_tracer.h>
#include <dsn/dist/replication/replication_app_base.h>
Expand Down Expand Up @@ -238,6 +239,10 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c
}
mu->set_left_potential_secondary_ack_count(count);

if (_split_mgr->is_splitting()) {
_split_mgr->copy_mutation(mu);
}

if (mu->is_logged()) {
do_possible_commit_on_primary(mu);
} else {
Expand Down Expand Up @@ -480,6 +485,10 @@ void replica::on_prepare(dsn::message_ex *request)
return;
}

if (_split_mgr->is_splitting()) {
_split_mgr->copy_mutation(mu);
}

dassert(mu->log_task() == nullptr, "");
mu->log_task() = _stub->_log->append(mu,
LPC_WRITE_REPLICATION_LOG,
Expand Down
85 changes: 85 additions & 0 deletions src/replica/split/replica_split_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1172,5 +1172,90 @@ void replica_split_manager::trigger_secondary_parent_split(
// TODO(heyuchen): add other split_status check, response will be used in future
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::copy_mutation(mutation_ptr &mu) // on parent partition
{
dassert_replica(_child_gpid.get_app_id() > 0, "child_gpid({}) is invalid", _child_gpid);

// TODO(hyc): if copy mutation synchronously, add flags

mutation_ptr new_mu = mutation::copy_no_reply(mu);
error_code ec = _stub->split_replica_exec(
LPC_PARTITION_SPLIT,
_child_gpid,
std::bind(&replica_split_manager::on_copy_mutation, std::placeholders::_1, new_mu));
if (ec != ERR_OK) {
parent_cleanup_split_context();
}
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::on_copy_mutation(mutation_ptr &mu) // on child partition
{
if (status() != partition_status::PS_PARTITION_SPLIT) {
derror_replica(
"wrong status({}), ignore this mutation({})", enum_to_string(status()), mu->name());
_stub->split_replica_error_handler(
_replica->_split_states.parent_gpid, [mu](replica_split_manager *split_mgr) {
split_mgr->parent_cleanup_split_context();
split_mgr->on_copy_mutation_reply(
ERR_OK, mu->data.header.ballot, mu->data.header.decree);
});
return;
}

// It is possible for child has not copied parent prepare list, because parent and child may
// execute in different thread. In this case, child should ignore this mutation.
if (!_replica->_split_states.is_prepare_list_copied) {
return;
}

if (mu->data.header.ballot > get_ballot()) {
derror_replica("ballot changed, mutation ballot({}) vs local ballot({}), ignore copy this "
"mutation({})",
mu->data.header.ballot,
get_ballot(),
mu->name());
_stub->split_replica_error_handler(
_replica->_split_states.parent_gpid, [mu](replica_split_manager *split_mgr) {
split_mgr->parent_cleanup_split_context();
split_mgr->on_copy_mutation_reply(
ERR_OK, mu->data.header.ballot, mu->data.header.decree);
});
child_handle_split_error("on_copy_mutation failed because ballot changed");
return;
}

mu->data.header.pid = get_gpid();
_replica->_prepare_list->prepare(mu, partition_status::PS_SECONDARY);
if (!mu->is_sync_to_child()) { // child copy mutation asynchronously
if (!mu->is_logged()) {
mu->set_logged();
}
mu->log_task() = _stub->_log->append(
mu, LPC_WRITE_REPLICATION_LOG, tracker(), nullptr, get_gpid().thread_hash());
_replica->_private_log->append(
mu, LPC_WRITE_REPLICATION_LOG_COMMON, tracker(), nullptr, get_gpid().thread_hash());
} else {
// TODO(heyuchen): child copy mutation synchronously
}
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::ack_parent(error_code ec, mutation_ptr &mu) // on child partition
{
// TODO(heyuchen): when child copy mutation synchronously, child replica send ack to its parent
// TBD
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::on_copy_mutation_reply(error_code ec,
ballot b,
decree d) // on parent partition
{
// TODO(heyuchen): when child copy mutation synchronously, parent replica handle child ack
// TBD
}

} // namespace replication
} // namespace dsn
13 changes: 13 additions & 0 deletions src/replica/split/replica_split_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,19 @@ class replica_split_manager : replica_base
void trigger_secondary_parent_split(const group_check_request &request,
/*out*/ group_check_response &response);

// parent copy mutations to child during partition split
void copy_mutation(mutation_ptr &mu);

// child add mutation into prepare list and private log
// after child copy prepare list, before child replica become active
void on_copy_mutation(mutation_ptr &mu);

// when child copy mutation synchronously, child replica send ack to its parent
void ack_parent(dsn::error_code ec, mutation_ptr &mu);

// when child copy mutation synchronously, parent replica handle child ack
void on_copy_mutation_reply(dsn::error_code ec, ballot b, decree d);

//
// helper functions
//
Expand Down

0 comments on commit d9926a9

Please sign in to comment.