Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

bnet threading issues #5417

Merged
merged 4 commits into from
Aug 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion libraries/chain/include/eosio/chain/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ namespace eosio { namespace chain {

time_point_sec expiration()const;
transaction_id_type id()const;
bytes get_raw_transaction()const;
transaction_id_type get_uncached_id()const; // thread safe
bytes get_raw_transaction()const; // thread safe
vector<bytes> get_context_free_data()const;
transaction get_transaction()const;
signed_transaction get_signed_transaction()const;
Expand Down
6 changes: 6 additions & 0 deletions libraries/chain/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,12 @@ transaction_id_type packed_transaction::id()const
return get_transaction().id();
}

transaction_id_type packed_transaction::get_uncached_id()const
{
const auto raw = get_raw_transaction();
return fc::raw::unpack<transaction>( raw ).id();
}

void packed_transaction::local_unpack()const
{
if (!unpacked_trx) {
Expand Down
37 changes: 34 additions & 3 deletions plugins/bnet_plugin/bnet_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,10 +530,15 @@ namespace eosio {
_block_status.insert( block_status(id, false, false) );
}
}
}

void on_accepted_block( const block_state_ptr& s ) {
verify_strand_in_this_thread(_strand, __func__, __LINE__);
//idump((_block_status.size())(_transaction_status.size()));
//ilog( "accepted block ${n}", ("n",s->block_num) );

const auto& id = s->id;

_local_head_block_id = id;
_local_head_block_num = block_header::num_from_id(id);

Expand All @@ -549,7 +554,9 @@ namespace eosio {
*/
for( const auto& receipt : s->block->transactions ) {
if( receipt.trx.which() == 1 ) {
const auto tid = receipt.trx.get<packed_transaction>().id();
const auto& pt = receipt.trx.get<packed_transaction>();
// get id via get_uncached_id() as packed_transaction.id() mutates internal transaction state
const auto& tid = pt.get_uncached_id();
auto itr = _transaction_status.find( tid );
if( itr != _transaction_status.end() )
_transaction_status.erase(itr);
Expand Down Expand Up @@ -1006,7 +1013,9 @@ namespace eosio {
void mark_block_transactions_known_by_peer( const signed_block_ptr& b ) {
for( const auto& receipt : b->transactions ) {
if( receipt.trx.which() == 1 ) {
auto id = receipt.trx.get<packed_transaction>().id();
const auto& pt = receipt.trx.get<packed_transaction>();
// get id via get_uncached_id() as packed_transaction.id() mutates internal transaction state
const auto& id = pt.get_uncached_id();
mark_transaction_known_by_peer(id);
}
}
Expand Down Expand Up @@ -1041,10 +1050,12 @@ namespace eosio {
if( app().get_plugin<chain_plugin>().chain().get_read_mode() == chain::db_read_mode::READ_ONLY )
return;

auto id = p->id();
// ilog( "recv trx ${n}", ("n", id) );
if( p->expiration() < fc::time_point::now() ) return;

// get id via get_uncached_id() as packed_transaction.id() mutates internal transaction state
const auto& id = p->get_uncached_id();

if( mark_transaction_known_by_peer( id ) )
return;

Expand Down Expand Up @@ -1204,6 +1215,18 @@ namespace eosio {
for_each_session( [s]( auto ses ){ ses->on_new_lib( s ); } );
}

/**
* Notify all active connections of the new accepted block so
* they can relay it. This method also pre-packages the block
* as a packed bnet_message so the connections can simply relay
* it on.
*/
void on_accepted_block( block_state_ptr s ) {
_ioc->post( [s,this] { /// post this to the thread pool because packing can be intensive
for_each_session( [s]( auto ses ){ ses->on_accepted_block( s ); } );
});
}

void on_accepted_block_header( block_state_ptr s ) {
_ioc->post( [s,this] { /// post this to the thread pool because packing can be intensive
for_each_session( [s]( auto ses ){ ses->on_accepted_block_header( s ); } );
Expand Down Expand Up @@ -1349,6 +1372,9 @@ namespace eosio {

wlog( "bnet startup " );

auto& chain = app().get_plugin<chain_plugin>().chain();
FC_ASSERT ( chain.get_read_mode() != chain::db_read_mode::IRREVERSIBLE, "bnet is not compatible with \"irreversible\" read_mode");

my->_on_appled_trx_handle = app().get_channel<channels::accepted_transaction>()
.subscribe( [this]( transaction_metadata_ptr t ){
my->on_accepted_transaction(t);
Expand All @@ -1359,6 +1385,11 @@ namespace eosio {
my->on_irreversible_block(s);
});

my->_on_accepted_block_handle = app().get_channel<channels::accepted_block>()
.subscribe( [this]( block_state_ptr s ){
my->on_accepted_block(s);
});

my->_on_accepted_block_header_handle = app().get_channel<channels::accepted_block_header>()
.subscribe( [this]( block_state_ptr s ){
my->on_accepted_block_header(s);
Expand Down