Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not count to-be-removed server in quorum #92

Merged
merged 1 commit into from
Dec 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions include/libnuraft/async.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@ limitations under the License.
namespace nuraft {

enum cmd_result_code {
OK = 0,
CANCELLED = -1,
TIMEOUT = -2,
NOT_LEADER = -3,
BAD_REQUEST = -4,
SERVER_ALREADY_EXISTS = -5,
CONFIG_CHANGING = -6,
SERVER_IS_JOINING = -7,
SERVER_NOT_FOUND = -8,
CANNOT_REMOVE_LEADER = -9,

FAILED = -32768,
OK = 0,
CANCELLED = -1,
TIMEOUT = -2,
NOT_LEADER = -3,
BAD_REQUEST = -4,
SERVER_ALREADY_EXISTS = -5,
CONFIG_CHANGING = -6,
SERVER_IS_JOINING = -7,
SERVER_NOT_FOUND = -8,
CANNOT_REMOVE_LEADER = -9,
SERVER_IS_LEAVING = -10,

FAILED = -32768,
};

template< typename T,
Expand Down
2 changes: 2 additions & 0 deletions include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ protected:
bool check_leadership_validity();
void update_rand_timeout();

bool is_regular_member(const ptr<peer>& p);
int32 get_num_voting_members();
int32 get_quorum_for_election();
int32 get_quorum_for_commit();
Expand Down Expand Up @@ -552,6 +553,7 @@ protected:
void reconnect_client(peer& p);
void become_leader();
void become_follower();
void check_srv_to_leave_timeout();
void enable_hb_for_peer(peer& p);
void stop_election_timer();
void handle_hb_timeout(int32 srv_id);
Expand Down
52 changes: 20 additions & 32 deletions src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,18 @@ bool raft_server::request_append_entries(ptr<peer> p) {

if ( srv_to_leave_ &&
srv_to_leave_->get_id() == p->get_id() &&
msg->get_commit_idx() >= srv_to_leave_target_idx_ ) {
msg->get_commit_idx() >= srv_to_leave_target_idx_ &&
!srv_to_leave_->is_stepping_down() ) {
// If this is the server to leave, AND
// current request's commit index includes
// the target log index number, step down and remove it
// as soon as we get the corresponding response.
srv_to_leave_->step_down();
p_in("srv_to_leave_ %d is safe to be erased from peer list, "
"log idx %zu commit idx %zu, set flag",
srv_to_leave_->get_id(),
msg->get_last_log_idx(),
msg->get_commit_idx());
}

p_tr("sent\n");
Expand Down Expand Up @@ -673,34 +679,18 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
return;
}

if (srv_to_leave_) {
ulong last_active_ms = srv_to_leave_->get_active_timer_us() / 1000;
bool reset_srv = false;
if ( last_active_ms >
(ulong)peer::RESPONSE_LIMIT *
ctx_->get_params()->heart_beat_interval_ ) {
// Timeout: remove peer.
p_wn("server to be removed %d, activity timeout %zu ms. "
"force remove now",
srv_to_leave_->get_id(),
last_active_ms);
reset_srv = true;

} else if ( srv_to_leave_->get_id() == resp.get_src() &&
srv_to_leave_->is_stepping_down() ) {
// Catch-up is done.
p_in("server to be removed %d fully caught up the "
"target config log %zu",
srv_to_leave_->get_id(),
srv_to_leave_target_idx_);
reset_srv = true;
}

if (reset_srv) {
remove_peer_from_peers(srv_to_leave_);
reset_srv_to_leave();
return;
}
check_srv_to_leave_timeout();
if ( srv_to_leave_ &&
srv_to_leave_->get_id() == resp.get_src() &&
srv_to_leave_->is_stepping_down() ) {
// Catch-up is done.
p_in("server to be removed %d fully caught up the "
"target config log %zu",
srv_to_leave_->get_id(),
srv_to_leave_target_idx_);
remove_peer_from_peers(srv_to_leave_);
reset_srv_to_leave();
return;
}

// If there are pending logs to be synced or commit index need to be advanced,
Expand Down Expand Up @@ -832,9 +822,7 @@ ulong raft_server::get_expected_committed_log_idx() {
matched_indexes.push_back( precommit_index_ );
for (auto& entry: peers_) {
ptr<peer>& p = entry.second;

// Skip learner.
if (p->is_learner()) continue;
if (!is_regular_member(p)) continue;

matched_indexes.push_back( p->get_matched_idx() );
}
Expand Down
6 changes: 2 additions & 4 deletions src/handle_commit.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -609,10 +609,8 @@ void raft_server::reconfigure(const ptr<cluster_config>& new_config) {
// commit index, S3 will be just force removed.

if (role_ == srv_role::leader) {
srv_to_leave_ = pit->second;
srv_to_leave_target_idx_ = new_config->get_log_idx();
p_in("server %d will be removed from cluster, config %zu",
srv_removed, srv_to_leave_target_idx_);
// If leader, keep the to-be-removed server in peer list
// until 1) catch-up is done, or 2) timeout.
} else {
remove_peer_from_peers(pit->second);
sprintf(temp_buf, "remove peer %d\n", srv_removed);
Expand Down
28 changes: 25 additions & 3 deletions src/handle_join_leave.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,13 @@ ptr<resp_msg> raft_server::handle_rm_srv_req(req_msg& req) {
return resp;
}

check_srv_to_leave_timeout();
if (srv_to_leave_) {
p_wn("previous to-be-removed server has not left yet");
resp->set_result_code(cmd_result_code::SERVER_IS_LEAVING);
return resp;
}

if (config_changing_) {
// the previous config has not committed yet
p_wn("previous config has not committed yet");
Expand Down Expand Up @@ -444,6 +451,17 @@ void raft_server::rm_srv_from_cluster(int32 srv_id) {
new_conf_buf,
log_val_type::conf ) );
store_log_entry(entry);

auto p_entry = peers_.find(srv_id);
if (p_entry != peers_.end()) {
ptr<peer> pp = p_entry->second;
srv_to_leave_ = pp;
srv_to_leave_target_idx_ = new_conf->get_log_idx();
p_in("set srv_to_leave_, "
"server %d will be removed from cluster, config %zu",
srv_id, srv_to_leave_target_idx_);
}

request_append_entries();
}

Expand Down Expand Up @@ -473,10 +491,13 @@ void raft_server::handle_join_leave_rpc_err(msg_type t_msg, ptr<peer> p) {
pit->second->enable_hb(false);
peers_.erase(pit);
p_in("server %d is removed from cluster", p->get_id());
} else {
p_in("peer %d cannot be found, no action for removing",
p->get_id());
}
else {
p_in( "peer %d cannot be found, no action for removing",
p->get_id() );

if (srv_to_leave_) {
reset_srv_to_leave();
}
}

Expand Down Expand Up @@ -504,6 +525,7 @@ void raft_server::reset_srv_to_join() {
void raft_server::reset_srv_to_leave() {
srv_to_leave_.reset();
srv_to_leave_target_idx_ = 0;
p_in("clearing srv_to_leave_");
}

} // namespace nuraft;
Expand Down
18 changes: 18 additions & 0 deletions src/handle_timeout.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,27 @@ void raft_server::enable_hb_for_peer(peer& p) {
schedule_task(p.get_hb_task(), p.get_current_hb_interval());
}

void raft_server::check_srv_to_leave_timeout() {
if (!srv_to_leave_) return;
ulong last_active_ms = srv_to_leave_->get_active_timer_us() / 1000;
if ( last_active_ms >
(ulong)peer::RESPONSE_LIMIT *
ctx_->get_params()->heart_beat_interval_ ) {
// Timeout: remove peer.
p_wn("server to be removed %d, activity timeout %zu ms. "
"force remove now",
srv_to_leave_->get_id(),
last_active_ms);
remove_peer_from_peers(srv_to_leave_);
reset_srv_to_leave();
}
}

void raft_server::handle_hb_timeout(int32 srv_id) {
recur_lock(lock_);

check_srv_to_leave_timeout();

if (write_paused_ && reelection_timer_.timeout()) {
p_in("resign by timeout, %zu us elapsed, resign now",
reelection_timer_.get_us());
Expand Down
6 changes: 3 additions & 3 deletions src/handle_vote.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void raft_server::request_prevote() {
ptr<cluster_config> c_config = get_config();
for (peer_itor it = peers_.begin(); it != peers_.end(); ++it) {
ptr<peer> pp = it->second;
if (pp->is_learner()) continue;
if (!is_regular_member(pp)) continue;
ptr<srv_config> s_config = c_config->get_server( pp->get_id() );

if (s_config) {
Expand Down Expand Up @@ -94,7 +94,7 @@ void raft_server::request_prevote() {

for (peer_itor it = peers_.begin(); it != peers_.end(); ++it) {
ptr<peer> pp = it->second;
if (pp->is_learner()) {
if (!is_regular_member(pp)) {
// Do not send voting request to learner.
continue;
}
Expand Down Expand Up @@ -152,7 +152,7 @@ void raft_server::request_vote(bool ignore_priority) {

for (peer_itor it = peers_.begin(); it != peers_.end(); ++it) {
ptr<peer> pp = it->second;
if (pp->is_learner()) {
if (!is_regular_member(pp)) {
// Do not send voting request to learner.
continue;
}
Expand Down
15 changes: 12 additions & 3 deletions src/raft_server.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,22 @@ void raft_server::shutdown() {
}
}

bool raft_server::is_regular_member(const ptr<peer>& p) {
// Skip to-be-removed server.
if (srv_to_leave_ && srv_to_leave_->get_id() == p->get_id()) return false;

// Skip learner.
if (p->is_learner()) return false;

return true;
}

// Number of nodes that are able to vote, including leader itself.
int32 raft_server::get_num_voting_members() {
int32 count = 0;
for (auto& entry: peers_) {
ptr<peer>& p = entry.second;
if (p->is_learner()) continue;
if (!is_regular_member(p)) continue;
count++;
}
if (!im_learner_) count++;
Expand Down Expand Up @@ -449,8 +459,7 @@ size_t raft_server::get_not_responding_peers() {
for (auto& entry: peers_) {
ptr<peer> p = entry.second;

// Skip learner.
if (p->is_learner()) continue;
if (!is_regular_member(p)) continue;

int32 resp_elapsed_ms = (int32)(p->get_resp_timer_us() / 1000);
if ( resp_elapsed_ms > expiry ) {
Expand Down
12 changes: 5 additions & 7 deletions tests/unit/failure_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -478,14 +478,7 @@ int removed_server_late_step_down_test() {
CHK_Z( launch_servers( pkgs ) );
CHK_Z( make_group( pkgs ) );

// Try to remove s3 from non leader, should return error.
ptr< cmd_result< ptr<buffer> > > ret =
s2.raftServer->remove_srv( s3.getTestMgr()->get_srv_config()->get_id() );
CHK_FALSE( ret->get_accepted() );
CHK_EQ( cmd_result_code::NOT_LEADER, ret->get_result_code() );

// Remove s3 from leader.
s1.dbgLog(" --- remove ---");
s1.raftServer->remove_srv( s3.getTestMgr()->get_srv_config()->get_id() );

// Leave req/resp.
Expand All @@ -512,6 +505,11 @@ int removed_server_late_step_down_test() {
}
}

// Removing server again should fail.
ptr< cmd_result< ptr<buffer> > > ret =
s1.raftServer->remove_srv( s3.getTestMgr()->get_srv_config()->get_id() );
CHK_FALSE(ret->get_accepted());

// More catch-up for to-be-removed server.
s1.fNet->execReqResp();
s1.fNet->execReqResp();
Expand Down
5 changes: 1 addition & 4 deletions tests/unit/raft_server_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -592,17 +592,14 @@ int remove_node_error_cases_test() {
// Leave req/resp.
s1.fNet->execReqResp();

// Now config change is in progress, add S3 to S1.
// Now config change is in progress, remove S3.
ptr<raft_result> ret = s1.raftServer->remove_srv(s3.myId);

// May fail (depends on commit thread wake-up timing).
size_t expected_cluster_size = 2;
if (ret->get_result_code() == cmd_result_code::OK) {
// If succeed, S3 is also removed.
expected_cluster_size = 1;
} else {
// If not, error code should be CONFIG_CHANGNING.
CHK_EQ( cmd_result_code::CONFIG_CHANGING, ret->get_result_code() );
}

// Finish the task.
Expand Down