Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

net_plugin more quickly determine known block #6735

Merged
merged 5 commits into from
Feb 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libraries/fc
148 changes: 83 additions & 65 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,16 @@ namespace eosio {
void start_listen_loop();
void start_read_message(const connection_ptr& c);

/** \brief Process the next message from the pending message buffer
*
* Process the next message from the pending_message_buffer.
* message_length is the already determined length of the data
* part of the message that will handle the message.
* Returns true is successful. Returns false if an error was
* encountered unpacking or processing the message.
*/
bool process_next_message(const connection_ptr& conn, uint32_t message_length);

void close(const connection_ptr& c);
size_t count_open_sockets() const;

Expand Down Expand Up @@ -303,6 +313,8 @@ namespace eosio {
constexpr auto def_sync_fetch_span = 100;

constexpr auto message_header_size = 4;
constexpr uint32_t signed_block_which = 7; // see protocol net_message
constexpr uint32_t packed_transaction_which = 8; // see protocol net_message

/**
* For a while, network version was a 16 bit value equal to the second set of 16 bits
Expand Down Expand Up @@ -595,17 +607,6 @@ namespace eosio {
bool to_sync_queue = false);
void do_queue_write(int priority);

/** \brief Process the next message from the pending message buffer
*
* Process the next message from the pending_message_buffer.
* message_length is the already determined length of the data
* part of the message and impl in the net plugin implementation
* that will handle the message.
* Returns true is successful. Returns false if an error was
* encountered unpacking or processing the message.
*/
bool process_next_message(net_plugin_impl& impl, uint32_t message_length);

bool add_peer_block(const peer_block_state& pbs);
bool peer_has_block(const block_id_type& blkid);

Expand Down Expand Up @@ -1065,11 +1066,12 @@ namespace eosio {
close_after_send = m.get<go_away_message>().reason;
}

uint32_t payload_size = fc::raw::pack_size( m );
const uint32_t payload_size = fc::raw::pack_size( m );

char* header = reinterpret_cast<char*>(&payload_size);
size_t header_size = sizeof(payload_size);
size_t buffer_size = header_size + payload_size;
const char* const header = reinterpret_cast<const char* const>(&payload_size); // avoid variable size encoding of uint32_t
constexpr size_t header_size = sizeof(payload_size);
static_assert( header_size == message_header_size, "invalid message_header_size" );
const size_t buffer_size = header_size + payload_size;

auto send_buffer = std::make_shared<vector<char>>(buffer_size);
fc::datastream<char*> ds( send_buffer->data(), buffer_size);
Expand All @@ -1079,26 +1081,38 @@ namespace eosio {
enqueue_buffer( send_buffer, trigger_send, priority::low, close_after_send );
}

static std::shared_ptr<std::vector<char>> create_send_buffer( const signed_block_ptr& sb ) {
// this implementation is to avoid copy of signed_block to net_message
int which = 7; // matches which of net_message for signed_block

uint32_t which_size = fc::raw::pack_size( unsigned_int( which ));
uint32_t payload_size = which_size + fc::raw::pack_size( *sb );
template< typename T>
static std::shared_ptr<std::vector<char>> create_send_buffer( uint32_t which, const T& v ) {
// match net_message static_variant pack
const uint32_t which_size = fc::raw::pack_size( unsigned_int( which ) );
const uint32_t payload_size = which_size + fc::raw::pack_size( v );

char* header = reinterpret_cast<char*>(&payload_size);
size_t header_size = sizeof(payload_size);
size_t buffer_size = header_size + payload_size;
const char* const header = reinterpret_cast<const char* const>(&payload_size); // avoid variable size encoding of uint32_t
constexpr size_t header_size = sizeof( payload_size );
static_assert( header_size == message_header_size, "invalid message_header_size" );
const size_t buffer_size = header_size + payload_size;

auto send_buffer = std::make_shared<vector<char>>(buffer_size);
fc::datastream<char*> ds( send_buffer->data(), buffer_size);
auto send_buffer = std::make_shared<vector<char>>( buffer_size );
fc::datastream<char*> ds( send_buffer->data(), buffer_size );
ds.write( header, header_size );
fc::raw::pack( ds, unsigned_int( which ));
fc::raw::pack( ds, *sb );
fc::raw::pack( ds, unsigned_int( which ) );
fc::raw::pack( ds, v );

return send_buffer;
}

static std::shared_ptr<std::vector<char>> create_send_buffer( const signed_block_ptr& sb ) {
// this implementation is to avoid copy of signed_block to net_message
// matches which of net_message for signed_block
return create_send_buffer( signed_block_which, *sb );
}

static std::shared_ptr<std::vector<char>> create_send_buffer( const packed_transaction& trx ) {
// this implementation is to avoid copy of packed_transaction to net_message
// matches which of net_message for packed_transaction
return create_send_buffer( packed_transaction_which, trx );
}

void connection::enqueue_block( const signed_block_ptr& sb, bool trigger_send, bool to_sync_queue) {
enqueue_buffer( create_send_buffer( sb ), trigger_send, priority::low, no_reason, to_sync_queue);
}
Expand Down Expand Up @@ -1202,27 +1216,6 @@ namespace eosio {
sync_wait();
}

bool connection::process_next_message(net_plugin_impl& impl, uint32_t message_length) {
try {
auto ds = pending_message_buffer.create_datastream();
net_message msg;
fc::raw::unpack(ds, msg);
msg_handler m(impl, shared_from_this() );
if( msg.contains<signed_block>() ) {
m( std::move( msg.get<signed_block>() ) );
} else if( msg.contains<packed_transaction>() ) {
m( std::move( msg.get<packed_transaction>() ) );
} else {
msg.visit( m );
}
} catch( const fc::exception& e ) {
edump((e.to_detail_string() ));
impl.close( shared_from_this() );
return false;
}
return true;
}

bool connection::add_peer_block(const peer_block_state& entry) {
auto bptr = blk_state.get<by_id>().find(entry.id);
bool added = (bptr == blk_state.end());
Expand Down Expand Up @@ -1675,21 +1668,7 @@ namespace eosio {
time_point_sec trx_expiration = ptrx->packed_trx->expiration();
const packed_transaction& trx = *ptrx->packed_trx;

// this implementation is to avoid copy of packed_transaction to net_message
int which = 8; // matches which of net_message for packed_transaction

uint32_t which_size = fc::raw::pack_size( unsigned_int( which ));
uint32_t payload_size = which_size + fc::raw::pack_size( trx );

char* header = reinterpret_cast<char*>(&payload_size);
size_t header_size = sizeof(payload_size);
size_t buffer_size = header_size + payload_size;

auto buff = std::make_shared<vector<char>>(buffer_size);
fc::datastream<char*> ds( buff->data(), buffer_size);
ds.write( header, header_size );
fc::raw::pack( ds, unsigned_int( which ));
fc::raw::pack( ds, trx );
auto buff = create_send_buffer( trx );

node_transaction_state nts = {id, trx_expiration, 0, buff};
my_impl->local_txns.insert(std::move(nts));
Expand Down Expand Up @@ -2089,7 +2068,7 @@ namespace eosio {

if (bytes_in_buffer >= total_message_bytes) {
conn->pending_message_buffer.advance_read_ptr(message_header_size);
if (!conn->process_next_message(*this, message_length)) {
if (!process_next_message(conn, message_length)) {
return;
}
} else {
Expand Down Expand Up @@ -2139,6 +2118,45 @@ namespace eosio {
}
}

bool net_plugin_impl::process_next_message(const connection_ptr& conn, uint32_t message_length) {
try {
// if next message is a block we already have, exit early
auto peek_ds = conn->pending_message_buffer.create_peek_datastream();
unsigned_int which{};
fc::raw::unpack( peek_ds, which );
brianjohnson5972 marked this conversation as resolved.
Show resolved Hide resolved
if( which == signed_block_which ) {
block_header bh;
fc::raw::unpack( peek_ds, bh );

controller& cc = chain_plug->chain();
block_id_type blk_id = bh.id();
uint32_t blk_num = bh.block_num();
if( cc.fetch_block_by_id( blk_id ) ) {
sync_master->recv_block( conn, blk_id, blk_num );
conn->pending_message_buffer.advance_read_ptr( message_length );
return true;
}
}

auto ds = conn->pending_message_buffer.create_datastream();
net_message msg;
fc::raw::unpack( ds, msg );
msg_handler m( *this, conn );
if( msg.contains<signed_block>() ) {
m( std::move( msg.get<signed_block>() ) );
} else if( msg.contains<packed_transaction>() ) {
m( std::move( msg.get<packed_transaction>() ) );
} else {
msg.visit( m );
}
} catch( const fc::exception& e ) {
edump( (e.to_detail_string()) );
close( conn );
return false;
}
return true;
}

size_t net_plugin_impl::count_open_sockets() const
{
size_t count = 0;
Expand Down
39 changes: 39 additions & 0 deletions unittests/message_buffer_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,45 @@ BOOST_AUTO_TEST_CASE(message_buffer_read_peek_bounds_multi) {
BOOST_CHECK_THROW(mbuff.read(&throw_away_buffer, 1), fc::out_of_range_exception);
}

BOOST_AUTO_TEST_CASE(message_buffer_datastream) {
using my_message_buffer_t = fc::message_buffer<1024>;
my_message_buffer_t mbuff;

char buf[1024];
fc::datastream<char*> ds( buf, 1024 );

int v = 13;
fc::raw::pack( ds, v );
v = 42;
fc::raw::pack( ds, 42 );
fc::raw::pack( ds, std::string( "hello" ) );

memcpy(mbuff.write_ptr(), buf, 1024);
mbuff.advance_write_ptr(1024);

for( int i = 0; i < 3; ++i ) {
auto ds2 = mbuff.create_peek_datastream();
fc::raw::unpack( ds2, v );
BOOST_CHECK_EQUAL( 13, v );
fc::raw::unpack( ds2, v );
BOOST_CHECK_EQUAL( 42, v );
std::string s;
fc::raw::unpack( ds2, s );
BOOST_CHECK_EQUAL( s, std::string( "hello" ) );
}

{
auto ds2 = mbuff.create_datastream();
fc::raw::unpack( ds2, v );
BOOST_CHECK_EQUAL( 13, v );
fc::raw::unpack( ds2, v );
BOOST_CHECK_EQUAL( 42, v );
std::string s;
fc::raw::unpack( ds2, s );
BOOST_CHECK_EQUAL( s, std::string( "hello" ) );
}
}

BOOST_AUTO_TEST_SUITE_END()

} // namespace eosio