Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

commit_ret_elems_ performance improvement and enclose basic types with namespace #142

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.vscode/
build
debug/
Debug/
.vs/
Expand Down
6 changes: 5 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ if (BOOST_INCLUDE_PATH AND BOOST_LIBRARY_PATH)

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DUSE_BOOST_ASIO")

set(ASIO_INCLUDE_DIR ${BOOST_INCLUDE_PATH})
set(ASIO_INCLUDE_DIR ${BOOST_INCLUDE_PATH}/boost)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is unnecessary, you can set the variable as

-DBOOST_INCLUDE_PATH=<your library include path>/boost

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As said this is my first PR, actually this cmake file change is for my own use only and I actually have no intention to make PR for this. Sorry for bother you.

set(LIBBOOST_SYSTEM "${BOOST_LIBRARY_PATH}/libboost_system.a")

else ()
Expand All @@ -60,6 +60,10 @@ endif ()

# === Includes ===
include_directories(BEFORE ./)
if (BOOST_INCLUDE_PATH)
message(STATUS "Boost include path: " ${BOOST_INCLUDE_PATH})
include_directories(BEFORE ${BOOST_INCLUDE_PATH})
endif ()
Comment on lines +63 to +66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boost include path is already printed out at line 32 above. And also include_directories of boost include path is not required, as what we need is ASIO_INCLUDE_DIR only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As said this is my first PR, actually this cmake file change is for my own use only and I actually have no intention to make PR for this. Sorry for bother you.

include_directories(BEFORE ${ASIO_INCLUDE_DIR})
include_directories(BEFORE ${PROJECT_SOURCE_DIR}/include)
include_directories(BEFORE ${PROJECT_SOURCE_DIR}/include/libnuraft)
Expand Down
4 changes: 4 additions & 0 deletions include/libnuraft/basic_types.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ limitations under the License.

#include <cstdint>

namespace nuraft {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please make this change as a separate PR? Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


typedef uint64_t ulong;
typedef int64_t int64;
typedef void* any_ptr;
Expand All @@ -31,4 +33,6 @@ typedef uint16_t ushort;
typedef uint32_t uint;
typedef int32_t int32;

}

#endif // _BASIC_TYPES_HXX_
4 changes: 3 additions & 1 deletion include/libnuraft/launcher.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public:
* @param port_number Port number.
* @param asio_options ASIO options.
* @param params Raft parameters.
* @param raft_callback Callback function for hooking the operation.
* @return Raft server instance.
* `nullptr` on any errors.
*/
Expand All @@ -45,7 +46,8 @@ public:
ptr<logger> lg,
int port_number,
const asio_service::options& asio_options,
const raft_params& params);
const raft_params& params,
cb_func::func_type raft_callback = nullptr);
Copy link
Contributor

@greensky00 greensky00 Oct 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in the other comment, can you please replace this line with raft_server::init_options, and move cb_func::func_type raft_callback into raft_server::init_options? Please make this change as a separate commit too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

understood and noted


/**
* Shutdown Raft server and ASIO service.
Expand Down
12 changes: 12 additions & 0 deletions include/libnuraft/raft_params.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ struct raft_params {
, auto_adjust_quorum_for_small_cluster_(false)
, locking_method_type_(dual_mutex)
, return_method_(blocking)
, skip_initial_election_timeout_(false)
{}

/**
Expand Down Expand Up @@ -472,6 +473,17 @@ public:
* To choose blocking call or asynchronous call.
*/
return_method_type return_method_;

/**
* If `true`, the election timer will not be initiated
* automatically, so that this node will never trigger
* leader election until it gets the first heartbeat
* from any valid leader.
*
* Purpose: to avoid becoming leader when there is only one
* node in the cluster.
*/
bool skip_initial_election_timeout_;
Comment on lines +476 to +486
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raft_params includes parameters repeatedly used throughout the lifetime of a Raft server, while skip_initial_election_timeout_ is just a one-time parameter for the initialization only. If the reason why you added it here is merely for passing it to launcher, please add raft_server::init_options as a parameter of the constructor of launcher.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

understood and noted

};

}
Expand Down
17 changes: 10 additions & 7 deletions src/handle_client_request.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,16 @@ ptr<resp_msg> raft_server::handle_cli_req(req_msg& req) {
elem->idx_ = last_idx;
elem->result_code_ = cmd_result_code::TIMEOUT;

{ auto_lock(commit_ret_elems_lock_);
auto entry = commit_ret_elems_.find(last_idx);
if (entry != commit_ret_elems_.end()) {
// Commit thread was faster than this.
elem = entry->second;
} else {
commit_ret_elems_.insert( std::make_pair(last_idx, elem) );
{
{
auto_lock(commit_ret_elems_lock_);
auto entry = commit_ret_elems_.find(last_idx);
if (entry != commit_ret_elems_.end()) {
// Commit thread was faster than this.
elem = entry->second;
} else {
commit_ret_elems_.insert( std::make_pair(last_idx, elem) );
}
}

switch (ctx_->get_params()->return_method_) {
Expand Down
22 changes: 12 additions & 10 deletions src/handle_commit.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,10 @@ void raft_server::commit_app_log(ulong idx_to_commit,
if (need_to_handle_commit_elem) {
std::unique_lock<std::mutex> cre_lock(commit_ret_elems_lock_);
bool match_found = false;
auto entry = commit_ret_elems_.begin();
while (entry != commit_ret_elems_.end()) {
auto entry = commit_ret_elems_.find(sm_idx);
if (entry != commit_ret_elems_.end()) {
ptr<commit_ret_elem> elem = entry->second;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes regarding commit_ret_elems_lock_ will not be acceptable because of the following situation. Let's say T1 is the user thread (calling append_entries) and T2 is the commit thread.

  1. T1: appends log and calls pre_commit, but thread scheduling happens before dealing with commit_ret_elem.
  2. Replication is done.
  3. T2: calls commit_app_log, grabs the lock, but cannot find corresponding commit_ret_elem, and releases the lock.
  4. T1: grabs the lock, adds commit_ret_elem (let's say elem1) as there is no corresponding commit_ret_elem, and releases the lock.
  5. T2: grabs the lock, adds commit_ret_elem (let's say elem2), and invokes elem2.
  6. T1: sleeps using elem1.

Due to the racing between user and commit threads, commit_ret_elems_lock_ should protect the entire process of 1) finding commit_ret_elem and 2) inserting commit_ret_elem if not exists. We can't release the lock in the middle.

Please note that, in most cases, there won't be notable performance degradation caused by commit_ret_elems_lock_, as Raft commit rate is around a few thousand operations per second due to network latency and disk I/O (by log store and state machine), while the commit_ret_elem logic within the lock is a pure in-memory operation without I/O: faster than a few hundred thousand executions per second I believe. The impact will be marginal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the changes regarding commit_ret_elems_lock_ have been made a few months ago so I have already forgotten some details here. But my main purpose is to avoid locking unnecessary scope to reduce lock contention.

And for commit_app_log(), I want to avoid elem->awaiter_.invoke() inside commit_ret_elems_lock_ as it would wait other thread to gain lock inside.

Besides the lock, for commit_app_log() function, I have removed the while loop for linear search and instead I use the commit_ret_elems_.find() for getting the required entry. Please review if it's ok.

if (elem->idx_ > sm_idx) {
break;
} else if (elem->idx_ == sm_idx) {
if (elem->idx_ == sm_idx) {
elem->result_code_ = cmd_result_code::OK;
elem->ret_value_ = ret_value;
match_found = true;
Expand All @@ -284,21 +282,23 @@ void raft_server::commit_app_log(ulong idx_to_commit,
case raft_params::blocking:
default:
// Blocking mode: invoke waiting function.
cre_lock.unlock();
elem->awaiter_.invoke();
entry++;
break;

case raft_params::async_handler:
// Async handler: put into list.
commit_ret_elems_.erase(entry);
cre_lock.unlock();
async_elems.push_back(elem);
entry = commit_ret_elems_.erase(entry);
break;
}

} else {
entry++;
}
}
if (cre_lock.owns_lock())
{
cre_lock.unlock();
}

if (!match_found) {
// If not found, commit thread is invoked earlier than user thread.
Expand All @@ -311,7 +311,9 @@ void raft_server::commit_app_log(ulong idx_to_commit,
case raft_params::blocking:
default:
elem->awaiter_.invoke(); // Callback will not sleep.
cre_lock.lock();
commit_ret_elems_.insert( std::make_pair(sm_idx, elem) );
cre_lock.unlock();
break;
case raft_params::async_handler:
// Async handler: put into list.
Expand Down
11 changes: 9 additions & 2 deletions src/launcher.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ ptr<raft_server> raft_launcher::init(ptr<state_machine> sm,
ptr<logger> lg,
int port_number,
const asio_service::options& asio_options,
const raft_params& params_given)
const raft_params& params_given,
cb_func::func_type raft_callback)
{
asio_svc_ = cs_new<asio_service>(asio_options, lg);
asio_listener_ = asio_svc_->create_rpc_listener(port_number, lg);
Expand All @@ -41,14 +42,20 @@ ptr<raft_server> raft_launcher::init(ptr<state_machine> sm,
ptr<delayed_task_scheduler> scheduler = asio_svc_;
ptr<rpc_client_factory> rpc_cli_factory = asio_svc_;

raft_server::init_options opt;
opt.skip_initial_election_timeout_ = params_given.skip_initial_election_timeout_;

context* ctx = new context( smgr,
sm,
asio_listener_,
lg,
rpc_cli_factory,
scheduler,
params_given );
raft_instance_ = cs_new<raft_server>(ctx);
if (raft_callback) {
ctx->set_cb_func(raft_callback);
}
raft_instance_ = cs_new<raft_server>(ctx, opt);
asio_listener_->listen( raft_instance_ );
return raft_instance_;
}
Expand Down