Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Added a new application method for sync from blockvault #9749

Merged
merged 8 commits into from
Dec 5, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ namespace eosio { namespace chain { namespace plugin_interface {
namespace methods {
// synchronously push a block/trx to a single provider
using block_sync = method_decl<chain_plugin_interface, bool(const signed_block_ptr&, const std::optional<block_id_type>&), first_provider_policy>;
using blockvault_sync = method_decl<chain_plugin_interface, bool(const signed_block_ptr&), first_provider_policy>;
using transaction_async = method_decl<chain_plugin_interface, void(const packed_transaction_ptr&, bool, next_function<transaction_trace_ptr>), first_provider_policy>;
}
}
Expand Down
2 changes: 2 additions & 0 deletions plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class chain_plugin_impl {
,applied_transaction_channel(app().get_channel<channels::applied_transaction>())
,incoming_block_channel(app().get_channel<incoming::channels::block>())
,incoming_block_sync_method(app().get_method<incoming::methods::block_sync>())
,incoming_blockvault_sync_method(app().get_method<incoming::methods::blockvault_sync>())
,incoming_transaction_async_method(app().get_method<incoming::methods::transaction_async>())
{}

Expand Down Expand Up @@ -208,6 +209,7 @@ class chain_plugin_impl {

// retained references to methods for easy calling
incoming::methods::block_sync::method_type& incoming_block_sync_method;
incoming::methods::blockvault_sync::method_type& incoming_blockvault_sync_method;
incoming::methods::transaction_async::method_type& incoming_transaction_async_method;

// method provider handles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,15 @@ struct blockvault_sync_strategy : public sync_callback {
ilog("Received no data from blockvault.");
run_startup();
}

ilog("Sync from blockvault completed. ${snap}. ${blks} blocks received. ${ulnk} blocks unlinkable",
("snap", _received_snapshot ? "Got snapshot" : "No snapshot")
("blks", _num_blocks_received)("ulnk", _num_unlinkable_blocks));
}

void on_snapshot(const char* snapshot_filename) override final {
ilog("Received snapshot from blockvault ${fn}", ("fn", snapshot_filename));
EOS_ASSERT(!_received_snapshot, plugin_exception, "Received multiple snapshots from blockvault.", );
EOS_ASSERT(!_received_snapshot, plugin_exception, "Received multiple snapshots from blockvault." );
_received_snapshot = true;

if (_check_shutdown()) {
Expand All @@ -66,6 +70,10 @@ struct blockvault_sync_strategy : public sync_callback {
}

void on_block(eosio::chain::signed_block_ptr block) override final {
if (0 == (_num_blocks_received % 100)) {
ilog("Received block number ${bn}", ("bn", block->block_num()));
ndcgundlach marked this conversation as resolved.
Show resolved Hide resolved
}

if (_check_shutdown()) {
_shutdown();
}
Expand All @@ -76,7 +84,9 @@ struct blockvault_sync_strategy : public sync_callback {

try {
++_num_blocks_received;
_blockchain_provider.incoming_block_sync_method(block, block->calculate_id());
EOS_ASSERT(_blockchain_provider.incoming_blockvault_sync_method(block), plugin_exception,
"Unable to sync block from blockvault, block num=${bnum}, block id=${bid}",
("bnum", block->block_num())("bid", block->calculate_id()));
} catch (unlinkable_block_exception& e) {
if (block->block_num() == 2) {
elog("Received unlinkable block 2. Please double check if --genesis-json and --genesis-timestamp are "
Expand Down
4 changes: 1 addition & 3 deletions plugins/chain_plugin/test/test_blockvault_sync_strategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ struct mock_chain_plugin_t {
chain = std::make_unique<mock_chain_t>();
}

bool incoming_block_sync_method(const chain::signed_block_ptr& block, const chain::block_id_type& id) {
bool incoming_blockvault_sync_method(const chain::signed_block_ptr& block) {
_block = block;
_id = id;
return _accept_block_rc;
}

Expand Down Expand Up @@ -147,7 +146,6 @@ BOOST_FIXTURE_TEST_CASE(on_block_no_snapshot, TESTER) {
BOOST_TEST(plugin._startup_non_snapshot_called);
BOOST_TEST(!plugin.chain->_startup_reader_called);
BOOST_TEST(plugin._block->calculate_id() == b->calculate_id());
BOOST_TEST(plugin._block->calculate_id() == plugin._id);
}
FC_LOG_AND_RETHROW()
}
Expand Down
51 changes: 50 additions & 1 deletion plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
compat::channels::transaction_ack::channel_type& _transaction_ack_channel;

incoming::methods::block_sync::method_type::handle _incoming_block_sync_provider;
incoming::methods::blockvault_sync::method_type::handle _incoming_blockvault_sync_provider;
incoming::methods::transaction_async::method_type::handle _incoming_transaction_async_provider;

transaction_id_with_expiry_index _blacklisted_transactions;
Expand Down Expand Up @@ -292,9 +293,52 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
}
};

bool on_sync_block(const signed_block_ptr& block) {
auto& chain = chain_plug->chain();

const auto& id = block->calculate_id();
auto blk_num = block->block_num();

fc_dlog(_log, "syncing blockvault block ${n} ${id}", ("n", blk_num)("id", id));

// start processing of block
auto bsf = chain.create_block_state_future( id, block );

// abort the pending block
_unapplied_transactions.add_aborted( chain.abort_block() );

// push the new block
auto handle_error = [&](const auto& e)
{
elog((e.to_detail_string()));
throw;
};

try {
block_state_ptr blk_state = chain.push_block( bsf, [this]( const branch_type& forked_branch ) {
_unapplied_transactions.add_forked( forked_branch );
}, [this]( const transaction_id_type& id ) {
return _unapplied_transactions.get_trx( id );
} );
} catch ( const guard_exception& e ) {
chain_plugin::handle_guard_exception(e);
return false;
} catch ( const std::bad_alloc& ) {
chain_plugin::handle_bad_alloc();
} catch ( boost::interprocess::bad_alloc& ) {
chain_plugin::handle_db_exhaustion();
} catch( const fc::exception& e ) {
handle_error(e);
ndcgundlach marked this conversation as resolved.
Show resolved Hide resolved
} catch (const std::exception& e) {
handle_error(fc::std_exception_wrapper::from_current_exception(e));
}

return true;
}

bool on_incoming_block(const signed_block_ptr& block, const std::optional<block_id_type>& block_id) {
auto& chain = chain_plug->chain();
if ( _pending_block_mode == pending_block_mode::producing ) {
if ( _pending_block_mode == pending_block_mode::producing) {
fc_wlog( _log, "dropped incoming block #${num} id: ${id}",
("num", block->block_num())("id", block_id ? (*block_id).str() : "UNKNOWN") );
return false;
Expand Down Expand Up @@ -813,6 +857,11 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_
return my->on_incoming_block(block, block_id);
});

my->_incoming_blockvault_sync_provider = app().get_method<incoming::methods::blockvault_sync>().register_provider(
[this](const signed_block_ptr& block) {
return my->on_sync_block(block);
});

my->_incoming_transaction_async_provider = app().get_method<incoming::methods::transaction_async>().register_provider(
[this](const packed_transaction_ptr& trx, bool persist_until_expired, next_function<transaction_trace_ptr> next) -> void {
return my->on_incoming_transaction_async(trx, persist_until_expired, next );
Expand Down