diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 7bc3fe0a98..4a2922f308 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -227,11 +227,11 @@ namespace eosio { private: constexpr static auto stage_str( stages s ); bool set_state( stages newstate ); - bool is_sync_required( uint32_t fork_head_block_num ); + bool is_sync_required( uint32_t fork_head_block_num ); // call with locked mutex void request_next_chunk( std::unique_lock g_sync, const connection_ptr& conn = connection_ptr() ); - connection_ptr find_next_sync_node(); - void start_sync( const connection_ptr& c, uint32_t target ); - bool verify_catchup( const connection_ptr& c, uint32_t num, const block_id_type& id ); + connection_ptr find_next_sync_node(); // call with locked mutex + void start_sync( const connection_ptr& c, uint32_t target ); // locks mutex + bool verify_catchup( const connection_ptr& c, uint32_t num, const block_id_type& id ); // locks mutex public: explicit sync_manager( uint32_t span, uint32_t sync_peer_limit ); @@ -776,12 +776,12 @@ namespace eosio { hb_timeout = dur.count(); } - uint64_t get_net_latency_ns() const { return net_latency_ns; } + uint64_t get_peer_ping_time_ns() const { return peer_ping_time_ns; } private: static const string unknown; - std::atomic net_latency_ns = std::numeric_limits::max(); + std::atomic peer_ping_time_ns = std::numeric_limits::max(); std::optional peer_requested; // this peer is requesting info from us @@ -863,11 +863,11 @@ namespace eosio { * Time message handling * @{ */ - // Members set from network data - tstamp org{0}; //!< originate timestamp - tstamp rec{0}; //!< receive timestamp - tstamp dst{0}; //!< destination timestamp - tstamp xmt{0}; //!< transmit timestamp + // See NTP protocol. https://datatracker.ietf.org/doc/rfc5905/ + tstamp org{0}; //!< origin timestamp. Time at the client when the request departed for the server. + // tstamp (not used) rec{0}; //!< receive timestamp. Time at the server when the request arrived from the client. + tstamp xmt{0}; //!< transmit timestamp, Time at the server when the response left for the client. + // tstamp (not used) dst{0}; //!< destination timestamp, Time at the client when the reply arrived from the server. /** @} */ // timestamp for the lastest message tstamp latest_msg_time{0}; @@ -928,7 +928,7 @@ namespace eosio { * day routine and converts to a (at least) 64 bit integer. */ static tstamp get_time() { - return std::chrono::system_clock::now().time_since_epoch().count(); + return std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); } /** @} */ @@ -987,7 +987,7 @@ namespace eosio { void handle_message( packed_transaction_ptr trx ); // returns calculated number of blocks combined latency - uint32_t update_latency(const handshake_message& msg); + uint32_t calc_block_latency(); void process_signed_block( const block_id_type& id, signed_block_ptr block, block_state_ptr bsp ); @@ -1258,10 +1258,10 @@ namespace eosio { // thread safe bool connection::should_sync_from(uint32_t sync_next_expected_num, uint32_t sync_known_lib_num) const { - fc_dlog(logger, "id: ${id} blocks conn: ${t} current: ${c} socket_open: ${so} syncing from us: ${s} state: ${con} peer_start_block: ${sb} peer_head: ${h} latency: ${lat}us no_retry: ${g}", + fc_dlog(logger, "id: ${id} blocks conn: ${t} current: ${c} socket_open: ${so} syncing from us: ${s} state: ${con} peer_start_block: ${sb} peer_head: ${h} ping: ${p}us no_retry: ${g}", ("id", connection_id)("t", is_blocks_connection()) ("c", current())("so", socket_is_open())("s", peer_syncing_from_us.load())("con", state_str(state())) - ("sb", peer_start_block_num.load())("h", peer_head_block_num.load())("lat", get_net_latency_ns()/1000)("g", reason_str(no_retry))); + ("sb", peer_start_block_num.load())("h", peer_head_block_num.load())("p", get_peer_ping_time_ns()/1000)("g", reason_str(no_retry))); if (is_blocks_connection() && current()) { if (no_retry == go_away_reason::no_reason) { if (peer_start_block_num <= sync_next_expected_num) { // has blocks we want @@ -1317,6 +1317,7 @@ namespace eosio { peer_ilog( this, "closing" ); cancel_wait(); sync_last_requested_block = 0; + org = 0; set_state(connection_state::closed); if( reconnect && !shutdown ) { @@ -1458,25 +1459,33 @@ namespace eosio { } + org = 0; send_time(); } // called from connection strand void connection::send_time() { - time_message xpkt; - xpkt.org = rec; - xpkt.rec = dst; - xpkt.xmt = get_time(); - org = xpkt.xmt; - enqueue(xpkt); + if (org == 0) { // do not send if there is already a time loop in progress + org = get_time(); + // xpkt.org == 0 means we are initiating a ping. Actual origin time is in xpkt.xmt. + time_message xpkt{ + .org = 0, + .rec = 0, + .xmt = org, + .dst = 0 }; + peer_dlog(this, "send init time_message: ${t}", ("t", xpkt)); + enqueue(xpkt); + } } // called from connection strand void connection::send_time(const time_message& msg) { - time_message xpkt; - xpkt.org = msg.xmt; - xpkt.rec = msg.dst; - xpkt.xmt = get_time(); + time_message xpkt{ + .org = msg.xmt, + .rec = msg.dst, + .xmt = get_time(), + .dst = 0 }; + peer_dlog( this, "send time_message: ${t}, org: ${o}", ("t", xpkt)("o", org) ); enqueue(xpkt); } @@ -1864,8 +1873,8 @@ namespace eosio { } ); sync_known_lib_num = highest_lib_num; - // if closing the connection we are currently syncing from or not syncing, then request from a diff peer - if( !sync_source || c == sync_source ) { + // if closing the connection we are currently syncing from then request from a diff peer + if( c == sync_source ) { sync_last_requested_num = 0; // if starting to sync need to always start from lib as we might be on our own fork uint32_t lib_num = my_impl->get_chain_lib_num(); @@ -1886,7 +1895,7 @@ namespace eosio { }); if (conns.size() > sync_peer_limit) { std::partial_sort(conns.begin(), conns.begin() + sync_peer_limit, conns.end(), [](const connection_ptr& lhs, const connection_ptr& rhs) { - return lhs->get_net_latency_ns() < rhs->get_net_latency_ns(); + return lhs->get_peer_ping_time_ns() < rhs->get_peer_ping_time_ns(); }); conns.resize(sync_peer_limit); } @@ -1910,8 +1919,8 @@ namespace eosio { uint32_t lowest_ordinal = std::numeric_limits::max(); for (size_t i = 0; i < conns.size() && lowest_ordinal != 0; ++i) { uint32_t sync_ord = conns[i]->sync_ordinal; - fc_dlog(logger, "compare sync ords, conn: ${lcid}, ord: ${l} < ${r}, latency: ${lat}us", - ("lcid", conns[i]->connection_id)("l", sync_ord)("r", lowest_ordinal)("lat", conns[i]->get_net_latency_ns()/1000)); + fc_dlog(logger, "compare sync ords, conn: ${lcid}, ord: ${l} < ${r}, ping: ${p}us", + ("lcid", conns[i]->connection_id)("l", sync_ord)("r", lowest_ordinal)("p", conns[i]->get_peer_ping_time_ns()/1000)); if (sync_ord < lowest_ordinal) { the_one = i; lowest_ordinal = sync_ord; @@ -1930,7 +1939,7 @@ namespace eosio { ("r", sync_last_requested_num)("e", sync_next_expected_num)("k", sync_known_lib_num)("s", sync_req_span)("h", chain_info.head_num) ); if( chain_info.head_num + sync_req_span < sync_last_requested_num && sync_source && sync_source->current() ) { - fc_wlog( logger, "ignoring request, head is ${h} last req = ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}, source connection ${c}", + fc_dlog( logger, "ignoring request, head is ${h} last req = ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}, source connection ${c}", ("h", chain_info.head_num)("r", sync_last_requested_num)("e", sync_next_expected_num) ("k", sync_known_lib_num)("s", sync_req_span)("c", sync_source->connection_id) ); return; @@ -1979,6 +1988,7 @@ namespace eosio { } } if( !request_sent ) { + sync_source.reset(); g_sync.unlock(); fc_wlog(logger, "Unable to request range, sending handshakes to everyone"); send_handshakes(); @@ -2014,7 +2024,6 @@ namespace eosio { if( !is_sync_required( chain_info.head_num ) || target <= chain_info.lib_num ) { peer_dlog( c, "We are already caught up, my irr = ${b}, head = ${h}, target = ${t}", ("b", chain_info.lib_num)( "h", chain_info.head_num )( "t", target ) ); - c->send_handshake(); return; } @@ -2073,11 +2082,6 @@ namespace eosio { peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 0, lib ${l}", ("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16))("l", chain_info.lib_num) ); c->peer_syncing_from_us = false; - notice_message note; - note.known_blocks.mode = none; - note.known_trx.mode = catch_up; - note.known_trx.pending = 0; - c->enqueue( note ); return; } if (chain_info.head_num < msg.last_irreversible_block_num) { @@ -2227,9 +2231,7 @@ namespace eosio { c->last_handshake_recv.last_irreversible_block_num = msg.known_trx.pending; } sync_reset_lib_num(c, false); - if (is_in_sync()) { - start_sync(c, msg.known_trx.pending); - } + start_sync(c, msg.known_trx.pending); } } @@ -2238,6 +2240,9 @@ namespace eosio { c->block_status_monitor_.rejected(); std::unique_lock g( sync_mtx ); sync_last_requested_num = 0; + if (blk_num < sync_next_expected_num) { + sync_next_expected_num = my_impl->get_chain_lib_num(); + } if( c->block_status_monitor_.max_events_violated()) { peer_wlog( c, "block ${bn} not accepted, closing connection", ("bn", blk_num) ); sync_source.reset(); @@ -2245,7 +2250,7 @@ namespace eosio { c->close(); } else { g.unlock(); - peer_dlog(c, "rejected block, sending handshake"); + peer_dlog(c, "rejected block ${bn}, sending handshake", ("bn", blk_num)); c->send_handshake(); } } @@ -2630,6 +2635,7 @@ namespace eosio { if( !err && socket->is_open() && socket == c->socket ) { if( c->start_session() ) { c->send_handshake(); + c->send_time(); } } else { fc_elog( logger, "connection failed to ${a}, ${error}", ("a", c->peer_address())( "error", err.message())); @@ -3232,28 +3238,22 @@ namespace eosio { } } - uint32_t nblk_combined_latency = update_latency(msg); + uint32_t nblk_combined_latency = calc_block_latency(); my_impl->sync_master->recv_handshake( shared_from_this(), msg, nblk_combined_latency ); } // called from connection strand - uint32_t connection::update_latency(const handshake_message& msg) { - auto current_time_ns = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - int64_t network_latency_ns = current_time_ns - msg.time; // net latency in nanoseconds - if( network_latency_ns < 0 ) { - peer_wlog(this, "Peer sent a handshake with a timestamp skewed by at least ${t}ms", ("t", network_latency_ns/1000000)); - network_latency_ns = -network_latency_ns; // use absolute value because it might be this node with the skew - } - // number of blocks syncing node is behind from a peer node, round up - uint32_t nblk_behind_by_net_latency = std::lround( static_cast(network_latency_ns) / static_cast(block_interval_ns) ); - // 2x for time it takes for message to reach back to peer node - uint32_t nblk_combined_latency = 2 * nblk_behind_by_net_latency; - // message in the log below is used in p2p_high_latency_test.py test - peer_dlog(this, "Network latency is ${lat}ms, ${num} blocks discrepancy by network latency, ${tot_num} blocks discrepancy expected once message received", - ("lat", network_latency_ns/1000000)("num", nblk_behind_by_net_latency)("tot_num", nblk_combined_latency)); - - net_latency_ns = network_latency_ns; - + uint32_t connection::calc_block_latency() { + uint32_t nblk_combined_latency = 0; + if (peer_ping_time_ns != std::numeric_limits::max()) { + // number of blocks syncing node is behind from a peer node, round up + uint32_t nblk_behind_by_net_latency = std::lround(static_cast(peer_ping_time_ns.load()) / static_cast(block_interval_ns)); + // peer_ping_time_ns includes time there and back, include round trip time as the block latency is used to compensate for communication back + nblk_combined_latency = nblk_behind_by_net_latency; + // message in the log below is used in p2p_high_latency_test.py test + peer_dlog(this, "Network latency is ${lat}ms, ${num} blocks discrepancy by network latency, ${tot_num} blocks discrepancy expected once message received", + ("lat", peer_ping_time_ns / 2 / 1000000)("num", nblk_behind_by_net_latency)("tot_num", nblk_combined_latency)); + } return nblk_combined_latency; } @@ -3279,51 +3279,81 @@ namespace eosio { close( retry ); // reconnect if wrong_version } - void connection::handle_message( const time_message& msg ) { - peer_ilog( this, "received time_message" ); + // some clients before leap 5.0 provided microsecond epoch instead of nanosecond epoch + tstamp normalize_epoch_to_ns(tstamp x) { + // 1686211688888 milliseconds - 2023-06-08T08:08:08.888, 5yrs from EOS genesis 2018-06-08T08:08:08.888 + // 1686211688888000 microseconds + // 1686211688888000000 nanoseconds + if (x >= 1686211688888000000) // nanoseconds + return x; + if (x >= 1686211688888000) // microseconds + return x*1000; + if (x >= 1686211688888) // milliseconds + return x*1000*1000; + if (x >= 1686211688) // seconds + return x*1000*1000*1000; + return 0; // unknown or is zero + } - /* We've already lost however many microseconds it took to dispatch - * the message, but it can't be helped. - */ - msg.dst = get_time(); + void connection::handle_message( const time_message& msg ) { + peer_dlog( this, "received time_message: ${t}, org: ${o}", ("t", msg)("o", org) ); // If the transmit timestamp is zero, the peer is horribly broken. if(msg.xmt == 0) - return; /* invalid timestamp */ + return; // invalid timestamp + + // We've already lost however many microseconds it took to dispatch the message, but it can't be helped. + msg.dst = get_time(); - if(msg.xmt == xmt) - return; /* duplicate packet */ + if (msg.org != 0) { + if (msg.org == org) { + auto ping = msg.dst - msg.org; + peer_dlog(this, "send_time ping ${p}us", ("p", ping / 1000)); + peer_ping_time_ns = ping; + } else { + // a diff time loop is in progress, ignore this message as it is not the one we want + return; + } + } + + auto msg_xmt = normalize_epoch_to_ns(msg.xmt); + if (msg_xmt == xmt) + return; // duplicate packet - xmt = msg.xmt; - rec = msg.rec; - dst = msg.dst; + xmt = msg_xmt; if( msg.org == 0 ) { send_time( msg ); return; // We don't have enough data to perform the calculation yet. } - double offset = (double(rec - org) + double(msg.xmt - dst)) / 2; - double NsecPerUsec{1000}; + if (org != 0) { + auto rec = normalize_epoch_to_ns(msg.rec); + int64_t offset = (double(rec - org) + double(msg_xmt - msg.dst)) / 2.0; - if( logger.is_enabled( fc::log_level::all ) ) - logger.log( FC_LOG_MESSAGE( all, "Clock offset is ${o}ns (${us}us)", - ("o", offset)( "us", offset / NsecPerUsec ) ) ); + if (std::abs(offset) > block_interval_ns) { + peer_wlog(this, "Clock offset is ${of}us, calculation: (rec ${r} - org ${o} + xmt ${x} - dst ${d})/2", + ("of", offset / 1000)("r", rec)("o", org)("x", msg_xmt)("d", msg.dst)); + } + } org = 0; - rec = 0; std::unique_lock g_conn( conn_mtx ); if( last_handshake_recv.generation == 0 ) { g_conn.unlock(); send_handshake(); } + + // make sure we also get the latency we need + if (peer_ping_time_ns == std::numeric_limits::max()) { + send_time(); + } } void connection::handle_message( const notice_message& msg ) { // peer tells us about one or more blocks or txns. When done syncing, forward on // notices of previously unknown blocks or txns, // - peer_dlog( this, "received notice_message" ); set_state(connection_state::connected); if( msg.known_blocks.ids.size() > 2 ) { peer_elog( this, "Invalid notice_message, known_blocks.ids.size ${s}, closing connection", @@ -3342,15 +3372,13 @@ namespace eosio { switch (msg.known_trx.mode) { case none: break; - case last_irr_catch_up: { + case last_irr_catch_up: + case catch_up : { std::unique_lock g_conn( conn_mtx ); last_handshake_recv.head_num = msg.known_blocks.pending; g_conn.unlock(); break; } - case catch_up : { - break; - } case normal: { my_impl->dispatcher->recv_notice( shared_from_this(), msg, false ); } @@ -3685,9 +3713,10 @@ namespace eosio { // called from application thread void net_plugin_impl::on_accepted_block_header(const block_state_ptr& bs) { update_chain_info(); - dispatcher->strand.post( [bs]() { - fc_dlog( logger, "signaled accepted_block_header, blk num = ${num}, id = ${id}", ("num", bs->block_num)("id", bs->id) ); - my_impl->dispatcher->bcast_block( bs->block, bs->id ); + + dispatcher->strand.post([bs]() { + fc_dlog(logger, "signaled accepted_block_header, blk num = ${num}, id = ${id}", ("num", bs->block_num)("id", bs->id)); + my_impl->dispatcher->bcast_block(bs->block, bs->id); }); } @@ -3781,8 +3810,13 @@ namespace eosio { // call from connection strand bool connection::populate_handshake( handshake_message& hello ) const { namespace sc = std::chrono; - hello.network_version = net_version_base + net_version; auto chain_info = my_impl->get_chain_info(); + auto now = sc::duration_cast(sc::system_clock::now().time_since_epoch()).count(); + constexpr int64_t hs_delay = sc::duration_cast(sc::milliseconds(50)).count(); + // nothing as changed since last handshake and one was sent recently, so skip sending + if (chain_info.head_id == hello.head_id && (hello.time + hs_delay > now)) + return false; + hello.network_version = net_version_base + net_version; hello.last_irreversible_block_num = chain_info.lib_num; hello.last_irreversible_block_id = chain_info.lib_id; hello.head_num = chain_info.head_num; diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 0fbaa4beb0..36ea7d22f0 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -627,7 +627,7 @@ class producer_plugin_impl : public std::enable_shared_from_this().publish(priority::medium, block); throw; }; diff --git a/tests/nodeos_forked_chain_test.py b/tests/nodeos_forked_chain_test.py index 5951d35918..67e27f5828 100755 --- a/tests/nodeos_forked_chain_test.py +++ b/tests/nodeos_forked_chain_test.py @@ -559,14 +559,15 @@ def getBlock(self, blockNum): finally: TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, dumpErrorDetails=dumpErrorDetails) - if not testSuccessful: - Print(Utils.FileDivider) - Print("Compare Blocklog") - cluster.compareBlockLogs() - Print(Utils.FileDivider) - Print("Print Blocklog") - cluster.printBlockLog() - Print(Utils.FileDivider) +# Too much output for ci/cd +# if not testSuccessful: +# Print(Utils.FileDivider) +# Print("Compare Blocklog") +# cluster.compareBlockLogs() +# Print(Utils.FileDivider) +# Print("Print Blocklog") +# cluster.printBlockLog() +# Print(Utils.FileDivider) exitCode = 0 if testSuccessful else 1 exit(exitCode) diff --git a/tests/nodeos_high_transaction_test.py b/tests/nodeos_high_transaction_test.py index 9ab51019c4..0b0550408a 100755 --- a/tests/nodeos_high_transaction_test.py +++ b/tests/nodeos_high_transaction_test.py @@ -375,14 +375,16 @@ def findTransInBlock(transId, transToBlock, node): testSuccessful = not delayedReportError finally: TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, dumpErrorDetails=dumpErrorDetails) - if not testSuccessful: - Print(Utils.FileDivider) - Print("Compare Blocklog") - cluster.compareBlockLogs() - Print(Utils.FileDivider) - Print("Print Blocklog") - cluster.printBlockLog() - Print(Utils.FileDivider) + +# Too much output for ci/cd +# if not testSuccessful: +# Print(Utils.FileDivider) +# Print("Compare Blocklog") +# cluster.compareBlockLogs() +# Print(Utils.FileDivider) +# Print("Print Blocklog") +# cluster.printBlockLog() +# Print(Utils.FileDivider) errorCode = 0 if testSuccessful else 1 exit(errorCode) diff --git a/tests/nodeos_short_fork_take_over_test.py b/tests/nodeos_short_fork_take_over_test.py index 55887f9644..f468cae52c 100755 --- a/tests/nodeos_short_fork_take_over_test.py +++ b/tests/nodeos_short_fork_take_over_test.py @@ -409,14 +409,15 @@ def getMinHeadAndLib(prodNodes): finally: TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, dumpErrorDetails=dumpErrorDetails) - if not testSuccessful: - Print(Utils.FileDivider) - Print("Compare Blocklog") - cluster.compareBlockLogs() - Print(Utils.FileDivider) - Print("Compare Blocklog") - cluster.printBlockLog() - Print(Utils.FileDivider) +# Too much output for ci/cd +# if not testSuccessful: +# Print(Utils.FileDivider) +# Print("Compare Blocklog") +# cluster.compareBlockLogs() +# Print(Utils.FileDivider) +# Print("Print Blocklog") +# cluster.printBlockLog() +# Print(Utils.FileDivider) exitCode = 0 if testSuccessful else 1 exit(exitCode)