Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P use round trip of time_message to calculate latency #1243

Merged
merged 24 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c776ac9
GH-1072 No reason to post if syncing from peer
heifner Jun 5, 2023
703f8eb
GH-1072 Use round trip time_message for calculation of latency.
heifner Jun 6, 2023
1341215
GH-1072 Fix for net_latency_ns not being calculated yet.
heifner Jun 6, 2023
cdd05a8
GH-1072 Do not print out complete blocklog on failure as it is way to…
heifner Jun 6, 2023
668d2a4
GH-1072 Update comments
heifner Jun 7, 2023
2fab062
GH-1072 Revert check for in sync as we need to always attempt a start…
heifner Jun 7, 2023
73644fd
GH-1072 Revert: No reason to post if syncing from peer.
heifner Jun 7, 2023
fed0e8a
Merge branch 'main' into GH-1072-latency
heifner Jun 7, 2023
1600c88
GH-1027 No need to sync or send handshake if already syncing
heifner Jun 7, 2023
fa5eeb7
GH-1027 Remove unneeded handshake
heifner Jun 7, 2023
91346b1
GH-1027 start_sync will determine if sync needed
heifner Jun 7, 2023
7032cda
GH-1027 Prevent infinite time_message loops by making sure only one i…
heifner Jun 8, 2023
ef1b486
GH-1027 No need to normalize msg.org
heifner Jun 8, 2023
438d125
GH-1027 Use constants for normalize
heifner Jun 8, 2023
5b34d53
GH-1027 If two time_message loops in progress, drop all but the expec…
heifner Jun 8, 2023
39df8b5
GH-1027 Rename net_latency_ns to peer_ping_time_ns
heifner Jun 8, 2023
9fe62d5
GH-1027 Reset sync_source when unable to request range
heifner Jun 8, 2023
aa25f11
GH-1027 Remove notice_message that is a no-op
heifner Jun 9, 2023
2c43180
GH-1027 Reduce handshake storm by only sending handshake if head chan…
heifner Jun 9, 2023
d74af81
GH-1027 Simplify time_message logic
heifner Jun 9, 2023
ea230ce
GH-1027 Simplify time_message logic
heifner Jun 9, 2023
c86ca47
GH-1027 catch_up message with head was being ignored. This was likely…
heifner Jun 9, 2023
b2e7857
GH-1027 Since we now sync ahead the sync_next_expected_num needs rese…
heifner Jun 9, 2023
84fa137
Merge branch 'main' into GH-1072-send-time-latency
heifner Jun 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 107 additions & 74 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -774,12 +774,12 @@ namespace eosio {
hb_timeout = dur.count();
}

uint64_t get_net_latency_ns() const { return net_latency_ns; }
uint64_t get_peer_ping_time_ns() const { return peer_ping_time_ns; }

private:
static const string unknown;

std::atomic<uint64_t> net_latency_ns = std::numeric_limits<uint64_t>::max();
std::atomic<uint64_t> peer_ping_time_ns = std::numeric_limits<uint64_t>::max();

std::optional<peer_sync_state> peer_requested; // this peer is requesting info from us

Expand Down Expand Up @@ -861,11 +861,11 @@ namespace eosio {
* Time message handling
* @{
*/
// Members set from network data
tstamp org{0}; //!< originate timestamp
tstamp rec{0}; //!< receive timestamp
tstamp dst{0}; //!< destination timestamp
tstamp xmt{0}; //!< transmit timestamp
// See NTP protocol. https://datatracker.ietf.org/doc/rfc5905/
tstamp org{0}; //!< origin timestamp. Time at the client when the request departed for the server.
// tstamp (not used) rec{0}; //!< receive timestamp. Time at the server when the request arrived from the client.
tstamp xmt{0}; //!< transmit timestamp, Time at the server when the response left for the client.
// tstamp (not used) dst{0}; //!< destination timestamp, Time at the client when the reply arrived from the server.
/** @} */
// timestamp for the lastest message
tstamp latest_msg_time{0};
Expand Down Expand Up @@ -926,7 +926,7 @@ namespace eosio {
* day routine and converts to a (at least) 64 bit integer.
*/
static tstamp get_time() {
return std::chrono::system_clock::now().time_since_epoch().count();
return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
}
/** @} */

Expand Down Expand Up @@ -985,7 +985,7 @@ namespace eosio {
void handle_message( packed_transaction_ptr trx );

// returns calculated number of blocks combined latency
uint32_t update_latency(const handshake_message& msg);
uint32_t calc_block_latency();

void process_signed_block( const block_id_type& id, signed_block_ptr block, block_state_ptr bsp );

Expand Down Expand Up @@ -1256,10 +1256,10 @@ namespace eosio {

// thread safe
bool connection::should_sync_from(uint32_t sync_next_expected_num, uint32_t sync_known_lib_num) const {
fc_dlog(logger, "id: ${id} blocks conn: ${t} current: ${c} socket_open: ${so} syncing from us: ${s} state: ${con} peer_start_block: ${sb} peer_head: ${h} latency: ${lat}us no_retry: ${g}",
fc_dlog(logger, "id: ${id} blocks conn: ${t} current: ${c} socket_open: ${so} syncing from us: ${s} state: ${con} peer_start_block: ${sb} peer_head: ${h} ping: ${p}us no_retry: ${g}",
("id", connection_id)("t", is_blocks_connection())
("c", current())("so", socket_is_open())("s", peer_syncing_from_us.load())("con", state_str(state()))
("sb", peer_start_block_num.load())("h", peer_head_block_num.load())("lat", get_net_latency_ns()/1000)("g", reason_str(no_retry)));
("sb", peer_start_block_num.load())("h", peer_head_block_num.load())("p", get_peer_ping_time_ns()/1000)("g", reason_str(no_retry)));
if (is_blocks_connection() && current()) {
if (no_retry == go_away_reason::no_reason) {
if (peer_start_block_num <= sync_next_expected_num) { // has blocks we want
Expand Down Expand Up @@ -1315,6 +1315,7 @@ namespace eosio {
peer_ilog( self, "closing" );
self->cancel_wait();
self->sync_last_requested_block = 0;
self->org = 0;
self->set_state(connection_state::closed);

if( reconnect && !shutdown ) {
Expand Down Expand Up @@ -1456,25 +1457,33 @@ namespace eosio {

}

org = 0;
send_time();
}

// called from connection strand
void connection::send_time() {
time_message xpkt;
xpkt.org = rec;
xpkt.rec = dst;
xpkt.xmt = get_time();
org = xpkt.xmt;
enqueue(xpkt);
if (org == 0) { // do not send if there is already a time loop in progress
org = get_time();
// xpkt.org == 0 means we are initiating a ping. Actual origin time is in xpkt.xmt.
time_message xpkt{
.org = 0,
.rec = 0,
.xmt = org,
.dst = 0 };
peer_dlog(this, "send init time_message: ${t}", ("t", xpkt));
enqueue(xpkt);
}
}

// called from connection strand
void connection::send_time(const time_message& msg) {
time_message xpkt;
xpkt.org = msg.xmt;
xpkt.rec = msg.dst;
xpkt.xmt = get_time();
time_message xpkt{
.org = msg.xmt,
.rec = msg.dst,
.xmt = get_time(),
.dst = 0 };
peer_dlog( this, "send time_message: ${t}, org: ${o}", ("t", xpkt)("o", org) );
enqueue(xpkt);
}

Expand Down Expand Up @@ -1884,7 +1893,7 @@ namespace eosio {
});
if (conns.size() > sync_peer_limit) {
std::partial_sort(conns.begin(), conns.begin() + sync_peer_limit, conns.end(), [](const connection_ptr& lhs, const connection_ptr& rhs) {
return lhs->get_net_latency_ns() < rhs->get_net_latency_ns();
return lhs->get_peer_ping_time_ns() < rhs->get_peer_ping_time_ns();
});
conns.resize(sync_peer_limit);
}
Expand All @@ -1908,8 +1917,8 @@ namespace eosio {
uint32_t lowest_ordinal = std::numeric_limits<uint32_t>::max();
for (size_t i = 0; i < conns.size() && lowest_ordinal != 0; ++i) {
uint32_t sync_ord = conns[i]->sync_ordinal;
fc_dlog(logger, "compare sync ords, conn: ${lcid}, ord: ${l} < ${r}, latency: ${lat}us",
("lcid", conns[i]->connection_id)("l", sync_ord)("r", lowest_ordinal)("lat", conns[i]->get_net_latency_ns()/1000));
fc_dlog(logger, "compare sync ords, conn: ${lcid}, ord: ${l} < ${r}, ping: ${p}us",
("lcid", conns[i]->connection_id)("l", sync_ord)("r", lowest_ordinal)("p", conns[i]->get_peer_ping_time_ns()/1000));
if (sync_ord < lowest_ordinal) {
the_one = i;
lowest_ordinal = sync_ord;
Expand Down Expand Up @@ -1977,6 +1986,7 @@ namespace eosio {
}
}
if( !request_sent ) {
sync_source.reset();
g_sync.unlock();
fc_wlog(logger, "Unable to request range, sending handshakes to everyone");
send_handshakes();
Expand Down Expand Up @@ -2012,7 +2022,6 @@ namespace eosio {
if( !is_sync_required( chain_info.head_num ) || target <= chain_info.lib_num ) {
peer_dlog( c, "We are already caught up, my irr = ${b}, head = ${h}, target = ${t}",
("b", chain_info.lib_num)( "h", chain_info.head_num )( "t", target ) );
c->send_handshake();
return;
}

Expand Down Expand Up @@ -2071,11 +2080,6 @@ namespace eosio {
peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 0, lib ${l}",
("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16))("l", chain_info.lib_num) );
c->peer_syncing_from_us = false;
notice_message note;
note.known_blocks.mode = none;
note.known_trx.mode = catch_up;
note.known_trx.pending = 0;
c->enqueue( note );
return;
}
if (chain_info.head_num < msg.last_irreversible_block_num) {
Expand Down Expand Up @@ -2225,9 +2229,7 @@ namespace eosio {
c->last_handshake_recv.last_irreversible_block_num = msg.known_trx.pending;
}
sync_reset_lib_num(c, false);
if (is_in_sync()) {
start_sync(c, msg.known_trx.pending);
}
start_sync(c, msg.known_trx.pending);
}
}

Expand Down Expand Up @@ -2628,6 +2630,7 @@ namespace eosio {
if( !err && socket->is_open() && socket == c->socket ) {
if( c->start_session() ) {
c->send_handshake();
c->send_time();
}
} else {
fc_elog( logger, "connection failed to ${a}, ${error}", ("a", c->peer_address())( "error", err.message()));
Expand Down Expand Up @@ -3230,28 +3233,22 @@ namespace eosio {
}
}

uint32_t nblk_combined_latency = update_latency(msg);
uint32_t nblk_combined_latency = calc_block_latency();
my_impl->sync_master->recv_handshake( shared_from_this(), msg, nblk_combined_latency );
}

// called from connection strand
uint32_t connection::update_latency(const handshake_message& msg) {
auto current_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
int64_t network_latency_ns = current_time_ns - msg.time; // net latency in nanoseconds
if( network_latency_ns < 0 ) {
peer_wlog(this, "Peer sent a handshake with a timestamp skewed by at least ${t}ms", ("t", network_latency_ns/1000000));
network_latency_ns = -network_latency_ns; // use absolute value because it might be this node with the skew
}
// number of blocks syncing node is behind from a peer node, round up
uint32_t nblk_behind_by_net_latency = std::lround( static_cast<double>(network_latency_ns) / static_cast<double>(block_interval_ns) );
// 2x for time it takes for message to reach back to peer node
uint32_t nblk_combined_latency = 2 * nblk_behind_by_net_latency;
// message in the log below is used in p2p_high_latency_test.py test
peer_dlog(this, "Network latency is ${lat}ms, ${num} blocks discrepancy by network latency, ${tot_num} blocks discrepancy expected once message received",
("lat", network_latency_ns/1000000)("num", nblk_behind_by_net_latency)("tot_num", nblk_combined_latency));

net_latency_ns = network_latency_ns;

uint32_t connection::calc_block_latency() {
uint32_t nblk_combined_latency = 0;
if (peer_ping_time_ns != std::numeric_limits<uint64_t>::max()) {
// number of blocks syncing node is behind from a peer node, round up
uint32_t nblk_behind_by_net_latency = std::lround(static_cast<double>(peer_ping_time_ns.load()) / static_cast<double>(block_interval_ns));
// peer_ping_time_ns includes time there and back, include round trip time as the block latency is used to compensate for communication back
nblk_combined_latency = nblk_behind_by_net_latency;
// message in the log below is used in p2p_high_latency_test.py test
peer_dlog(this, "Network latency is ${lat}ms, ${num} blocks discrepancy by network latency, ${tot_num} blocks discrepancy expected once message received",
("lat", peer_ping_time_ns / 2 / 1000000)("num", nblk_behind_by_net_latency)("tot_num", nblk_combined_latency));
}
return nblk_combined_latency;
}

Expand All @@ -3277,51 +3274,81 @@ namespace eosio {
close( retry ); // reconnect if wrong_version
}

void connection::handle_message( const time_message& msg ) {
peer_ilog( this, "received time_message" );
// some clients before leap 5.0 provided microsecond epoch instead of nanosecond epoch
tstamp normalize_epoch_to_ns(tstamp x) {
// 1686211688888 milliseconds - 2023-06-08T08:08:08.888, 5yrs from EOS genesis 2018-06-08T08:08:08.888
// 1686211688888000 microseconds
// 1686211688888000000 nanoseconds
if (x >= 1686211688888000000) // nanoseconds
return x;
if (x >= 1686211688888000) // microseconds
return x*1000;
if (x >= 1686211688888) // milliseconds
return x*1000*1000;
if (x >= 1686211688) // seconds
return x*1000*1000*1000;
return 0; // unknown or is zero
}

/* We've already lost however many microseconds it took to dispatch
* the message, but it can't be helped.
*/
msg.dst = get_time();
void connection::handle_message( const time_message& msg ) {
peer_dlog( this, "received time_message: ${t}, org: ${o}", ("t", msg)("o", org) );

// If the transmit timestamp is zero, the peer is horribly broken.
if(msg.xmt == 0)
return; /* invalid timestamp */
return; // invalid timestamp

if(msg.xmt == xmt)
return; /* duplicate packet */
// We've already lost however many microseconds it took to dispatch the message, but it can't be helped.
msg.dst = get_time();

xmt = msg.xmt;
rec = msg.rec;
dst = msg.dst;
if (msg.org != 0) {
if (msg.org == org) {
auto ping = msg.dst - msg.org;
peer_dlog(this, "send_time ping ${p}us", ("p", ping / 1000));
peer_ping_time_ns = ping;
} else {
// a diff time loop is in progress, ignore this message as it is not the one we want
return;
}
}

auto msg_xmt = normalize_epoch_to_ns(msg.xmt);
if (msg_xmt == xmt)
return; // duplicate packet

xmt = msg_xmt;

if( msg.org == 0 ) {
send_time( msg );
return; // We don't have enough data to perform the calculation yet.
}

double offset = (double(rec - org) + double(msg.xmt - dst)) / 2;
double NsecPerUsec{1000};
if (org != 0) {
auto rec = normalize_epoch_to_ns(msg.rec);
int64_t offset = (double(rec - org) + double(msg_xmt - msg.dst)) / 2.0;

if( logger.is_enabled( fc::log_level::all ) )
logger.log( FC_LOG_MESSAGE( all, "Clock offset is ${o}ns (${us}us)",
("o", offset)( "us", offset / NsecPerUsec ) ) );
if (std::abs(offset) > block_interval_ns) {
peer_wlog(this, "Clock offset is ${of}us, calculation: (rec ${r} - org ${o} + xmt ${x} - dst ${d})/2",
("of", offset / 1000)("r", rec)("o", org)("x", msg_xmt)("d", msg.dst));
}
}
org = 0;
rec = 0;

std::unique_lock<std::mutex> g_conn( conn_mtx );
if( last_handshake_recv.generation == 0 ) {
g_conn.unlock();
send_handshake();
}

// make sure we also get the latency we need
if (peer_ping_time_ns == std::numeric_limits<uint64_t>::max()) {
send_time();
}
}

void connection::handle_message( const notice_message& msg ) {
// peer tells us about one or more blocks or txns. When done syncing, forward on
// notices of previously unknown blocks or txns,
//
peer_dlog( this, "received notice_message" );
set_state(connection_state::connected);
if( msg.known_blocks.ids.size() > 2 ) {
peer_elog( this, "Invalid notice_message, known_blocks.ids.size ${s}, closing connection",
Expand Down Expand Up @@ -3683,9 +3710,10 @@ namespace eosio {
// called from application thread
void net_plugin_impl::on_accepted_block_header(const block_state_ptr& bs) {
update_chain_info();
dispatcher->strand.post( [bs]() {
fc_dlog( logger, "signaled accepted_block_header, blk num = ${num}, id = ${id}", ("num", bs->block_num)("id", bs->id) );
my_impl->dispatcher->bcast_block( bs->block, bs->id );

dispatcher->strand.post([bs]() {
fc_dlog(logger, "signaled accepted_block_header, blk num = ${num}, id = ${id}", ("num", bs->block_num)("id", bs->id));
my_impl->dispatcher->bcast_block(bs->block, bs->id);
});
}

Expand Down Expand Up @@ -3779,8 +3807,13 @@ namespace eosio {
// call from connection strand
bool connection::populate_handshake( handshake_message& hello ) const {
namespace sc = std::chrono;
hello.network_version = net_version_base + net_version;
auto chain_info = my_impl->get_chain_info();
auto now = sc::duration_cast<sc::nanoseconds>(sc::system_clock::now().time_since_epoch()).count();
constexpr int64_t hs_delay = sc::duration_cast<sc::nanoseconds>(sc::milliseconds(50)).count();
// nothing as changed since last handshake and one was sent recently, so skip sending
if (chain_info.head_id == hello.head_id && (hello.time + hs_delay > now))
return false;
hello.network_version = net_version_base + net_version;
hello.last_irreversible_block_num = chain_info.lib_num;
hello.last_irreversible_block_id = chain_info.lib_id;
hello.head_num = chain_info.head_num;
Expand Down
17 changes: 9 additions & 8 deletions tests/nodeos_forked_chain_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,14 +559,15 @@ def getBlock(self, blockNum):
finally:
TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, dumpErrorDetails=dumpErrorDetails)

if not testSuccessful:
Print(Utils.FileDivider)
Print("Compare Blocklog")
cluster.compareBlockLogs()
Print(Utils.FileDivider)
Print("Print Blocklog")
cluster.printBlockLog()
Print(Utils.FileDivider)
# Too much output for ci/cd
# if not testSuccessful:
# Print(Utils.FileDivider)
# Print("Compare Blocklog")
# cluster.compareBlockLogs()
# Print(Utils.FileDivider)
# Print("Print Blocklog")
# cluster.printBlockLog()
# Print(Utils.FileDivider)

exitCode = 0 if testSuccessful else 1
exit(exitCode)
Loading