Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
fix(network): use multi io_services in asio (#1016)
Browse files Browse the repository at this point in the history
  • Loading branch information
Smityz authored Jan 26, 2022
1 parent e805bf4 commit 582c8bb
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 22 deletions.
47 changes: 32 additions & 15 deletions src/runtime/rpc/asio_net_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,38 @@

#include "asio_net_provider.h"
#include "asio_rpc_session.h"
#include <dsn/utility/flags.h>

namespace dsn {
namespace tools {

DSN_DEFINE_uint32("network",
io_service_worker_count,
1,
"thread number for io service (timer and boost network)");

const int threads_per_event_loop = 1;

asio_network_provider::asio_network_provider(rpc_engine *srv, network *inner_provider)
: connection_oriented_network(srv, inner_provider)
: connection_oriented_network(srv, inner_provider), _acceptor(nullptr)
{
_acceptor = nullptr;
for (auto i = 0; i < FLAGS_io_service_worker_count; i++) {
// Using thread-local operation queues in single-threaded use cases (i.e. when
// concurrency_hint is 1) to eliminate a lock/unlock pair.
_io_services.emplace_back(
std::make_unique<boost::asio::io_service>(threads_per_event_loop));
}
}

asio_network_provider::~asio_network_provider()
{
if (_acceptor) {
_acceptor->close();
}
_io_service.stop();
for (auto &io_service : _io_services) {
io_service->stop();
}

for (auto &w : _workers) {
w->join();
}
Expand All @@ -55,17 +71,11 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie
if (_acceptor != nullptr)
return ERR_SERVICE_ALREADY_RUNNING;

int io_service_worker_count =
(int)dsn_config_get_value_uint64("network",
"io_service_worker_count",
1,
"thread number for io service (timer and boost network)");

// get connection threshold from config, default value 0 means no threshold
_cfg_conn_threshold_per_ip = (uint32_t)dsn_config_get_value_uint64(
"network", "conn_threshold_per_ip", 0, "max connection count to each server per ip");

for (int i = 0; i < io_service_worker_count; i++) {
for (int i = 0; i < FLAGS_io_service_worker_count; i++) {
_workers.push_back(std::make_shared<std::thread>([this, i]() {
task::set_tls_dsn_context(node(), nullptr);

Expand All @@ -74,9 +84,9 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie
sprintf(buffer, "%s.asio.%d", name, i);
task_worker::set_name(buffer);

boost::asio::io_service::work work(_io_service);
boost::asio::io_service::work work(*_io_services[i]);
boost::system::error_code ec;
_io_service.run(ec);
_io_services[i]->run(ec);
if (ec) {
dassert(false, "boost::asio::io_service run failed: err(%s)", ec.message().data());
}
Expand All @@ -95,7 +105,7 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie
auto v4_addr = boost::asio::ip::address_v4::any(); //(ntohl(_address.ip));
::boost::asio::ip::tcp::endpoint endpoint(v4_addr, _address.port());
boost::system::error_code ec;
_acceptor.reset(new boost::asio::ip::tcp::acceptor(_io_service));
_acceptor.reset(new boost::asio::ip::tcp::acceptor(get_io_service()));
_acceptor->open(endpoint.protocol(), ec);
if (ec) {
derror("asio tcp acceptor open failed, error = %s", ec.message().c_str());
Expand Down Expand Up @@ -126,14 +136,14 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie

rpc_session_ptr asio_network_provider::create_client_session(::dsn::rpc_address server_addr)
{
auto sock = std::make_shared<boost::asio::ip::tcp::socket>(_io_service);
auto sock = std::make_shared<boost::asio::ip::tcp::socket>(get_io_service());
message_parser_ptr parser(new_message_parser(_client_hdr_format));
return rpc_session_ptr(new asio_rpc_session(*this, server_addr, sock, parser, true));
}

void asio_network_provider::do_accept()
{
auto socket = std::make_shared<boost::asio::ip::tcp::socket>(_io_service);
auto socket = std::make_shared<boost::asio::ip::tcp::socket>(get_io_service());

_acceptor->async_accept(*socket, [this, socket](boost::system::error_code ec) {
if (!ec) {
Expand Down Expand Up @@ -394,5 +404,12 @@ error_code asio_udp_provider::start(rpc_channel channel, int port, bool client_o

return ERR_OK;
}

// use a round-robin scheme to choose the next io_service to use.
boost::asio::io_service &asio_network_provider::get_io_service()
{
return *_io_services[rand::next_u32(0, FLAGS_io_service_worker_count - 1)];
}

} // namespace tools
} // namespace dsn
27 changes: 26 additions & 1 deletion src/runtime/rpc/asio_net_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,29 @@
namespace dsn {
namespace tools {

/// asio_network_provider is a wrapper of Asio library for rDSN to accept a connection and create
/// sockets. Each io_service only allows one thread polling, so the operations of the single socket
/// are always done in a single thread. we create many io_service instances to take advantage of the
/// multi-core capabilities of the processor, and use the round-robin scheme to decide which
/// io_service for socket to choose.
///
/// +-----------------------------------------------+
/// |Linux kernel |
/// | +-----------+ +-----------+ +-----------+ |
/// | | Epoll1 | | Epoll2 | | Epoll3 | |
/// | | | | | | | |
/// | | rfd 1,2,3 | | rfd 4,5,6 | | rfd 7,8,9 | |
/// | | | | | | | |
/// | +-----^-----+ +-----^-----+ +-----^-----+ |
/// +-------|---------------|---------------|-------+
/// +-----------+ +-----------+ +-----------+
/// | polling | | polling | | polling |
/// | +-------+ | | +-------+ | | +-------+ |
/// | |Thread1| | | |Thread2| | | |Thread3| |
/// | +-------+ | | +-------+ | | +-------+ |
/// |io_service1| |io_service2| |io_service3|
/// +-----------+ +-----------+ +-----------+

class asio_network_provider : public connection_oriented_network
{
public:
Expand All @@ -45,17 +68,19 @@ class asio_network_provider : public connection_oriented_network

private:
void do_accept();
boost::asio::io_service &get_io_service();

private:
friend class asio_rpc_session;
friend class asio_network_provider_test;

std::shared_ptr<boost::asio::ip::tcp::acceptor> _acceptor;
boost::asio::io_service _io_service;
std::vector<std::unique_ptr<boost::asio::io_service>> _io_services;
std::vector<std::shared_ptr<std::thread>> _workers;
::dsn::rpc_address _address;
};

// TODO(Tangyanzhao): change the network model like asio_network_provider
class asio_udp_provider : public network
{
public:
Expand Down
5 changes: 0 additions & 5 deletions src/runtime/rpc/asio_rpc_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ namespace tools {

void asio_rpc_session::set_options()
{
utils::auto_write_lock socket_guard(_socket_lock);

if (_socket->is_open()) {
boost::system::error_code ec;
Expand Down Expand Up @@ -83,8 +82,6 @@ void asio_rpc_session::do_read(int read_next)
void *ptr = _reader.read_buffer_ptr(read_next);
int remaining = _reader.read_buffer_capacity();

utils::auto_read_lock socket_guard(_socket_lock);

_socket->async_read_some(
boost::asio::buffer(ptr, remaining),
[this](boost::system::error_code ec, std::size_t length) {
Expand Down Expand Up @@ -142,7 +139,6 @@ void asio_rpc_session::send(uint64_t signature)

add_ref();

utils::auto_read_lock socket_guard(_socket_lock);
boost::asio::async_write(
*_socket, asio_wbufs, [this, signature](boost::system::error_code ec, std::size_t length) {
if (ec) {
Expand All @@ -169,7 +165,6 @@ asio_rpc_session::asio_rpc_session(asio_network_provider &net,

void asio_rpc_session::close()
{
utils::auto_write_lock socket_guard(_socket_lock);

boost::system::error_code ec;
_socket->shutdown(boost::asio::socket_base::shutdown_type::shutdown_both, ec);
Expand Down
1 change: 0 additions & 1 deletion src/runtime/rpc/asio_rpc_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class asio_rpc_session : public rpc_session
// boost::asio::socket is thread-unsafe, must use lock to prevent a
// reading/writing socket being modified or closed concurrently.
std::shared_ptr<boost::asio::ip::tcp::socket> _socket;
::dsn::utils::rw_lock_nr _socket_lock;
};

} // namespace tools
Expand Down

0 comments on commit 582c8bb

Please sign in to comment.