Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Optimizations #9760

Merged
merged 6 commits into from
Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 75 additions & 14 deletions libraries/chain/block_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
#include <eosio/chain/log_index.hpp>
#include <eosio/chain/log_catalog.hpp>
#include <eosio/chain/block_log_config.hpp>
#include <eosio/chain/thread_utils.hpp>
#include <fc/bitutil.hpp>
#include <fc/io/raw.hpp>
#include <future>
#include <regex>

namespace eosio { namespace chain {
Expand Down Expand Up @@ -521,10 +523,11 @@ namespace eosio { namespace chain {
fc::datastream<fc::cfile> index_file;
bool genesis_written_to_block_log = false;
block_log_preamble preamble;
size_t stride = std::numeric_limits<size_t>::max();
uint32_t future_version;
const size_t stride;
static uint32_t default_version;

block_log_impl(const block_log::config_type& config);
explicit block_log_impl(const block_log::config_type& config);

static void ensure_file_exists(fc::cfile& f) {
if (fc::exists(f.get_file_path()))
Expand All @@ -541,7 +544,13 @@ namespace eosio { namespace chain {

uint64_t append(const signed_block_ptr& b, packed_transaction::cf_compression_type segment_compression);

uint64_t write_log_entry(const signed_block& b, packed_transaction::cf_compression_type segment_compression);
// create futures for append, must call in order of blocks
std::future<std::tuple<signed_block_ptr, std::vector<char>>>
create_append_future(boost::asio::io_context& thread_pool,
const signed_block_ptr& b, packed_transaction::cf_compression_type segment_compression);
uint64_t append(std::future<std::tuple<signed_block_ptr, std::vector<char>>> f);

uint64_t write_log_entry(const std::vector<char>& block_buffer);

void split_log();
bool recover_from_incomplete_block_head(block_log_data& log_data, block_log_index& index);
Expand All @@ -562,15 +571,16 @@ namespace eosio { namespace chain {
void block_log::set_version(uint32_t ver) { detail::block_log_impl::default_version = ver; }
uint32_t block_log::version() const { return my->preamble.version; }

detail::block_log_impl::block_log_impl(const block_log::config_type& config) {
detail::block_log_impl::block_log_impl(const block_log::config_type& config)
: stride( config.stride )
{

if (!fc::is_directory(config.log_dir))
fc::create_directories(config.log_dir);

catalog.open(config.log_dir, config.retained_dir, config.archive_dir, "blocks");

catalog.max_retained_files = config.max_retained_files;
this->stride = config.stride;

block_file.set_file_path(config.log_dir / "blocks.log");
index_file.set_file_path(config.log_dir / "blocks.index");
Expand Down Expand Up @@ -601,6 +611,7 @@ namespace eosio { namespace chain {
ilog("Log is nonempty");
block_log_data log_data(block_file.get_file_path());
preamble = log_data.get_preamble();
future_version = preamble.version;

EOS_ASSERT(catalog.verifier.chain_id.empty() || catalog.verifier.chain_id == preamble.chain_id(), block_log_exception,
"block log file ${path} has a different chain id", ("path", block_file.get_file_path()));
Expand Down Expand Up @@ -649,20 +660,26 @@ namespace eosio { namespace chain {
read_head();
}

uint64_t detail::block_log_impl::write_log_entry(const signed_block& b, packed_transaction::cf_compression_type segment_compression) {
uint64_t pos = block_file.tellp();
std::vector<char> create_block_buffer( const signed_block& b, uint32_t version, packed_transaction::cf_compression_type segment_compression ) {
std::vector<char> buffer;
if (preamble.version >= pruned_transaction_version) {

if (version >= pruned_transaction_version) {
buffer = pack(b, segment_compression);
} else {
auto block_ptr = b.to_signed_block_v0();
EOS_ASSERT(block_ptr, block_log_append_fail, "Unable to convert block to legacy format");
EOS_ASSERT(segment_compression == packed_transaction::cf_compression_type::none, block_log_append_fail,
"the compression must be \"none\" for legacy format");
"the compression must be \"none\" for legacy format");
buffer = fc::raw::pack(*block_ptr);
}
block_file.write(buffer.data(), buffer.size());

return buffer;
}

uint64_t detail::block_log_impl::write_log_entry(const std::vector<char>& block_buffer) {
uint64_t pos = block_file.tellp();

block_file.write(block_buffer.data(), block_buffer.size());
block_file.write((char*)&pos, sizeof(pos));
index_file.write((char*)&pos, sizeof(pos));
flush();
Expand All @@ -673,19 +690,43 @@ namespace eosio { namespace chain {
return my->append(b, segment_compression);
}

uint64_t detail::block_log_impl::append(const signed_block_ptr& b, packed_transaction::cf_compression_type segment_compression) {
uint64_t detail::block_log_impl::append(const signed_block_ptr& b, packed_transaction::cf_compression_type segment_compression) {
try {
EOS_ASSERT( genesis_written_to_block_log, block_log_append_fail, "Cannot append to block log until the genesis is first written" );

block_file.seek_end(0);
index_file.seek_end(0);
EOS_ASSERT(index_file.tellp() == sizeof(uint64_t) * (b->block_num() - preamble.first_block_num),
block_log_append_fail,
"Append to index file occuring at wrong position.",
("position", (uint64_t) index_file.tellp())
("expected", (b->block_num() - preamble.first_block_num) * sizeof(uint64_t)));

std::vector<char> buffer = create_block_buffer( *b, preamble.version, segment_compression );
auto pos = write_log_entry(buffer);
head = b;
if (b->block_num() % stride == 0) {
split_log();
}
return pos;
}
FC_LOG_AND_RETHROW()
}

uint64_t detail::block_log_impl::append(std::future<std::tuple<signed_block_ptr, std::vector<char>>> f) {
try {
EOS_ASSERT( genesis_written_to_block_log, block_log_append_fail, "Cannot append to block log until the genesis is first written" );

block_file.seek_end(0);
index_file.seek_end(0);
auto[b, buffer] = f.get();
EOS_ASSERT(index_file.tellp() == sizeof(uint64_t) * (b->block_num() - preamble.first_block_num),
block_log_append_fail,
"Append to index file occuring at wrong position.",
("position", (uint64_t) index_file.tellp())
("expected", (b->block_num() - preamble.first_block_num) * sizeof(uint64_t)));

auto pos = write_log_entry(*b, segment_compression);
auto pos = write_log_entry(buffer);
head = b;
if (b->block_num() % stride == 0) {
split_log();
Expand All @@ -695,7 +736,26 @@ namespace eosio { namespace chain {
FC_LOG_AND_RETHROW()
}

void detail::block_log_impl::split_log() {
std::future<std::tuple<signed_block_ptr, std::vector<char>>>
block_log::create_append_future(boost::asio::io_context& thread_pool, const signed_block_ptr& b, packed_transaction::cf_compression_type segment_compression) {
return my->create_append_future(thread_pool, b, segment_compression);
}

std::future<std::tuple<signed_block_ptr, std::vector<char>>>
detail::block_log_impl::create_append_future(boost::asio::io_context& thread_pool, const signed_block_ptr& b, packed_transaction::cf_compression_type segment_compression) {
future_version = (b->block_num() % stride == 0) ? block_log::max_supported_version : future_version;
std::promise<std::tuple<signed_block_ptr, std::vector<char>>> p;
std::future<std::tuple<signed_block_ptr, std::vector<char>>> f = p.get_future();
return async_thread_pool( thread_pool, [b, version=future_version, segment_compression]() {
return std::make_tuple(b, create_block_buffer(*b, version, segment_compression));
} );
}

uint64_t block_log::append(std::future<std::tuple<signed_block_ptr, std::vector<char>>> f) {
return my->append( std::move( f ) );
}

void detail::block_log_impl::split_log() {
block_file.close();
index_file.close();

Expand All @@ -720,6 +780,7 @@ namespace eosio { namespace chain {
block_file.open(fc::cfile::truncate_rw_mode);
index_file.open(fc::cfile::truncate_rw_mode);

future_version = block_log_impl::default_version;
preamble.version = block_log_impl::default_version;
preamble.first_block_num = first_bnum;
preamble.chain_context = std::move(chain_context);
Expand Down
40 changes: 32 additions & 8 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,18 @@ struct controller_impl {
if( fork_head->dpos_irreversible_blocknum <= lib_num )
return;

const auto branch = fork_db.fetch_branch( fork_head->id, fork_head->dpos_irreversible_blocknum );
auto branch = fork_db.fetch_branch( fork_head->id, fork_head->dpos_irreversible_blocknum );
try {
const auto& rbi = reversible_blocks.get_index<reversible_block_index,by_num>();

std::vector<std::future<std::tuple<signed_block_ptr, std::vector<char>>>> v;
v.reserve( branch.size() );
for( auto bitr = branch.rbegin(); bitr != branch.rend(); ++bitr ) {
v.emplace_back( blog.create_append_future( thread_pool.get_executor(), (*bitr)->block,
packed_transaction::cf_compression_type::none ) );
}
auto it = v.begin();

for( auto bitr = branch.rbegin(); bitr != branch.rend(); ++bitr ) {
if( read_mode == db_read_mode::IRREVERSIBLE ) {
apply_block( *bitr, controller::block_status::complete, trx_meta_cache_lookup{} );
Expand All @@ -354,7 +362,8 @@ struct controller_impl {

// blog.append could fail due to failures like running out of space.
// Do it before commit so that in case it throws, DB can be rolled back.
blog.append( (*bitr)->block, packed_transaction::cf_compression_type::none );
blog.append( std::move( *it ) );
++it;

kv_db.commit( (*bitr)->block_num );
root_id = (*bitr)->id;
Expand All @@ -373,8 +382,12 @@ struct controller_impl {
}

if( root_id != fork_db.root()->id ) {
branch.emplace_back(fork_db.root());
fork_db.advance_root( root_id );
}

// delete branch in thread pool
boost::asio::post( thread_pool.get_executor(), [branch{std::move(branch)}]() {} );
}

/**
Expand Down Expand Up @@ -1536,6 +1549,21 @@ struct controller_impl {

auto& pbhs = pending->get_pending_block_header_state();

auto& bb = std::get<building_block>(pending->_block_stage);

auto action_merkle_fut = async_thread_pool( thread_pool.get_executor(),
[ids{std::move( bb._action_receipt_digests )}]() mutable {
return merkle( std::move( ids ) );
} );
const bool calc_trx_merkle = !std::holds_alternative<checksum256_type>(bb._trx_mroot_or_receipt_digests);
std::future<checksum256_type> trx_merkle_fut;
if( calc_trx_merkle ) {
trx_merkle_fut = async_thread_pool( thread_pool.get_executor(),
[ids{std::move( std::get<digests_t>(bb._trx_mroot_or_receipt_digests) )}]() mutable {
return merkle( std::move( ids ) );
} );
}

// Update resource limits:
resource_limits.process_account_limit_updates();
const auto& chain_config = self.get_global_properties().configuration;
Expand All @@ -1546,14 +1574,10 @@ struct controller_impl {
);
resource_limits.process_block_usage(pbhs.block_num);

auto& bb = std::get<building_block>(pending->_block_stage);

// Create (unsigned) block:
auto block_ptr = std::make_shared<signed_block>( pbhs.make_block_header(
std::holds_alternative<checksum256_type>(bb._trx_mroot_or_receipt_digests) ?
std::get<checksum256_type>(bb._trx_mroot_or_receipt_digests) :
merkle( std::move( std::get<digests_t>(bb._trx_mroot_or_receipt_digests) ) ),
merkle( std::move( std::get<building_block>(pending->_block_stage)._action_receipt_digests ) ),
calc_trx_merkle ? trx_merkle_fut.get() : std::get<checksum256_type>(bb._trx_mroot_or_receipt_digests),
action_merkle_fut.get(),
bb._new_pending_producer_schedule,
std::move( bb._new_protocol_feature_activations ),
protocol_features.get_protocol_feature_set()
Expand Down
7 changes: 7 additions & 0 deletions libraries/chain/include/eosio/chain/block_log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <eosio/chain/block.hpp>
#include <eosio/chain/genesis_state.hpp>
#include <eosio/chain/block_log_config.hpp>
#include <future>

namespace eosio { namespace chain {

Expand Down Expand Up @@ -46,6 +47,12 @@ namespace eosio { namespace chain {

uint64_t append(const signed_block_ptr& block, packed_transaction::cf_compression_type segment_compression);

// create futures for append, must call in order of blocks
std::future<std::tuple<signed_block_ptr, std::vector<char>>>
create_append_future(boost::asio::io_context& thread_pool,
const signed_block_ptr& b, packed_transaction::cf_compression_type segment_compression);
uint64_t append(std::future<std::tuple<signed_block_ptr, std::vector<char>>> f);

void reset( const genesis_state& gs, const signed_block_ptr& genesis_block, packed_transaction::cf_compression_type segment_compression);
void reset( const chain_id_type& chain_id, uint32_t first_block_num );

Expand Down
8 changes: 4 additions & 4 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,13 +539,13 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
if( exception_is_exhausted( *trace->except, deadline_is_subjective )) {
_unapplied_transactions.add_incoming( trx, persist_until_expired, next );
if( _pending_block_mode == pending_block_mode::producing ) {
fc_dlog(_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} COULD NOT FIT, tx: ${txid} RETRYING ",
fc_dlog(_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} COULD NOT FIT, tx: ${txid} RETRYING, ec: ${c} ",
("block_num", chain.head_block_num() + 1)
("prod", get_pending_block_producer())
("txid", trx->id()));
("txid", trx->id())("c", trace->except->code()));
} else {
fc_dlog(_log, "[TRX_TRACE] Speculative execution COULD NOT FIT tx: ${txid} RETRYING",
("txid", trx->id()));
fc_dlog(_log, "[TRX_TRACE] Speculative execution COULD NOT FIT tx: ${txid} RETRYING, ec: ${c}",
("txid", trx->id())("c", trace->except->code()));
}
if( !exhausted )
exhausted = block_is_exhausted();
Expand Down