Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #507 from EOSIO/p2p-liveness-checking-395
Browse files Browse the repository at this point in the history
P2p liveness checking #395
  • Loading branch information
bytemaster authored Sep 26, 2017
2 parents b506836 + 8b79606 commit 1dd3f04
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 1 deletion.
15 changes: 15 additions & 0 deletions plugins/net_plugin/include/eos/net_plugin/protocol.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include <eos/chain/block.hpp>
#include <eos/chain/types.hpp>
#include <chrono>

namespace eos {
using namespace chain;
Expand All @@ -19,6 +20,18 @@ namespace eos {
string agent;
};

typedef std::chrono::system_clock::duration::rep tstamp;
typedef int32_t tdist;

static_assert(sizeof(std::chrono::system_clock::duration::rep) >= 8, "system_clock is expected to be at least 64 bits");

struct time_message {
tstamp org; //!< origin timestamp
tstamp rec; //!< receive timestamp
tstamp xmt; //!< transmit timestamp
mutable tstamp dst; //!< destination timestamp
};

struct notice_message {
vector<transaction_id_type> known_trx;
vector<block_id_type> known_blocks;
Expand All @@ -41,6 +54,7 @@ namespace eos {
};

using net_message = static_variant<handshake_message,
time_message,
notice_message,
request_message,
sync_request_message,
Expand All @@ -58,6 +72,7 @@ FC_REFLECT( eos::handshake_message,
(head_num)(head_id)
(os)(agent) )

FC_REFLECT( eos::time_message, (org)(rec)(xmt)(dst) )
FC_REFLECT( eos::block_summary_message, (block)(trx_ids) )
FC_REFLECT( eos::notice_message, (known_trx)(known_blocks) )
FC_REFLECT( eos::request_message, (req_trx)(req_blocks) )
Expand Down
144 changes: 143 additions & 1 deletion plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,11 @@ namespace eos {

unique_ptr<boost::asio::steady_timer> connector_check;
unique_ptr<boost::asio::steady_timer> transaction_check;
unique_ptr<boost::asio::steady_timer> keepalive_timer;
boost::asio::steady_timer::duration connector_period;
boost::asio::steady_timer::duration txn_exp_period;
boost::asio::steady_timer::duration resp_expected_period;
boost::asio::steady_timer::duration keepalive_interval{std::chrono::seconds{32}};

int16_t network_version;
chain_id_type chain_id;
Expand Down Expand Up @@ -137,6 +139,21 @@ namespace eos {
void broadcast_block_impl( const signed_block &sb);

void handle_message( connection_ptr c, const handshake_message &msg);
/** \name Peer Timestamps
* Time message handling
* @{
*/
/** \brief Process time_message
*
* Calculate offset, delay and dispersion. Note carefully the
* implied processing. The first-order difference is done
* directly in 64-bit arithmetic, then the result is converted
* to floating double. All further processing is in
* floating-double arithmetic with rounding done by the hardware.
* This is necessary in order to avoid overflow and preserve precision.
*/
void handle_message( connection_ptr c, const time_message &msg);
/** @} */
void handle_message( connection_ptr c, const notice_message &msg);
void handle_message( connection_ptr c, const request_message &msg);
void handle_message( connection_ptr c, const sync_request_message &msg);
Expand All @@ -150,6 +167,14 @@ namespace eos {

void expire_txns( );
void connection_monitor( );
/** \name Peer Timestamps
* Time message handling
* @{
*/
/** \brief Peer heartbeat ticker.
*/
void ticker();
/** @} */
};

static net_plugin_impl *my_impl;
Expand Down Expand Up @@ -262,12 +287,55 @@ namespace eos {
string peer_addr;
unique_ptr<boost::asio::steady_timer> response_expected;

/** \name Peer Timestamps
* Time message handling
* @{
*/
// Members set from network data
tstamp org{0}; //!< originate timestamp
tstamp rec{0}; //!< receive timestamp
tstamp dst{0}; //!< destination timestamp
tstamp xmt{0}; //!< transmit timestamp

// Computed data
double offset{0}; //!< peer offset

static const size_t ts_buffer_size{32};
char ts[ts_buffer_size]; //!< working buffer for making human readable timestamps
/** @} */

bool connected ();
bool current ();
void reset ();
void close ();
void send_handshake ();

/** \name Peer Timestamps
* Time message handling
*/
/** @{ */
/** \brief Convert an std::chrono nanosecond rep to a human readable string
*/
char* convert_tstamp(const tstamp& t);
/** \brief Populate and queue time_message
*/
void send_time();
/** \brief Populate and queue time_message immediately using incoming time_message
*/
void send_time(const time_message& msg);
/** \brief Read system time and convert to a 64 bit integer.
*
* There are only two calls on this routine in the program. One
* when a packet arrives from the network and the other when a
* packet is placed on the send queue. Calls the kernel time of
* day routine and converts to a (at least) 64 bit integer.
*/
tstamp get_time()
{
return std::chrono::system_clock::now().time_since_epoch().count();
}
/** @} */

void enqueue( const net_message &msg );
void enqueue_sync_block ();

Expand Down Expand Up @@ -391,6 +459,32 @@ namespace eos {
enqueue (hello);
}

char* connection::convert_tstamp(const tstamp& t)
{
const long NsecPerSec{1000000000};
time_t seconds = t / NsecPerSec;
strftime(ts, ts_buffer_size, "%F %T", localtime(&seconds));
snprintf(ts+19, ts_buffer_size-19, ".%ld", t % NsecPerSec);
return ts;
}

void connection::send_time () {
time_message xpkt;
xpkt.org = rec;
xpkt.rec = dst;
xpkt.xmt = get_time();
org = xpkt.xmt;
enqueue(xpkt);
}

void connection::send_time (const time_message& msg) {
time_message xpkt;
xpkt.org = msg.xmt;
xpkt.rec = msg.dst;
xpkt.xmt = get_time();
enqueue(xpkt);
}

void connection::enqueue( const net_message &m ) {
out_queue.push_back( m );
if( out_queue.size() == 1 ) {
Expand Down Expand Up @@ -805,6 +899,36 @@ namespace eos {
c->last_handshake = msg;
}

void net_plugin_impl::handle_message (connection_ptr c, const time_message &msg) {
/* We've already lost however many microseconds it took to dispatch
* the message, but it can't be helped.
*/
msg.dst = c->get_time();

// If the transmit timestamp is zero, the peer is horribly broken.
if(msg.xmt == 0)
return; /* invalid timestamp */

if(msg.xmt == c->xmt)
return; /* duplicate packet */

c->xmt = msg.xmt;
c->rec = msg.rec;
c->dst = msg.dst;

if(msg.org == 0)
{
c->send_time(msg);
return; // We don't have enough data to perform the calculation yet.
}

c->offset = (double(c->rec - c->org) + double(msg.xmt - c->dst)) / 2;
double NsecPerUsec{1000};
dlog("Clock offset is ${o}ns (${us}us)", ("o", c->offset)("us", c->offset/NsecPerUsec));
c->org = 0;
c->rec = 0;
}

void net_plugin_impl::handle_message (connection_ptr c, const notice_message &msg) {
//peer tells us about one or more blocks or txns. We need to forward only those
//we don't already know about. and for each peer note that it knows
Expand Down Expand Up @@ -843,7 +967,7 @@ namespace eos {
try {
send_now.push_back(cc.get_recent_transaction(t));
} catch (...) {
elog( "failed to retieve transaction");
elog( "failed to retrieve transaction");
}
}
else {
Expand Down Expand Up @@ -1104,6 +1228,21 @@ namespace eos {
});
}

void net_plugin_impl::ticker () {
keepalive_timer->expires_from_now (keepalive_interval);
keepalive_timer->async_wait ([&](boost::system::error_code ec) {
ticker ();
if (ec) {
wlog ("Peer keepalive ticked sooner than expected: ${m}", ("m", ec.message()));
}
for (auto &c : connections ) {
if (c->socket->is_open()) {
c->send_time();
}
}
});
}

void net_plugin_impl::start_monitors () {
connector_check.reset(new boost::asio::steady_timer (app().get_io_service()));
transaction_check.reset(new boost::asio::steady_timer (app().get_io_service()));
Expand Down Expand Up @@ -1360,6 +1499,9 @@ namespace eos {
my->chain_plug = app().find_plugin<chain_plugin>();
my->chain_plug->get_chain_id(my->chain_id);
fc::rand_pseudo_bytes(my->node_id.data(), my->node_id.data_size());

my->keepalive_timer.reset(new boost::asio::steady_timer (app().get_io_service()));
my->ticker();
}

void net_plugin::plugin_startup() {
Expand Down

0 comments on commit 1dd3f04

Please sign in to comment.