Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #6735 from EOSIO/net-msg-opt
Browse files Browse the repository at this point in the history
net_plugin more quickly determine known block
  • Loading branch information
heifner authored Feb 15, 2019
2 parents f7d01e7 + 1dba360 commit 7fc6b87
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 66 deletions.
2 changes: 1 addition & 1 deletion libraries/fc
148 changes: 83 additions & 65 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,16 @@ namespace eosio {
void start_listen_loop();
void start_read_message(const connection_ptr& c);

/** \brief Process the next message from the pending message buffer
*
* Process the next message from the pending_message_buffer.
* message_length is the already determined length of the data
* part of the message that will handle the message.
* Returns true is successful. Returns false if an error was
* encountered unpacking or processing the message.
*/
bool process_next_message(const connection_ptr& conn, uint32_t message_length);

void close(const connection_ptr& c);
size_t count_open_sockets() const;

Expand Down Expand Up @@ -303,6 +313,8 @@ namespace eosio {
constexpr auto def_sync_fetch_span = 100;

constexpr auto message_header_size = 4;
constexpr uint32_t signed_block_which = 7; // see protocol net_message
constexpr uint32_t packed_transaction_which = 8; // see protocol net_message

/**
* For a while, network version was a 16 bit value equal to the second set of 16 bits
Expand Down Expand Up @@ -596,17 +608,6 @@ namespace eosio {
bool to_sync_queue = false);
void do_queue_write(int priority);

/** \brief Process the next message from the pending message buffer
*
* Process the next message from the pending_message_buffer.
* message_length is the already determined length of the data
* part of the message and impl in the net plugin implementation
* that will handle the message.
* Returns true is successful. Returns false if an error was
* encountered unpacking or processing the message.
*/
bool process_next_message(net_plugin_impl& impl, uint32_t message_length);

bool add_peer_block(const peer_block_state& pbs);
bool peer_has_block(const block_id_type& blkid);

Expand Down Expand Up @@ -1068,11 +1069,12 @@ namespace eosio {
close_after_send = m.get<go_away_message>().reason;
}

uint32_t payload_size = fc::raw::pack_size( m );
const uint32_t payload_size = fc::raw::pack_size( m );

char* header = reinterpret_cast<char*>(&payload_size);
size_t header_size = sizeof(payload_size);
size_t buffer_size = header_size + payload_size;
const char* const header = reinterpret_cast<const char* const>(&payload_size); // avoid variable size encoding of uint32_t
constexpr size_t header_size = sizeof(payload_size);
static_assert( header_size == message_header_size, "invalid message_header_size" );
const size_t buffer_size = header_size + payload_size;

auto send_buffer = std::make_shared<vector<char>>(buffer_size);
fc::datastream<char*> ds( send_buffer->data(), buffer_size);
Expand All @@ -1082,26 +1084,38 @@ namespace eosio {
enqueue_buffer( send_buffer, trigger_send, priority::low, close_after_send );
}

static std::shared_ptr<std::vector<char>> create_send_buffer( const signed_block_ptr& sb ) {
// this implementation is to avoid copy of signed_block to net_message
int which = 7; // matches which of net_message for signed_block

uint32_t which_size = fc::raw::pack_size( unsigned_int( which ));
uint32_t payload_size = which_size + fc::raw::pack_size( *sb );
template< typename T>
static std::shared_ptr<std::vector<char>> create_send_buffer( uint32_t which, const T& v ) {
// match net_message static_variant pack
const uint32_t which_size = fc::raw::pack_size( unsigned_int( which ) );
const uint32_t payload_size = which_size + fc::raw::pack_size( v );

char* header = reinterpret_cast<char*>(&payload_size);
size_t header_size = sizeof(payload_size);
size_t buffer_size = header_size + payload_size;
const char* const header = reinterpret_cast<const char* const>(&payload_size); // avoid variable size encoding of uint32_t
constexpr size_t header_size = sizeof( payload_size );
static_assert( header_size == message_header_size, "invalid message_header_size" );
const size_t buffer_size = header_size + payload_size;

auto send_buffer = std::make_shared<vector<char>>(buffer_size);
fc::datastream<char*> ds( send_buffer->data(), buffer_size);
auto send_buffer = std::make_shared<vector<char>>( buffer_size );
fc::datastream<char*> ds( send_buffer->data(), buffer_size );
ds.write( header, header_size );
fc::raw::pack( ds, unsigned_int( which ));
fc::raw::pack( ds, *sb );
fc::raw::pack( ds, unsigned_int( which ) );
fc::raw::pack( ds, v );

return send_buffer;
}

static std::shared_ptr<std::vector<char>> create_send_buffer( const signed_block_ptr& sb ) {
// this implementation is to avoid copy of signed_block to net_message
// matches which of net_message for signed_block
return create_send_buffer( signed_block_which, *sb );
}

static std::shared_ptr<std::vector<char>> create_send_buffer( const packed_transaction& trx ) {
// this implementation is to avoid copy of packed_transaction to net_message
// matches which of net_message for packed_transaction
return create_send_buffer( packed_transaction_which, trx );
}

void connection::enqueue_block( const signed_block_ptr& sb, bool trigger_send, bool to_sync_queue) {
enqueue_buffer( create_send_buffer( sb ), trigger_send, priority::low, no_reason, to_sync_queue);
}
Expand Down Expand Up @@ -1205,27 +1219,6 @@ namespace eosio {
sync_wait();
}

bool connection::process_next_message(net_plugin_impl& impl, uint32_t message_length) {
try {
auto ds = pending_message_buffer.create_datastream();
net_message msg;
fc::raw::unpack(ds, msg);
msg_handler m(impl, shared_from_this() );
if( msg.contains<signed_block>() ) {
m( std::move( msg.get<signed_block>() ) );
} else if( msg.contains<packed_transaction>() ) {
m( std::move( msg.get<packed_transaction>() ) );
} else {
msg.visit( m );
}
} catch( const fc::exception& e ) {
edump((e.to_detail_string() ));
impl.close( shared_from_this() );
return false;
}
return true;
}

bool connection::add_peer_block(const peer_block_state& entry) {
auto bptr = blk_state.get<by_id>().find(entry.id);
bool added = (bptr == blk_state.end());
Expand Down Expand Up @@ -1678,21 +1671,7 @@ namespace eosio {
time_point_sec trx_expiration = ptrx->packed_trx->expiration();
const packed_transaction& trx = *ptrx->packed_trx;

// this implementation is to avoid copy of packed_transaction to net_message
int which = 8; // matches which of net_message for packed_transaction

uint32_t which_size = fc::raw::pack_size( unsigned_int( which ));
uint32_t payload_size = which_size + fc::raw::pack_size( trx );

char* header = reinterpret_cast<char*>(&payload_size);
size_t header_size = sizeof(payload_size);
size_t buffer_size = header_size + payload_size;

auto buff = std::make_shared<vector<char>>(buffer_size);
fc::datastream<char*> ds( buff->data(), buffer_size);
ds.write( header, header_size );
fc::raw::pack( ds, unsigned_int( which ));
fc::raw::pack( ds, trx );
auto buff = create_send_buffer( trx );

node_transaction_state nts = {id, trx_expiration, 0, buff};
my_impl->local_txns.insert(std::move(nts));
Expand Down Expand Up @@ -2092,7 +2071,7 @@ namespace eosio {

if (bytes_in_buffer >= total_message_bytes) {
conn->pending_message_buffer.advance_read_ptr(message_header_size);
if (!conn->process_next_message(*this, message_length)) {
if (!process_next_message(conn, message_length)) {
return;
}
} else {
Expand Down Expand Up @@ -2142,6 +2121,45 @@ namespace eosio {
}
}

bool net_plugin_impl::process_next_message(const connection_ptr& conn, uint32_t message_length) {
try {
// if next message is a block we already have, exit early
auto peek_ds = conn->pending_message_buffer.create_peek_datastream();
unsigned_int which{};
fc::raw::unpack( peek_ds, which );
if( which == signed_block_which ) {
block_header bh;
fc::raw::unpack( peek_ds, bh );

controller& cc = chain_plug->chain();
block_id_type blk_id = bh.id();
uint32_t blk_num = bh.block_num();
if( cc.fetch_block_by_id( blk_id ) ) {
sync_master->recv_block( conn, blk_id, blk_num );
conn->pending_message_buffer.advance_read_ptr( message_length );
return true;
}
}

auto ds = conn->pending_message_buffer.create_datastream();
net_message msg;
fc::raw::unpack( ds, msg );
msg_handler m( *this, conn );
if( msg.contains<signed_block>() ) {
m( std::move( msg.get<signed_block>() ) );
} else if( msg.contains<packed_transaction>() ) {
m( std::move( msg.get<packed_transaction>() ) );
} else {
msg.visit( m );
}
} catch( const fc::exception& e ) {
edump( (e.to_detail_string()) );
close( conn );
return false;
}
return true;
}

size_t net_plugin_impl::count_open_sockets() const
{
size_t count = 0;
Expand Down
39 changes: 39 additions & 0 deletions unittests/message_buffer_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,45 @@ BOOST_AUTO_TEST_CASE(message_buffer_read_peek_bounds_multi) {
BOOST_CHECK_THROW(mbuff.read(&throw_away_buffer, 1), fc::out_of_range_exception);
}

BOOST_AUTO_TEST_CASE(message_buffer_datastream) {
using my_message_buffer_t = fc::message_buffer<1024>;
my_message_buffer_t mbuff;

char buf[1024];
fc::datastream<char*> ds( buf, 1024 );

int v = 13;
fc::raw::pack( ds, v );
v = 42;
fc::raw::pack( ds, 42 );
fc::raw::pack( ds, std::string( "hello" ) );

memcpy(mbuff.write_ptr(), buf, 1024);
mbuff.advance_write_ptr(1024);

for( int i = 0; i < 3; ++i ) {
auto ds2 = mbuff.create_peek_datastream();
fc::raw::unpack( ds2, v );
BOOST_CHECK_EQUAL( 13, v );
fc::raw::unpack( ds2, v );
BOOST_CHECK_EQUAL( 42, v );
std::string s;
fc::raw::unpack( ds2, s );
BOOST_CHECK_EQUAL( s, std::string( "hello" ) );
}

{
auto ds2 = mbuff.create_datastream();
fc::raw::unpack( ds2, v );
BOOST_CHECK_EQUAL( 13, v );
fc::raw::unpack( ds2, v );
BOOST_CHECK_EQUAL( 42, v );
std::string s;
fc::raw::unpack( ds2, s );
BOOST_CHECK_EQUAL( s, std::string( "hello" ) );
}
}

BOOST_AUTO_TEST_SUITE_END()

} // namespace eosio

0 comments on commit 7fc6b87

Please sign in to comment.