Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #8285 from EOSIO/net-plugin-handshake-2.0
Browse files Browse the repository at this point in the history
Net plugin handshake - 2.0
  • Loading branch information
heifner authored Dec 9, 2019
2 parents ca3ec60 + ecde55b commit 05b48ff
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 36 deletions.
80 changes: 44 additions & 36 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,10 +667,10 @@ namespace eosio {
void blk_send(const block_id_type& blkid);
void stop_send();

void enqueue( const net_message &msg, bool trigger_send = true );
void enqueue_block( const signed_block_ptr& sb, bool trigger_send = true, bool to_sync_queue = false);
void enqueue( const net_message &msg );
void enqueue_block( const signed_block_ptr& sb, bool to_sync_queue = false);
void enqueue_buffer( const std::shared_ptr<std::vector<char>>& send_buffer,
bool trigger_send, int priority, go_away_reason close_after_send,
go_away_reason close_after_send,
bool to_sync_queue = false);
void cancel_sync(go_away_reason);
void flush_queues();
Expand All @@ -684,11 +684,9 @@ namespace eosio {
void fetch_timeout(boost::system::error_code ec);

void queue_write(const std::shared_ptr<vector<char>>& buff,
bool trigger_send,
int priority,
std::function<void(boost::system::error_code, std::size_t)> callback,
bool to_sync_queue = false);
void do_queue_write(int priority);
void do_queue_write();

static bool is_valid( const handshake_message& msg );

Expand Down Expand Up @@ -1085,8 +1083,6 @@ namespace eosio {
}

void connection::queue_write(const std::shared_ptr<vector<char>>& buff,
bool trigger_send,
int priority,
std::function<void(boost::system::error_code, std::size_t)> callback,
bool to_sync_queue) {
if( !buffer_queue.add_write_queue( buff, callback, to_sync_queue )) {
Expand All @@ -1095,27 +1091,28 @@ namespace eosio {
close();
return;
}
if( buffer_queue.is_out_queue_empty() && trigger_send) {
do_queue_write( priority );
}
do_queue_write();
}

void connection::do_queue_write(int priority) {
void connection::do_queue_write() {
if( !buffer_queue.ready_to_send() )
return;
connection_ptr c(shared_from_this());

std::vector<boost::asio::const_buffer> bufs;
buffer_queue.fill_out_buffer( bufs );

strand.dispatch( [c{std::move(c)}, bufs{std::move(bufs)}, priority]() {
strand.dispatch( [c{std::move(c)}, bufs{std::move(bufs)}]() {
boost::asio::async_write( *c->socket, bufs,
boost::asio::bind_executor( c->strand, [c, socket=c->socket, priority]( boost::system::error_code ec, std::size_t w ) {
boost::asio::bind_executor( c->strand, [c, socket=c->socket]( boost::system::error_code ec, std::size_t w ) {
try {
c->buffer_queue.clear_out_queue();
// May have closed connection and cleared buffer_queue
if( !c->socket_is_open() ) return;

c->buffer_queue.out_callback( ec, w );
if( !c->socket_is_open() || socket != c->socket ) {
fc_ilog( logger, "async write socket ${r} before callback: ${p}",
("r", c->socket_is_open() ? "changed" : "closed")("p", c->peer_name()) );
return;
}

if( ec ) {
if( ec.value() != boost::asio::error::eof ) {
Expand All @@ -1126,9 +1123,11 @@ namespace eosio {
c->close();
return;
}
c->buffer_queue.clear_out_queue();

c->buffer_queue.out_callback( ec, w );

c->enqueue_sync_block();
c->do_queue_write( priority );
c->do_queue_write();
} catch( const std::exception& ex ) {
fc_elog( logger, "Exception in do_queue_write to ${p} ${s}", ("p", c->peer_name())( "s", ex.what() ) );
} catch( const fc::exception& ex ) {
Expand Down Expand Up @@ -1165,32 +1164,32 @@ namespace eosio {
fc_dlog( logger, "enqueue sync block ${num}", ("num", peer_requested->last + 1) );
}
uint32_t num = ++peer_requested->last;
bool trigger_send = true;
if(num == peer_requested->end_block) {
peer_requested.reset();
fc_ilog( logger, "completing enqueue_sync_block ${num} to ${p}", ("num", num)("p", peer_name()) );
}
connection_wptr weak = shared_from_this();
app().post( priority::medium, [num, trigger_send, weak{std::move(weak)}]() {
app().post( priority::medium, [num, weak{std::move(weak)}]() {
connection_ptr c = weak.lock();
if( !c ) return;
controller& cc = my_impl->chain_plug->chain();
signed_block_ptr sb = cc.fetch_block_by_number( num );
if( sb ) {
c->strand.post( [c, sb{std::move(sb)}, trigger_send]() {
c->enqueue_block( sb, trigger_send, true );
c->strand.post( [c, sb{std::move(sb)}]() {
c->enqueue_block( sb, true );
});
} else {
c->strand.post( [c, num]() {
peer_ilog( c, "enqueue sync, unable to fetch block ${num}", ("num", num) );
c->send_handshake();
});
}
});

return true;
}

void connection::enqueue( const net_message& m, bool trigger_send ) {
void connection::enqueue( const net_message& m ) {
verify_strand_in_this_thread( strand, __func__, __LINE__ );
go_away_reason close_after_send = no_reason;
if (m.contains<go_away_message>()) {
Expand All @@ -1209,7 +1208,7 @@ namespace eosio {
ds.write( header, header_size );
fc::raw::pack( ds, m );

enqueue_buffer( send_buffer, trigger_send, priority::low, close_after_send );
enqueue_buffer( send_buffer, close_after_send );
}

template< typename T>
Expand Down Expand Up @@ -1245,19 +1244,20 @@ namespace eosio {
return create_send_buffer( packed_transaction_which, trx );
}

void connection::enqueue_block( const signed_block_ptr& sb, bool trigger_send, bool to_sync_queue) {
void connection::enqueue_block( const signed_block_ptr& sb, bool to_sync_queue) {
fc_dlog( logger, "enqueue block ${num}", ("num", sb->block_num()) );
verify_strand_in_this_thread( strand, __func__, __LINE__ );
enqueue_buffer( create_send_buffer( sb ), trigger_send, priority::medium, no_reason, to_sync_queue);
enqueue_buffer( create_send_buffer( sb ), no_reason, to_sync_queue);
}

void connection::enqueue_buffer( const std::shared_ptr<std::vector<char>>& send_buffer,
bool trigger_send, int priority, go_away_reason close_after_send,
go_away_reason close_after_send,
bool to_sync_queue)
{
connection_ptr self = shared_from_this();
queue_write(send_buffer,trigger_send, priority,
[conn{std::move(self)}, close_after_send](boost::system::error_code ec, std::size_t ) {
queue_write(send_buffer,
[conn{std::move(self)}, close_after_send](boost::system::error_code ec, std::size_t ) {
if (ec) return;
if (close_after_send != no_reason) {
fc_ilog( logger, "sent a go away message: ${r}, closing connection to ${p}",
("r", reason_str(close_after_send))("p", conn->peer_name()) );
Expand Down Expand Up @@ -1787,6 +1787,7 @@ namespace eosio {
set_state( head_catchup );
} else {
set_state( in_sync );
send_handshakes();
}
} else if( state == lib_catchup ) {
if( blk_num == sync_known_lib_num ) {
Expand Down Expand Up @@ -1959,7 +1960,7 @@ namespace eosio {
return;
}
fc_dlog( logger, "bcast block ${b} to ${p}", ("b", bnum)("p", cp->peer_name()) );
cp->enqueue_buffer( send_buffer, true, priority::medium, no_reason );
cp->enqueue_buffer( send_buffer, no_reason );
}
});
return true;
Expand Down Expand Up @@ -2034,7 +2035,7 @@ namespace eosio {

cp->strand.post( [cp, send_buffer]() {
fc_dlog( logger, "sending trx to ${n}", ("n", cp->peer_name()) );
cp->enqueue_buffer( send_buffer, true, priority::low, no_reason );
cp->enqueue_buffer( send_buffer, no_reason );
} );
return true;
} );
Expand Down Expand Up @@ -2199,10 +2200,11 @@ namespace eosio {
}
connecting = true;
pending_message_buffer.reset();
buffer_queue.clear_out_queue();
boost::asio::async_connect( *socket, endpoints,
boost::asio::bind_executor( strand,
[resolver, c = shared_from_this(), socket=socket]( const boost::system::error_code& err, const tcp::endpoint& endpoint ) {
if( !err && socket->is_open() ) {
if( !err && socket->is_open() && socket == c->socket ) {
if( c->start_session() ) {
c->send_handshake();
}
Expand Down Expand Up @@ -2314,7 +2316,7 @@ namespace eosio {
boost::asio::bind_executor( strand,
[conn = shared_from_this(), socket=socket]( boost::system::error_code ec, std::size_t bytes_transferred ) {
// may have closed connection and cleared pending_message_buffer
if( !conn->socket_is_open() ) return;
if( !conn->socket_is_open() || socket != conn->socket ) return;

bool close_connection = false;
try {
Expand Down Expand Up @@ -2427,7 +2429,7 @@ namespace eosio {
std::unique_lock<std::mutex> g( conn_mtx );
const auto last_sent_lib = last_handshake_sent.last_irreversible_block_num;
g.unlock();
if( !peer_requested && blk_num < last_sent_lib ) {
if( blk_num < last_sent_lib ) {
fc_ilog( logger, "received block ${n} less than sent lib ${lib}", ("n", blk_num)("lib", last_sent_lib) );
close();
} else {
Expand Down Expand Up @@ -2702,6 +2704,12 @@ namespace eosio {
("o", offset)( "us", offset / NsecPerUsec ) ) );
org = 0;
rec = 0;

std::unique_lock<std::mutex> g_conn( conn_mtx );
if( last_handshake_recv.generation == 0 ) {
g_conn.unlock();
send_handshake();
}
}

void connection::handle_message( const notice_message& msg ) {
Expand All @@ -2725,7 +2733,7 @@ namespace eosio {
break;
case last_irr_catch_up: {
std::unique_lock<std::mutex> g_conn( conn_mtx );
last_handshake_recv.head_num = msg.known_trx.pending;
last_handshake_recv.head_num = msg.known_blocks.pending;
g_conn.unlock();
break;
}
Expand Down
2 changes: 2 additions & 0 deletions tests/testUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import json
import shlex
import socket
from datetime import datetime
from sys import stdout
from sys import exit
import traceback
Expand Down Expand Up @@ -76,6 +77,7 @@ class Utils:
def Print(*args, **kwargs):
stackDepth=len(inspect.stack())-2
s=' '*stackDepth
stdout.write(datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%f "))
stdout.write(s)
print(*args, **kwargs)

Expand Down

0 comments on commit 05b48ff

Please sign in to comment.