Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge pull request #9749 from EOSIO/bv_add_new_sync
Browse files Browse the repository at this point in the history
Added a new application method for sync from blockvault
  • Loading branch information
ndcgundlach authored Dec 5, 2020
2 parents d5a2642 + 00af187 commit f5c9db3
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ namespace eosio { namespace chain { namespace plugin_interface {
namespace methods {
// synchronously push a block/trx to a single provider
using block_sync = method_decl<chain_plugin_interface, bool(const signed_block_ptr&, const std::optional<block_id_type>&), first_provider_policy>;
using blockvault_sync = method_decl<chain_plugin_interface, bool(const signed_block_ptr&, bool), first_provider_policy>;
using transaction_async = method_decl<chain_plugin_interface, void(const packed_transaction_ptr&, bool, next_function<transaction_trace_ptr>), first_provider_policy>;
}
}
Expand Down
2 changes: 2 additions & 0 deletions plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class chain_plugin_impl {
,applied_transaction_channel(app().get_channel<channels::applied_transaction>())
,incoming_block_channel(app().get_channel<incoming::channels::block>())
,incoming_block_sync_method(app().get_method<incoming::methods::block_sync>())
,incoming_blockvault_sync_method(app().get_method<incoming::methods::blockvault_sync>())
,incoming_transaction_async_method(app().get_method<incoming::methods::transaction_async>())
{}

Expand Down Expand Up @@ -208,6 +209,7 @@ class chain_plugin_impl {

// retained references to methods for easy calling
incoming::methods::block_sync::method_type& incoming_block_sync_method;
incoming::methods::blockvault_sync::method_type& incoming_blockvault_sync_method;
incoming::methods::transaction_async::method_type& incoming_transaction_async_method;

// method provider handles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,15 @@ struct blockvault_sync_strategy : public sync_callback {
ilog("Received no data from blockvault.");
run_startup();
}

ilog("Sync from blockvault completed. ${snap}. ${blks} blocks received. ${ulnk} blocks unlinkable",
("snap", _received_snapshot ? "Got snapshot" : "No snapshot")
("blks", _num_blocks_received)("ulnk", _num_unlinkable_blocks));
}

void on_snapshot(const char* snapshot_filename) override final {
ilog("Received snapshot from blockvault ${fn}", ("fn", snapshot_filename));
EOS_ASSERT(!_received_snapshot, plugin_exception, "Received multiple snapshots from blockvault.", );
EOS_ASSERT(!_received_snapshot, plugin_exception, "Received multiple snapshots from blockvault." );
_received_snapshot = true;

if (_check_shutdown()) {
Expand All @@ -63,9 +67,15 @@ struct blockvault_sync_strategy : public sync_callback {
_startup_run = true;

infile.close();

_snapshot_height = _blockchain_provider.chain->head_block_num();
}

void on_block(eosio::chain::signed_block_ptr block) override final {
if (0 == (_num_blocks_received % 100)) {
dlog("Received block number ${bn}", ("bn", block->block_num()));
}

if (_check_shutdown()) {
_shutdown();
}
Expand All @@ -75,8 +85,14 @@ struct blockvault_sync_strategy : public sync_callback {
}

try {

++_num_blocks_received;
_blockchain_provider.incoming_block_sync_method(block, block->calculate_id());
auto rc = _blockchain_provider.incoming_blockvault_sync_method(block,
!(_received_snapshot && block->block_num() == _snapshot_height +1));

EOS_ASSERT(rc, plugin_exception,
"Unable to sync block from blockvault, block num=${bnum}, block id=${bid}",
("bnum", block->block_num())("bid", block->calculate_id()));
} catch (unlinkable_block_exception& e) {
if (block->block_num() == 2) {
elog("Received unlinkable block 2. Please double check if --genesis-json and --genesis-timestamp are "
Expand All @@ -96,6 +112,7 @@ struct blockvault_sync_strategy : public sync_callback {
bool _received_snapshot;
uint32_t _num_unlinkable_blocks = 0;
uint32_t _num_blocks_received = 0;
uint32_t _snapshot_height = 0;
};

} // namespace blockvault
Expand Down
7 changes: 4 additions & 3 deletions plugins/chain_plugin/test/test_blockvault_sync_strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ struct mock_chain_t {
std::shared_ptr<istream_snapshot_reader> _reader;

bool _startup_reader_called;

uint32_t _head_block_num = 0;
uint32_t head_block_num() {return _head_block_num;}
};

struct mock_blockvault_t : public block_vault_interface {
Expand Down Expand Up @@ -70,9 +73,8 @@ struct mock_chain_plugin_t {
chain = std::make_unique<mock_chain_t>();
}

bool incoming_block_sync_method(const chain::signed_block_ptr& block, const chain::block_id_type& id) {
bool incoming_blockvault_sync_method(const chain::signed_block_ptr& block, bool check_connectivity) {
_block = block;
_id = id;
return _accept_block_rc;
}

Expand Down Expand Up @@ -147,7 +149,6 @@ BOOST_FIXTURE_TEST_CASE(on_block_no_snapshot, TESTER) {
BOOST_TEST(plugin._startup_non_snapshot_called);
BOOST_TEST(!plugin.chain->_startup_reader_called);
BOOST_TEST(plugin._block->calculate_id() == b->calculate_id());
BOOST_TEST(plugin._block->calculate_id() == plugin._id);
}
FC_LOG_AND_RETHROW()
}
Expand Down
60 changes: 59 additions & 1 deletion plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
compat::channels::transaction_ack::channel_type& _transaction_ack_channel;

incoming::methods::block_sync::method_type::handle _incoming_block_sync_provider;
incoming::methods::blockvault_sync::method_type::handle _incoming_blockvault_sync_provider;
incoming::methods::transaction_async::method_type::handle _incoming_transaction_async_provider;

transaction_id_with_expiry_index _blacklisted_transactions;
Expand Down Expand Up @@ -292,9 +293,61 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
}
};

bool on_sync_block(const signed_block_ptr& block, bool check_connectivity) {
auto& chain = chain_plug->chain();

const auto& id = block->calculate_id();
auto blk_num = block->block_num();

fc_dlog(_log, "syncing blockvault block ${n} ${id}", ("n", blk_num)("id", id));

if (check_connectivity) {
auto previous = chain.fetch_block_by_id(block->previous);
if (!previous) {
dlog("Don't have previous block for block number ${bn}, looking for block id ${pbi}",
("bn", block->block_num())("pbi", block->previous));
return true;
}
}

// start processing of block
auto bsf = chain.create_block_state_future( id, block );

// abort the pending block
_unapplied_transactions.add_aborted( chain.abort_block() );

// push the new block
auto handle_error = [&](const auto& e)
{
elog((e.to_detail_string()));
throw;
};

try {
block_state_ptr blk_state = chain.push_block( bsf, [this]( const branch_type& forked_branch ) {
_unapplied_transactions.add_forked( forked_branch );
}, [this]( const transaction_id_type& id ) {
return _unapplied_transactions.get_trx( id );
} );
} catch ( const guard_exception& e ) {
chain_plugin::handle_guard_exception(e);
return false;
} catch ( const std::bad_alloc& ) {
chain_plugin::handle_bad_alloc();
} catch ( boost::interprocess::bad_alloc& ) {
chain_plugin::handle_db_exhaustion();
} catch( const fc::exception& e ) {
handle_error(e);
} catch (const std::exception& e) {
handle_error(fc::std_exception_wrapper::from_current_exception(e));
}

return true;
}

bool on_incoming_block(const signed_block_ptr& block, const std::optional<block_id_type>& block_id) {
auto& chain = chain_plug->chain();
if ( _pending_block_mode == pending_block_mode::producing ) {
if ( _pending_block_mode == pending_block_mode::producing) {
fc_wlog( _log, "dropped incoming block #${num} id: ${id}",
("num", block->block_num())("id", block_id ? (*block_id).str() : "UNKNOWN") );
return false;
Expand Down Expand Up @@ -813,6 +866,11 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_
return my->on_incoming_block(block, block_id);
});

my->_incoming_blockvault_sync_provider = app().get_method<incoming::methods::blockvault_sync>().register_provider(
[this](const signed_block_ptr& block, bool check_connectivity) {
return my->on_sync_block(block, check_connectivity);
});

my->_incoming_transaction_async_provider = app().get_method<incoming::methods::transaction_async>().register_provider(
[this](const packed_transaction_ptr& trx, bool persist_until_expired, next_function<transaction_trace_ptr> next) -> void {
return my->on_incoming_transaction_async(trx, persist_until_expired, next );
Expand Down

0 comments on commit f5c9db3

Please sign in to comment.