diff --git a/plugins/net_plugin/include/eos/net_plugin/protocol.hpp b/plugins/net_plugin/include/eos/net_plugin/protocol.hpp index b3942fc3fc3..15f1c0623ff 100644 --- a/plugins/net_plugin/include/eos/net_plugin/protocol.hpp +++ b/plugins/net_plugin/include/eos/net_plugin/protocol.hpp @@ -6,10 +6,12 @@ namespace eos { using namespace chain; using namespace fc; + using node_id_type = fc::sha256; + struct handshake_message { int16_t network_version = 0; chain_id_type chain_id; ///< used to identify chain - fc::sha256 node_id; ///< used to identify peers and prevent self-connect + node_id_type node_id; ///< used to identify peers and prevent self-connect string p2p_address; uint32_t last_irreversible_block_num = 0; block_id_type last_irreversible_block_id; @@ -22,6 +24,7 @@ namespace eos { struct notice_message { vector known_trx; vector known_blocks; + vector known_to; }; @@ -33,6 +36,8 @@ namespace eos { struct block_summary_message { signed_block block; vector trx_ids; + vector known_to; + }; struct sync_request_message { @@ -41,7 +46,7 @@ namespace eos { }; struct peer_message { - vector peers; + vector peers; }; using net_message = static_variant; + using forward_message = static_variant; + } // namespace eos @@ -64,7 +73,7 @@ FC_REFLECT( eos::handshake_message, (os)(agent) ) FC_REFLECT( eos::block_summary_message, (block)(trx_ids) ) -FC_REFLECT( eos::notice_message, (known_trx)(known_blocks) ) +FC_REFLECT( eos::notice_message, (known_trx)(known_blocks)(known_to) ) FC_REFLECT( eos::request_message, (req_trx)(req_blocks) ) FC_REFLECT( eos::sync_request_message, (start_block)(end_block) ) FC_REFLECT( eos::peer_message, (peers) ) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index bc2cddc3f9f..f91b442a12a 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -126,8 +126,8 @@ namespace eos { sync_request_index in_sync_state; sync_request_index out_sync_state; socket_ptr socket; - std::set shared_peers; - + std::set shared_peers; + node_id_type peer_id; uint32_t pending_message_size; vector pending_message_buffer; @@ -220,6 +220,8 @@ namespace eos { } }; + + class net_plugin_impl { public: unique_ptr acceptor; @@ -231,13 +233,16 @@ namespace eos { std::set resolved_seed_nodes; std::set learned_nodes; + // cache of blocks received out of order due to parallel sync requests + std::set pending_sockets; - std::set< connection_ptr > connections; + std::set unknown_connections; + std::map connections; bool done = false; int16_t network_version = 0; chain_id_type chain_id; ///< used to identify chain - fc::sha256 node_id; ///< used to identify peers and prevent self-connect + node_id_type node_id; ///< used to identify peers and prevent self-connect std::string user_agent_name; chain_plugin* chain_plug; @@ -309,7 +314,7 @@ namespace eos { ilog( "starting network loop" ); while( !done ) { for( auto itr = connections.begin(); itr != connections.end(); ) { - auto con = *itr; + auto con = *itr.second(); if( !con->socket->is_open() ) { close(con); itr = connections.begin(); @@ -324,7 +329,7 @@ namespace eos { void start_session( connection_ptr con ) { - connections.insert (con); + unknown_connections.insert (con); uint32_t mtu = 1300; // need a way to query this if (mtu < just_send_it_max) { just_send_it_max = mtu; @@ -332,7 +337,6 @@ namespace eos { start_read_message( con ); con->send_handshake(); - send_peer_message(*con); // for now, we can just use the application main loop. // con->readloop_complete = bf::async( [=](){ read_loop( con ); } ); @@ -392,9 +396,8 @@ namespace eos { peer_message pm; pm.peers.resize(connections.size()); for (auto &c : connections) { - fc::ip::endpoint remote = asio_to_fc(c->socket->remote_endpoint()); - if (conn.shared_peers.find(remote) == conn.shared_peers.end()) { - pm.peers.push_back(remote); + if (conn.shared_peers.find(c.first) == conn.shared_peers.end()) { + pm.peers.push_back(c.first); } } if (!pm.peers.empty()) { @@ -405,41 +408,49 @@ namespace eos { // template void send_all (const SignedTransaction &msg) { for (auto &c : connections) { - ilog ("send_all bsm: peer in_sync ${insiz} out_sync ${outsiz}", ("insiz",c->in_sync_state.size())("outsiz",c->out_sync_state.size())); - if (c->out_sync_state.size() == 0) { - const auto& bs = c->trx_state.find(msg.id()); - if (bs == c->trx_state.end()) { - c->trx_state.insert((transaction_state){msg.id(),true,true,(uint32_t)-1, + if (c.second->out_sync_state.size() == 0) { + const auto& bs = c.second->trx_state.find(msg.id()); + if (bs == c.second->trx_state.end()) { + c.second->trx_state.insert((transaction_state){msg.id(),true,true,(uint32_t)-1, fc::time_point(),fc::time_point()}); } - c->send(msg); + c.second->send(msg); } } } void send_all (const block_summary_message &msg) { for (auto &c : connections) { - ilog ("send_all bsm: peer in_sync ${insiz} out_sync ${outsiz}", ("insiz",c->in_sync_state.size())("outsiz",c->out_sync_state.size())); - const auto& bs = c->block_state.find(msg.block.id()); - if (bs == c->block_state.end()) { - c->block_state.insert ((block_state){msg.block.id(),true,true,fc::time_point()}); - if (c->out_sync_state.size() == 0) - c->send(msg); + ilog ("send_all bsm: peer in_sync ${insiz} out_sync ${outsiz}", ("insiz",c.second->in_sync_state.size())("outsiz",c.second->out_sync_state.size())); + const auto& bs = c.second->block_state.find(msg.block.id()); + if (bs == c.second->block_state.end()) { + c.second->block_state.insert ((block_state){msg.block.id(),true,true,fc::time_point()}); + if (c.second->out_sync_state.size() == 0) + c.second->send(msg); } } } void send_all (const notice_message &msg) { for (auto &c : connections) { - ilog ("send_all nm: peer in_sync ${insiz} out_sync ${outsiz}", ("insiz",c->in_sync_state.size())("outsiz",c->out_sync_state.size())); - if (c->out_sync_state.size() == 0) { + bool skip = true; + if (c.second->out_sync_state.size() == 0) { + skip = false; + for (const auto& f : msg.known_to) { + if (f == c.first) { + skip = true; + break; + } + } + } + if (!skip) { for (const auto& b : msg.known_blocks) { - const auto& bs = c->block_state.find(b); - if (bs == c->block_state.end()) { - c->block_state.insert ((block_state){b,false,true,fc::time_point()}); + const auto& bs = c.second->block_state.find(b); + if (bs == c.second->block_state.end()) { + c.second->block_state.insert ((block_state){b,false,true,fc::time_point()}); } } - c->send(msg); + c.second->send(msg); } } } @@ -456,17 +467,17 @@ namespace eos { span = lastSpan; } sync_state req = {low+1, low+span, low, time_point::now() }; - cx->in_sync_state.insert (req); + cx.second->in_sync_state.insert (req); sync_request_message srm = {req.start_block, req.end_block }; - cx->send (srm); + cx.second->send (srm); low += span; } } void forward (connection_ptr source, const net_message &msg) { for (auto c : connections ) { - if (c != source) { - c->send (msg); + if (c.second != source) { + c.second->send (msg); } } } @@ -488,6 +499,10 @@ namespace eos { close (c); return; } + unknown_connections.erase(c); + c->peer_id = msg.node_id; + connections.insert(std::pair(msg.node_id, c)); + send_peer_message (*c); chain_controller& cc = chain_plug->chain(); uint32_t head = cc.head_block_num (); @@ -499,22 +514,21 @@ namespace eos { void handle_message (connection_ptr c, const peer_message &msg) { dlog ("got a peer message with ${pc}", ("pc", msg.peers.size())); - for (auto fcep : msg.peers) { - c->shared_peers.insert (fcep); - tcp::endpoint ep = fc_to_asio (fcep); - if (ep == listen_endpoint) { + for (auto id : msg.peers) { + c->shared_peers.insert (id); + if (id == node_id) { continue; } - +#if 0 if (resolved_seed_nodes.find(ep) == resolved_seed_nodes.end() && learned_nodes.find (fcep) == learned_nodes.end()) { learned_nodes.insert (fcep); } +#endif } } void handle_message (connection_ptr c, const notice_message &msg) { - dlog ("got a notice message"); notice_message fwd; request_message req; for (const auto& b : msg.known_blocks) { @@ -536,27 +550,46 @@ namespace eos { } } if (fwd.known_blocks.size() > 0 || fwd.known_trx.size() > 0) { + fwd.known_to = msg.known_to; + fwd.known_to.push_back(node_id); forward (c, fwd); c->send(req); } } void handle_message (connection_ptr c, const request_message &msg) { - dlog ("got a request_message"); #warning ("TODO: implement handling a request_message") + chain_controller &cc = chain_plug->chain(); + for (const auto& b : msg.req_blocks) { + optional blk = cc.fetch_block_by_id(b); + if (blk) { + c->send(*blk); + c->block_state.insert((block_state){b,true,true,fc::time_point()}); + } + } + + for (const auto& t : msg.req_trx) { + try { + const SignedTransaction &trx = cc.get_recent_transaction (t); + c->send(trx); + c->trx_state.insert((transaction_state){t,true,true,(uint32_t)-1, + fc::time_point(),fc::time_point()}); + } catch (const assert_exception &ex) { + // received + elog (" caught assertion #${n}",("n",t)); + // close (c); + } + } } void handle_message (connection_ptr c, const sync_request_message &msg) { - dlog ("got a sync request message for blocks ${s} to ${e}", - ("s",msg.start_block)("e", msg.end_block)); sync_state req = {msg.start_block,msg.end_block,msg.start_block-1,time_point::now()}; c->out_sync_state.insert (req); c->write_block_backlog (); } void handle_message (connection_ptr c, const block_summary_message &msg) { - dlog ("got a block summary message blkid = ${b}", ("b",msg.block.id())); #warning ("TODO: reconstruct actual block from cached transactions") const auto& itr = c->block_state.get(); auto bs = itr.find(msg.block.id()); @@ -572,6 +605,9 @@ namespace eos { c->block_state.insert (std::move(value)); forward (c, msg); } + else { + dlog ("not forwarding known block"); + } } chain_controller &cc = chain_plug->chain(); if (!cc.is_known_block(msg.block.id()) ) { @@ -590,7 +626,6 @@ namespace eos { } void handle_message (connection_ptr c, const SignedTransaction &msg) { - dlog ("got a SignedTransacton"); chain_controller &cc = chain_plug->chain(); if (!cc.is_known_transaction(msg.id())) { last_recd_txn_guard tls_guard(msg.id()); @@ -598,15 +633,16 @@ namespace eos { chain_plug->accept_transaction (msg); forward (c, msg); } + else { + dlog ("ignoring known SignedTransacton"); + } } void handle_message (connection_ptr c, const signed_block &msg) { - uint32_t bn = msg.block_num(); - dlog ("got a signed_block, num = ${n}", ("n", bn)); chain_controller &cc = chain_plug->chain(); if (cc.is_known_block(msg.id())) { - // dlog ("block id ${id} is known", ("id", msg.id()) ); + dlog ("ignoring known block"); return; } uint32_t num = 0; @@ -629,7 +665,7 @@ namespace eos { elog ("unable to accpt block #${n}",("n",num)); close (c); } catch (const assert_exception &ex) { - elog ("unable to accept block cuz my asserts! #${n}",("n",num)); + elog ("unable to accept block on assertion #${n}",("n",num)); close (c); } } @@ -643,6 +679,7 @@ namespace eos { template void operator()(const T &msg) const { + idump((msg)); impl.handle_message (c, msg); } }; @@ -677,7 +714,7 @@ namespace eos { void close( connection_ptr c ) { if( c->socket ) c->socket->close(); - connections.erase( c ); + connections.erase( c->peer_id ); c.reset (); } @@ -847,8 +884,8 @@ namespace eos { ilog( "close ${s} connections", ("s",my->connections.size()) ); auto cons = my->connections; for( auto con : cons ) { - con->socket->close(); - my->close (con); + con.second->socket->close(); + my->close (con.second); } idump((my->connections.size())); @@ -858,10 +895,12 @@ namespace eos { } FC_CAPTURE_AND_RETHROW() } void net_plugin::broadcast_transaction (const SignedTransaction &txn) { + wdump( (txn.id()) ); my->pending_txn (txn); } void net_plugin::broadcast_block (const chain::signed_block &sb) { + wdump( (sb.id()) ); vector trxs; if (!sb.cycles.empty()) { for (const auto& cyc : sb.cycles) { @@ -875,7 +914,7 @@ namespace eos { vector blks; blks.push_back (sb.id()); - notice_message nm = {blks, my->pending_notify}; + notice_message nm = {my->pending_notify, blks}; my->send_all (nm); block_summary_message bsm = {sb, trxs}; diff --git a/tests/p2p_tests/init/run_test.pl b/tests/p2p_tests/init/run_test.pl index 2a63b3dea6d..0f173595fa1 100755 --- a/tests/p2p_tests/init/run_test.pl +++ b/tests/p2p_tests/init/run_test.pl @@ -19,28 +19,31 @@ my $genesis = "$eos_home/genesis.json"; my $http_port_base = 8888; my $p2p_port_base = 9876; -my $data_dir_base = "ttdn"; +my $data_dir_base = "tdn"; my $hostname = "127.0.0.1"; -my $first_pause = 45; -my $launch_pause = 5; +my $first_pause = 0; +my $launch_pause = 0; my $run_duration = 60; my $topo = "ring"; -my $override_gts; # = "now"; +my $override_gts = ""; +my $no_delay=0; if (!GetOptions("nodes=i" => \$nodes, "first-pause=i" => \$first_pause, "launch-pause=i" => \$launch_pause, "duration=i" => \$run_duration, "topo=s" => \$topo, + "time-stamp=s" => \$override_gts, "pnodes=i" => \$pnodes)) { - print "usage: $ARGV[0] [--nodes=] [--pnodes=] [--topo=] [--first-pause=] [--launch-pause=] [--duration=]\n"; + print "usage: $ARGV[0] [--nodes=] [--pnodes=] [--topo=] [--first-pause=] [--launch-pause=] [--duration=] [--time-stamp=