diff --git a/src/runtime/rpc/asio_net_provider.cpp b/src/runtime/rpc/asio_net_provider.cpp index 8d2f0fb095..7d0f02ad8c 100644 --- a/src/runtime/rpc/asio_net_provider.cpp +++ b/src/runtime/rpc/asio_net_provider.cpp @@ -29,14 +29,27 @@ #include "asio_net_provider.h" #include "asio_rpc_session.h" +#include namespace dsn { namespace tools { +DSN_DEFINE_uint32("network", + io_service_worker_count, + 1, + "thread number for io service (timer and boost network)"); + +const int threads_per_event_loop = 1; + asio_network_provider::asio_network_provider(rpc_engine *srv, network *inner_provider) - : connection_oriented_network(srv, inner_provider) + : connection_oriented_network(srv, inner_provider), _acceptor(nullptr) { - _acceptor = nullptr; + for (auto i = 0; i < FLAGS_io_service_worker_count; i++) { + // Using thread-local operation queues in single-threaded use cases (i.e. when + // concurrency_hint is 1) to eliminate a lock/unlock pair. + _io_services.emplace_back( + std::make_unique(threads_per_event_loop)); + } } asio_network_provider::~asio_network_provider() @@ -44,7 +57,10 @@ asio_network_provider::~asio_network_provider() if (_acceptor) { _acceptor->close(); } - _io_service.stop(); + for (auto &io_service : _io_services) { + io_service->stop(); + } + for (auto &w : _workers) { w->join(); } @@ -55,17 +71,11 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie if (_acceptor != nullptr) return ERR_SERVICE_ALREADY_RUNNING; - int io_service_worker_count = - (int)dsn_config_get_value_uint64("network", - "io_service_worker_count", - 1, - "thread number for io service (timer and boost network)"); - // get connection threshold from config, default value 0 means no threshold _cfg_conn_threshold_per_ip = (uint32_t)dsn_config_get_value_uint64( "network", "conn_threshold_per_ip", 0, "max connection count to each server per ip"); - for (int i = 0; i < io_service_worker_count; i++) { + for (int i = 0; i < FLAGS_io_service_worker_count; i++) { _workers.push_back(std::make_shared([this, i]() { task::set_tls_dsn_context(node(), nullptr); @@ -74,9 +84,9 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie sprintf(buffer, "%s.asio.%d", name, i); task_worker::set_name(buffer); - boost::asio::io_service::work work(_io_service); + boost::asio::io_service::work work(*_io_services[i]); boost::system::error_code ec; - _io_service.run(ec); + _io_services[i]->run(ec); if (ec) { dassert(false, "boost::asio::io_service run failed: err(%s)", ec.message().data()); } @@ -95,7 +105,7 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie auto v4_addr = boost::asio::ip::address_v4::any(); //(ntohl(_address.ip)); ::boost::asio::ip::tcp::endpoint endpoint(v4_addr, _address.port()); boost::system::error_code ec; - _acceptor.reset(new boost::asio::ip::tcp::acceptor(_io_service)); + _acceptor.reset(new boost::asio::ip::tcp::acceptor(get_io_service())); _acceptor->open(endpoint.protocol(), ec); if (ec) { derror("asio tcp acceptor open failed, error = %s", ec.message().c_str()); @@ -126,14 +136,14 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie rpc_session_ptr asio_network_provider::create_client_session(::dsn::rpc_address server_addr) { - auto sock = std::make_shared(_io_service); + auto sock = std::make_shared(get_io_service()); message_parser_ptr parser(new_message_parser(_client_hdr_format)); return rpc_session_ptr(new asio_rpc_session(*this, server_addr, sock, parser, true)); } void asio_network_provider::do_accept() { - auto socket = std::make_shared(_io_service); + auto socket = std::make_shared(get_io_service()); _acceptor->async_accept(*socket, [this, socket](boost::system::error_code ec) { if (!ec) { @@ -394,5 +404,12 @@ error_code asio_udp_provider::start(rpc_channel channel, int port, bool client_o return ERR_OK; } + +// use a round-robin scheme to choose the next io_service to use. +boost::asio::io_service &asio_network_provider::get_io_service() +{ + return *_io_services[rand::next_u32(0, FLAGS_io_service_worker_count - 1)]; +} + } // namespace tools } // namespace dsn diff --git a/src/runtime/rpc/asio_net_provider.h b/src/runtime/rpc/asio_net_provider.h index aeace24ac3..4adc37d495 100644 --- a/src/runtime/rpc/asio_net_provider.h +++ b/src/runtime/rpc/asio_net_provider.h @@ -32,6 +32,29 @@ namespace dsn { namespace tools { +/// asio_network_provider is a wrapper of Asio library for rDSN to accept a connection and create +/// sockets. Each io_service only allows one thread polling, so the operations of the single socket +/// are always done in a single thread. we create many io_service instances to take advantage of the +/// multi-core capabilities of the processor, and use the round-robin scheme to decide which +/// io_service for socket to choose. +/// +/// +-----------------------------------------------+ +/// |Linux kernel | +/// | +-----------+ +-----------+ +-----------+ | +/// | | Epoll1 | | Epoll2 | | Epoll3 | | +/// | | | | | | | | +/// | | rfd 1,2,3 | | rfd 4,5,6 | | rfd 7,8,9 | | +/// | | | | | | | | +/// | +-----^-----+ +-----^-----+ +-----^-----+ | +/// +-------|---------------|---------------|-------+ +/// +-----------+ +-----------+ +-----------+ +/// | polling | | polling | | polling | +/// | +-------+ | | +-------+ | | +-------+ | +/// | |Thread1| | | |Thread2| | | |Thread3| | +/// | +-------+ | | +-------+ | | +-------+ | +/// |io_service1| |io_service2| |io_service3| +/// +-----------+ +-----------+ +-----------+ + class asio_network_provider : public connection_oriented_network { public: @@ -45,17 +68,19 @@ class asio_network_provider : public connection_oriented_network private: void do_accept(); + boost::asio::io_service &get_io_service(); private: friend class asio_rpc_session; friend class asio_network_provider_test; std::shared_ptr _acceptor; - boost::asio::io_service _io_service; + std::vector> _io_services; std::vector> _workers; ::dsn::rpc_address _address; }; +// TODO(Tangyanzhao): change the network model like asio_network_provider class asio_udp_provider : public network { public: diff --git a/src/runtime/rpc/asio_rpc_session.cpp b/src/runtime/rpc/asio_rpc_session.cpp index 94ba669ac7..4b32007546 100644 --- a/src/runtime/rpc/asio_rpc_session.cpp +++ b/src/runtime/rpc/asio_rpc_session.cpp @@ -31,7 +31,6 @@ namespace tools { void asio_rpc_session::set_options() { - utils::auto_write_lock socket_guard(_socket_lock); if (_socket->is_open()) { boost::system::error_code ec; @@ -83,8 +82,6 @@ void asio_rpc_session::do_read(int read_next) void *ptr = _reader.read_buffer_ptr(read_next); int remaining = _reader.read_buffer_capacity(); - utils::auto_read_lock socket_guard(_socket_lock); - _socket->async_read_some( boost::asio::buffer(ptr, remaining), [this](boost::system::error_code ec, std::size_t length) { @@ -142,7 +139,6 @@ void asio_rpc_session::send(uint64_t signature) add_ref(); - utils::auto_read_lock socket_guard(_socket_lock); boost::asio::async_write( *_socket, asio_wbufs, [this, signature](boost::system::error_code ec, std::size_t length) { if (ec) { @@ -169,7 +165,6 @@ asio_rpc_session::asio_rpc_session(asio_network_provider &net, void asio_rpc_session::close() { - utils::auto_write_lock socket_guard(_socket_lock); boost::system::error_code ec; _socket->shutdown(boost::asio::socket_base::shutdown_type::shutdown_both, ec); diff --git a/src/runtime/rpc/asio_rpc_session.h b/src/runtime/rpc/asio_rpc_session.h index 6b76ee6bd1..de736f3a8c 100644 --- a/src/runtime/rpc/asio_rpc_session.h +++ b/src/runtime/rpc/asio_rpc_session.h @@ -68,7 +68,6 @@ class asio_rpc_session : public rpc_session // boost::asio::socket is thread-unsafe, must use lock to prevent a // reading/writing socket being modified or closed concurrently. std::shared_ptr _socket; - ::dsn::utils::rw_lock_nr _socket_lock; }; } // namespace tools