diff --git a/libraries/libfc/include/fc/network/listener.hpp b/libraries/libfc/include/fc/network/listener.hpp new file mode 100644 index 0000000000..807323a8bd --- /dev/null +++ b/libraries/libfc/include/fc/network/listener.hpp @@ -0,0 +1,276 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include +#include + +namespace fc { + +inline std::string to_string(const boost::asio::ip::tcp::endpoint& endpoint) { + const auto& ip_addr = endpoint.address(); + std::string ip_addr_string = ip_addr.to_string(); + if (ip_addr.is_v6()) { + ip_addr_string = "[" + ip_addr_string + "]"; + } + return ip_addr_string + ":" + std::to_string(endpoint.port()); +} + +inline std::pair split_host_port(std::string_view endpoint) { + std::string::size_type colon_pos = endpoint.rfind(':'); + if (colon_pos != std::string::npos) { + auto port = endpoint.substr(colon_pos + 1); + auto hostname = + (endpoint[0] == '[' && colon_pos >= 2) ? endpoint.substr(1, colon_pos - 2) : endpoint.substr(0, colon_pos); + return { std::string(hostname), std::string(port) }; + } else { + return { std::string(endpoint), {} }; + } +} + +///////////////////////////////////////////////////////////////////////////////////////////// +/// +/// fc::listener is template class to simplify the code for accepting new socket connections. +/// It can be used for both tcp or Unix socket connection. +/// +/// Example Usage: +/// \code{.cpp} +/// +/// class shared_state_type; +/// +/// template +/// struct example_session : std::enable_shared_from_this> { +/// using socket_type = Protocol::socket; +/// socket_type&& socket_; +/// shared_state_type& shared_state_; +/// example_session(socket_type&& socket, shared_state_type& shared_state) +/// : socket_(std::move(socket)), shared_state_(shared_state_) {} +/// +/// // ... +/// void start(); +/// }; +/// +/// template +/// struct example_listener : fc::listener, Protocol>{ +/// static constexpr uint32_t accept_timeout_ms = 200; +/// shared_state_type& shared_state_; +/// +/// example_listener(boost::asio::io_context& executor, +/// logger& logger, +/// const std::string& local_address, +/// const typename Protocol::endpoint& endpoint, +/// shared_state_type& shared_state) +/// : fc::listener, Protocol> +/// (executor, logger, boost::posix_time::milliseconds(accept_timeout_ms), local_address, endpoint) +/// , shared_state_(shared_state) {} +/// +/// std::string extra_listening_log_info() { +/// return shared_state_.info_to_be_printed_after_address_is_resolved_and_listening; +/// } +/// +/// void create_session(Protocol::socket&& sock) { +/// auto session = std::make_shared(std::move(sock), shared_state_); +/// session->start(); +/// } +/// }; +/// +/// int main() { +/// boost::asio::io_context ioc; +/// fc::logger logger = fc::logger::get(DEFAULT_LOGGER); +/// shared_state_type shared_state{...}; +/// +/// // usage for accepting tcp connection +/// // notice that it only throws std::system_error, not fc::exception +/// example_listener::create(executor, logger, "localhost:8080", std::ref(shared_state)); +/// +/// // usage for accepting unix socket connection +/// example_listener::create(executor, logger, "tmp.sock", +/// std::ref(shared_state)); +/// +/// ioc.run(); +/// return 0; +/// } +/// \endcode +/// +///////////////////////////////////////////////////////////////////////////////////////////// + +template +struct listener : std::enable_shared_from_this { + using endpoint_type = typename Protocol::endpoint; + + typename Protocol::acceptor acceptor_; + boost::asio::deadline_timer accept_error_timer_; + boost::posix_time::time_duration accept_timeout_; + logger& logger_; + std::string local_address_; + + listener(boost::asio::io_context& executor, logger& logger, boost::posix_time::time_duration accept_timeout, + const std::string& local_address, const endpoint_type& endpoint) + : acceptor_(executor, endpoint), accept_error_timer_(executor), accept_timeout_(accept_timeout), logger_(logger), + local_address_(std::is_same_v + ? local_address + : std::filesystem::absolute(local_address).string()) {} + + ~listener() { + if constexpr (std::is_same_v) { + std::filesystem::remove(local_address_); + } + } + + void do_accept() { + acceptor_.async_accept([self = this->shared_from_this()](boost::system::error_code ec, auto&& peer_socket) { + self->on_accept(ec, std::forward(peer_socket)); + }); + } + + template + void on_accept(boost::system::error_code ec, Socket&& socket) { + if (!ec) { + static_cast(this)->create_session(std::forward(socket)); + do_accept(); + } else if (ec == boost::system::errc::too_many_files_open) { + // retry accept() after timeout to avoid cpu loop on accept + fc_elog(logger_, "open file limit reached: not accepting new connections for next ${timeout}ms", + ("timeout", accept_timeout_.total_milliseconds())); + accept_error_timer_.expires_from_now(accept_timeout_); + accept_error_timer_.async_wait([self = this->shared_from_this()](boost::system::error_code ec) { + if (!ec) + self->do_accept(); + }); + } else if (int code = ec.value(); code == ENETDOWN || code == EPROTO || code == ENOPROTOOPT || + code == EHOSTDOWN || code == EHOSTUNREACH || code == EOPNOTSUPP || + code == ENETUNREACH +#ifdef ENONET + || code == ENONET +#endif + ) { + // according to https://man7.org/linux/man-pages/man2/accept.2.html, reliable application should + // retry when these error codes are returned + fc_wlog(logger_, "closing connection, accept error: ${m}", ("m", ec.message())); + do_accept(); + } else { + fc_elog(logger_, "Unrecoverable accept error, stop listening: ${msg}", ("m", ec.message())); + } + } + + const char* extra_listening_log_info() { return ""; } + + void log_listening(const endpoint_type& endpoint, const std::string& local_address) { + std::string info; + if constexpr (std::is_same_v) { + info = fc::to_string(endpoint) + " resolved from " + local_address; + } else { + info = "Unix socket " + local_address; + } + info += static_cast(this)->extra_listening_log_info(); + fc_ilog(logger_, "start listening on ${info}", ("info", info)); + } + + /// @brief Create listeners to listen on endpoints resolved from address + /// @param ...args The arguments to forward to the listener constructor so that they can be accessed + /// from create_session() to construct the customized session objects. + /// @throws std::system_error + template + static void create(boost::asio::io_context& executor, logger& logger, const std::string& address, Args&&... args) { + using tcp = boost::asio::ip::tcp; + if constexpr (std::is_same_v) { + auto [host, port] = split_host_port(address); + if (port.empty()) { + fc_elog(logger, "port is not specified for address ${addr}", ("addr", address)); + throw std::system_error(std::make_error_code(std::errc::bad_address)); + } + + boost::system::error_code ec; + tcp::resolver resolver(executor); + auto endpoints = resolver.resolve(host, port, tcp::resolver::passive, ec); + if (ec) { + fc_elog(logger, "failed to resolve address: ${msg}", ("msg", ec.message())); + throw std::system_error(ec); + } + + int listened = 0; + std::optional unspecified_ipv4_addr; + bool has_unspecified_ipv6_only = false; + + auto create_server = [&](const auto& endpoint) { + const auto& ip_addr = endpoint.address(); + try { + auto server = std::make_shared(executor, logger, address, endpoint, std::forward(args)...); + server->log_listening(endpoint, address); + server->do_accept(); + ++listened; + has_unspecified_ipv6_only = ip_addr.is_unspecified() && ip_addr.is_v6(); + if (has_unspecified_ipv6_only) { + boost::asio::ip::v6_only option; + server->acceptor_.get_option(option); + has_unspecified_ipv6_only &= option.value(); + } + + } catch (boost::system::system_error& ex) { + fc_wlog(logger, "unable to listen on ${ip_addr}:${port} resolved from ${address}: ${msg}", + ("ip_addr", ip_addr.to_string())("port", endpoint.port())("address", address)("msg", ex.what())); + } + }; + + for (const auto& ep : endpoints) { + const auto& endpoint = ep.endpoint(); + const auto& ip_addr = endpoint.address(); + if (ip_addr.is_unspecified() && ip_addr.is_v4() && endpoints.size() > 1) { + // it is an error to bind a socket to the same port for both ipv6 and ipv4 INADDR_ANY address when + // the system has ipv4-mapped ipv6 enabled by default, we just skip the ipv4 for now. + unspecified_ipv4_addr = endpoint; + continue; + } + create_server(endpoint); + } + + if (unspecified_ipv4_addr.has_value() && has_unspecified_ipv6_only) { + create_server(*unspecified_ipv4_addr); + } + + if (listened == 0) { + fc_elog(logger, "none of the addresses resolved from ${addr} can be listened to", ("addr", address)); + throw std::system_error(std::make_error_code(std::errc::bad_address)); + } + } else { + using stream_protocol = boost::asio::local::stream_protocol; + static_assert(std::is_same_v); + + namespace fs = std::filesystem; + auto cwd = fs::current_path(); + fs::path sock_path = address; + + fs::create_directories(sock_path.parent_path()); + // The maximum length of the socket path is defined by sockaddr_un::sun_path. On Linux, + // according to unix(7), it is 108 bytes. On FreeBSD, according to unix(4), it is 104 bytes. + // Therefore, we create the unix socket with the relative path to its parent path to avoid the + // problem. + fs::current_path(sock_path.parent_path()); + auto restore = fc::make_scoped_exit([cwd] { fs::current_path(cwd); }); + + endpoint_type endpoint{ sock_path.filename().string() }; + + boost::system::error_code ec; + stream_protocol::socket test_socket(executor); + test_socket.connect(endpoint, ec); + + // looks like a service is already running on that socket, don't touch it... fail out + if (ec == boost::system::errc::success) { + fc_elog(logger, "The unix socket path ${addr} is already in use", ("addr", address)); + throw std::system_error(std::make_error_code(std::errc::address_in_use)); + } + // socket exists but no one home, go ahead and remove it and continue on + else if (ec == boost::system::errc::connection_refused) + fs::remove(sock_path); + + auto server = std::make_shared(executor, logger, address, endpoint, std::forward(args)...); + server->log_listening(endpoint, address); + server->do_accept(); + } + } +}; +} // namespace fc diff --git a/plugins/http_plugin/http_plugin.cpp b/plugins/http_plugin/http_plugin.cpp index bd97965829..bc40bb7885 100644 --- a/plugins/http_plugin/http_plugin.cpp +++ b/plugins/http_plugin/http_plugin.cpp @@ -1,11 +1,12 @@ + #include #include -#include +#include #include #include #include -#include +#include #include #include @@ -110,6 +111,34 @@ namespace eosio { return result; } + template + struct beast_http_listener + : fc::listener, Protocol> { + using socket_type = typename Protocol::socket; + + static constexpr uint32_t accept_timeout_ms = 500; + http_plugin_state& state_; + api_category_set categories_ = {}; + + beast_http_listener(boost::asio::io_context& executor, fc::logger& logger, const std::string& local_address, + const typename Protocol::endpoint& endpoint, http_plugin_state& plugin_state, + api_category_set categories) + : fc::listener, Protocol>( + executor, logger, boost::posix_time::milliseconds(accept_timeout_ms), local_address, endpoint), + state_(plugin_state), categories_(categories) {} + + std::string extra_listening_log_info() { return " for API categories: " + category_names(categories_); } + + void create_session(socket_type&& socket) { + boost::system::error_code re_ec; + auto re = socket.remote_endpoint(re_ec); + std::string remote_endpoint = re_ec ? "unknown" : boost::lexical_cast(re); + std::make_shared>(std::move(socket), this->state_, std::move(remote_endpoint), + categories_, this->local_address_) + ->run_session(); + } + }; + class http_plugin_impl : public std::enable_shared_from_this { public: http_plugin_impl() = default; @@ -122,7 +151,7 @@ namespace eosio { std::map categories_by_address; - shared_ptr plugin_state = std::make_shared(logger()); + http_plugin_state plugin_state{logger()}; std::atomic listening; @@ -199,9 +228,9 @@ namespace eosio { bool on_loopback_only(const std::string& address) { if (is_unix_socket_address(address)) return true; - auto [host, port] = split_host_port(address); + auto [host, port] = fc::split_host_port(address); boost::system::error_code ec; - tcp::resolver resolver(plugin_state->thread_pool.get_executor()); + tcp::resolver resolver(plugin_state.thread_pool.get_executor()); auto endpoints = resolver.resolve(host, port, boost::asio::ip::tcp::resolver::passive, ec); if (ec) { fc_wlog(logger(), "Cannot resolve address ${addr}: ${msg}", ("addr", address)("msg", ec.message())); @@ -214,88 +243,16 @@ namespace eosio { void create_beast_server(const std::string& address, api_category_set categories) { try { - EOS_ASSERT(address.size() >= 2, chain::plugin_config_exception, "Invalid http server address: ${addr}", - ("addr", address)); - if (is_unix_socket_address(address)) { - namespace fs = std::filesystem; - auto cwd = fs::current_path(); + namespace fs = std::filesystem; fs::path sock_path = address; if (sock_path.is_relative()) sock_path = fs::weakly_canonical(app().data_dir() / sock_path); - fs::remove(sock_path); - fs::create_directories(sock_path.parent_path()); - // The maximum length of the socket path is defined by sockaddr_un::sun_path. On Linux, - // according to unix(7), it is 108 bytes. On FreeBSD, according to unix(4), it is 104 bytes. - // Therefore, we create the unix socket with the relative path to its parent path to avoid the - // problem. - fs::current_path(sock_path.parent_path()); - auto restore = fc::make_scoped_exit([cwd] { fs::current_path(cwd); }); - - using stream_protocol = asio::local::stream_protocol; - auto server = std::make_shared>( - plugin_state, categories, stream_protocol::endpoint{ sock_path.filename().string() }); - server->do_accept(); - fc_ilog(logger(), "created UNIX socket listener at ${addr} for API categories: ${cat}", - ("addr", sock_path)("cat", category_names(categories))); + beast_http_listener::create(plugin_state.thread_pool.get_executor(), logger(), + sock_path.string(), std::ref(plugin_state), categories); } else { - - auto [host, port] = split_host_port(address); - EOS_ASSERT(port.size(), chain::plugin_config_exception, "port is not specified"); - - boost::system::error_code ec; - tcp::resolver resolver( plugin_state->thread_pool.get_executor()); - auto endpoints = resolver.resolve(host, port, boost::asio::ip::tcp::resolver::passive, ec); - EOS_ASSERT(!ec, chain::plugin_config_exception, "failed to resolve address: ${msg}", - ("msg", ec.message())); - - int listened = 0; - std::optional unspecified_ipv4_addr; - bool has_unspecified_ipv6_only = false; - - auto create_ip_server = [&](const auto& endpoint) { - const auto& ip_addr = endpoint.address(); - std::string ip_addr_string = ip_addr.to_string(); - if (ip_addr.is_v6()) { - ip_addr_string = "[" + ip_addr_string + "]"; - } - try { - auto server = std::make_shared>( - plugin_state, categories, endpoint, address); - server->do_accept(); - ++listened; - fc_ilog( - logger(), - "start listening on ${ip_addr}:${port} resolved from ${address} for API categories: ${cat}", - ("ip_addr", ip_addr_string)("port", endpoint.port())("address", address)( - "cat", category_names(categories))); - has_unspecified_ipv6_only = - ip_addr.is_unspecified() && ip_addr.is_v6() && server->is_ip_v6_only(); - - } catch (boost::system::system_error& ex) { - fc_wlog(logger(), "unable to listen on ${ip_addr}:${port} resolved from ${address}: ${msg}", - ("ip_addr", ip_addr.to_string())("port", endpoint.port())("address", address)("msg", - ex.what())); - } - }; - - for (const auto& ep: endpoints) { - const auto& endpoint = ep.endpoint(); - const auto& ip_addr = endpoint.address(); - if (ip_addr.is_unspecified() && ip_addr.is_v4() && endpoints.size() > 1) { - // it is an error to bind a socket to the same port for both ipv6 and ipv4 INADDR_ANY address when - // the system has ipv4-mapped ipv6 enabled by default, we just skip the ipv4 for now. - unspecified_ipv4_addr = endpoint; - continue; - } - create_ip_server(endpoint); - } - - if (unspecified_ipv4_addr.has_value() && has_unspecified_ipv6_only) { - create_ip_server(*unspecified_ipv4_addr); - } - - EOS_ASSERT (listened > 0, chain::plugin_config_exception, "none of the resolved addresses can be listened to" ); + beast_http_listener::create(plugin_state.thread_pool.get_executor(), logger(), address, + std::ref(plugin_state), categories); } } catch (const fc::exception& e) { fc_elog(logger(), "http service failed to start for ${addr}: ${e}", @@ -378,33 +335,33 @@ namespace eosio { cfg.add_options() ("access-control-allow-origin", bpo::value()->notifier([this](const string& v) { - my->plugin_state->access_control_allow_origin = v; + my->plugin_state.access_control_allow_origin = v; fc_ilog( logger(), "configured http with Access-Control-Allow-Origin: ${o}", - ("o", my->plugin_state->access_control_allow_origin) ); + ("o", my->plugin_state.access_control_allow_origin) ); }), "Specify the Access-Control-Allow-Origin to be returned on each request") ("access-control-allow-headers", bpo::value()->notifier([this](const string& v) { - my->plugin_state->access_control_allow_headers = v; + my->plugin_state.access_control_allow_headers = v; fc_ilog( logger(), "configured http with Access-Control-Allow-Headers : ${o}", - ("o", my->plugin_state->access_control_allow_headers) ); + ("o", my->plugin_state.access_control_allow_headers) ); }), "Specify the Access-Control-Allow-Headers to be returned on each request") ("access-control-max-age", bpo::value()->notifier([this](const string& v) { - my->plugin_state->access_control_max_age = v; + my->plugin_state.access_control_max_age = v; fc_ilog( logger(), "configured http with Access-Control-Max-Age : ${o}", - ("o", my->plugin_state->access_control_max_age) ); + ("o", my->plugin_state.access_control_max_age) ); }), "Specify the Access-Control-Max-Age to be returned on each request.") ("access-control-allow-credentials", bpo::bool_switch()->notifier([this](bool v) { - my->plugin_state->access_control_allow_credentials = v; + my->plugin_state.access_control_allow_credentials = v; if( v ) fc_ilog( logger(), "configured http with Access-Control-Allow-Credentials: true" ); })->default_value(false), "Specify if Access-Control-Allow-Credentials: true should be returned on each request.") - ("max-body-size", bpo::value()->default_value(my->plugin_state->max_body_size), + ("max-body-size", bpo::value()->default_value(my->plugin_state.max_body_size), "The maximum body size in bytes allowed for incoming RPC requests") ("http-max-bytes-in-flight-mb", bpo::value()->default_value(500), "Maximum size in megabytes http_plugin should use for processing http requests. -1 for unlimited. 429 error response when exceeded." ) @@ -418,7 +375,7 @@ namespace eosio { "If set to false, then any incoming \"Host\" header is considered valid") ("http-alias", bpo::value>()->composing(), "Additionally acceptable values for the \"Host\" header of incoming HTTP requests, can be specified multiple times. Includes http/s_server_address by default.") - ("http-threads", bpo::value()->default_value( my->plugin_state->thread_pool_size ), + ("http-threads", bpo::value()->default_value( my->plugin_state.thread_pool_size ), "Number of worker threads in http thread pool") ("http-keep-alive", bpo::value()->default_value(true), "If set to false, do not keep HTTP connections alive, even if client requests.") @@ -428,38 +385,38 @@ namespace eosio { void http_plugin::plugin_initialize(const variables_map& options) { try { handle_sighup(); // setup logging - my->plugin_state->max_body_size = options.at( "max-body-size" ).as(); + my->plugin_state.max_body_size = options.at( "max-body-size" ).as(); verbose_http_errors = options.at( "verbose-http-errors" ).as(); - my->plugin_state->thread_pool_size = options.at( "http-threads" ).as(); - EOS_ASSERT( my->plugin_state->thread_pool_size > 0, chain::plugin_config_exception, - "http-threads ${num} must be greater than 0", ("num", my->plugin_state->thread_pool_size)); + my->plugin_state.thread_pool_size = options.at( "http-threads" ).as(); + EOS_ASSERT( my->plugin_state.thread_pool_size > 0, chain::plugin_config_exception, + "http-threads ${num} must be greater than 0", ("num", my->plugin_state.thread_pool_size)); auto max_bytes_mb = options.at( "http-max-bytes-in-flight-mb" ).as(); EOS_ASSERT( (max_bytes_mb >= -1 && max_bytes_mb < std::numeric_limits::max() / (1024 * 1024)), chain::plugin_config_exception, "http-max-bytes-in-flight-mb (${max_bytes_mb}) must be equal to or greater than -1 and less than ${max}", ("max_bytes_mb", max_bytes_mb) ("max", std::numeric_limits::max() / (1024 * 1024)) ); if ( max_bytes_mb == -1 ) { - my->plugin_state->max_bytes_in_flight = std::numeric_limits::max(); + my->plugin_state.max_bytes_in_flight = std::numeric_limits::max(); } else { - my->plugin_state->max_bytes_in_flight = max_bytes_mb * 1024 * 1024; + my->plugin_state.max_bytes_in_flight = max_bytes_mb * 1024 * 1024; } - my->plugin_state->max_requests_in_flight = options.at( "http-max-in-flight-requests" ).as(); + my->plugin_state.max_requests_in_flight = options.at( "http-max-in-flight-requests" ).as(); int64_t max_reponse_time_ms = options.at("http-max-response-time-ms").as(); EOS_ASSERT( max_reponse_time_ms == -1 || max_reponse_time_ms >= 0, chain::plugin_config_exception, "http-max-response-time-ms must be -1, or non-negative: ${m}", ("m", max_reponse_time_ms) ); - my->plugin_state->max_response_time = max_reponse_time_ms == -1 ? + my->plugin_state.max_response_time = max_reponse_time_ms == -1 ? fc::microseconds::maximum() : fc::microseconds( max_reponse_time_ms * 1000 ); - my->plugin_state->validate_host = options.at("http-validate-host").as(); + my->plugin_state.validate_host = options.at("http-validate-host").as(); if( options.count( "http-alias" )) { const auto& aliases = options["http-alias"].as>(); for (const auto& alias : aliases ) { - auto [host, port] = split_host_port(alias); - my->plugin_state->valid_hosts.insert(host); + auto [host, port] = fc::split_host_port(alias); + my->plugin_state.valid_hosts.insert(host); } } - my->plugin_state->keep_alive = options.at("http-keep-alive").as(); + my->plugin_state.keep_alive = options.at("http-keep-alive").as(); std::string http_server_address; if (options.count("http-server-address")) { @@ -507,7 +464,7 @@ namespace eosio { auto address = spec.substr(comma_pos+1); - auto [host, port] = split_host_port(address); + auto [host, port] = fc::split_host_port(address); if (port.size()) { auto [itr, inserted] = hostnames.try_emplace(port, host); EOS_ASSERT(inserted || host == itr->second, chain::plugin_config_exception, @@ -517,7 +474,7 @@ namespace eosio { my->categories_by_address[address].insert(category); } } - my->plugin_state->server_header = current_http_plugin_defaults.server_header; + my->plugin_state.server_header = current_http_plugin_defaults.server_header; //watch out for the returns above when adding new code here @@ -529,7 +486,7 @@ namespace eosio { { // The reason we post here is because we want blockchain replay to happen before we start listening. try { - my->plugin_state->thread_pool.start( my->plugin_state->thread_pool_size, [](const fc::exception& e) { + my->plugin_state.thread_pool.start( my->plugin_state.thread_pool_size, [](const fc::exception& e) { fc_elog( logger(), "Exception in http thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); app().quit(); } ); @@ -557,10 +514,10 @@ namespace eosio { } void http_plugin::plugin_shutdown() { - my->plugin_state->thread_pool.stop(); + my->plugin_state.thread_pool.stop(); // release http_plugin_impl_ptr shared_ptrs captured in url handlers - my->plugin_state->url_handlers.clear(); + my->plugin_state.url_handlers.clear(); fc_ilog( logger(), "exit shutdown"); } @@ -578,20 +535,20 @@ namespace eosio { void http_plugin::add_handler(api_entry&& entry, appbase::exec_queue q, int priority, http_content_type content_type) { log_add_handler(my.get(), entry); std::string path = entry.path; - auto p = my->plugin_state->url_handlers.emplace(path, my->make_app_thread_url_handler(std::move(entry), q, priority, my, content_type)); + auto p = my->plugin_state.url_handlers.emplace(path, my->make_app_thread_url_handler(std::move(entry), q, priority, my, content_type)); EOS_ASSERT( p.second, chain::plugin_config_exception, "http url ${u} is not unique", ("u", path) ); } void http_plugin::add_async_handler(api_entry&& entry, http_content_type content_type) { log_add_handler(my.get(), entry); std::string path = entry.path; - auto p = my->plugin_state->url_handlers.emplace(path, my->make_http_thread_url_handler(std::move(entry), content_type)); + auto p = my->plugin_state.url_handlers.emplace(path, my->make_http_thread_url_handler(std::move(entry), content_type)); EOS_ASSERT( p.second, chain::plugin_config_exception, "http url ${u} is not unique", ("u", path) ); } void http_plugin::post_http_thread_pool(std::function f) { if( f ) - boost::asio::post( my->plugin_state->thread_pool.get_executor(), f ); + boost::asio::post( my->plugin_state.thread_pool.get_executor(), f ); } void http_plugin::handle_exception( const char* api_name, const char* call_name, const string& body, const url_response_callback& cb) { @@ -663,15 +620,15 @@ namespace eosio { } fc::microseconds http_plugin::get_max_response_time()const { - return my->plugin_state->max_response_time; + return my->plugin_state.max_response_time; } size_t http_plugin::get_max_body_size()const { - return my->plugin_state->max_body_size; + return my->plugin_state.max_body_size; } void http_plugin::register_update_metrics(std::function&& fun) { - my->plugin_state->update_metrics = std::move(fun); + my->plugin_state.update_metrics = std::move(fun); } std::atomic& http_plugin::listening() { diff --git a/plugins/http_plugin/include/eosio/http_plugin/api_category.hpp b/plugins/http_plugin/include/eosio/http_plugin/api_category.hpp index f6d15d8ce6..031e2277d4 100644 --- a/plugins/http_plugin/include/eosio/http_plugin/api_category.hpp +++ b/plugins/http_plugin/include/eosio/http_plugin/api_category.hpp @@ -1,6 +1,6 @@ #pragma once -#include #include +#include namespace eosio { enum class api_category : uint32_t { @@ -19,7 +19,6 @@ enum class api_category : uint32_t { node = UINT32_MAX }; - class api_category_set { uint32_t data = {}; public: diff --git a/plugins/http_plugin/include/eosio/http_plugin/beast_http_listener.hpp b/plugins/http_plugin/include/eosio/http_plugin/beast_http_listener.hpp deleted file mode 100644 index 484f7fabc1..0000000000 --- a/plugins/http_plugin/include/eosio/http_plugin/beast_http_listener.hpp +++ /dev/null @@ -1,89 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -namespace eosio { -// since beast_http_listener handles both TCP and UNIX endpoints we need a template here -// to get the path if makes sense, so that we can call ::unlink() before opening socket -// in beast_http_listener::listen() by tdefault return blank string -template -std::string get_endpoint_path(const T& endpt) { return {}; } - -std::string get_endpoint_path(const stream_protocol::endpoint& endpt) { return endpt.path(); } - -// Accepts incoming connections and launches the sessions -// socket type must be the socket e.g, boost::asio::ip::tcp::socket -template -class beast_http_listener : public std::enable_shared_from_this> { -private: - std::shared_ptr plugin_state_; - - using protocol_type = typename socket_type::protocol_type; - typename protocol_type::acceptor acceptor_; - socket_type socket_; - std::string local_address_; - - boost::asio::deadline_timer accept_error_timer_; - api_category_set categories_ = {}; -public: - beast_http_listener() = default; - beast_http_listener(const beast_http_listener&) = delete; - beast_http_listener(beast_http_listener&&) = delete; - - beast_http_listener& operator=(const beast_http_listener&) = delete; - beast_http_listener& operator=(beast_http_listener&&) = delete; - - beast_http_listener(std::shared_ptr plugin_state, api_category_set categories, - typename protocol_type::endpoint endpoint, - const std::string& local_address="") - : plugin_state_(std::move(plugin_state)), acceptor_(plugin_state_->thread_pool.get_executor(), endpoint), - socket_(plugin_state_->thread_pool.get_executor()), local_address_(local_address), - accept_error_timer_(plugin_state_->thread_pool.get_executor()), categories_(categories) { - } - - virtual ~beast_http_listener() {}; - - void do_accept() { - auto self = this->shared_from_this(); - acceptor_.async_accept(socket_, [self](beast::error_code ec) { - if(ec == boost::system::errc::too_many_files_open) { - // retry accept() after timeout to avoid cpu loop on accept - fail(ec, "accept", self->plugin_state_->logger, "too many files open - waiting 500ms"); - self->accept_error_timer_.expires_from_now(boost::posix_time::milliseconds(500)); - self->accept_error_timer_.async_wait([self = self->shared_from_this()](beast::error_code ec) { - if (!ec) - self->do_accept(); - }); - } else { - if (ec) { - fail(ec, "accept", self->plugin_state_->logger, "closing connection"); - } else { - // Create the session object and run it - boost::system::error_code re_ec; - auto re = self->socket_.remote_endpoint(re_ec); - std::string remote_endpoint = re_ec ? "unknown" : boost::lexical_cast(re); - std::make_shared>( - std::move(self->socket_), - self->plugin_state_, - std::move(remote_endpoint), - self->categories_, - self->local_address_) - ->run_session(); - } - - // Accept another connection - self->do_accept(); - } - }); - } - - bool is_ip_v6_only() const { - boost::asio::ip::v6_only option; - acceptor_.get_option(option); - return option.value(); - } -};// end class beast_http_Listener -}// namespace eosio diff --git a/plugins/http_plugin/include/eosio/http_plugin/beast_http_session.hpp b/plugins/http_plugin/include/eosio/http_plugin/beast_http_session.hpp index e055a503a3..2ac3ebb5a4 100644 --- a/plugins/http_plugin/include/eosio/http_plugin/beast_http_session.hpp +++ b/plugins/http_plugin/include/eosio/http_plugin/beast_http_session.hpp @@ -17,21 +17,6 @@ namespace eosio { using std::chrono::steady_clock; -typedef asio::basic_stream_socket tcp_socket_t; - -using boost::asio::local::stream_protocol; - -#if BOOST_VERSION < 107300 -using local_stream = beast::basic_stream< - stream_protocol, - asio::executor, - beast::unlimited_rate_policy>; -#else -using local_stream = beast::basic_stream< - stream_protocol, - asio::any_io_executor, - beast::unlimited_rate_policy>; -#endif //------------------------------------------------------------------------------ // fail() @@ -43,12 +28,12 @@ void fail(beast::error_code ec, char const* what, fc::logger& logger, char const } -bool allow_host(const std::string& host_str, tcp_socket_t& socket, - const std::shared_ptr& plugin_state) { +bool allow_host(const std::string& host_str, tcp::socket& socket, + const http_plugin_state& plugin_state) { auto& lowest_layer = beast::get_lowest_layer(socket); auto local_endpoint = lowest_layer.local_endpoint(); - if(host_str.empty() || !host_is_valid(*plugin_state, + if(host_str.empty() || !host_is_valid(plugin_state, host_str, local_endpoint.address())) { return false; @@ -98,7 +83,7 @@ class beast_http_session : public detail::abstract_conn, // HTTP response object std::optional> res_; - std::shared_ptr plugin_state_; + http_plugin_state& plugin_state_; std::string remote_endpoint_; std::string local_address_; @@ -127,12 +112,12 @@ class beast_http_session : public detail::abstract_conn, res_->version(req.version()); res_->set(http::field::content_type, "application/json"); res_->keep_alive(req.keep_alive()); - if(plugin_state_->server_header.size()) - res_->set(http::field::server, plugin_state_->server_header); + if(plugin_state_.server_header.size()) + res_->set(http::field::server, plugin_state_.server_header); // Request path must be absolute and not contain "..". if(req.target().empty() || req.target()[0] != '/' || req.target().find("..") != beast::string_view::npos) { - fc_dlog( plugin_state_->logger, "Return bad_reqest: ${target}", ("target", std::string(req.target())) ); + fc_dlog( plugin_state_.get_logger(), "Return bad_reqest: ${target}", ("target", std::string(req.target())) ); error_results results{static_cast(http::status::bad_request), "Illegal request-target"}; send_response( fc::json::to_string( results, fc::time_point::maximum() ), static_cast(http::status::bad_request) ); @@ -141,23 +126,23 @@ class beast_http_session : public detail::abstract_conn, try { if(!allow_host(req)) { - fc_dlog( plugin_state_->logger, "bad host: ${HOST}", ("HOST", std::string(req["host"]))); + fc_dlog( plugin_state_.get_logger(), "bad host: ${HOST}", ("HOST", std::string(req["host"]))); error_results results{static_cast(http::status::bad_request), "Disallowed HTTP HOST header in the request"}; send_response( fc::json::to_string( results, fc::time_point::maximum() ), static_cast(http::status::bad_request) ); return; } - if(!plugin_state_->access_control_allow_origin.empty()) { - res_->set("Access-Control-Allow-Origin", plugin_state_->access_control_allow_origin); + if(!plugin_state_.access_control_allow_origin.empty()) { + res_->set("Access-Control-Allow-Origin", plugin_state_.access_control_allow_origin); } - if(!plugin_state_->access_control_allow_headers.empty()) { - res_->set("Access-Control-Allow-Headers", plugin_state_->access_control_allow_headers); + if(!plugin_state_.access_control_allow_headers.empty()) { + res_->set("Access-Control-Allow-Headers", plugin_state_.access_control_allow_headers); } - if(!plugin_state_->access_control_max_age.empty()) { - res_->set("Access-Control-Max-Age", plugin_state_->access_control_max_age); + if(!plugin_state_.access_control_max_age.empty()) { + res_->set("Access-Control-Max-Age", plugin_state_.access_control_max_age); } - if(plugin_state_->access_control_allow_credentials) { + if(plugin_state_.access_control_allow_credentials) { res_->set("Access-Control-Allow-Credentials", "true"); } @@ -167,21 +152,21 @@ class beast_http_session : public detail::abstract_conn, return; } - fc_dlog( plugin_state_->logger, "Request: ${ep} ${r}", + fc_dlog( plugin_state_.get_logger(), "Request: ${ep} ${r}", ("ep", remote_endpoint_)("r", to_log_string(req)) ); std::string resource = std::string(req.target()); // look for the URL handler to handle this resource - auto handler_itr = plugin_state_->url_handlers.find(resource); - if(handler_itr != plugin_state_->url_handlers.end() && categories_.contains(handler_itr->second.category)) { - if(plugin_state_->logger.is_enabled(fc::log_level::all)) - plugin_state_->logger.log(FC_LOG_MESSAGE(all, "resource: ${ep}", ("ep", resource))); + auto handler_itr = plugin_state_.url_handlers.find(resource); + if(handler_itr != plugin_state_.url_handlers.end() && categories_.contains(handler_itr->second.category)) { + if(plugin_state_.get_logger().is_enabled(fc::log_level::all)) + plugin_state_.get_logger().log(FC_LOG_MESSAGE(all, "resource: ${ep}", ("ep", resource))); std::string body = req.body(); auto content_type = handler_itr->second.content_type; set_content_type_header(content_type); - if (plugin_state_->update_metrics) - plugin_state_->update_metrics({resource}); + if (plugin_state_.update_metrics) + plugin_state_.update_metrics({resource}); handler_itr->second.fn(this->shared_from_this(), std::move(resource), @@ -189,13 +174,13 @@ class beast_http_session : public detail::abstract_conn, make_http_response_handler(plugin_state_, this->shared_from_this(), content_type)); } else if (resource == "/v1/node/get_supported_apis") { http_plugin::get_supported_apis_result result; - for (const auto& handler : plugin_state_->url_handlers) { + for (const auto& handler : plugin_state_.url_handlers) { if (categories_.contains(handler.second.category)) result.apis.push_back(handler.first); } send_response(fc::json::to_string(fc::variant(result), fc::time_point::maximum()), 200); } else { - fc_dlog( plugin_state_->logger, "404 - not found: ${ep}", ("ep", resource) ); + fc_dlog( plugin_state_.get_logger(), "404 - not found: ${ep}", ("ep", resource) ); error_results results{static_cast(http::status::not_found), "Not Found", error_results::error_info( fc::exception( FC_LOG_MESSAGE( error, "Unknown Endpoint" ) ), http_plugin::verbose_errors() )}; @@ -219,7 +204,7 @@ class beast_http_session : public detail::abstract_conn, res->result(http::status::unauthorized); continue_state_ = continue_state_t::reject; } - res->set(http::field::server, plugin_state_->server_header); + res->set(http::field::server, plugin_state_.server_header); http::async_write( socket_, @@ -242,21 +227,21 @@ class beast_http_session : public detail::abstract_conn, } virtual std::string verify_max_bytes_in_flight(size_t extra_bytes) final { - auto bytes_in_flight_size = plugin_state_->bytes_in_flight.load() + extra_bytes; - if(bytes_in_flight_size > plugin_state_->max_bytes_in_flight) { - fc_dlog(plugin_state_->logger, "429 - too many bytes in flight: ${bytes}", ("bytes", bytes_in_flight_size)); + auto bytes_in_flight_size = plugin_state_.bytes_in_flight.load() + extra_bytes; + if(bytes_in_flight_size > plugin_state_.max_bytes_in_flight) { + fc_dlog(plugin_state_.get_logger(), "429 - too many bytes in flight: ${bytes}", ("bytes", bytes_in_flight_size)); return "Too many bytes in flight: " + std::to_string( bytes_in_flight_size ); } return {}; } virtual std::string verify_max_requests_in_flight() final { - if(plugin_state_->max_requests_in_flight < 0) + if(plugin_state_.max_requests_in_flight < 0) return {}; - auto requests_in_flight_num = plugin_state_->requests_in_flight.load(); - if(requests_in_flight_num > plugin_state_->max_requests_in_flight) { - fc_dlog(plugin_state_->logger, "429 - too many requests in flight: ${requests}", ("requests", requests_in_flight_num)); + auto requests_in_flight_num = plugin_state_.requests_in_flight.load(); + if(requests_in_flight_num > plugin_state_.max_requests_in_flight) { + fc_dlog(plugin_state_.get_logger(), "429 - too many requests in flight: ${requests}", ("requests", requests_in_flight_num)); return "Too many requests in flight: " + std::to_string( requests_in_flight_num ); } return {}; @@ -266,13 +251,13 @@ class beast_http_session : public detail::abstract_conn, // shared_from_this() requires default constructor beast_http_session() = default; - beast_http_session(Socket&& socket, std::shared_ptr plugin_state, std::string remote_endpoint, + beast_http_session(Socket&& socket, http_plugin_state& plugin_state, std::string remote_endpoint, api_category_set categories, const std::string& local_address) - : socket_(std::move(socket)), categories_(categories), plugin_state_(std::move(plugin_state)), + : socket_(std::move(socket)), categories_(categories), plugin_state_(plugin_state), remote_endpoint_(std::move(remote_endpoint)), local_address_(local_address) { - plugin_state_->requests_in_flight += 1; + plugin_state_.requests_in_flight += 1; req_parser_.emplace(); - req_parser_->body_limit(plugin_state_->max_body_size); + req_parser_->body_limit(plugin_state_.max_body_size); res_.emplace(); session_begin_ = steady_clock::now(); @@ -284,14 +269,14 @@ class beast_http_session : public detail::abstract_conn, virtual ~beast_http_session() { is_send_exception_response_ = false; - plugin_state_->requests_in_flight -= 1; - if(plugin_state_->logger.is_enabled(fc::log_level::all)) { + plugin_state_.requests_in_flight -= 1; + if(plugin_state_.get_logger().is_enabled(fc::log_level::all)) { auto session_time = steady_clock::now() - session_begin_; auto session_time_us = std::chrono::duration_cast(session_time).count(); - plugin_state_->logger.log(FC_LOG_MESSAGE(all, "session time ${t}", ("t", session_time_us))); - plugin_state_->logger.log(FC_LOG_MESSAGE(all, " read ${t}", ("t", read_time_us_))); - plugin_state_->logger.log(FC_LOG_MESSAGE(all, " handle ${t}", ("t", handle_time_us_))); - plugin_state_->logger.log(FC_LOG_MESSAGE(all, " write ${t}", ("t", write_time_us_))); + plugin_state_.get_logger().log(FC_LOG_MESSAGE(all, "session time ${t}", ("t", session_time_us))); + plugin_state_.get_logger().log(FC_LOG_MESSAGE(all, " read ${t}", ("t", read_time_us_))); + plugin_state_.get_logger().log(FC_LOG_MESSAGE(all, " handle ${t}", ("t", handle_time_us_))); + plugin_state_.get_logger().log(FC_LOG_MESSAGE(all, " write ${t}", ("t", write_time_us_))); } } @@ -313,7 +298,7 @@ class beast_http_session : public detail::abstract_conn, if(ec == http::error::end_of_stream) // other side closed the connection return do_eof(); - return fail(ec, "read_header", plugin_state_->logger, "closing connection"); + return fail(ec, "read_header", plugin_state_.get_logger(), "closing connection"); } // Check for the Expect field value @@ -321,7 +306,7 @@ class beast_http_session : public detail::abstract_conn, bool do_continue = true; auto sv = req_parser_->get()[http::field::content_length]; if (uint64_t sz; !sv.empty() && std::from_chars(sv.data(), sv.data() + sv.size(), sz).ec == std::errc() && - sz > plugin_state_->max_body_size) { + sz > plugin_state_.max_body_size) { do_continue = false; } send_100_continue_response(do_continue); @@ -353,7 +338,7 @@ class beast_http_session : public detail::abstract_conn, if(ec == http::error::end_of_stream || ec == asio::error::connection_reset) return do_eof(); - return fail(ec, "read", plugin_state_->logger, "closing connection"); + return fail(ec, "read", plugin_state_.get_logger(), "closing connection"); } auto req = req_parser_->release(); @@ -372,7 +357,7 @@ class beast_http_session : public detail::abstract_conn, boost::ignore_unused(bytes_transferred); if(ec) { - return fail(ec, "write", plugin_state_->logger, "closing connection"); + return fail(ec, "write", plugin_state_.get_logger(), "closing connection"); } auto dt = steady_clock::now() - write_begin_; @@ -405,7 +390,7 @@ class beast_http_session : public detail::abstract_conn, // create a new parser to clear state req_parser_.emplace(); - req_parser_->body_limit(plugin_state_->max_body_size); + req_parser_->body_limit(plugin_state_.max_body_size); // Read another request do_read_header(); @@ -420,26 +405,26 @@ class beast_http_session : public detail::abstract_conn, throw; } catch(const fc::exception& e) { err_str = e.to_detail_string(); - fc_elog(plugin_state_->logger, "fc::exception: ${w}", ("w", err_str)); + fc_elog(plugin_state_.get_logger(), "fc::exception: ${w}", ("w", err_str)); if( is_send_exception_response_ ) { error_results results{static_cast(http::status::internal_server_error), "Internal Service Error", error_results::error_info( e, http_plugin::verbose_errors() )}; - err_str = fc::json::to_string( results, fc::time_point::now().safe_add(plugin_state_->max_response_time) ); + err_str = fc::json::to_string( results, fc::time_point::now().safe_add(plugin_state_.max_response_time) ); } } catch(std::exception& e) { err_str = e.what(); - fc_elog(plugin_state_->logger, "std::exception: ${w}", ("w", err_str)); + fc_elog(plugin_state_.get_logger(), "std::exception: ${w}", ("w", err_str)); if( is_send_exception_response_ ) { error_results results{static_cast(http::status::internal_server_error), "Internal Service Error", error_results::error_info( fc::exception( FC_LOG_MESSAGE( error, err_str ) ), http_plugin::verbose_errors() )}; - err_str = fc::json::to_string( results, fc::time_point::now().safe_add(plugin_state_->max_response_time) ); + err_str = fc::json::to_string( results, fc::time_point::now().safe_add(plugin_state_.max_response_time) ); } } catch(...) { err_str = "Unknown exception"; - fc_elog(plugin_state_->logger, err_str); + fc_elog(plugin_state_.get_logger(), err_str); if( is_send_exception_response_ ) { error_results results{static_cast(http::status::internal_server_error), "Internal Service Error", @@ -450,10 +435,10 @@ class beast_http_session : public detail::abstract_conn, } } } catch (fc::timeout_exception& e) { - fc_elog( plugin_state_->logger, "Timeout exception ${te} attempting to handle exception: ${e}", ("te", e.to_detail_string())("e", err_str) ); + fc_elog( plugin_state_.get_logger(), "Timeout exception ${te} attempting to handle exception: ${e}", ("te", e.to_detail_string())("e", err_str) ); err_str = R"xxx({"message": "Internal Server Error"})xxx"; } catch (...) { - fc_elog( plugin_state_->logger, "Exception attempting to handle exception: ${e}", ("e", err_str) ); + fc_elog( plugin_state_.get_logger(), "Exception attempting to handle exception: ${e}", ("e", err_str) ); err_str = R"xxx({"message": "Internal Server Error"})xxx"; } @@ -469,11 +454,11 @@ class beast_http_session : public detail::abstract_conn, } void increment_bytes_in_flight(size_t sz) { - plugin_state_->bytes_in_flight += sz; + plugin_state_.bytes_in_flight += sz; } void decrement_bytes_in_flight(size_t sz) { - plugin_state_->bytes_in_flight -= sz; + plugin_state_.bytes_in_flight -= sz; } virtual void send_response(std::string&& json, unsigned int code) final { @@ -488,9 +473,9 @@ class beast_http_session : public detail::abstract_conn, res_->prepare_payload(); // Determine if we should close the connection after - bool close = !(plugin_state_->keep_alive) || res_->need_eof(); + bool close = !(plugin_state_.keep_alive) || res_->need_eof(); - fc_dlog( plugin_state_->logger, "Response: ${ep} ${b}", + fc_dlog( plugin_state_.get_logger(), "Response: ${ep} ${b}", ("ep", remote_endpoint_)("b", to_log_string(*res_)) ); // Write the response @@ -526,7 +511,7 @@ class beast_http_session : public detail::abstract_conn, bool allow_host(const http::request& req) { - if constexpr(std::is_same_v) { + if constexpr(std::is_same_v) { const std::string host_str(req["host"]); if (host_str != local_address_) return eosio::allow_host(host_str, socket_, plugin_state_); diff --git a/plugins/http_plugin/include/eosio/http_plugin/common.hpp b/plugins/http_plugin/include/eosio/http_plugin/common.hpp index 006aacddb5..5f12134086 100644 --- a/plugins/http_plugin/include/eosio/http_plugin/common.hpp +++ b/plugins/http_plugin/include/eosio/http_plugin/common.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -33,6 +34,7 @@ #include #include + namespace eosio { static uint16_t const uri_default_port = 80; /// Default port for wss:// @@ -135,6 +137,8 @@ struct http_plugin_state { fc::logger& logger; std::function update_metrics; + fc::logger& get_logger() { return logger; } + explicit http_plugin_state(fc::logger& log) : logger(log) {} @@ -148,8 +152,8 @@ struct http_plugin_state { * @param session_ptr - beast_http_session object on which to invoke send_response * @return lambda suitable for url_response_callback */ -inline auto make_http_response_handler(std::shared_ptr plugin_state, detail::abstract_conn_ptr session_ptr, http_content_type content_type) { - return [plugin_state{std::move(plugin_state)}, +inline auto make_http_response_handler(http_plugin_state& plugin_state, detail::abstract_conn_ptr session_ptr, http_content_type content_type) { + return [&plugin_state, session_ptr{std::move(session_ptr)}, content_type](int code, std::optional response) { auto payload_size = detail::in_flight_sizeof(response); if(auto error_str = session_ptr->verify_max_bytes_in_flight(payload_size); !error_str.empty()) { @@ -157,13 +161,13 @@ inline auto make_http_response_handler(std::shared_ptr plugin return; } - plugin_state->bytes_in_flight += payload_size; + plugin_state.bytes_in_flight += payload_size; // post back to an HTTP thread to allow the response handler to be called from any thread - boost::asio::post(plugin_state->thread_pool.get_executor(), - [plugin_state, session_ptr, code, payload_size, response = std::move(response), content_type]() { + boost::asio::post(plugin_state.thread_pool.get_executor(), + [&plugin_state, session_ptr, code, payload_size, response = std::move(response), content_type]() { try { - plugin_state->bytes_in_flight -= payload_size; + plugin_state.bytes_in_flight -= payload_size; if (response.has_value()) { std::string json = (content_type == http_content_type::plaintext) ? response->as_string() : fc::json::to_string(*response, fc::time_point::maximum()); if (auto error_str = session_ptr->verify_max_bytes_in_flight(json.size()); error_str.empty()) @@ -181,17 +185,6 @@ inline auto make_http_response_handler(std::shared_ptr plugin } -inline std::pair split_host_port(std::string_view endpoint) { - std::string::size_type colon_pos = endpoint.rfind(':'); - if(colon_pos != std::string::npos) { - auto port = endpoint.substr(colon_pos + 1); - auto hostname = (endpoint[0] == '[' && colon_pos >= 2) ? endpoint.substr( 1, colon_pos-2 ) : endpoint.substr( 0, colon_pos ); - return {std::string(hostname), std::string(port)}; - } else { - return {std::string(endpoint), {}}; - } -} - inline bool host_is_valid(const http_plugin_state& plugin_state, const std::string& header_host_port, const asio::ip::address& addr) { @@ -199,7 +192,7 @@ inline bool host_is_valid(const http_plugin_state& plugin_state, return true; } - auto [hostname, port] = split_host_port(header_host_port); + auto [hostname, port] = fc::split_host_port(header_host_port); boost::system::error_code ec; auto header_addr = boost::asio::ip::make_address(hostname, ec); if (ec) diff --git a/plugins/http_plugin/tests/unit_tests.cpp b/plugins/http_plugin/tests/unit_tests.cpp index b0c5e13748..f76d3b01d7 100644 --- a/plugins/http_plugin/tests/unit_tests.cpp +++ b/plugins/http_plugin/tests/unit_tests.cpp @@ -422,7 +422,7 @@ struct http_response_for { net::io_context ioc; http::response response; http_response_for(const char* addr, const char* path) { - auto [host, port] = eosio::split_host_port(addr); + auto [host, port] = fc::split_host_port(addr); // These objects perform our I/O tcp::resolver resolver(ioc); beast::tcp_stream stream(ioc); diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index a31265112b..5e873aa03f 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -318,7 +319,6 @@ namespace eosio { class net_plugin_impl : public std::enable_shared_from_this, public auto_bp_peering::bp_connection_manager { public: - unique_ptr acceptor; std::atomic current_connection_id{0}; unique_ptr< sync_manager > sync_master; @@ -419,8 +419,6 @@ namespace eosio { uint32_t get_chain_lib_num() const; uint32_t get_chain_head_num() const; - void start_listen_loop(); - void on_accepted_block_header( const block_state_ptr& bs ); void on_accepted_block( const block_state_ptr& bs ); @@ -706,8 +704,7 @@ namespace eosio { class connection : public std::enable_shared_from_this { public: explicit connection( const string& endpoint ); - connection(); - + explicit connection( tcp::socket&& socket ); ~connection() = default; bool start_session(); @@ -1000,6 +997,28 @@ namespace eosio { } }; + + + std::tuple split_host_port_type(const std::string& peer_add) { + // host:port:[|] + if (peer_add.empty()) return {}; + + string::size_type p = peer_add[0] == '[' ? peer_add.find(']') : 0; + if (p == string::npos) { + fc_wlog( logger, "Invalid peer address: ${peer}", ("peer", peer_add) ); + return {}; + } + string::size_type colon = peer_add.find(':', p); + string::size_type colon2 = peer_add.find(':', colon + 1); + string::size_type end = colon2 == string::npos + ? string::npos : peer_add.find_first_of( " :+=.,<>!$%^&(*)|-#@\t", colon2 + 1 ); // future proof by including most symbols without using regex + string host = (p > 0) ? peer_add.substr( 1, p-1 ) : peer_add.substr( 0, colon ); + string port = peer_add.substr( colon + 1, colon2 == string::npos ? string::npos : colon2 - (colon + 1)); + string type = colon2 == string::npos ? "" : end == string::npos ? + peer_add.substr( colon2 + 1 ) : peer_add.substr( colon2 + 1, end - (colon2 + 1) ); + return {std::move(host), std::move(port), std::move(type)}; + } + template void net_plugin_impl::for_each_connection( Function&& f ) const { @@ -1039,15 +1058,16 @@ namespace eosio { fc_ilog( logger, "created connection ${c} to ${n}", ("c", connection_id)("n", endpoint) ); } - connection::connection() + connection::connection(tcp::socket&& s) : peer_addr(), strand( my_impl->thread_pool.get_executor() ), - socket( new tcp::socket( my_impl->thread_pool.get_executor() ) ), + socket( new tcp::socket( std::move(s) ) ), connection_id( ++my_impl->current_connection_id ), response_expected_timer( my_impl->thread_pool.get_executor() ), last_handshake_recv(), last_handshake_sent() { + set_heartbeat_timeout(my_impl->heartbeat_timeout); fc_dlog( logger, "new connection object created" ); } @@ -1066,17 +1086,8 @@ namespace eosio { } // called from connection strand - void connection::set_connection_type( const string& peer_add ) { - // host:port:[|] - string::size_type colon = peer_add.find(':'); - string::size_type colon2 = peer_add.find(':', colon + 1); - string::size_type end = colon2 == string::npos - ? string::npos : peer_add.find_first_of( " :+=.,<>!$%^&(*)|-#@\t", colon2 + 1 ); // future proof by including most symbols without using regex - string host = peer_add.substr( 0, colon ); - string port = peer_add.substr( colon + 1, colon2 == string::npos ? string::npos : colon2 - (colon + 1)); - string type = colon2 == string::npos ? "" : end == string::npos ? - peer_add.substr( colon2 + 1 ) : peer_add.substr( colon2 + 1, end - (colon2 + 1) ); - + void connection::set_connection_type( const std::string& peer_add ) { + auto [host, port, type] = split_host_port_type(peer_add); if( type.empty() ) { fc_dlog( logger, "Setting connection ${c} type for: ${peer} to both transactions and blocks", ("c", connection_id)("peer", peer_add) ); connection_type = both; @@ -2434,17 +2445,14 @@ namespace eosio { } strand.post([c]() { - string::size_type colon = c->peer_address().find(':'); - string::size_type colon2 = c->peer_address().find(':', colon + 1); - string host = c->peer_address().substr( 0, colon ); - string port = c->peer_address().substr( colon + 1, colon2 == string::npos ? string::npos : colon2 - (colon + 1)); + auto [host, port, type] = split_host_port_type(c->peer_address()); c->set_connection_type( c->peer_address() ); auto resolver = std::make_shared( my_impl->thread_pool.get_executor() ); connection_wptr weak_conn = c; // Note: need to add support for IPv6 too - resolver->async_resolve( tcp::v4(), host, port, boost::asio::bind_executor( c->strand, - [resolver, weak_conn, host, port]( const boost::system::error_code& err, const tcp::resolver::results_type& endpoints ) { + resolver->async_resolve(host, port, boost::asio::bind_executor( c->strand, + [resolver, weak_conn, host = host, port = port]( const boost::system::error_code& err, const tcp::resolver::results_type& endpoints ) { auto c = weak_conn.lock(); if( !c ) return; if( !err ) { @@ -2482,85 +2490,70 @@ namespace eosio { } ) ); } - void net_plugin_impl::start_listen_loop() { - connection_ptr new_connection = std::make_shared(); - new_connection->connecting = true; - new_connection->strand.post( [this, new_connection = std::move( new_connection )](){ - acceptor->async_accept( *new_connection->socket, - boost::asio::bind_executor( new_connection->strand, [new_connection, socket=new_connection->socket, this]( boost::system::error_code ec ) { - if( !ec ) { - uint32_t visitors = 0; - uint32_t from_addr = 0; - boost::system::error_code rec; - const auto& paddr_add = socket->remote_endpoint( rec ).address(); - string paddr_str; - if( rec ) { - fc_elog( logger, "Error getting remote endpoint: ${m}", ("m", rec.message())); - } else { - paddr_str = paddr_add.to_string(); - for_each_connection( [&visitors, &from_addr, &paddr_str]( auto& conn ) { - if( conn->socket_is_open()) { - if( conn->peer_address().empty()) { - ++visitors; - std::lock_guard g_conn( conn->conn_mtx ); - if( paddr_str == conn->remote_endpoint_ip ) { - ++from_addr; - } - } - } - return true; - } ); - if( from_addr < max_nodes_per_host && (auto_bp_peering_enabled() || max_client_count == 0 || visitors < max_client_count)) { - fc_ilog( logger, "Accepted new connection: " + paddr_str ); - new_connection->set_heartbeat_timeout( heartbeat_timeout ); - if( new_connection->start_session()) { - std::lock_guard g_unique( connections_mtx ); - connections.insert( new_connection ); - } + struct p2p_listener : public fc::listener { + static constexpr uint32_t accept_timeout_ms = 100; + eosio::net_plugin_impl* state_; - } else { - if( from_addr >= max_nodes_per_host ) { - fc_dlog( logger, "Number of connections (${n}) from ${ra} exceeds limit ${l}", - ("n", from_addr + 1)( "ra", paddr_str )( "l", max_nodes_per_host )); - } else { - fc_dlog( logger, "max_client_count ${m} exceeded", ("m", max_client_count)); + p2p_listener(boost::asio::io_context& executor, fc::logger& logger, const std::string& local_address, + const tcp::endpoint& endpoint, eosio::net_plugin_impl* impl) + : fc::listener(executor, logger, boost::posix_time::milliseconds(accept_timeout_ms), + local_address, endpoint), + state_(impl) {} + + std::string extra_listening_log_info() { + return ", max clients is " + std::to_string(state_->max_client_count); + } + + void create_session(tcp::socket&& socket) { + uint32_t visitors = 0; + uint32_t from_addr = 0; + boost::system::error_code rec; + const auto& paddr_add = socket.remote_endpoint(rec).address(); + string paddr_str; + if (rec) { + fc_elog(logger, "Error getting remote endpoint: ${m}", ("m", rec.message())); + } else { + paddr_str = paddr_add.to_string(); + state_->for_each_connection([&visitors, &from_addr, &paddr_str](auto& conn) { + if (conn->socket_is_open()) { + if (conn->peer_address().empty()) { + ++visitors; + std::lock_guard g_conn(conn->conn_mtx); + if (paddr_str == conn->remote_endpoint_ip) { + ++from_addr; } - // new_connection never added to connections and start_session not called, lifetime will end - boost::system::error_code ec; - socket->shutdown( tcp::socket::shutdown_both, ec ); - socket->close( ec ); } } - } else { - fc_elog( logger, "Error accepting connection: ${m}", ("m", ec.message())); - // For the listed error codes below, recall start_listen_loop() - switch (ec.value()) { - case EMFILE: // same as boost::system::errc::too_many_files_open - { - // no file descriptors available to accept the connection. Wait on async_timer - // and retry listening using shorter 100ms timer than SHiP or http_plugin - // as net_pluging is more critical - accept_error_timer.expires_from_now(boost::posix_time::milliseconds(100)); - accept_error_timer.async_wait([this]( const boost::system::error_code &ec) { - if (!ec) - start_listen_loop(); - }); - return; // wait for timer!! + return true; + }); + if (from_addr < state_->max_nodes_per_host && + (state_->auto_bp_peering_enabled() || state_->max_client_count == 0 || + visitors < state_->max_client_count)) { + fc_ilog(logger, "Accepted new connection: " + paddr_str); + + connection_ptr new_connection = std::make_shared(std::move(socket)); + new_connection->strand.post([new_connection, state = state_]() { + if (new_connection->start_session()) { + std::lock_guard g_unique(state->connections_mtx); + state->connections.insert(new_connection); } - case ECONNABORTED: - case ENFILE: - case ENOBUFS: - case ENOMEM: - case EPROTO: - break; - default: - return; + }); + + } else { + if (from_addr >= state_->max_nodes_per_host) { + fc_dlog(logger, "Number of connections (${n}) from ${ra} exceeds limit ${l}", + ("n", from_addr + 1)("ra", paddr_str)("l", state_->max_nodes_per_host)); + } else { + fc_dlog(logger, "max_client_count ${m} exceeded", ("m", state_->max_client_count)); } + // new_connection never added to connections and start_session not called, lifetime will end + boost::system::error_code ec; + socket.shutdown(tcp::socket::shutdown_both, ec); + socket.close(ec); } - start_listen_loop(); - })); - } ); - } + } + } + }; // only called from strand thread void connection::start_read_message() { @@ -2859,12 +2852,6 @@ namespace eosio { } thread_pool.stop(); - - if( acceptor ) { - boost::system::error_code ec; - acceptor->cancel( ec ); - acceptor->close( ec ); - } } // call only from main application thread @@ -2968,8 +2955,8 @@ namespace eosio { my_impl->mark_bp_connection(this); if (my_impl->exceeding_connection_limit(this)) { - // When auto bp peering is enabled, the start_listen_loop check doesn't have enough information to determine - // if a client is a BP peer. In start_listen_loop, it only has the peer address which a node is connecting + // When auto bp peering is enabled, the p2p_listener check doesn't have enough information to determine + // if a client is a BP peer. In p2p_listener, it only has the peer address which a node is connecting // from, but it would be different from the address it is listening. The only way to make sure is when the // first handshake message is received with the p2p_address information in the message. Thus the connection // limit checking has to be here when auto bp peering is enabled. @@ -2979,7 +2966,9 @@ namespace eosio { } if( peer_address().empty() ) { - set_connection_type( msg.p2p_address ); + auto [host, port, type] = split_host_port_type(msg.p2p_address); + if (host.size()) + set_connection_type( msg.p2p_address ); } g_conn.lock(); @@ -3943,31 +3932,23 @@ namespace eosio { "***********************************\n" ); } - tcp::endpoint listen_endpoint; - if( !my->p2p_address.empty() ) { - auto host = my->p2p_address.substr( 0, my->p2p_address.find( ':' )); - auto port = my->p2p_address.substr( host.size() + 1, my->p2p_address.size()); - tcp::resolver resolver( my->thread_pool.get_executor() ); - // Note: need to add support for IPv6 too? - listen_endpoint = *resolver.resolve( tcp::v4(), host, port ); - - my->acceptor = std::make_unique( my_impl->thread_pool.get_executor() ); + std::string listen_address = my->p2p_address; + if( !my->p2p_address.empty() ) { + auto [host, port] = fc::split_host_port(listen_address); + if( !my->p2p_server_address.empty() ) { my->p2p_address = my->p2p_server_address; - } else { - if( listen_endpoint.address().to_v4() == address_v4::any()) { - boost::system::error_code ec; - host = host_name( ec ); - if( ec.value() != boost::system::errc::success ) { + } else if( host.empty() || host == "0.0.0.0" || host == "[::]") { + boost::system::error_code ec; + auto hostname = host_name( ec ); + if( ec.value() != boost::system::errc::success ) { - FC_THROW_EXCEPTION( fc::invalid_arg_exception, - "Unable to retrieve host_name. ${msg}", ("msg", ec.message())); + FC_THROW_EXCEPTION( fc::invalid_arg_exception, + "Unable to retrieve host_name. ${msg}", ("msg", ec.message())); - } - port = my->p2p_address.substr( my->p2p_address.find( ':' ), my->p2p_address.size()); - my->p2p_address = host + port; } + my->p2p_address = hostname + ":" + port; } } @@ -3993,21 +3974,16 @@ namespace eosio { my->incoming_transaction_ack_subscription = app().get_channel().subscribe( [me = my.get()](auto&& t) { me->transaction_ack(std::forward(t)); }); - app().executor().post(priority::highest, [my=my, listen_endpoint](){ - if( my->acceptor ) { + app().executor().post(priority::highest, [my=my, address = std::move(listen_address)](){ + if (address.size()) { try { - my->acceptor->open(listen_endpoint.protocol()); - my->acceptor->set_option(tcp::acceptor::reuse_address(true)); - my->acceptor->bind(listen_endpoint); - my->acceptor->listen(); + p2p_listener::create(my->thread_pool.get_executor(), logger, address, my.get()); } catch (const std::exception& e) { - fc_elog( logger, "net_plugin::plugin_startup failed to bind to port ${port}, ${what}", - ("port", listen_endpoint.port())("what", e.what()) ); + fc_elog( logger, "net_plugin::plugin_startup failed to listen on ${addr}, ${what}", + ("addr", address)("what", e.what()) ); app().quit(); return; } - fc_ilog( logger, "starting listener, max clients is ${mc}",("mc",my->max_client_count) ); - my->start_listen_loop(); } my->ticker(); diff --git a/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp b/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp index 2a20f9f059..8d3acac35b 100644 --- a/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp +++ b/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp @@ -289,7 +289,7 @@ class blocks_result_send_queue_entry : public send_queue_entry_base, public std: template struct session : session_base, std::enable_shared_from_this> { private: - Plugin plugin; + Plugin& plugin; session_manager& session_mgr; std::optional> socket_stream; // ship thread only after creation std::string description; @@ -306,16 +306,16 @@ struct session : session_base, std::enable_shared_from_thisdefault_frame_size) { + , default_frame_size(plugin.default_frame_size) { description = to_description_string(); } void start() { - fc_ilog(plugin->logger(), "incoming connection from ${a}", ("a", description)); + fc_ilog(plugin.get_logger(), "incoming connection from ${a}", ("a", description)); socket_stream->auto_fragment(false); socket_stream->binary(true); if constexpr (std::is_same_v) { @@ -373,7 +373,7 @@ struct session : session_base, std::enable_shared_from_this& buf) { if (result.traces.has_value()) { - auto& optional_log = plugin->get_trace_log(); + auto& optional_log = plugin.get_trace_log(); if( optional_log ) { buf.emplace( optional_log->create_locked_decompress_stream() ); return optional_log->get_unpacked_entry( result.this_block->block_num, *buf ); @@ -385,7 +385,7 @@ struct session : session_base, std::enable_shared_from_this& buf) { if (result.deltas.has_value()) { - auto& optional_log = plugin->get_chain_state_log(); + auto& optional_log = plugin.get_chain_state_log(); if( optional_log ) { buf.emplace( optional_log->create_locked_decompress_stream() ); return optional_log->get_unpacked_entry( result.this_block->block_num, *buf ); @@ -395,7 +395,7 @@ struct session : session_base, std::enable_shared_from_thislogger(), "received get_status_request_v0"); + fc_dlog(plugin.get_logger(), "received get_status_request_v0"); auto self = this->shared_from_this(); auto entry_ptr = std::make_unique>(self); @@ -403,7 +403,7 @@ struct session : session_base, std::enable_shared_from_thislogger(), "received get_blocks_request_v0 = ${req}", ("req", req)); + fc_dlog(plugin.get_logger(), "received get_blocks_request_v0 = ${req}", ("req", req)); auto self = this->shared_from_this(); auto entry_ptr = std::make_unique>(self, std::move(req)); @@ -411,9 +411,9 @@ struct session : session_base, std::enable_shared_from_thislogger(), "received get_blocks_ack_request_v0 = ${req}", ("req", req)); + fc_dlog(plugin.get_logger(), "received get_blocks_ack_request_v0 = ${req}", ("req", req)); if (!current_request) { - fc_dlog(plugin->logger(), " no current get_blocks_request_v0, discarding the get_blocks_ack_request_v0"); + fc_dlog(plugin.get_logger(), " no current get_blocks_request_v0, discarding the get_blocks_ack_request_v0"); return; } @@ -423,48 +423,48 @@ struct session : session_base, std::enable_shared_from_thislogger(), "replying get_status_request_v0"); + fc_dlog(plugin.get_logger(), "replying get_status_request_v0"); state_history::get_status_result_v0 result; - result.head = plugin->get_block_head(); - result.last_irreversible = plugin->get_last_irreversible(); - result.chain_id = plugin->get_chain_id(); - auto&& trace_log = plugin->get_trace_log(); + result.head = plugin.get_block_head(); + result.last_irreversible = plugin.get_last_irreversible(); + result.chain_id = plugin.get_chain_id(); + auto&& trace_log = plugin.get_trace_log(); if (trace_log) { auto r = trace_log->block_range(); result.trace_begin_block = r.first; result.trace_end_block = r.second; } - auto&& chain_state_log = plugin->get_chain_state_log(); + auto&& chain_state_log = plugin.get_chain_state_log(); if (chain_state_log) { auto r = chain_state_log->block_range(); result.chain_state_begin_block = r.first; result.chain_state_end_block = r.second; } - fc_dlog(plugin->logger(), "pushing get_status_result_v0 to send queue"); + fc_dlog(plugin.get_logger(), "pushing get_status_result_v0 to send queue"); return result; } void update_current_request(state_history::get_blocks_request_v0& req) { - fc_dlog(plugin->logger(), "replying get_blocks_request_v0 = ${req}", ("req", req)); + fc_dlog(plugin.get_logger(), "replying get_blocks_request_v0 = ${req}", ("req", req)); to_send_block_num = req.start_block_num; for (auto& cp : req.have_positions) { if (req.start_block_num <= cp.block_num) continue; - auto id = plugin->get_block_id(cp.block_num); + auto id = plugin.get_block_id(cp.block_num); if (!id || *id != cp.block_id) req.start_block_num = std::min(req.start_block_num, cp.block_num); if (!id) { to_send_block_num = std::min(to_send_block_num, cp.block_num); - fc_dlog(plugin->logger(), "block ${block_num} is not available", ("block_num", cp.block_num)); + fc_dlog(plugin.get_logger(), "block ${block_num} is not available", ("block_num", cp.block_num)); } else if (*id != cp.block_id) { to_send_block_num = std::min(to_send_block_num, cp.block_num); - fc_dlog(plugin->logger(), "the id for block ${block_num} in block request have_positions does not match the existing", + fc_dlog(plugin.get_logger(), "the id for block ${block_num} in block request have_positions does not match the existing", ("block_num", cp.block_num)); } } - fc_dlog(plugin->logger(), " get_blocks_request_v0 start_block_num set to ${num}", ("num", to_send_block_num)); + fc_dlog(plugin.get_logger(), " get_blocks_request_v0 start_block_num set to ${num}", ("num", to_send_block_num)); if( !req.have_positions.empty() ) { position_it = req.have_positions.begin(); @@ -480,18 +480,18 @@ struct session : session_base, std::enable_shared_from_thisget_last_irreversible(); + result.last_irreversible = plugin.get_last_irreversible(); uint32_t current = current_request->irreversible_only ? result.last_irreversible.block_num : result.head.block_num; if (to_send_block_num > current || to_send_block_num >= current_request->end_block_num) { - fc_dlog( plugin->logger(), "Not sending, to_send_block_num: ${s}, current: ${c} current_request.end_block_num: ${b}", + fc_dlog( plugin.get_logger(), "Not sending, to_send_block_num: ${s}, current: ${c} current_request.end_block_num: ${b}", ("s", to_send_block_num)("c", current)("b", current_request->end_block_num) ); session_mgr.pop_entry(false); return; } - auto block_id = plugin->get_block_id(to_send_block_num); + auto block_id = plugin.get_block_id(to_send_block_num); if (block_id && position_it && (*position_it)->block_num == to_send_block_num) { // This branch happens when the head block of nodeos is behind the head block of connecting client. @@ -512,22 +512,22 @@ struct session : session_base, std::enable_shared_from_thisget_block_id(to_send_block_num - 1); + auto prev_block_id = plugin.get_block_id(to_send_block_num - 1); if (prev_block_id) result.prev_block = state_history::block_position{to_send_block_num - 1, *prev_block_id}; if (current_request->fetch_block) - plugin->get_block(to_send_block_num, block_state, result.block); - if (current_request->fetch_traces && plugin->get_trace_log()) + plugin.get_block(to_send_block_num, block_state, result.block); + if (current_request->fetch_traces && plugin.get_trace_log()) result.traces.emplace(); - if (current_request->fetch_deltas && plugin->get_chain_state_log()) + if (current_request->fetch_deltas && plugin.get_chain_state_log()) result.deltas.emplace(); } ++to_send_block_num; // during syncing if block is older than 5 min, log every 1000th block - bool fresh_block = fc::time_point::now() - plugin->get_head_block_timestamp() < fc::minutes(5); + bool fresh_block = fc::time_point::now() - plugin.get_head_block_timestamp() < fc::minutes(5); if (fresh_block || (result.this_block && result.this_block->block_num % 1000 == 0)) { - fc_ilog(plugin->logger(), + fc_ilog(plugin.get_logger(), "pushing result " "{\"head\":{\"block_num\":${head}},\"last_irreversible\":{\"block_num\":${last_irr}},\"this_block\":{" "\"block_num\":${this_block}}} to send queue", @@ -557,7 +557,7 @@ struct session : session_base, std::enable_shared_from_thisget_block_head(); + result.head = plugin.get_block_head(); send_update(std::move(result), {}); } else { session_mgr.pop_entry(false); @@ -571,26 +571,26 @@ struct session : session_base, std::enable_shared_from_thislogger(), "${e}", ("e", e.to_detail_string()) ); + fc_elog( plugin.get_logger(), "${e}", ("e", e.to_detail_string()) ); } catch( const std::exception& e ) { - fc_elog( plugin->logger(), "${e}", ("e", e.what()) ); + fc_elog( plugin.get_logger(), "${e}", ("e", e.what()) ); } catch( ... ) { - fc_elog( plugin->logger(), "unknown exception" ); + fc_elog( plugin.get_logger(), "unknown exception" ); } } else { if (ec == boost::asio::error::operation_aborted || ec == boost::asio::error::connection_reset || ec == boost::asio::error::eof || ec == boost::beast::websocket::error::closed) { - fc_dlog(plugin->logger(), "${w}: ${m}", ("w", what)("m", ec.message())); + fc_dlog(plugin.get_logger(), "${w}: ${m}", ("w", what)("m", ec.message())); } else { - fc_elog(plugin->logger(), "${w}: ${m}", ("w", what)("m", ec.message())); + fc_elog(plugin.get_logger(), "${w}: ${m}", ("w", what)("m", ec.message())); } } // on exception allow session to be destroyed - fc_ilog(plugin->logger(), "Closing connection from ${a}", ("a", description)); + fc_ilog(plugin.get_logger(), "Closing connection from ${a}", ("a", description)); session_mgr.remove( this->shared_from_this(), active_entry ); } }; diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index e749987891..f783693193 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -7,8 +7,8 @@ #include #include #include -#include #include +#include #include #include @@ -19,17 +19,17 @@ #include #include +#include namespace ws = boost::beast::websocket; - namespace eosio { using namespace chain; using namespace state_history; using boost::signals2::scoped_connection; namespace bio = boost::iostreams; - static auto _state_history_plugin = application::register_plugin(); +static auto _state_history_plugin = application::register_plugin(); const std::string logger_name("state_history"); fc::logger _log; @@ -56,38 +56,22 @@ struct state_history_plugin_impl : std::enable_shared_from_this block_start_connection; std::optional accepted_block_connection; string endpoint_address; - uint16_t endpoint_port = 8080; string unix_path; state_history::trace_converter trace_converter; session_manager session_mgr; mutable std::mutex mtx; - block_id_type head_id; - block_id_type lib_id; - time_point head_timestamp; - - constexpr static uint64_t default_frame_size = 1024 * 1024; + block_id_type head_id; + block_id_type lib_id; + time_point head_timestamp; - template - struct generic_acceptor { - using socket_type = typename ACCEPTOR::protocol_type::socket; - explicit generic_acceptor(boost::asio::io_context& ioc) : acceptor_(ioc), socket_(ioc), error_timer_(ioc) {} - ACCEPTOR acceptor_; - socket_type socket_; - boost::asio::deadline_timer error_timer_; - }; - - using tcp_acceptor = generic_acceptor; - using unix_acceptor = generic_acceptor; - - using acceptor_type = std::variant, std::unique_ptr>; - std::set acceptors; + constexpr static uint64_t default_frame_size = 1024 * 1024; named_thread_pool thread_pool; bool plugin_started = false; - static fc::logger& logger() { return _log; } + static fc::logger& get_logger() { return _log; } std::optional& get_trace_log() { return trace_log; } std::optional& get_chain_state_log(){ return chain_state_log; } @@ -151,97 +135,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this(thread_pool.get_executor())); }; - auto init_unix_acceptor = [&]() { - // take a sniff and see if anything is already listening at the given socket path, or if the socket path exists - // but nothing is listening - { - boost::system::error_code test_ec; - boost::asio::local::stream_protocol::socket test_socket(app().get_io_service()); - test_socket.connect(unix_path.c_str(), test_ec); - - // looks like a service is already running on that socket, don't touch it... fail out - if (test_ec == boost::system::errc::success) - ec = boost::system::errc::make_error_code(boost::system::errc::address_in_use); - // socket exists but no one home, go ahead and remove it and continue on - else if (test_ec == boost::system::errc::connection_refused) - ::unlink(unix_path.c_str()); - else if (test_ec != boost::system::errc::no_such_file_or_directory) - ec = test_ec; - } - check_ec("open"); - acceptors.insert(std::make_unique(thread_pool.get_executor())); - }; - - // create and configure acceptors, can be both - if (!endpoint_address.empty()) init_tcp_acceptor(); - if (!unix_path.empty()) init_unix_acceptor(); - - // start it - std::for_each(acceptors.begin(), acceptors.end(), [&](const acceptor_type& acc) { - std::visit(overloaded{[&](const std::unique_ptr& tcp_acc) { - auto address = boost::asio::ip::make_address(endpoint_address); - auto endpoint = boost::asio::ip::tcp::endpoint{address, endpoint_port}; - tcp_acc->acceptor_.open(endpoint.protocol(), ec); - check_ec("open"); - tcp_acc->acceptor_.set_option(boost::asio::socket_base::reuse_address(true)); - tcp_acc->acceptor_.bind(endpoint, ec); - check_ec("bind"); - tcp_acc->acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec); - check_ec("listen"); - do_accept(*tcp_acc); - }, - [&](const std::unique_ptr& unx_acc) { - unx_acc->acceptor_.open(boost::asio::local::stream_protocol::acceptor::protocol_type(), ec); - check_ec("open"); - unx_acc->acceptor_.bind(unix_path.c_str(), ec); - check_ec("bind"); - unx_acc->acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec); - check_ec("listen"); - do_accept(*unx_acc); - }}, - acc); - }); - } - - template - void do_accept(Acceptor& acc) { - // &acceptor kept alive by self, reference into acceptors set - acc.acceptor_.async_accept(acc.socket_, [self = shared_from_this(), &acc](const boost::system::error_code& ec) { - if (ec == boost::system::errc::too_many_files_open) { - fc_elog(_log, "ship accept() error: too many files open - waiting 200ms"); - acc.error_timer_.expires_from_now(boost::posix_time::milliseconds(200)); - acc.error_timer_.async_wait([self = self->shared_from_this(), &acc](const boost::system::error_code& ec) { - if (!ec) - catch_and_log([&] { self->do_accept(acc); }); - }); - } else { - if (ec) - fc_elog(_log, "ship accept() error: ${m} - closing connection", ("m", ec.message())); - else { - // Create a session object and run it - catch_and_log([&] { - auto s = std::make_shared, typename Acceptor::socket_type>>(self, std::move(acc.socket_), self->session_mgr); - self->session_mgr.insert(s); - s->start(); - }); - } - - // Accept another connection - catch_and_log([&] { self->do_accept(acc); }); - } - }); - } + void listen(); // called from main thread void on_applied_transaction(const transaction_trace_ptr& p, const packed_transaction_ptr& t) { @@ -326,21 +220,47 @@ struct state_history_plugin_impl : std::enable_shared_from_this& a ) { - boost::system::error_code ec; - if( const auto ep = a->acceptor_.local_endpoint( ec ); !ec ) - ::unlink( ep.path().c_str() ); - }, - []( const std::unique_ptr& a) {} - }, acc); - }); } -}; // state_history_plugin_impl +}; // state_history_plugin_impl +template +struct ship_listener : fc::listener, Protocol> { + using socket_type = typename Protocol::socket; + static constexpr uint32_t accept_timeout_ms = 200; + + state_history_plugin_impl& state_; + + ship_listener(boost::asio::io_context& executor, logger& logger, const std::string& local_address, + const typename Protocol::endpoint& endpoint, state_history_plugin_impl& state) + : fc::listener, Protocol>( + executor, logger, boost::posix_time::milliseconds(accept_timeout_ms), local_address, endpoint) + , state_(state) {} + + void create_session(socket_type&& socket) { + // Create a session object and run it + catch_and_log([&] { + auto s = std::make_shared>( + state_, std::move(socket), state_.session_mgr); + state_.session_mgr.insert(s); + s->start(); + }); + } +}; + +void state_history_plugin_impl::listen() { + try { + if (!endpoint_address.empty()) { + ship_listener::create(thread_pool.get_executor(), _log, endpoint_address, *this); + } + if (!unix_path.empty()) { + ship_listener::create(thread_pool.get_executor(), _log, unix_path, *this); + } + } catch (std::exception&) { + FC_THROW_EXCEPTION(plugin_exception, "unable to open listen socket"); + } +} state_history_plugin::state_history_plugin() : my(std::make_shared()) {} @@ -408,17 +328,7 @@ void state_history_plugin::plugin_initialize(const variables_map& options) { if (auto resmon_plugin = app().find_plugin()) resmon_plugin->monitor_directory(state_history_dir); - auto ip_port = options.at("state-history-endpoint").as(); - - if (!ip_port.empty()) { - auto port = ip_port.substr(ip_port.find(':') + 1, ip_port.size()); - auto host = ip_port.substr(0, ip_port.find(':')); - my->endpoint_address = host; - my->endpoint_port = std::stoi(port); - - fc_dlog(_log, "PLUGIN_INITIALIZE ${ip_port} ${host} ${port}", - ("ip_port", ip_port)("host", host)("port", port)); - } + my->endpoint_address = options.at("state-history-endpoint").as(); if (options.count("state-history-unix-socket-path")) { std::filesystem::path sock_path = options.at("state-history-unix-socket-path").as(); diff --git a/plugins/state_history_plugin/tests/session_test.cpp b/plugins/state_history_plugin/tests/session_test.cpp index abe08a7732..6dcc79d76e 100644 --- a/plugins/state_history_plugin/tests/session_test.cpp +++ b/plugins/state_history_plugin/tests/session_test.cpp @@ -23,6 +23,7 @@ #include #include #include +#include namespace beast = boost::beast; // from namespace http = beast::http; // from @@ -118,7 +119,9 @@ struct mock_state_history_plugin { log.emplace("ship", log_dir.path(), conf); } - fc::logger logger() { return fc::logger::get(DEFAULT_LOGGER); } + fc::logger logger = fc::logger::get(DEFAULT_LOGGER); + + fc::logger& get_logger() { return logger; } void get_block(uint32_t block_num, const eosio::chain::block_state_ptr& block_state, std::optional& result) const { @@ -140,80 +143,22 @@ struct mock_state_history_plugin { }; -using session_type = eosio::session; - -// Accepts incoming connections and launches the sessions -class listener : public std::enable_shared_from_this { - mock_state_history_plugin* server_; - tcp::acceptor acceptor_; - - public: - listener(mock_state_history_plugin* server, tcp::endpoint& endpoint) - : server_(server) - , acceptor_(server->ship_ioc) { - beast::error_code ec; - - // Open the acceptor - acceptor_.open(endpoint.protocol(), ec); - if (ec) { - fail(ec, "open"); - return; - } - - // Allow address reuse - acceptor_.set_option(net::socket_base::reuse_address(true), ec); - if (ec) { - fail(ec, "set_option"); - return; - } +using session_type = eosio::session; - // Bind to the server address - acceptor_.bind(endpoint, ec); - if (ec) { - fail(ec, "bind"); - return; - } +struct listener : fc::listener { + mock_state_history_plugin& server_; - endpoint = acceptor_.local_endpoint(ec); - if (ec) { - fail(ec, "local_endpoint"); - return; - } - - // Start listening for connections - acceptor_.listen(net::socket_base::max_listen_connections, ec); - if (ec) { - fail(ec, "listen"); - return; - } + listener(boost::asio::io_context& executor, fc::logger& logger, const std::string& local_address, + tcp::endpoint& endpoint, mock_state_history_plugin& server) + : fc::listener(executor, logger, boost::posix_time::milliseconds(100), local_address, endpoint) + , server_(server) { + endpoint = acceptor_.local_endpoint(); } - // Start accepting incoming connections - void run() { do_accept(); } - - private: - void do_accept() { - // The new connection gets its own strand - acceptor_.async_accept(boost::asio::make_strand(server_->ship_ioc), - [self = shared_from_this()](beast::error_code ec, boost::asio::ip::tcp::socket&& socket) { - if( self->server_->stopping ) return; - if (ec) { - fail(ec, "async_accept"); - } else { - self->on_accept( ec, std::move( socket ) ); - } - }); - } - - void on_accept(beast::error_code ec, tcp::socket&& socket) { - if (ec) { - fail(ec, "accept"); - } else { - // Create the session and run it - auto s = std::make_shared(server_, std::move(socket), server_->session_mgr); - s->start(); - server_->add_session(s); - } + void create_session(tcp::socket&& peer_socket) { + auto s = std::make_shared(server_, std::move(peer_socket), server_.session_mgr); + s->start(); + server_.add_session(s); } }; @@ -230,13 +175,14 @@ struct test_server : mock_state_history_plugin { threads.emplace_back([this]{ ship_ioc.run(); }); // Create and launch a listening port - std::make_shared(this, local_address)->run(); + std::make_shared(ship_ioc, logger, "", local_address, *this)->do_accept(); } ~test_server() { stopping = true; ship_ioc_work.reset(); main_ioc_work.reset(); + ship_ioc.stop(); for (auto& thr : threads) { thr.join(); @@ -664,4 +610,3 @@ BOOST_FIXTURE_TEST_CASE(test_session_fork, state_history_test_fixture) { } FC_LOG_AND_RETHROW() } -