Skip to content

Commit

Permalink
Do not count to-be-removed server in quorum (#92)
Browse files Browse the repository at this point in the history
* To-be-removed server should be treated the same as learner.

* Set `srv_to_leave_` earlier than commit, to reduce one RPC to remove
a server.
  • Loading branch information
greensky00 authored Dec 31, 2019
1 parent 7f4041d commit 1a76f48
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 68 deletions.
25 changes: 13 additions & 12 deletions include/libnuraft/async.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@ limitations under the License.
namespace nuraft {

enum cmd_result_code {
OK = 0,
CANCELLED = -1,
TIMEOUT = -2,
NOT_LEADER = -3,
BAD_REQUEST = -4,
SERVER_ALREADY_EXISTS = -5,
CONFIG_CHANGING = -6,
SERVER_IS_JOINING = -7,
SERVER_NOT_FOUND = -8,
CANNOT_REMOVE_LEADER = -9,

FAILED = -32768,
OK = 0,
CANCELLED = -1,
TIMEOUT = -2,
NOT_LEADER = -3,
BAD_REQUEST = -4,
SERVER_ALREADY_EXISTS = -5,
CONFIG_CHANGING = -6,
SERVER_IS_JOINING = -7,
SERVER_NOT_FOUND = -8,
CANNOT_REMOVE_LEADER = -9,
SERVER_IS_LEAVING = -10,

FAILED = -32768,
};

template< typename T,
Expand Down
2 changes: 2 additions & 0 deletions include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ protected:
bool check_leadership_validity();
void update_rand_timeout();

bool is_regular_member(const ptr<peer>& p);
int32 get_num_voting_members();
int32 get_quorum_for_election();
int32 get_quorum_for_commit();
Expand Down Expand Up @@ -552,6 +553,7 @@ protected:
void reconnect_client(peer& p);
void become_leader();
void become_follower();
void check_srv_to_leave_timeout();
void enable_hb_for_peer(peer& p);
void stop_election_timer();
void handle_hb_timeout(int32 srv_id);
Expand Down
52 changes: 20 additions & 32 deletions src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,18 @@ bool raft_server::request_append_entries(ptr<peer> p) {

if ( srv_to_leave_ &&
srv_to_leave_->get_id() == p->get_id() &&
msg->get_commit_idx() >= srv_to_leave_target_idx_ ) {
msg->get_commit_idx() >= srv_to_leave_target_idx_ &&
!srv_to_leave_->is_stepping_down() ) {
// If this is the server to leave, AND
// current request's commit index includes
// the target log index number, step down and remove it
// as soon as we get the corresponding response.
srv_to_leave_->step_down();
p_in("srv_to_leave_ %d is safe to be erased from peer list, "
"log idx %zu commit idx %zu, set flag",
srv_to_leave_->get_id(),
msg->get_last_log_idx(),
msg->get_commit_idx());
}

p_tr("sent\n");
Expand Down Expand Up @@ -673,34 +679,18 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
return;
}

if (srv_to_leave_) {
ulong last_active_ms = srv_to_leave_->get_active_timer_us() / 1000;
bool reset_srv = false;
if ( last_active_ms >
(ulong)peer::RESPONSE_LIMIT *
ctx_->get_params()->heart_beat_interval_ ) {
// Timeout: remove peer.
p_wn("server to be removed %d, activity timeout %zu ms. "
"force remove now",
srv_to_leave_->get_id(),
last_active_ms);
reset_srv = true;

} else if ( srv_to_leave_->get_id() == resp.get_src() &&
srv_to_leave_->is_stepping_down() ) {
// Catch-up is done.
p_in("server to be removed %d fully caught up the "
"target config log %zu",
srv_to_leave_->get_id(),
srv_to_leave_target_idx_);
reset_srv = true;
}

if (reset_srv) {
remove_peer_from_peers(srv_to_leave_);
reset_srv_to_leave();
return;
}
check_srv_to_leave_timeout();
if ( srv_to_leave_ &&
srv_to_leave_->get_id() == resp.get_src() &&
srv_to_leave_->is_stepping_down() ) {
// Catch-up is done.
p_in("server to be removed %d fully caught up the "
"target config log %zu",
srv_to_leave_->get_id(),
srv_to_leave_target_idx_);
remove_peer_from_peers(srv_to_leave_);
reset_srv_to_leave();
return;
}

// If there are pending logs to be synced or commit index need to be advanced,
Expand Down Expand Up @@ -832,9 +822,7 @@ ulong raft_server::get_expected_committed_log_idx() {
matched_indexes.push_back( precommit_index_ );
for (auto& entry: peers_) {
ptr<peer>& p = entry.second;

// Skip learner.
if (p->is_learner()) continue;
if (!is_regular_member(p)) continue;

matched_indexes.push_back( p->get_matched_idx() );
}
Expand Down
6 changes: 2 additions & 4 deletions src/handle_commit.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -609,10 +609,8 @@ void raft_server::reconfigure(const ptr<cluster_config>& new_config) {
// commit index, S3 will be just force removed.

if (role_ == srv_role::leader) {
srv_to_leave_ = pit->second;
srv_to_leave_target_idx_ = new_config->get_log_idx();
p_in("server %d will be removed from cluster, config %zu",
srv_removed, srv_to_leave_target_idx_);
// If leader, keep the to-be-removed server in peer list
// until 1) catch-up is done, or 2) timeout.
} else {
remove_peer_from_peers(pit->second);
sprintf(temp_buf, "remove peer %d\n", srv_removed);
Expand Down
28 changes: 25 additions & 3 deletions src/handle_join_leave.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,13 @@ ptr<resp_msg> raft_server::handle_rm_srv_req(req_msg& req) {
return resp;
}

check_srv_to_leave_timeout();
if (srv_to_leave_) {
p_wn("previous to-be-removed server has not left yet");
resp->set_result_code(cmd_result_code::SERVER_IS_LEAVING);
return resp;
}

if (config_changing_) {
// the previous config has not committed yet
p_wn("previous config has not committed yet");
Expand Down Expand Up @@ -444,6 +451,17 @@ void raft_server::rm_srv_from_cluster(int32 srv_id) {
new_conf_buf,
log_val_type::conf ) );
store_log_entry(entry);

auto p_entry = peers_.find(srv_id);
if (p_entry != peers_.end()) {
ptr<peer> pp = p_entry->second;
srv_to_leave_ = pp;
srv_to_leave_target_idx_ = new_conf->get_log_idx();
p_in("set srv_to_leave_, "
"server %d will be removed from cluster, config %zu",
srv_id, srv_to_leave_target_idx_);
}

request_append_entries();
}

Expand Down Expand Up @@ -473,10 +491,13 @@ void raft_server::handle_join_leave_rpc_err(msg_type t_msg, ptr<peer> p) {
pit->second->enable_hb(false);
peers_.erase(pit);
p_in("server %d is removed from cluster", p->get_id());
} else {
p_in("peer %d cannot be found, no action for removing",
p->get_id());
}
else {
p_in( "peer %d cannot be found, no action for removing",
p->get_id() );

if (srv_to_leave_) {
reset_srv_to_leave();
}
}

Expand Down Expand Up @@ -504,6 +525,7 @@ void raft_server::reset_srv_to_join() {
void raft_server::reset_srv_to_leave() {
srv_to_leave_.reset();
srv_to_leave_target_idx_ = 0;
p_in("clearing srv_to_leave_");
}

} // namespace nuraft;
Expand Down
18 changes: 18 additions & 0 deletions src/handle_timeout.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,27 @@ void raft_server::enable_hb_for_peer(peer& p) {
schedule_task(p.get_hb_task(), p.get_current_hb_interval());
}

void raft_server::check_srv_to_leave_timeout() {
if (!srv_to_leave_) return;
ulong last_active_ms = srv_to_leave_->get_active_timer_us() / 1000;
if ( last_active_ms >
(ulong)peer::RESPONSE_LIMIT *
ctx_->get_params()->heart_beat_interval_ ) {
// Timeout: remove peer.
p_wn("server to be removed %d, activity timeout %zu ms. "
"force remove now",
srv_to_leave_->get_id(),
last_active_ms);
remove_peer_from_peers(srv_to_leave_);
reset_srv_to_leave();
}
}

void raft_server::handle_hb_timeout(int32 srv_id) {
recur_lock(lock_);

check_srv_to_leave_timeout();

if (write_paused_ && reelection_timer_.timeout()) {
p_in("resign by timeout, %zu us elapsed, resign now",
reelection_timer_.get_us());
Expand Down
6 changes: 3 additions & 3 deletions src/handle_vote.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void raft_server::request_prevote() {
ptr<cluster_config> c_config = get_config();
for (peer_itor it = peers_.begin(); it != peers_.end(); ++it) {
ptr<peer> pp = it->second;
if (pp->is_learner()) continue;
if (!is_regular_member(pp)) continue;
ptr<srv_config> s_config = c_config->get_server( pp->get_id() );

if (s_config) {
Expand Down Expand Up @@ -94,7 +94,7 @@ void raft_server::request_prevote() {

for (peer_itor it = peers_.begin(); it != peers_.end(); ++it) {
ptr<peer> pp = it->second;
if (pp->is_learner()) {
if (!is_regular_member(pp)) {
// Do not send voting request to learner.
continue;
}
Expand Down Expand Up @@ -152,7 +152,7 @@ void raft_server::request_vote(bool ignore_priority) {

for (peer_itor it = peers_.begin(); it != peers_.end(); ++it) {
ptr<peer> pp = it->second;
if (pp->is_learner()) {
if (!is_regular_member(pp)) {
// Do not send voting request to learner.
continue;
}
Expand Down
15 changes: 12 additions & 3 deletions src/raft_server.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,22 @@ void raft_server::shutdown() {
}
}

bool raft_server::is_regular_member(const ptr<peer>& p) {
// Skip to-be-removed server.
if (srv_to_leave_ && srv_to_leave_->get_id() == p->get_id()) return false;

// Skip learner.
if (p->is_learner()) return false;

return true;
}

// Number of nodes that are able to vote, including leader itself.
int32 raft_server::get_num_voting_members() {
int32 count = 0;
for (auto& entry: peers_) {
ptr<peer>& p = entry.second;
if (p->is_learner()) continue;
if (!is_regular_member(p)) continue;
count++;
}
if (!im_learner_) count++;
Expand Down Expand Up @@ -449,8 +459,7 @@ size_t raft_server::get_not_responding_peers() {
for (auto& entry: peers_) {
ptr<peer> p = entry.second;

// Skip learner.
if (p->is_learner()) continue;
if (!is_regular_member(p)) continue;

int32 resp_elapsed_ms = (int32)(p->get_resp_timer_us() / 1000);
if ( resp_elapsed_ms > expiry ) {
Expand Down
12 changes: 5 additions & 7 deletions tests/unit/failure_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -478,14 +478,7 @@ int removed_server_late_step_down_test() {
CHK_Z( launch_servers( pkgs ) );
CHK_Z( make_group( pkgs ) );

// Try to remove s3 from non leader, should return error.
ptr< cmd_result< ptr<buffer> > > ret =
s2.raftServer->remove_srv( s3.getTestMgr()->get_srv_config()->get_id() );
CHK_FALSE( ret->get_accepted() );
CHK_EQ( cmd_result_code::NOT_LEADER, ret->get_result_code() );

// Remove s3 from leader.
s1.dbgLog(" --- remove ---");
s1.raftServer->remove_srv( s3.getTestMgr()->get_srv_config()->get_id() );

// Leave req/resp.
Expand All @@ -512,6 +505,11 @@ int removed_server_late_step_down_test() {
}
}

// Removing server again should fail.
ptr< cmd_result< ptr<buffer> > > ret =
s1.raftServer->remove_srv( s3.getTestMgr()->get_srv_config()->get_id() );
CHK_FALSE(ret->get_accepted());

// More catch-up for to-be-removed server.
s1.fNet->execReqResp();
s1.fNet->execReqResp();
Expand Down
5 changes: 1 addition & 4 deletions tests/unit/raft_server_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -592,17 +592,14 @@ int remove_node_error_cases_test() {
// Leave req/resp.
s1.fNet->execReqResp();

// Now config change is in progress, add S3 to S1.
// Now config change is in progress, remove S3.
ptr<raft_result> ret = s1.raftServer->remove_srv(s3.myId);

// May fail (depends on commit thread wake-up timing).
size_t expected_cluster_size = 2;
if (ret->get_result_code() == cmd_result_code::OK) {
// If succeed, S3 is also removed.
expected_cluster_size = 1;
} else {
// If not, error code should be CONFIG_CHANGNING.
CHK_EQ( cmd_result_code::CONFIG_CHANGING, ret->get_result_code() );
}

// Finish the task.
Expand Down

0 comments on commit 1a76f48

Please sign in to comment.