From c438e7d79aeccd450323a5f8eecb62940d1c8174 Mon Sep 17 00:00:00 2001 From: Jung-Sang Ahn Date: Fri, 6 Mar 2020 14:34:58 -0800 Subject: [PATCH] Fix to precommit/commit order inversion bug * Since `peer::set_free()` is called prior than response handler without acquiring `raft_server::lock_`, there can be an edge case that leader may send duplicate logs, and their last log index may not be greater than the last log index this server already has. We should always compare the target index with current precommit index, and take it only when it is greater than the previous one. --- src/handle_append_entries.cxx | 36 +++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/src/handle_append_entries.cxx b/src/handle_append_entries.cxx index 69a9c41d..60bc4f60 100644 --- a/src/handle_append_entries.cxx +++ b/src/handle_append_entries.cxx @@ -666,11 +666,39 @@ ptr raft_server::handle_append_entries(req_msg& req) // on next `append_entries()` call, due to racing // between BG commit thread and appending logs. // Hence, we always should take smaller one. - precommit_index_ = req.get_last_log_idx() + req.log_entries().size(); - commit( std::min( req.get_commit_idx(), - req.get_last_log_idx() + req.log_entries().size() ) ); + ulong target_precommit_index = req.get_last_log_idx() + req.log_entries().size(); - resp->accept(req.get_last_log_idx() + req.log_entries().size() + 1); + // WARNING: + // Since `peer::set_free()` is called prior than response handler + // without acquiring `raft_server::lock_`, there can be an edge case + // that leader may send duplicate logs, and their last log index may not + // be greater than the last log index this server already has. We should + // always compare the target index with current precommit index, and take + // it only when it is greater than the previous one. + const size_t MAX_ATTEMPTS = 10; + size_t num_attempts = 0; + ulong prev_precommit_index = precommit_index_; + while ( prev_precommit_index < target_precommit_index && + num_attempts < MAX_ATTEMPTS ) { + if ( precommit_index_.compare_exchange_strong( prev_precommit_index, + target_precommit_index ) ) { + break; + } + // Otherwise: retry until `precommit_index_` is equal to or greater than + // `target_precommit_index`. + num_attempts++; + } + if (num_attempts >= MAX_ATTEMPTS) { + // If updating `precommit_index_` failed, we SHOULD NOT update + // commit index as well. + p_er("updating precommit_index_ failed after %zu attempts, " + "last seen precommit_index_ %zu, target %zu", + num_attempts, prev_precommit_index, target_precommit_index); + } else { + commit( std::min( req.get_commit_idx(), target_precommit_index ) ); + } + + resp->accept(target_precommit_index + 1); int32 time_ms = tt.get_us() / 1000; if (time_ms >= ctx_->get_params()->heart_beat_interval_) {