Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

fix(network): use multi io_services in asio #1016

Merged
merged 12 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 36 additions & 13 deletions src/runtime/rpc/asio_net_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,27 @@ asio_network_provider::asio_network_provider(rpc_engine *srv, network *inner_pro
: connection_oriented_network(srv, inner_provider)
{
_acceptor = nullptr;
_service_count =
(int)dsn_config_get_value_uint64("network",
Smityz marked this conversation as resolved.
Show resolved Hide resolved
"io_service_worker_count",
1,
"thread number for io service (timer and boost network)");

for (std::size_t i = 0; i < _service_count; i++) {
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
Smityz marked this conversation as resolved.
Show resolved Hide resolved
io_context_ptr io_context(new boost::asio::io_context(1));
Smityz marked this conversation as resolved.
Show resolved Hide resolved
_io_services.push_back(io_context);
}
}

asio_network_provider::~asio_network_provider()
{
if (_acceptor) {
_acceptor->close();
}
_io_service.stop();
for (const auto &io_service : _io_services) {
io_service->stop();
}

for (auto &w : _workers) {
w->join();
}
Expand All @@ -55,17 +68,11 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie
if (_acceptor != nullptr)
return ERR_SERVICE_ALREADY_RUNNING;

int io_service_worker_count =
(int)dsn_config_get_value_uint64("network",
"io_service_worker_count",
1,
"thread number for io service (timer and boost network)");

// get connection threshold from config, default value 0 means no threshold
_cfg_conn_threshold_per_ip = (uint32_t)dsn_config_get_value_uint64(
"network", "conn_threshold_per_ip", 0, "max connection count to each server per ip");

for (int i = 0; i < io_service_worker_count; i++) {
for (int i = 0; i < _service_count; i++) {
_workers.push_back(std::make_shared<std::thread>([this, i]() {
task::set_tls_dsn_context(node(), nullptr);

Expand All @@ -74,9 +81,9 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie
sprintf(buffer, "%s.asio.%d", name, i);
task_worker::set_name(buffer);

boost::asio::io_service::work work(_io_service);
boost::asio::io_service::work work(*_io_services[i]);
boost::system::error_code ec;
_io_service.run(ec);
_io_services[i]->run(ec);
if (ec) {
dassert(false, "boost::asio::io_service run failed: err(%s)", ec.message().data());
}
Expand All @@ -95,7 +102,7 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie
auto v4_addr = boost::asio::ip::address_v4::any(); //(ntohl(_address.ip));
::boost::asio::ip::tcp::endpoint endpoint(v4_addr, _address.port());
boost::system::error_code ec;
_acceptor.reset(new boost::asio::ip::tcp::acceptor(_io_service));
_acceptor.reset(new boost::asio::ip::tcp::acceptor(get_io_context()));
_acceptor->open(endpoint.protocol(), ec);
if (ec) {
derror("asio tcp acceptor open failed, error = %s", ec.message().c_str());
Expand Down Expand Up @@ -126,14 +133,14 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie

rpc_session_ptr asio_network_provider::create_client_session(::dsn::rpc_address server_addr)
{
auto sock = std::make_shared<boost::asio::ip::tcp::socket>(_io_service);
auto sock = std::make_shared<boost::asio::ip::tcp::socket>(get_io_context());
message_parser_ptr parser(new_message_parser(_client_hdr_format));
return rpc_session_ptr(new asio_rpc_session(*this, server_addr, sock, parser, true));
}

void asio_network_provider::do_accept()
{
auto socket = std::make_shared<boost::asio::ip::tcp::socket>(_io_service);
auto socket = std::make_shared<boost::asio::ip::tcp::socket>(get_io_context());

_acceptor->async_accept(*socket, [this, socket](boost::system::error_code ec) {
if (!ec) {
Expand Down Expand Up @@ -394,5 +401,21 @@ error_code asio_udp_provider::start(rpc_channel channel, int port, bool client_o

return ERR_OK;
}

boost::asio::io_context &asio_network_provider::get_io_context()
{
// Use a round-robin scheme to choose the next io_context to use.
int tmp = _next_io_context;
Smityz marked this conversation as resolved.
Show resolved Hide resolved
if (tmp >= _service_count) {
tmp = 0;
}
boost::asio::io_context &io_context = *_io_services[tmp];
++_next_io_context;
if (_next_io_context >= _service_count) {
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
_next_io_context = 0;
}
return io_context;
}

} // namespace tools
} // namespace dsn
6 changes: 5 additions & 1 deletion src/runtime/rpc/asio_net_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,17 @@ class asio_network_provider : public connection_oriented_network

private:
void do_accept();
boost::asio::io_context &get_io_context();

private:
friend class asio_rpc_session;
friend class asio_network_provider_test;

std::shared_ptr<boost::asio::ip::tcp::acceptor> _acceptor;
boost::asio::io_service _io_service;
int _next_io_context = 0;
int _service_count;
Smityz marked this conversation as resolved.
Show resolved Hide resolved
typedef std::shared_ptr<boost::asio::io_context> io_context_ptr;
std::vector<io_context_ptr> _io_services;
std::vector<std::shared_ptr<std::thread>> _workers;
::dsn::rpc_address _address;
};
Expand Down
5 changes: 0 additions & 5 deletions src/runtime/rpc/asio_rpc_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ namespace tools {

void asio_rpc_session::set_options()
{
utils::auto_write_lock socket_guard(_socket_lock);
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved

if (_socket->is_open()) {
boost::system::error_code ec;
Expand Down Expand Up @@ -83,8 +82,6 @@ void asio_rpc_session::do_read(int read_next)
void *ptr = _reader.read_buffer_ptr(read_next);
int remaining = _reader.read_buffer_capacity();

utils::auto_read_lock socket_guard(_socket_lock);

_socket->async_read_some(
boost::asio::buffer(ptr, remaining),
[this](boost::system::error_code ec, std::size_t length) {
Expand Down Expand Up @@ -142,7 +139,6 @@ void asio_rpc_session::send(uint64_t signature)

add_ref();

utils::auto_read_lock socket_guard(_socket_lock);
boost::asio::async_write(
*_socket, asio_wbufs, [this, signature](boost::system::error_code ec, std::size_t length) {
if (ec) {
Expand All @@ -169,7 +165,6 @@ asio_rpc_session::asio_rpc_session(asio_network_provider &net,

void asio_rpc_session::close()
{
utils::auto_write_lock socket_guard(_socket_lock);

boost::system::error_code ec;
_socket->shutdown(boost::asio::socket_base::shutdown_type::shutdown_both, ec);
Expand Down
1 change: 0 additions & 1 deletion src/runtime/rpc/asio_rpc_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class asio_rpc_session : public rpc_session
// boost::asio::socket is thread-unsafe, must use lock to prevent a
// reading/writing socket being modified or closed concurrently.
std::shared_ptr<boost::asio::ip::tcp::socket> _socket;
::dsn::utils::rw_lock_nr _socket_lock;
};

} // namespace tools
Expand Down