diff --git a/.gitignore b/.gitignore index ea4b7c86..f61e1407 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .vscode/ +build debug/ Debug/ .vs/ diff --git a/CMakeLists.txt b/CMakeLists.txt index bd98f4d8..e9219fa2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,7 +34,7 @@ if (BOOST_INCLUDE_PATH AND BOOST_LIBRARY_PATH) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DUSE_BOOST_ASIO") - set(ASIO_INCLUDE_DIR ${BOOST_INCLUDE_PATH}) + set(ASIO_INCLUDE_DIR ${BOOST_INCLUDE_PATH}/boost) set(LIBBOOST_SYSTEM "${BOOST_LIBRARY_PATH}/libboost_system.a") else () @@ -60,6 +60,10 @@ endif () # === Includes === include_directories(BEFORE ./) +if (BOOST_INCLUDE_PATH) + message(STATUS "Boost include path: " ${BOOST_INCLUDE_PATH}) + include_directories(BEFORE ${BOOST_INCLUDE_PATH}) +endif () include_directories(BEFORE ${ASIO_INCLUDE_DIR}) include_directories(BEFORE ${PROJECT_SOURCE_DIR}/include) include_directories(BEFORE ${PROJECT_SOURCE_DIR}/include/libnuraft) diff --git a/include/libnuraft/basic_types.hxx b/include/libnuraft/basic_types.hxx index 9a5b654e..9972431a 100644 --- a/include/libnuraft/basic_types.hxx +++ b/include/libnuraft/basic_types.hxx @@ -23,6 +23,8 @@ limitations under the License. #include +namespace nuraft { + typedef uint64_t ulong; typedef int64_t int64; typedef void* any_ptr; @@ -31,4 +33,6 @@ typedef uint16_t ushort; typedef uint32_t uint; typedef int32_t int32; +} + #endif // _BASIC_TYPES_HXX_ diff --git a/include/libnuraft/launcher.hxx b/include/libnuraft/launcher.hxx index 0b5f2ad4..43dc7091 100644 --- a/include/libnuraft/launcher.hxx +++ b/include/libnuraft/launcher.hxx @@ -37,6 +37,7 @@ public: * @param port_number Port number. * @param asio_options ASIO options. * @param params Raft parameters. + * @param raft_callback Callback function for hooking the operation. * @return Raft server instance. * `nullptr` on any errors. */ @@ -45,7 +46,8 @@ public: ptr lg, int port_number, const asio_service::options& asio_options, - const raft_params& params); + const raft_params& params, + cb_func::func_type raft_callback = nullptr); /** * Shutdown Raft server and ASIO service. diff --git a/include/libnuraft/raft_params.hxx b/include/libnuraft/raft_params.hxx index e83d0a11..97bc1ef6 100644 --- a/include/libnuraft/raft_params.hxx +++ b/include/libnuraft/raft_params.hxx @@ -89,6 +89,7 @@ struct raft_params { , auto_adjust_quorum_for_small_cluster_(false) , locking_method_type_(dual_mutex) , return_method_(blocking) + , skip_initial_election_timeout_(false) {} /** @@ -472,6 +473,17 @@ public: * To choose blocking call or asynchronous call. */ return_method_type return_method_; + + /** + * If `true`, the election timer will not be initiated + * automatically, so that this node will never trigger + * leader election until it gets the first heartbeat + * from any valid leader. + * + * Purpose: to avoid becoming leader when there is only one + * node in the cluster. + */ + bool skip_initial_election_timeout_; }; } diff --git a/src/handle_client_request.cxx b/src/handle_client_request.cxx index 969ec133..7d4534db 100644 --- a/src/handle_client_request.cxx +++ b/src/handle_client_request.cxx @@ -121,13 +121,16 @@ ptr raft_server::handle_cli_req(req_msg& req) { elem->idx_ = last_idx; elem->result_code_ = cmd_result_code::TIMEOUT; - { auto_lock(commit_ret_elems_lock_); - auto entry = commit_ret_elems_.find(last_idx); - if (entry != commit_ret_elems_.end()) { - // Commit thread was faster than this. - elem = entry->second; - } else { - commit_ret_elems_.insert( std::make_pair(last_idx, elem) ); + { + { + auto_lock(commit_ret_elems_lock_); + auto entry = commit_ret_elems_.find(last_idx); + if (entry != commit_ret_elems_.end()) { + // Commit thread was faster than this. + elem = entry->second; + } else { + commit_ret_elems_.insert( std::make_pair(last_idx, elem) ); + } } switch (ctx_->get_params()->return_method_) { diff --git a/src/handle_commit.cxx b/src/handle_commit.cxx index e372ab13..7a248705 100644 --- a/src/handle_commit.cxx +++ b/src/handle_commit.cxx @@ -269,12 +269,10 @@ void raft_server::commit_app_log(ulong idx_to_commit, if (need_to_handle_commit_elem) { std::unique_lock cre_lock(commit_ret_elems_lock_); bool match_found = false; - auto entry = commit_ret_elems_.begin(); - while (entry != commit_ret_elems_.end()) { + auto entry = commit_ret_elems_.find(sm_idx); + if (entry != commit_ret_elems_.end()) { ptr elem = entry->second; - if (elem->idx_ > sm_idx) { - break; - } else if (elem->idx_ == sm_idx) { + if (elem->idx_ == sm_idx) { elem->result_code_ = cmd_result_code::OK; elem->ret_value_ = ret_value; match_found = true; @@ -284,21 +282,23 @@ void raft_server::commit_app_log(ulong idx_to_commit, case raft_params::blocking: default: // Blocking mode: invoke waiting function. + cre_lock.unlock(); elem->awaiter_.invoke(); - entry++; break; case raft_params::async_handler: // Async handler: put into list. + commit_ret_elems_.erase(entry); + cre_lock.unlock(); async_elems.push_back(elem); - entry = commit_ret_elems_.erase(entry); break; } - - } else { - entry++; } } + if (cre_lock.owns_lock()) + { + cre_lock.unlock(); + } if (!match_found) { // If not found, commit thread is invoked earlier than user thread. @@ -311,7 +311,9 @@ void raft_server::commit_app_log(ulong idx_to_commit, case raft_params::blocking: default: elem->awaiter_.invoke(); // Callback will not sleep. + cre_lock.lock(); commit_ret_elems_.insert( std::make_pair(sm_idx, elem) ); + cre_lock.unlock(); break; case raft_params::async_handler: // Async handler: put into list. diff --git a/src/launcher.cxx b/src/launcher.cxx index db88e0ae..f349c751 100644 --- a/src/launcher.cxx +++ b/src/launcher.cxx @@ -32,7 +32,8 @@ ptr raft_launcher::init(ptr sm, ptr lg, int port_number, const asio_service::options& asio_options, - const raft_params& params_given) + const raft_params& params_given, + cb_func::func_type raft_callback) { asio_svc_ = cs_new(asio_options, lg); asio_listener_ = asio_svc_->create_rpc_listener(port_number, lg); @@ -41,6 +42,9 @@ ptr raft_launcher::init(ptr sm, ptr scheduler = asio_svc_; ptr rpc_cli_factory = asio_svc_; + raft_server::init_options opt; + opt.skip_initial_election_timeout_ = params_given.skip_initial_election_timeout_; + context* ctx = new context( smgr, sm, asio_listener_, @@ -48,7 +52,10 @@ ptr raft_launcher::init(ptr sm, rpc_cli_factory, scheduler, params_given ); - raft_instance_ = cs_new(ctx); + if (raft_callback) { + ctx->set_cb_func(raft_callback); + } + raft_instance_ = cs_new(ctx, opt); asio_listener_->listen( raft_instance_ ); return raft_instance_; }