From 75ee2e2a79c87e1ceacd8fadd804f4a8ccb7366b Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Tue, 16 May 2023 20:00:41 -0500 Subject: [PATCH 1/9] GH-1072 Refactor connections set into a connections_manager class --- .../eosio/net_plugin/auto_bp_peering.hpp | 10 +- plugins/net_plugin/net_plugin.cpp | 635 ++++++++++-------- .../tests/auto_bp_peering_unittest.cpp | 50 +- 3 files changed, 392 insertions(+), 303 deletions(-) diff --git a/plugins/net_plugin/include/eosio/net_plugin/auto_bp_peering.hpp b/plugins/net_plugin/include/eosio/net_plugin/auto_bp_peering.hpp index 7f138ef1d7..a394312669 100644 --- a/plugins/net_plugin/include/eosio/net_plugin/auto_bp_peering.hpp +++ b/plugins/net_plugin/include/eosio/net_plugin/auto_bp_peering.hpp @@ -145,7 +145,7 @@ class bp_connection_manager { // Only called from connection strand std::size_t num_established_clients() const { uint32_t num_clients = 0; - self()->for_each_connection([&num_clients](auto&& conn) { + self()->connections.for_each_connection([&num_clients](auto&& conn) { if (established_client_connection(conn)) { ++num_clients; } @@ -158,8 +158,8 @@ class bp_connection_manager { // This should only be called after the first handshake message is received to check if an incoming connection // has exceeded the pre-configured max_client_count limit. bool exceeding_connection_limit(Connection* new_connection) const { - return auto_bp_peering_enabled() && self()->max_client_count != 0 && - established_client_connection(new_connection) && num_established_clients() > self()->max_client_count; + return auto_bp_peering_enabled() && self()->connections.get_max_client_count() != 0 && + established_client_connection(new_connection) && num_established_clients() > self()->connections.get_max_client_count(); } // Only called from main thread @@ -182,7 +182,7 @@ class bp_connection_manager { fc_dlog(self()->get_logger(), "pending_downstream_neighbors: ${pending_downstream_neighbors}", ("pending_downstream_neighbors", to_string(pending_downstream_neighbors))); - for (auto neighbor : pending_downstream_neighbors) { self()->connect(config.bp_peer_addresses[neighbor]); } + for (auto neighbor : pending_downstream_neighbors) { self()->connections.connect(config.bp_peer_addresses[neighbor]); } pending_neighbors = std::move(pending_downstream_neighbors); finder.add_upstream_neighbors(pending_neighbors); @@ -222,7 +222,7 @@ class bp_connection_manager { std::back_inserter(peers_to_drop)); fc_dlog(self()->get_logger(), "peers to drop: ${peers_to_drop}", ("peers_to_drop", to_string(peers_to_drop))); - for (auto account : peers_to_drop) { self()->disconnect(config.bp_peer_addresses[account]); } + for (auto account : peers_to_drop) { self()->connections.disconnect(config.bp_peer_addresses[account]); } active_schedule_version = schedule.version; } } diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index a31265112b..d561c368fa 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -315,6 +315,86 @@ namespace eosio { constexpr uint32_t signed_block_which = fc::get_index(); // see protocol net_message constexpr uint32_t packed_transaction_which = fc::get_index(); // see protocol net_message + class connections_manager { + alignas(hardware_destructive_interference_size) + mutable std::shared_mutex connections_mtx; + std::set connections; + chain::flat_set supplied_peers; + + alignas(hardware_destructive_interference_size) + std::mutex connector_check_timer_mtx; + unique_ptr connector_check_timer; + + /// thread safe, only modified on startup + std::chrono::milliseconds heartbeat_timeout{def_keepalive_interval*2}; + fc::microseconds max_cleanup_time; + boost::asio::steady_timer::duration connector_period{0}; + uint32_t max_client_count{def_max_clients}; + std::function update_p2p_connection_metrics; + + private: // must call with held mutex + connection_ptr find_connection_impl(const string& host) const; + void add_impl(connection_ptr&& c); + void connect_impl(const string& peer); + + void connection_monitor(const std::weak_ptr& from_connection); + + public: + void add_supplied_peers(const vector& peers ) { + std::lock_guard g(connections_mtx); + supplied_peers.insert( peers.begin(), peers.end() ); + } + + // not thread safe, only call on startup + void init( std::chrono::milliseconds heartbeat_timeout_ms, + fc::microseconds conn_max_cleanup_time, + boost::asio::steady_timer::duration conn_period, + uint32_t maximum_client_count ) { + heartbeat_timeout = heartbeat_timeout_ms; + max_cleanup_time = conn_max_cleanup_time; + connector_period = conn_period; + max_client_count = maximum_client_count; + } + + uint32_t get_max_client_count() const { return max_client_count; } + fc::microseconds get_connector_period() const { + auto connector_period_us = std::chrono::duration_cast( connector_period ); + return fc::microseconds{ connector_period_us.count() }; + } + + void register_update_p2p_connection_metrics(std::function&& fun){ + update_p2p_connection_metrics = std::move(fun); + } + + void connect_supplied_peers() { + std::lock_guard g(connections_mtx); + for (const auto& seed_node : supplied_peers) { + connect_impl(seed_node); + } + } + + void start_conn_timer(); + void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr from_connection); + void stop_conn_timer(); + + void add(connection_ptr c); + string connect(const string& host); + string disconnect(const string& host); + void close_all(); + + std::optional status(const string& host) const; + vector connection_statuses() const; + + // return the next connection after current in collection that has blocks above + connection_ptr round_robin_next(const connection_ptr& current, uint32_t sync_known_lib_num) const; + + template + void for_each_connection(Function&& f) const; + + template + void for_each_block_connection(Function&& f) const; + }; + class net_plugin_impl : public std::enable_shared_from_this, public auto_bp_peering::bp_connection_manager { public: @@ -323,6 +403,7 @@ namespace eosio { unique_ptr< sync_manager > sync_master; unique_ptr< dispatch_manager > dispatcher; + connections_manager connections; /** * Thread safe, only updated in plugin initialize @@ -331,7 +412,6 @@ namespace eosio { string p2p_address; string p2p_server_address; - chain::flat_set supplied_peers; vector allowed_peers; ///< peer keys allowed to connect std::map private_keys; ///< overlapping with producer keys, also authenticating non-producing nodes @@ -343,21 +423,14 @@ namespace eosio { }; possible_connections allowed_connections{None}; - boost::asio::steady_timer::duration connector_period{0}; boost::asio::steady_timer::duration txn_exp_period{0}; boost::asio::steady_timer::duration resp_expected_period{0}; std::chrono::milliseconds keepalive_interval{std::chrono::milliseconds{def_keepalive_interval}}; - std::chrono::milliseconds heartbeat_timeout{keepalive_interval * 2}; - int max_cleanup_time_ms = 0; - uint32_t max_client_count = 0; uint32_t max_nodes_per_host = 1; bool p2p_accept_transactions = true; fc::microseconds p2p_dedup_cache_expire_time_us{}; - /// Peer clock may be no more than 1 second skewed from our clock, including network latency. - const std::chrono::system_clock::duration peer_authentication_interval{std::chrono::seconds{1}}; - chain_id_type chain_id; fc::sha256 node_id; string user_agent_name; @@ -367,15 +440,6 @@ namespace eosio { bool use_socket_read_watermark = false; /** @} */ - alignas(hardware_destructive_interference_size) - mutable std::shared_mutex connections_mtx; - std::set< connection_ptr > connections; // todo: switch to a thread safe container to avoid big mutex over complete collection - - alignas(hardware_destructive_interference_size) - std::mutex connector_check_timer_mtx; - unique_ptr connector_check_timer; - int connector_checks_in_flight{0}; - alignas(hardware_destructive_interference_size) std::mutex expire_timer_mtx; unique_ptr expire_timer; @@ -404,7 +468,6 @@ namespace eosio { }; - std::function update_p2p_connection_metrics; std::function increment_failed_p2p_connections; std::function increment_dropped_trxs; @@ -427,12 +490,10 @@ namespace eosio { void transaction_ack(const std::pair&); void on_irreversible_block( const block_state_ptr& block ); - void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr from_connection); void start_expire_timer(); void start_monitors(); void expire(); - void connection_monitor(const std::weak_ptr& from_connection, bool reschedule); /** \name Peer Timestamps * Time message handling * @{ @@ -467,13 +528,6 @@ namespace eosio { constexpr static uint16_t to_protocol_version(uint16_t v); - connection_ptr find_connection(const string& host)const; // must call with held mutex - string connect( const string& host ); - string disconnect( const string& host ); - - template - void for_each_connection(Function&& fun) const; - void plugin_shutdown(); bool in_sync() const; fc::logger& get_logger() { return logger; } @@ -943,7 +997,7 @@ namespace eosio { std::lock_guard g_conn( conn_mtx ); return !last_handshake_recv.p2p_address.empty(); } - }; + }; // class connection const string connection::unknown = ""; @@ -1002,7 +1056,7 @@ namespace eosio { template - void net_plugin_impl::for_each_connection( Function&& f ) const { + void connections_manager::for_each_connection( Function&& f ) const { std::shared_lock g( connections_mtx ); for( auto& c :connections ) { if( !f( c ) ) return; @@ -1010,14 +1064,9 @@ namespace eosio { } template - void for_each_connection( Function&& f ) { - my_impl->for_each_connection(std::forward(f)); - } - - template - void for_each_block_connection( Function f ) { - std::shared_lock g( my_impl->connections_mtx ); - for( auto& c : my_impl->connections ) { + void connections_manager::for_each_block_connection( Function&& f ) const { + std::shared_lock g( connections_mtx ); + for( auto& c : connections ) { if( c->is_transactions_only_connection() ) continue; if( !f( c ) ) return; } @@ -1176,7 +1225,7 @@ namespace eosio { self->closing = false; if( reconnect && !shutdown ) { - my_impl->start_conn_timer( std::chrono::milliseconds( 100 ), connection_wptr() ); + my_impl->connections.start_conn_timer( std::chrono::milliseconds( 100 ), connection_wptr() ); } } @@ -1706,7 +1755,7 @@ namespace eosio { // Closing connection, therefore its view of LIB can no longer be considered as we will no longer be connected. // Determine current LIB of remaining peers as our sync_known_lib_num. uint32_t highest_lib_num = 0; - for_each_block_connection( [&highest_lib_num]( const auto& cc ) { + my_impl->connections.for_each_block_connection( [&highest_lib_num]( const auto& cc ) { std::lock_guard g_conn( cc->conn_mtx ); if( cc->current() && cc->last_handshake_recv.last_irreversible_block_num > highest_lib_num ) { highest_lib_num = cc->last_handshake_recv.last_irreversible_block_num; @@ -1750,52 +1799,7 @@ namespace eosio { if (conn && conn->current() ) { new_sync_source = conn; } else { - std::shared_lock g( my_impl->connections_mtx ); - if( my_impl->connections.empty() ) { - new_sync_source.reset(); - } else if( my_impl->connections.size() == 1 ) { - if (!new_sync_source) { - new_sync_source = *my_impl->connections.begin(); - } - } else { - // init to a linear array search - auto cptr = my_impl->connections.begin(); - auto cend = my_impl->connections.end(); - // do we remember the previous source? - if (new_sync_source) { - //try to find it in the list - cptr = my_impl->connections.find( new_sync_source ); - cend = cptr; - if( cptr == my_impl->connections.end() ) { - //not there - must have been closed! cend is now connections.end, so just flatten the ring. - new_sync_source.reset(); - cptr = my_impl->connections.begin(); - } else { - //was found - advance the start to the next. cend is the old source. - if( ++cptr == my_impl->connections.end() && cend != my_impl->connections.end() ) { - cptr = my_impl->connections.begin(); - } - } - } - - //scan the list of peers looking for another able to provide sync blocks. - if( cptr != my_impl->connections.end() ) { - auto cstart_it = cptr; - do { - //select the first one which is current and has valid lib and break out. - if( !(*cptr)->is_transactions_only_connection() && (*cptr)->current() ) { - std::lock_guard g_conn( (*cptr)->conn_mtx ); - if( (*cptr)->last_handshake_recv.last_irreversible_block_num >= sync_known_lib_num ) { - new_sync_source = *cptr; - break; - } - } - if( ++cptr == my_impl->connections.end() ) - cptr = my_impl->connections.begin(); - } while( cptr != cstart_it ); - } - // no need to check the result, either source advanced or the whole list was checked and the old source is reused. - } + new_sync_source = my_impl->connections.round_robin_next(new_sync_source, sync_known_lib_num); } // verify there is an available source @@ -1833,7 +1837,7 @@ namespace eosio { // static, thread safe void sync_manager::send_handshakes() { - for_each_connection( []( auto& ci ) { + my_impl->connections.for_each_connection( []( auto& ci ) { if( ci->current() ) { ci->send_handshake(); } @@ -2005,7 +2009,7 @@ namespace eosio { bool sync_manager::verify_catchup(const connection_ptr& c, uint32_t num, const block_id_type& id) { request_message req; req.req_blocks.mode = catch_up; - for_each_block_connection( [num, &id, &req]( const auto& cc ) { + my_impl->connections.for_each_block_connection( [num, &id, &req]( const auto& cc ) { std::lock_guard g_conn( cc->conn_mtx ); if( cc->fork_head_num > num || cc->fork_head == id ) { req.req_blocks.mode = none; @@ -2125,7 +2129,7 @@ namespace eosio { block_id_type null_id; bool set_state_to_head_catchup = false; - for_each_block_connection( [&null_id, blk_num, &blk_id, &c, &set_state_to_head_catchup]( const auto& cp ) { + my_impl->connections.for_each_block_connection( [&null_id, blk_num, &blk_id, &c, &set_state_to_head_catchup]( const auto& cp ) { std::unique_lock g_cp_conn( cp->conn_mtx ); uint32_t fork_head_num = cp->fork_head_num; block_id_type fork_head_id = cp->fork_head; @@ -2258,7 +2262,7 @@ namespace eosio { block_buffer_factory buff_factory; const auto bnum = b->block_num(); - for_each_block_connection( [this, &id, &bnum, &b, &buff_factory]( auto& cp ) { + my_impl->connections.for_each_block_connection( [this, &id, &bnum, &b, &buff_factory]( auto& cp ) { fc_dlog( logger, "socket_is_open ${s}, connecting ${c}, syncing ${ss}, connection ${cid}", ("s", cp->socket_is_open())("c", cp->connecting.load())("ss", cp->syncing.load())("cid", cp->connection_id) ); if( !cp->current() ) return true; @@ -2307,7 +2311,7 @@ namespace eosio { void dispatch_manager::bcast_transaction(const packed_transaction_ptr& trx) { trx_buffer_factory buff_factory; const fc::time_point_sec now{fc::time_point::now()}; - for_each_connection( [this, &trx, &now, &buff_factory]( auto& cp ) { + my_impl->connections.for_each_connection( [this, &trx, &now, &buff_factory]( auto& cp ) { if( cp->is_blocks_only_connection() || !cp->current() ) { return true; } @@ -2370,7 +2374,7 @@ namespace eosio { } last_req = *c->last_req; } - for_each_block_connection( [this, &c, &last_req, &bid]( auto& conn ) { + my_impl->connections.for_each_block_connection( [this, &c, &last_req, &bid]( auto& conn ) { if( conn == c ) return true; @@ -2426,9 +2430,9 @@ namespace eosio { connection_ptr c = shared_from_this(); if( consecutive_immediate_connection_close > def_max_consecutive_immediate_connection_close || no_retry == benign_other ) { - auto connector_period_us = std::chrono::duration_cast( my_impl->connector_period ); + fc::microseconds connector_period = my_impl->connections.get_connector_period(); std::lock_guard g( c->conn_mtx ); - if( last_close == fc::time_point() || last_close > fc::time_point::now() - fc::microseconds( connector_period_us.count() ) ) { + if( last_close == fc::time_point() || last_close > fc::time_point::now() - connector_period ) { return true; // true so doesn't remove from valid connections } } @@ -2498,7 +2502,7 @@ namespace eosio { fc_elog( logger, "Error getting remote endpoint: ${m}", ("m", rec.message())); } else { paddr_str = paddr_add.to_string(); - for_each_connection( [&visitors, &from_addr, &paddr_str]( auto& conn ) { + my_impl->connections.for_each_connection( [&visitors, &from_addr, &paddr_str]( auto& conn ) { if( conn->socket_is_open()) { if( conn->peer_address().empty()) { ++visitors; @@ -2510,12 +2514,11 @@ namespace eosio { } return true; } ); + const uint32_t max_client_count = connections.get_max_client_count(); if( from_addr < max_nodes_per_host && (auto_bp_peering_enabled() || max_client_count == 0 || visitors < max_client_count)) { fc_ilog( logger, "Accepted new connection: " + paddr_str ); - new_connection->set_heartbeat_timeout( heartbeat_timeout ); if( new_connection->start_session()) { - std::lock_guard g_unique( connections_mtx ); - connections.insert( new_connection ); + connections.add( std::move(new_connection) ); } } else { @@ -2832,11 +2835,8 @@ namespace eosio { void net_plugin_impl::plugin_shutdown() { in_shutdown = true; - { - std::lock_guard g( connector_check_timer_mtx ); - if( connector_check_timer ) - connector_check_timer->cancel(); - } + + connections.stop_conn_timer(); { std::lock_guard g( expire_timer_mtx ); if( expire_timer ) @@ -2848,16 +2848,7 @@ namespace eosio { keepalive_timer->cancel(); } - { - fc_ilog( logger, "close ${s} connections", ("s", connections.size()) ); - std::lock_guard g( connections_mtx ); - for( auto& con : connections ) { - fc_dlog( logger, "close: ${cid}", ("cid", con->connection_id) ); - con->close( false, true ); - } - connections.clear(); - } - + connections.close_all(); thread_pool.stop(); if( acceptor ) { @@ -2973,8 +2964,8 @@ namespace eosio { // from, but it would be different from the address it is listening. The only way to make sure is when the // first handshake message is received with the p2p_address information in the message. Thus the connection // limit checking has to be here when auto bp peering is enabled. - fc_dlog(logger, "max_client_count ${m} exceeded", ("m", my_impl->max_client_count)); - my_impl->disconnect(peer_address()); + fc_dlog(logger, "max_client_count ${m} exceeded", ("m", my_impl->connections.get_max_client_count())); + my_impl->connections.disconnect(peer_address()); return; } @@ -2987,10 +2978,10 @@ namespace eosio { auto c_time = last_handshake_sent.time; g_conn.unlock(); peer_dlog( this, "checking for duplicate" ); - std::shared_lock g_cnts( my_impl->connections_mtx ); - for(const auto& check : my_impl->connections) { + bool dup = false; + my_impl->connections.for_each_connection([&](const auto& check) { if(check.get() == this) - continue; + return true; std::unique_lock g_check_conn( check->conn_mtx ); fc_dlog( logger, "dup check: connected ${c}, ${l} =? ${r}", ("c", check->connected())("l", check->last_handshake_recv.node_id)("r", msg.node_id) ); @@ -3004,31 +2995,35 @@ namespace eosio { auto check_time = check->last_handshake_sent.time + check->last_handshake_recv.time; g_check_conn.unlock(); if (msg.time + c_time <= check_time) - continue; + return true; } else if (net_version < proto_dup_node_id_goaway || msg.network_version < proto_dup_node_id_goaway) { if (my_impl->p2p_address < msg.p2p_address) { fc_dlog( logger, "my_impl->p2p_address '${lhs}' < msg.p2p_address '${rhs}'", ("lhs", my_impl->p2p_address)( "rhs", msg.p2p_address ) ); // only the connection from lower p2p_address to higher p2p_address will be considered as a duplicate, // so there is no chance for both connections to be closed - continue; + return true; } } else if (my_impl->node_id < msg.node_id) { fc_dlog( logger, "not duplicate, my_impl->node_id '${lhs}' < msg.node_id '${rhs}'", ("lhs", my_impl->node_id)("rhs", msg.node_id) ); // only the connection from lower node_id to higher node_id will be considered as a duplicate, // so there is no chance for both connections to be closed - continue; + return true; } - g_cnts.unlock(); - peer_dlog( this, "sending go_away duplicate, msg.p2p_address: ${add}", ("add", msg.p2p_address) ); - go_away_message gam(duplicate); - gam.node_id = conn_node_id; - enqueue(gam); - no_retry = duplicate; - return; + dup = true; + return false; } + return true; + }); + if (dup) { + peer_dlog( this, "sending go_away duplicate, msg.p2p_address: ${add}", ("add", msg.p2p_address) ); + go_away_message gam(duplicate); + gam.node_id = conn_node_id; + enqueue(gam); + no_retry = duplicate; + return; } } else { peer_dlog( this, "skipping duplicate check, addr == ${pa}, id = ${ni}", @@ -3463,28 +3458,6 @@ namespace eosio { } } - // called from any thread - void net_plugin_impl::start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr from_connection) { - if( in_shutdown ) return; - std::lock_guard g( connector_check_timer_mtx ); - ++connector_checks_in_flight; - connector_check_timer->expires_from_now( du ); - connector_check_timer->async_wait( [my = shared_from_this(), from_connection{std::move(from_connection)}](boost::system::error_code ec) mutable { - std::unique_lock g( my->connector_check_timer_mtx ); - int num_in_flight = --my->connector_checks_in_flight; - g.unlock(); - if( !ec ) { - my->connection_monitor(from_connection, num_in_flight == 0 ); - } else { - if( num_in_flight == 0 ) { - if( my->in_shutdown ) return; - fc_elog( logger, "Error from connection check monitor: ${m}", ("m", ec.message())); - my->start_conn_timer( my->connector_period, std::weak_ptr() ); - } - } - }); - } - // thread safe void net_plugin_impl::start_expire_timer() { if( in_shutdown ) return; @@ -3514,7 +3487,7 @@ namespace eosio { } tstamp current_time = connection::get_time(); - my->for_each_connection( [current_time]( auto& c ) { + my->connections.for_each_connection( [current_time]( auto& c ) { if( c->socket_is_open() ) { c->strand.post([c, current_time]() { c->check_heartbeat(current_time); @@ -3526,15 +3499,11 @@ namespace eosio { } void net_plugin_impl::start_monitors() { - { - std::lock_guard g( connector_check_timer_mtx ); - connector_check_timer = std::make_unique( my_impl->thread_pool.get_executor() ); - } { std::lock_guard g( expire_timer_mtx ); expire_timer = std::make_unique( my_impl->thread_pool.get_executor() ); } - start_conn_timer(connector_period, std::weak_ptr()); + connections.start_conn_timer(); start_expire_timer(); } @@ -3548,64 +3517,6 @@ namespace eosio { start_expire_timer(); } - // called from any thread - void net_plugin_impl::connection_monitor(const std::weak_ptr& from_connection, bool reschedule ) { - auto max_time = fc::time_point::now(); - max_time += fc::milliseconds(max_cleanup_time_ms); - auto from = from_connection.lock(); - std::unique_lock g( connections_mtx ); - auto it = (from ? connections.find(from) : connections.begin()); - if (it == connections.end()) it = connections.begin(); - size_t num_rm = 0, num_clients = 0, num_peers = 0, num_bp_peers = 0; - while (it != connections.end()) { - if (fc::time_point::now() >= max_time) { - connection_wptr wit = *it; - g.unlock(); - fc_dlog( logger, "Exiting connection monitor early, ran out of time: ${t}", ("t", max_time - fc::time_point::now()) ); - fc_ilog( logger, "p2p client connections: ${num}/${max}, peer connections: ${pnum}/${pmax}", - ("num", num_clients)("max", max_client_count)("pnum", num_peers)("pmax", supplied_peers.size()) ); - if( reschedule ) { - start_conn_timer( std::chrono::milliseconds( 1 ), wit ); // avoid exhausting - } - return; - } - if ((*it)->is_bp_connection) - ++num_bp_peers; - else if ((*it)->incoming()) - ++num_clients; - else - ++num_peers; - - if( !(*it)->socket_is_open() && !(*it)->connecting) { - if( !(*it)->incoming() ) { - if( !(*it)->resolve_and_connect() ) { - it = connections.erase(it); - --num_peers; ++num_rm; - continue; - } - } else { - --num_clients; ++num_rm; - it = connections.erase(it); - continue; - } - } - ++it; - } - g.unlock(); - - if (update_p2p_connection_metrics) { - update_p2p_connection_metrics({.num_peers = num_peers, .num_clients = num_clients}); - } - - if( num_clients > 0 || num_peers > 0 ) - fc_ilog( logger, "p2p client connections: ${num}/${max}, peer connections: ${pnum}/${pmax}, block producer peers: ${num_bp_peers}", - ("num", num_clients)("max", max_client_count)("pnum", num_peers)("pmax", supplied_peers.size())("num_bp_peers", num_bp_peers) ); - fc_dlog( logger, "connection monitor, removed ${n} connections", ("n", num_rm) ); - if( reschedule ) { - start_conn_timer( connector_period, std::weak_ptr()); - } - } - // called from application thread void net_plugin_impl::on_accepted_block_header(const block_state_ptr& bs) { update_chain_info(); @@ -3778,7 +3689,7 @@ namespace eosio { "Tuple of [PublicKey, WIF private key] (may specify multiple times)") ( "max-clients", bpo::value()->default_value(def_max_clients), "Maximum number of clients from which connections are accepted, use 0 for no limit") ( "connection-cleanup-period", bpo::value()->default_value(def_conn_retry_wait), "number of seconds to wait before cleaning up dead connections") - ( "max-cleanup-time-msec", bpo::value()->default_value(10), "max connection cleanup time per cleanup call in milliseconds") + ( "max-cleanup-time-msec", bpo::value()->default_value(10), "max connection cleanup time per cleanup call in milliseconds") ( "p2p-dedup-cache-expire-time-sec", bpo::value()->default_value(10), "Maximum time to track transaction for duplicate optimization") ( "net-threads", bpo::value()->default_value(my->thread_pool_size), "Number of worker threads in net_plugin thread pool" ) @@ -3814,12 +3725,9 @@ namespace eosio { my->sync_master = std::make_unique( options.at( "sync-fetch-span" ).as()); - my->connector_period = std::chrono::seconds( options.at( "connection-cleanup-period" ).as()); - my->max_cleanup_time_ms = options.at("max-cleanup-time-msec").as(); my->txn_exp_period = def_txn_expire_wait; my->p2p_dedup_cache_expire_time_us = fc::seconds( options.at( "p2p-dedup-cache-expire-time-sec" ).as() ); my->resp_expected_period = def_resp_expected_wait; - my->max_client_count = options.at( "max-clients" ).as(); my->max_nodes_per_host = options.at( "p2p-max-nodes-per-host" ).as(); my->p2p_accept_transactions = options.at( "p2p-accept-transactions" ).as(); @@ -3829,7 +3737,10 @@ namespace eosio { "p2p-keepalive_interval-ms must be greater than 0" ); if( options.count( "p2p-keepalive-interval-ms" )) { - my->heartbeat_timeout = std::chrono::milliseconds( options.at( "p2p-keepalive-interval-ms" ).as() * 2 ); + my->connections.init( std::chrono::milliseconds( options.at("p2p-keepalive-interval-ms").as() * 2 ), + fc::microseconds( options.at("max-cleanup-time-msec").as() ), + std::chrono::seconds( options.at("connection-cleanup-period").as() ), + options.at("max-clients").as() ); } if( options.count( "p2p-listen-endpoint" ) && options.at("p2p-listen-endpoint").as().length()) { @@ -3847,9 +3758,10 @@ namespace eosio { EOS_ASSERT( my->thread_pool_size > 0, chain::plugin_config_exception, "net-threads ${num} must be greater than 0", ("num", my->thread_pool_size) ); + std::vector peers; if( options.count( "p2p-peer-address" )) { - auto v = options.at( "p2p-peer-address" ).as >(); - my->supplied_peers.insert(v.begin(), v.end()); + peers = options.at( "p2p-peer-address" ).as>(); + my->connections.add_supplied_peers(peers); } if( options.count( "agent-name" )) { my->user_agent_name = options.at( "agent-name" ).as(); @@ -3859,8 +3771,8 @@ namespace eosio { if ( options.count( "p2p-auto-bp-peer")) { my->set_bp_peers(options.at( "p2p-auto-bp-peer" ).as>()); - my->for_each_bp_peer_address([this](const auto& addr) { - EOS_ASSERT(my->supplied_peers.count(addr) == 0, chain::plugin_config_exception, + my->for_each_bp_peer_address([&peers](const auto& addr) { + EOS_ASSERT(std::find(peers.begin(), peers.end(), addr) == peers.end(), chain::plugin_config_exception, "\"${addr}\" should only appear in either p2p-peer-address or p2p-auto-bp-peer option, not both.", ("addr",addr)); }); @@ -4006,16 +3918,14 @@ namespace eosio { app().quit(); return; } - fc_ilog( logger, "starting listener, max clients is ${mc}",("mc",my->max_client_count) ); + fc_ilog( logger, "starting listener, max clients is ${mc}",("mc",my->connections.get_max_client_count()) ); my->start_listen_loop(); } my->ticker(); my->start_monitors(); my->update_chain_info(); - for( const auto& seed_node : my->supplied_peers ) { - my->connect( seed_node ); - } + my->connections.connect_supplied_peers(); }); } catch( ... ) { @@ -4040,92 +3950,265 @@ namespace eosio { FC_CAPTURE_AND_RETHROW() } - /** - * Used to trigger a new connection from RPC API - */ + /// RPC API string net_plugin::connect( const string& host ) { - return my->connect( host ); + return my->connections.connect( host ); } - string net_plugin_impl::connect( const string& host ) { - std::lock_guard g( connections_mtx ); - if( find_connection( host ) ) - return "already connected"; + /// RPC API + string net_plugin::disconnect( const string& host ) { + return my->connections.disconnect(host); + } - connection_ptr c = std::make_shared( host ); - fc_dlog( logger, "calling active connector: ${h}", ("h", host) ); - if( c->resolve_and_connect() ) { - fc_dlog( logger, "adding new connection to the list: ${host} ${cid}", ("host", host)("cid", c->connection_id) ); - c->set_heartbeat_timeout( heartbeat_timeout ); - connections.insert( c ); + /// RPC API + std::optional net_plugin::status( const string& host )const { + return my->connections.status(host); + } + + /// RPC API + vector net_plugin::connections()const { + return my->connections.connection_statuses(); + } + + constexpr uint16_t net_plugin_impl::to_protocol_version(uint16_t v) { + if (v >= net_version_base) { + v -= net_version_base; + return (v > net_version_range) ? 0 : v; } + return 0; + } + + bool net_plugin_impl::in_sync() const { + return sync_master->is_in_sync(); + } + + void net_plugin::register_update_p2p_connection_metrics(std::function&& fun){ + my->connections.register_update_p2p_connection_metrics(std::move(fun)); + } + + void net_plugin::register_increment_failed_p2p_connections(std::function&& fun){ + my->increment_failed_p2p_connections = std::move(fun); + } + + void net_plugin::register_increment_dropped_trxs(std::function&& fun){ + my->increment_dropped_trxs = std::move(fun); + } + + //---------------------------------------------------------------------------- + + void connections_manager::add( connection_ptr c ) { + std::lock_guard g( connections_mtx ); + add_impl( std::move(c) ); + } + + string connections_manager::connect( const string& host ) { + std::lock_guard g( connections_mtx ); + if( find_connection_impl( host ) ) + return "already connected"; + + connect_impl( host ); return "added connection"; } - string net_plugin_impl::disconnect( const string& host ) { - std::lock_guard g( connections_mtx ); - for( auto itr = connections.begin(); itr != connections.end(); ++itr ) { - if( (*itr)->peer_address() == host ) { - fc_ilog( logger, "disconnecting: ${cid}", ("cid", (*itr)->connection_id) ); - (*itr)->close(); - connections.erase(itr); - return "connection removed"; - } + string connections_manager::disconnect( const string& host ) { + std::lock_guard g( connections_mtx ); + if( auto c = find_connection_impl( host ) ) { + fc_ilog( logger, "disconnecting: ${cid}", ("cid", c->connection_id) ); + c->close(); + connections.erase(c); + supplied_peers.erase(host); + return "connection removed"; } return "no known connection for host"; } - string net_plugin::disconnect( const string& host ) { - return my->disconnect(host); + void connections_manager::close_all() { + fc_ilog( logger, "close all ${s} connections", ("s", connections.size()) ); + std::lock_guard g( connections_mtx ); + for( auto& con : connections ) { + fc_dlog( logger, "close: ${cid}", ("cid", con->connection_id) ); + con->close( false, true ); + } + connections.clear(); } - std::optional net_plugin::status( const string& host )const { - std::shared_lock g( my->connections_mtx ); - auto con = my->find_connection( host ); - if( con ) + connection_ptr connections_manager::round_robin_next( const connection_ptr& current, uint32_t sync_known_lib_num ) const { + connection_ptr new_sync_source = current; + std::shared_lock g( connections_mtx ); + if( connections.empty() ) { + new_sync_source.reset(); + } else if( connections.size() == 1 ) { + if (!new_sync_source) { + new_sync_source = *connections.begin(); + } + } else { + // init to a linear array search + auto cptr = connections.begin(); + auto cend = connections.end(); + // do we remember the previous source? + if (current) { + //try to find it in the list + cptr = connections.find( current ); + cend = cptr; + if( cptr == connections.end() ) { + // not there - must have been closed! cend is now connections.end, so just flatten the ring. + new_sync_source.reset(); + cptr = connections.begin(); + } else { + // was found - advance the start to the next. cend is the old source. + if( ++cptr == connections.end() && cend != connections.end() ) { + cptr = connections.begin(); + } + } + } + + // scan the list of peers looking for another able to provide sync blocks. + if( cptr != connections.end() ) { + auto cstart_it = cptr; + do { + // select the first one which is current and has lib above current and break out. + if( !(*cptr)->is_transactions_only_connection() && (*cptr)->current() ) { + std::lock_guard g_conn( (*cptr)->conn_mtx ); + // TODO: change to a better heuristic than lib >= sync_known_lib_num + if( (*cptr)->last_handshake_recv.last_irreversible_block_num >= sync_known_lib_num ) { + new_sync_source = *cptr; + break; + } + } + if( ++cptr == connections.end() ) + cptr = connections.begin(); + } while( cptr != cstart_it ); + } + + // no need to check the result, either source advanced or the whole list was checked and the old source is reused. + } + return new_sync_source; + } + + std::optional connections_manager::status( const string& host )const { + std::shared_lock g( connections_mtx ); + auto con = find_connection_impl( host ); + if( con ) { return con->get_status(); + } return {}; } - vector net_plugin::connections()const { + vector connections_manager::connection_statuses()const { vector result; - std::shared_lock g( my->connections_mtx ); - result.reserve( my->connections.size() ); - for( const auto& c : my->connections ) { + std::shared_lock g( connections_mtx ); + result.reserve( connections.size() ); + for( const auto& c : connections ) { result.push_back( c->get_status() ); } return result; } // call with connections_mtx - connection_ptr net_plugin_impl::find_connection( const string& host )const { - for( const auto& c : connections ) - if( c->peer_address() == host ) return c; + connection_ptr connections_manager::find_connection_impl( const string& host )const { + for( const auto& c : connections ) { + if (c->peer_address() == host) + return c; + } return {}; } - constexpr uint16_t net_plugin_impl::to_protocol_version(uint16_t v) { - if (v >= net_version_base) { - v -= net_version_base; - return (v > net_version_range) ? 0 : v; + // call with connections_mtx + void connections_manager::connect_impl( const string& host ) { + supplied_peers.insert(host); + connection_ptr c = std::make_shared( host ); + fc_dlog( logger, "calling active connector: ${h}", ("h", host) ); + if( c->resolve_and_connect() ) { + fc_dlog( logger, "adding new connection to the list: ${host} ${cid}", ("host", host)("cid", c->connection_id) ); + add_impl( std::move(c) ); } - return 0; } - bool net_plugin_impl::in_sync() const { - return sync_master->is_in_sync(); + // call with connections_mtx + void connections_manager::add_impl(connection_ptr&& c) { + c->set_heartbeat_timeout( heartbeat_timeout ); + connections.insert( std::move(c) ); } - void net_plugin::register_update_p2p_connection_metrics(std::function&& fun){ - my->update_p2p_connection_metrics = std::move(fun); + // called from any thread + void connections_manager::start_conn_timer() { + start_conn_timer(connector_period, {}); } - void net_plugin::register_increment_failed_p2p_connections(std::function&& fun){ - my->increment_failed_p2p_connections = std::move(fun); + // called from any thread + void connections_manager::start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr from_connection) { + std::lock_guard g( connector_check_timer_mtx ); + if (!connector_check_timer) { + connector_check_timer = std::make_unique( my_impl->thread_pool.get_executor() ); + } + connector_check_timer->expires_from_now( du ); + connector_check_timer->async_wait( [this, from_connection{std::move(from_connection)}](boost::system::error_code ec) mutable { + if( !ec ) { + connection_monitor(from_connection); + } + }); } - void net_plugin::register_increment_dropped_trxs(std::function&& fun){ - my->increment_dropped_trxs = std::move(fun); + void connections_manager::stop_conn_timer() { + std::lock_guard g( connector_check_timer_mtx ); + if (connector_check_timer) { + connector_check_timer->cancel(); + } + } + + // called from any thread + void connections_manager::connection_monitor(const std::weak_ptr& from_connection) { + auto max_time = fc::time_point::now().safe_add(max_cleanup_time); + auto from = from_connection.lock(); + std::unique_lock g( connections_mtx ); + auto it = (from ? connections.find(from) : connections.begin()); + if (it == connections.end()) it = connections.begin(); + size_t num_rm = 0, num_clients = 0, num_peers = 0, num_bp_peers = 0; + while (it != connections.end()) { + if (fc::time_point::now() >= max_time) { + connection_wptr wit = *it; + g.unlock(); + fc_dlog( logger, "Exiting connection monitor early, ran out of time: ${t}", ("t", max_time - fc::time_point::now()) ); + fc_ilog( logger, "p2p client connections: ${num}/${max}, peer connections: ${pnum}/${pmax}", + ("num", num_clients)("max", max_client_count)("pnum", num_peers)("pmax", supplied_peers.size()) ); + start_conn_timer( std::chrono::milliseconds( 1 ), wit ); // avoid exhausting + return; + } + if ((*it)->is_bp_connection) { + ++num_bp_peers; + } else if ((*it)->incoming()) { + ++num_clients; + } else { + ++num_peers; + } + + if( !(*it)->socket_is_open() && !(*it)->connecting) { + if( !(*it)->incoming() ) { + if( !(*it)->resolve_and_connect() ) { + it = connections.erase(it); + --num_peers; ++num_rm; + continue; + } + } else { + --num_clients; ++num_rm; + it = connections.erase(it); + continue; + } + } + ++it; + } + g.unlock(); + + if (update_p2p_connection_metrics) { + update_p2p_connection_metrics({.num_peers = num_peers, .num_clients = num_clients}); + } + + if( num_clients > 0 || num_peers > 0 ) { + fc_ilog(logger, "p2p client connections: ${num}/${max}, peer connections: ${pnum}/${pmax}, block producer peers: ${num_bp_peers}", + ("num", num_clients)("max", max_client_count)("pnum", num_peers)("pmax", supplied_peers.size())("num_bp_peers", num_bp_peers)); + } + fc_dlog( logger, "connection monitor, removed ${n} connections", ("n", num_rm) ); + start_conn_timer( connector_period, {}); } -} +} // namespace eosio diff --git a/plugins/net_plugin/tests/auto_bp_peering_unittest.cpp b/plugins/net_plugin/tests/auto_bp_peering_unittest.cpp index 485234e4d8..93ac898a5b 100644 --- a/plugins/net_plugin/tests/auto_bp_peering_unittest.cpp +++ b/plugins/net_plugin/tests/auto_bp_peering_unittest.cpp @@ -14,13 +14,14 @@ struct mock_connection { using namespace eosio::chain::literals; using namespace std::literals::string_literals; -struct mock_net_plugin : eosio::auto_bp_peering::bp_connection_manager { - - uint32_t max_client_count; - bool is_in_sync = false; +struct mock_connections_manager { + uint32_t max_client_count = 0; std::vector connections; - bool in_sync() { return is_in_sync; } + std::function connect; + std::function disconnect; + + uint32_t get_max_client_count() const { return max_client_count; } template void for_each_connection(Function&& func) const { @@ -29,9 +30,14 @@ struct mock_net_plugin : eosio::auto_bp_peering::bp_connection_manager connect; - std::function disconnect; +struct mock_net_plugin : eosio::auto_bp_peering::bp_connection_manager { + + bool is_in_sync = false; + mock_connections_manager connections; + + bool in_sync() { return is_in_sync; } void setup_test_peers() { set_bp_peers({ "proda,127.0.0.1:8001:blk"s, "prodb,127.0.0.1:8002:trx"s, "prodc,127.0.0.1:8003"s, @@ -159,7 +165,7 @@ BOOST_AUTO_TEST_CASE(test_on_pending_schedule) { std::vector connected_hosts; - plugin.connect = [&connected_hosts](std::string host) { connected_hosts.push_back(host); }; + plugin.connections.connect = [&connected_hosts](std::string host) { connected_hosts.push_back(host); }; // make sure nothing happens when it is not in_sync plugin.is_in_sync = false; @@ -203,10 +209,10 @@ BOOST_AUTO_TEST_CASE(test_on_active_schedule1) { plugin.config.my_bp_accounts = { "prodd"_n, "produ"_n }; plugin.active_neighbors = { "proda"_n, "prodh"_n, "prodn"_n }; - plugin.connect = [](std::string host) {}; + plugin.connections.connect = [](std::string host) {}; std::vector disconnected_hosts; - plugin.disconnect = [&disconnected_hosts](std::string host) { disconnected_hosts.push_back(host); }; + plugin.connections.disconnect = [&disconnected_hosts](std::string host) { disconnected_hosts.push_back(host); }; // make sure nothing happens when it is not in_sync plugin.is_in_sync = false; @@ -239,9 +245,9 @@ BOOST_AUTO_TEST_CASE(test_on_active_schedule2) { plugin.config.my_bp_accounts = { "prodd"_n, "produ"_n }; plugin.active_neighbors = { "proda"_n, "prodh"_n, "prodn"_n }; - plugin.connect = [](std::string host) {}; + plugin.connections.connect = [](std::string host) {}; std::vector disconnected_hosts; - plugin.disconnect = [&disconnected_hosts](std::string host) { disconnected_hosts.push_back(host); }; + plugin.connections.disconnect = [&disconnected_hosts](std::string host) { disconnected_hosts.push_back(host); }; // when pending and active schedules are changed simultaneosly plugin.is_in_sync = true; @@ -263,8 +269,8 @@ BOOST_AUTO_TEST_CASE(test_exceeding_connection_limit) { mock_net_plugin plugin; plugin.setup_test_peers(); plugin.config.my_bp_accounts = { "prodd"_n, "produ"_n }; - plugin.max_client_count = 1; - plugin.connections = { + plugin.connections.max_client_count = 1; + plugin.connections.connections = { { .is_bp_connection = true, .is_open = true, .handshake_received = true }, // 0 { .is_bp_connection = true, .is_open = true, .handshake_received = false }, // 1 { .is_bp_connection = true, .is_open = false, .handshake_received = true }, // 2 @@ -277,12 +283,12 @@ BOOST_AUTO_TEST_CASE(test_exceeding_connection_limit) { BOOST_CHECK_EQUAL(plugin.num_established_clients(), 2); - BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections[0])); - BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections[1])); - BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections[2])); - BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections[3])); - BOOST_CHECK(plugin.exceeding_connection_limit(&plugin.connections[4])); - BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections[5])); - BOOST_CHECK(plugin.exceeding_connection_limit(&plugin.connections[6])); - BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections[7])); + BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections.connections[0])); + BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections.connections[1])); + BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections.connections[2])); + BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections.connections[3])); + BOOST_CHECK(plugin.exceeding_connection_limit(&plugin.connections.connections[4])); + BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections.connections[5])); + BOOST_CHECK(plugin.exceeding_connection_limit(&plugin.connections.connections[6])); + BOOST_CHECK(!plugin.exceeding_connection_limit(&plugin.connections.connections[7])); } \ No newline at end of file From 31983be9312171c6a03f6b3920b7cd82652f0ef0 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 17 May 2023 07:07:54 -0500 Subject: [PATCH 2/9] GH-1072 Use uint32_t for max-clients --- plugins/net_plugin/net_plugin.cpp | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index d561c368fa..6edabb35b5 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -3687,7 +3687,7 @@ namespace eosio { ( "peer-key", bpo::value>()->composing()->multitoken(), "Optional public key of peer allowed to connect. May be used multiple times.") ( "peer-private-key", boost::program_options::value>()->composing()->multitoken(), "Tuple of [PublicKey, WIF private key] (may specify multiple times)") - ( "max-clients", bpo::value()->default_value(def_max_clients), "Maximum number of clients from which connections are accepted, use 0 for no limit") + ( "max-clients", bpo::value()->default_value(def_max_clients), "Maximum number of clients from which connections are accepted, use 0 for no limit") ( "connection-cleanup-period", bpo::value()->default_value(def_conn_retry_wait), "number of seconds to wait before cleaning up dead connections") ( "max-cleanup-time-msec", bpo::value()->default_value(10), "max connection cleanup time per cleanup call in milliseconds") ( "p2p-dedup-cache-expire-time-sec", bpo::value()->default_value(10), "Maximum time to track transaction for duplicate optimization") @@ -3736,12 +3736,10 @@ namespace eosio { EOS_ASSERT( my->keepalive_interval.count() > 0, chain::plugin_config_exception, "p2p-keepalive_interval-ms must be greater than 0" ); - if( options.count( "p2p-keepalive-interval-ms" )) { - my->connections.init( std::chrono::milliseconds( options.at("p2p-keepalive-interval-ms").as() * 2 ), - fc::microseconds( options.at("max-cleanup-time-msec").as() ), - std::chrono::seconds( options.at("connection-cleanup-period").as() ), - options.at("max-clients").as() ); - } + my->connections.init( std::chrono::milliseconds( options.at("p2p-keepalive-interval-ms").as() * 2 ), + fc::microseconds( options.at("max-cleanup-time-msec").as() ), + std::chrono::seconds( options.at("connection-cleanup-period").as() ), + options.at("max-clients").as() ); if( options.count( "p2p-listen-endpoint" ) && options.at("p2p-listen-endpoint").as().length()) { my->p2p_address = options.at( "p2p-listen-endpoint" ).as(); From 0efab27e1cf5a2ba8669ce9cd376012afe767d0c Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 22 May 2023 10:09:02 -0500 Subject: [PATCH 3/9] GH-1072 Use flat_set and some other misc cleanup --- plugins/net_plugin/net_plugin.cpp | 77 +++++++++++++++++++------------ 1 file changed, 47 insertions(+), 30 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 6edabb35b5..b6a5811a53 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -317,9 +317,9 @@ namespace eosio { class connections_manager { alignas(hardware_destructive_interference_size) - mutable std::shared_mutex connections_mtx; - std::set connections; - chain::flat_set supplied_peers; + mutable std::shared_mutex connections_mtx; + chain::flat_set connections; + chain::flat_set supplied_peers; alignas(hardware_destructive_interference_size) std::mutex connector_check_timer_mtx; @@ -340,38 +340,21 @@ namespace eosio { void connection_monitor(const std::weak_ptr& from_connection); public: - void add_supplied_peers(const vector& peers ) { - std::lock_guard g(connections_mtx); - supplied_peers.insert( peers.begin(), peers.end() ); - } + void add_supplied_peers(const vector& peers ); // not thread safe, only call on startup - void init( std::chrono::milliseconds heartbeat_timeout_ms, + void init(std::chrono::milliseconds heartbeat_timeout_ms, fc::microseconds conn_max_cleanup_time, boost::asio::steady_timer::duration conn_period, - uint32_t maximum_client_count ) { - heartbeat_timeout = heartbeat_timeout_ms; - max_cleanup_time = conn_max_cleanup_time; - connector_period = conn_period; - max_client_count = maximum_client_count; - } + uint32_t maximum_client_count); uint32_t get_max_client_count() const { return max_client_count; } - fc::microseconds get_connector_period() const { - auto connector_period_us = std::chrono::duration_cast( connector_period ); - return fc::microseconds{ connector_period_us.count() }; - } - void register_update_p2p_connection_metrics(std::function&& fun){ - update_p2p_connection_metrics = std::move(fun); - } + fc::microseconds get_connector_period() const; - void connect_supplied_peers() { - std::lock_guard g(connections_mtx); - for (const auto& seed_node : supplied_peers) { - connect_impl(seed_node); - } - } + void register_update_p2p_connection_metrics(std::function&& fun); + + void connect_supplied_peers(); void start_conn_timer(); void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr from_connection); @@ -1059,7 +1042,7 @@ namespace eosio { void connections_manager::for_each_connection( Function&& f ) const { std::shared_lock g( connections_mtx ); for( auto& c :connections ) { - if( !f( c ) ) return; + if( !std::forward(f)( c ) ) return; } } @@ -1068,7 +1051,7 @@ namespace eosio { std::shared_lock g( connections_mtx ); for( auto& c : connections ) { if( c->is_transactions_only_connection() ) continue; - if( !f( c ) ) return; + if( !std::forward(f)( c ) ) return; } } @@ -3994,20 +3977,55 @@ namespace eosio { //---------------------------------------------------------------------------- + void connections_manager::add_supplied_peers(const vector& peers ) { + std::lock_guard g(connections_mtx); + supplied_peers.insert( peers.begin(), peers.end() ); + } + + // not thread safe, only call on startup + void connections_manager::init( std::chrono::milliseconds heartbeat_timeout_ms, + fc::microseconds conn_max_cleanup_time, + boost::asio::steady_timer::duration conn_period, + uint32_t maximum_client_count ) { + heartbeat_timeout = heartbeat_timeout_ms; + max_cleanup_time = conn_max_cleanup_time; + connector_period = conn_period; + max_client_count = maximum_client_count; + } + + fc::microseconds connections_manager::get_connector_period() const { + auto connector_period_us = std::chrono::duration_cast( connector_period ); + return fc::microseconds{ connector_period_us.count() }; + } + + void connections_manager::register_update_p2p_connection_metrics(std::function&& fun){ + update_p2p_connection_metrics = std::move(fun); + } + + void connections_manager::connect_supplied_peers() { + std::lock_guard g(connections_mtx); + for (const auto& peer : supplied_peers) { + connect_impl(peer); + } + } + void connections_manager::add( connection_ptr c ) { std::lock_guard g( connections_mtx ); add_impl( std::move(c) ); } + // called by API string connections_manager::connect( const string& host ) { std::lock_guard g( connections_mtx ); if( find_connection_impl( host ) ) return "already connected"; connect_impl( host ); + supplied_peers.insert(host); return "added connection"; } + // called by API string connections_manager::disconnect( const string& host ) { std::lock_guard g( connections_mtx ); if( auto c = find_connection_impl( host ) ) { @@ -4113,7 +4131,6 @@ namespace eosio { // call with connections_mtx void connections_manager::connect_impl( const string& host ) { - supplied_peers.insert(host); connection_ptr c = std::make_shared( host ); fc_dlog( logger, "calling active connector: ${h}", ("h", host) ); if( c->resolve_and_connect() ) { From 30d6fd5b4231212438dd48b716061ad1038eeffa Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 22 May 2023 10:15:29 -0500 Subject: [PATCH 4/9] GH-1072 Rename _impl methods to _i to distinguish from _impl types. --- plugins/net_plugin/net_plugin.cpp | 52 +++++++++++++++---------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index b6a5811a53..7a31df5d79 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -333,9 +333,9 @@ namespace eosio { std::function update_p2p_connection_metrics; private: // must call with held mutex - connection_ptr find_connection_impl(const string& host) const; - void add_impl(connection_ptr&& c); - void connect_impl(const string& peer); + connection_ptr find_connection_i(const string& host) const; + void add_i(connection_ptr&& c); + void connect_i(const string& peer); void connection_monitor(const std::weak_ptr& from_connection); @@ -906,7 +906,7 @@ namespace eosio { /** @} */ void blk_send_branch( const block_id_type& msg_head_id ); - void blk_send_branch_impl( uint32_t msg_head_num, uint32_t lib_num, uint32_t head_num ); + void blk_send_branch( uint32_t msg_head_num, uint32_t lib_num, uint32_t head_num ); void blk_send(const block_id_type& blkid); void stop_send(); @@ -1256,12 +1256,12 @@ namespace eosio { } else { if( on_fork ) msg_head_num = 0; // if peer on fork, start at their last lib, otherwise we can start at msg_head+1 - blk_send_branch_impl( msg_head_num, lib_num, head_num ); + blk_send_branch( msg_head_num, lib_num, head_num ); } } // called from connection strand - void connection::blk_send_branch_impl( uint32_t msg_head_num, uint32_t lib_num, uint32_t head_num ) { + void connection::blk_send_branch( uint32_t msg_head_num, uint32_t lib_num, uint32_t head_num ) { if( !peer_requested ) { auto last = msg_head_num != 0 ? msg_head_num : lib_num; peer_requested = peer_sync_state( last+1, head_num, last ); @@ -4005,22 +4005,22 @@ namespace eosio { void connections_manager::connect_supplied_peers() { std::lock_guard g(connections_mtx); for (const auto& peer : supplied_peers) { - connect_impl(peer); + connect_i(peer); } } void connections_manager::add( connection_ptr c ) { std::lock_guard g( connections_mtx ); - add_impl( std::move(c) ); + add_i( std::move(c) ); } // called by API string connections_manager::connect( const string& host ) { std::lock_guard g( connections_mtx ); - if( find_connection_impl( host ) ) + if( find_connection_i( host ) ) return "already connected"; - connect_impl( host ); + connect_i( host ); supplied_peers.insert(host); return "added connection"; } @@ -4028,7 +4028,7 @@ namespace eosio { // called by API string connections_manager::disconnect( const string& host ) { std::lock_guard g( connections_mtx ); - if( auto c = find_connection_impl( host ) ) { + if( auto c = find_connection_i( host ) ) { fc_ilog( logger, "disconnecting: ${cid}", ("cid", c->connection_id) ); c->close(); connections.erase(c); @@ -4082,17 +4082,17 @@ namespace eosio { if( cptr != connections.end() ) { auto cstart_it = cptr; do { - // select the first one which is current and has lib above current and break out. - if( !(*cptr)->is_transactions_only_connection() && (*cptr)->current() ) { - std::lock_guard g_conn( (*cptr)->conn_mtx ); - // TODO: change to a better heuristic than lib >= sync_known_lib_num - if( (*cptr)->last_handshake_recv.last_irreversible_block_num >= sync_known_lib_num ) { - new_sync_source = *cptr; - break; + // select the first one which is current and has lib above current and break out. + if( !(*cptr)->is_transactions_only_connection() && (*cptr)->current() ) { + std::lock_guard g_conn( (*cptr)->conn_mtx ); + // TODO: change to a better heuristic than lib >= sync_known_lib_num + if( (*cptr)->last_handshake_recv.last_irreversible_block_num >= sync_known_lib_num ) { + new_sync_source = *cptr; + break; + } } - } - if( ++cptr == connections.end() ) - cptr = connections.begin(); + if( ++cptr == connections.end() ) + cptr = connections.begin(); } while( cptr != cstart_it ); } @@ -4103,7 +4103,7 @@ namespace eosio { std::optional connections_manager::status( const string& host )const { std::shared_lock g( connections_mtx ); - auto con = find_connection_impl( host ); + auto con = find_connection_i( host ); if( con ) { return con->get_status(); } @@ -4121,7 +4121,7 @@ namespace eosio { } // call with connections_mtx - connection_ptr connections_manager::find_connection_impl( const string& host )const { + connection_ptr connections_manager::find_connection_i( const string& host )const { for( const auto& c : connections ) { if (c->peer_address() == host) return c; @@ -4130,17 +4130,17 @@ namespace eosio { } // call with connections_mtx - void connections_manager::connect_impl( const string& host ) { + void connections_manager::connect_i( const string& host ) { connection_ptr c = std::make_shared( host ); fc_dlog( logger, "calling active connector: ${h}", ("h", host) ); if( c->resolve_and_connect() ) { fc_dlog( logger, "adding new connection to the list: ${host} ${cid}", ("host", host)("cid", c->connection_id) ); - add_impl( std::move(c) ); + add_i( std::move(c) ); } } // call with connections_mtx - void connections_manager::add_impl(connection_ptr&& c) { + void connections_manager::add_i(connection_ptr&& c) { c->set_heartbeat_timeout( heartbeat_timeout ); connections.insert( std::move(c) ); } From a80f5bc227116726a8b7794037787524179bb8e3 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 22 May 2023 10:17:56 -0500 Subject: [PATCH 5/9] GH-1072 Fix indentation --- plugins/net_plugin/net_plugin.cpp | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 7a31df5d79..986b07936d 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -4147,7 +4147,7 @@ namespace eosio { // called from any thread void connections_manager::start_conn_timer() { - start_conn_timer(connector_period, {}); + start_conn_timer(connector_period, {}); // this locks mutex } // called from any thread @@ -4197,18 +4197,20 @@ namespace eosio { ++num_peers; } - if( !(*it)->socket_is_open() && !(*it)->connecting) { - if( !(*it)->incoming() ) { - if( !(*it)->resolve_and_connect() ) { + if (!(*it)->socket_is_open() && !(*it)->connecting) { + if (!(*it)->incoming()) { + if (!(*it)->resolve_and_connect()) { + it = connections.erase(it); + --num_peers; + ++num_rm; + continue; + } + } else { + --num_clients; + ++num_rm; it = connections.erase(it); - --num_peers; ++num_rm; continue; } - } else { - --num_clients; ++num_rm; - it = connections.erase(it); - continue; - } } ++it; } From a6b5259b344e2a243c429889fd2d205a3a152aa5 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 22 May 2023 10:48:41 -0500 Subject: [PATCH 6/9] GH-1072 Add `any_of_connection` and use it for duplicate implementation --- plugins/net_plugin/net_plugin.cpp | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 986b07936d..4f439c0d1c 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -376,6 +376,12 @@ namespace eosio { template void for_each_block_connection(Function&& f) const; + + template + bool any_of_connections(UnaryPredicate&& p) const { + std::shared_lock g(connections_mtx); + return std::find_if(connections.cbegin(), connections.cend(), std::forward(p)) != connections.cend(); + } }; class net_plugin_impl : public std::enable_shared_from_this, @@ -1040,7 +1046,7 @@ namespace eosio { template void connections_manager::for_each_connection( Function&& f ) const { - std::shared_lock g( connections_mtx ); + std::shared_lock g( connections_mtx ); for( auto& c :connections ) { if( !std::forward(f)( c ) ) return; } @@ -1048,7 +1054,7 @@ namespace eosio { template void connections_manager::for_each_block_connection( Function&& f ) const { - std::shared_lock g( connections_mtx ); + std::shared_lock g( connections_mtx ); for( auto& c : connections ) { if( c->is_transactions_only_connection() ) continue; if( !std::forward(f)( c ) ) return; @@ -2961,10 +2967,9 @@ namespace eosio { auto c_time = last_handshake_sent.time; g_conn.unlock(); peer_dlog( this, "checking for duplicate" ); - bool dup = false; - my_impl->connections.for_each_connection([&](const auto& check) { + auto is_duplicate = [&](const auto& check) { if(check.get() == this) - return true; + return false; std::unique_lock g_check_conn( check->conn_mtx ); fc_dlog( logger, "dup check: connected ${c}, ${l} =? ${r}", ("c", check->connected())("l", check->last_handshake_recv.node_id)("r", msg.node_id) ); @@ -2978,29 +2983,27 @@ namespace eosio { auto check_time = check->last_handshake_sent.time + check->last_handshake_recv.time; g_check_conn.unlock(); if (msg.time + c_time <= check_time) - return true; + return false; } else if (net_version < proto_dup_node_id_goaway || msg.network_version < proto_dup_node_id_goaway) { if (my_impl->p2p_address < msg.p2p_address) { fc_dlog( logger, "my_impl->p2p_address '${lhs}' < msg.p2p_address '${rhs}'", ("lhs", my_impl->p2p_address)( "rhs", msg.p2p_address ) ); // only the connection from lower p2p_address to higher p2p_address will be considered as a duplicate, // so there is no chance for both connections to be closed - return true; + return false; } } else if (my_impl->node_id < msg.node_id) { fc_dlog( logger, "not duplicate, my_impl->node_id '${lhs}' < msg.node_id '${rhs}'", ("lhs", my_impl->node_id)("rhs", msg.node_id) ); // only the connection from lower node_id to higher node_id will be considered as a duplicate, // so there is no chance for both connections to be closed - return true; + return false; } - - dup = true; - return false; + return true; } - return true; - }); - if (dup) { + return false; + }; + if (my_impl->connections.any_of_connections(std::move(is_duplicate))) { peer_dlog( this, "sending go_away duplicate, msg.p2p_address: ${add}", ("add", msg.p2p_address) ); go_away_message gam(duplicate); gam.node_id = conn_node_id; From 7470c68b4a11c7abc144fd761efbf21bd09bffd1 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 22 May 2023 10:51:50 -0500 Subject: [PATCH 7/9] GH-1072 Remove unneeded check --- plugins/net_plugin/net_plugin.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 4f439c0d1c..24188c5218 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -4075,7 +4075,7 @@ namespace eosio { cptr = connections.begin(); } else { // was found - advance the start to the next. cend is the old source. - if( ++cptr == connections.end() && cend != connections.end() ) { + if( ++cptr == connections.end() ) { cptr = connections.begin(); } } From 6a5bf21d8d1972d822f4c7ddc9848a24d12a316f Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 22 May 2023 12:43:49 -0500 Subject: [PATCH 8/9] GH-1072 Add any_of_block_connections and change for_each_connection and for_each_block_connection to not short circuit --- plugins/net_plugin/net_plugin.cpp | 64 ++++++++++++++++--------------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 24188c5218..622a6b412f 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -380,7 +380,13 @@ namespace eosio { template bool any_of_connections(UnaryPredicate&& p) const { std::shared_lock g(connections_mtx); - return std::find_if(connections.cbegin(), connections.cend(), std::forward(p)) != connections.cend(); + return std::any_of(connections.cbegin(), connections.cend(), std::forward(p)); + } + + template + bool any_of_block_connections(UnaryPredicate&& p) const { + std::shared_lock g(connections_mtx); + return std::any_of(connections.cbegin(), connections.cend(), std::forward(p)); } }; @@ -1047,9 +1053,7 @@ namespace eosio { template void connections_manager::for_each_connection( Function&& f ) const { std::shared_lock g( connections_mtx ); - for( auto& c :connections ) { - if( !std::forward(f)( c ) ) return; - } + std::for_each(connections.begin(), connections.end(), std::forward(f)); } template @@ -1057,7 +1061,7 @@ namespace eosio { std::shared_lock g( connections_mtx ); for( auto& c : connections ) { if( c->is_transactions_only_connection() ) continue; - if( !std::forward(f)( c ) ) return; + f( c ); } } @@ -1749,7 +1753,6 @@ namespace eosio { if( cc->current() && cc->last_handshake_recv.last_irreversible_block_num > highest_lib_num ) { highest_lib_num = cc->last_handshake_recv.last_irreversible_block_num; } - return true; } ); sync_known_lib_num = highest_lib_num; @@ -1830,7 +1833,6 @@ namespace eosio { if( ci->current() ) { ci->send_handshake(); } - return true; } ); } @@ -1998,14 +2000,17 @@ namespace eosio { bool sync_manager::verify_catchup(const connection_ptr& c, uint32_t num, const block_id_type& id) { request_message req; req.req_blocks.mode = catch_up; - my_impl->connections.for_each_block_connection( [num, &id, &req]( const auto& cc ) { + auto is_fork_head_greater = [num, &id, &req]( const auto& cc ) { std::lock_guard g_conn( cc->conn_mtx ); if( cc->fork_head_num > num || cc->fork_head == id ) { req.req_blocks.mode = none; - return false; + return true; } - return true; - } ); + return false; + }; + if (my_impl->connections.any_of_block_connections(is_fork_head_greater)) { + req.req_blocks.mode = none; + } if( req.req_blocks.mode == catch_up ) { { std::lock_guard g( sync_mtx ); @@ -2132,7 +2137,6 @@ namespace eosio { } else { set_state_to_head_catchup = true; } - return true; } ); if( set_state_to_head_catchup ) { @@ -2254,11 +2258,11 @@ namespace eosio { my_impl->connections.for_each_block_connection( [this, &id, &bnum, &b, &buff_factory]( auto& cp ) { fc_dlog( logger, "socket_is_open ${s}, connecting ${c}, syncing ${ss}, connection ${cid}", ("s", cp->socket_is_open())("c", cp->connecting.load())("ss", cp->syncing.load())("cid", cp->connection_id) ); - if( !cp->current() ) return true; + if( !cp->current() ) return; if( !add_peer_block( id, cp->connection_id ) ) { fc_dlog( logger, "not bcast block ${b} to connection ${cid}", ("b", bnum)("cid", cp->connection_id) ); - return true; + return; } send_buffer_type sb = buff_factory.get_send_buffer( b ); @@ -2271,7 +2275,6 @@ namespace eosio { cp->enqueue_buffer( sb, no_reason ); } }); - return true; } ); } @@ -2302,10 +2305,10 @@ namespace eosio { const fc::time_point_sec now{fc::time_point::now()}; my_impl->connections.for_each_connection( [this, &trx, &now, &buff_factory]( auto& cp ) { if( cp->is_blocks_only_connection() || !cp->current() ) { - return true; + return; } if( !add_peer_txn(trx->id(), trx->expiration(), cp->connection_id, now) ) { - return true; + return; } send_buffer_type sb = buff_factory.get_send_buffer( trx ); @@ -2313,7 +2316,6 @@ namespace eosio { cp->strand.post( [cp, sb{std::move(sb)}]() { cp->enqueue_buffer( sb, no_reason ); } ); - return true; } ); } @@ -2363,14 +2365,14 @@ namespace eosio { } last_req = *c->last_req; } - my_impl->connections.for_each_block_connection( [this, &c, &last_req, &bid]( auto& conn ) { + auto request_from_peer = [this, &c, &last_req, &bid]( auto& conn ) { if( conn == c ) - return true; + return false; { std::lock_guard guard( conn->conn_mtx ); if( conn->last_req ) { - return true; + return false; } } @@ -2382,16 +2384,18 @@ namespace eosio { std::lock_guard g_conn_conn( conn->conn_mtx ); conn->last_req = last_req; } ); - return false; + return true; } - return true; - } ); + return false; + }; - // at this point no other peer has it, re-request or do nothing? - peer_wlog( c, "no peer has last_req" ); - if( c->connected() ) { - c->enqueue( last_req ); - c->fetch_wait(); + if (!my_impl->connections.any_of_block_connections(request_from_peer)) { + // at this point no other peer has it, re-request or do nothing? + peer_wlog(c, "no peer has last_req"); + if (c->connected()) { + c->enqueue(last_req); + c->fetch_wait(); + } } } @@ -2501,7 +2505,6 @@ namespace eosio { } } } - return true; } ); const uint32_t max_client_count = connections.get_max_client_count(); if( from_addr < max_nodes_per_host && (auto_bp_peering_enabled() || max_client_count == 0 || visitors < max_client_count)) { @@ -3479,7 +3482,6 @@ namespace eosio { c->check_heartbeat(current_time); } ); } - return true; } ); } ); } From 01dae4ef0bd46a1e1efa9e451ae04519592c4545 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 22 May 2023 13:03:36 -0500 Subject: [PATCH 9/9] GH-1072 Fix any_of_block_connections to verify not transaction only connection --- plugins/net_plugin/net_plugin.cpp | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 622a6b412f..27707d6269 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -378,16 +378,10 @@ namespace eosio { void for_each_block_connection(Function&& f) const; template - bool any_of_connections(UnaryPredicate&& p) const { - std::shared_lock g(connections_mtx); - return std::any_of(connections.cbegin(), connections.cend(), std::forward(p)); - } + bool any_of_connections(UnaryPredicate&& p) const; template - bool any_of_block_connections(UnaryPredicate&& p) const { - std::shared_lock g(connections_mtx); - return std::any_of(connections.cbegin(), connections.cend(), std::forward(p)); - } + bool any_of_block_connections(UnaryPredicate&& p) const; }; class net_plugin_impl : public std::enable_shared_from_this, @@ -1065,6 +1059,24 @@ namespace eosio { } } + template + bool connections_manager::any_of_connections(UnaryPredicate&& p) const { + std::shared_lock g(connections_mtx); + return std::any_of(connections.cbegin(), connections.cend(), std::forward(p)); + } + + template + bool connections_manager::any_of_block_connections(UnaryPredicate&& p) const { + std::shared_lock g( connections_mtx ); + for( auto& c : connections ) { + if( c->is_transactions_only_connection() ) continue; + if (p(c)) + return true; + } + return false; + } + + //--------------------------------------------------------------------------- connection::connection( const string& endpoint )