Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

produce and execute blocks on thread pool #1646

Merged
merged 8 commits into from
Sep 14, 2023
62 changes: 41 additions & 21 deletions core/consensus/babe/impl/babe_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "storage/changes_trie/impl/storage_changes_tracker_impl.hpp"
#include "storage/trie/serialization/ordered_trie_hash.hpp"
#include "storage/trie/trie_storage.hpp"
#include "utils/thread_pool.hpp"

namespace {
constexpr const char *kIsMajorSyncing = "kagome_sub_libp2p_is_major_syncing";
Expand Down Expand Up @@ -69,6 +70,7 @@ namespace kagome::consensus::babe {
std::shared_ptr<application::AppStateManager> app_state_manager,
std::shared_ptr<BabeLottery> lottery,
std::shared_ptr<BabeConfigRepository> babe_config_repo,
const ThreadPool &thread_pool,
std::shared_ptr<authorship::Proposer> proposer,
std::shared_ptr<blockchain::BlockTree> block_tree,
std::shared_ptr<network::BlockAnnounceTransmitter>
Expand Down Expand Up @@ -100,6 +102,7 @@ namespace kagome::consensus::babe {
app_state_manager_(app_state_manager),
lottery_{std::move(lottery)},
babe_config_repo_{std::move(babe_config_repo)},
io_context_{thread_pool.io_context()},
proposer_{std::move(proposer)},
block_tree_{std::move(block_tree)},
block_announce_transmitter_{std::move(block_announce_transmitter)},
Expand Down Expand Up @@ -825,8 +828,8 @@ namespace kagome::consensus::babe {
metric_is_relaychain_validator_->set(false);
if (app_config_validator_) {
SL_VERBOSE(log_,
"Authority not known, skipping slot processing. "
"Probably authority list has changed.");
"Authority not known, skipping slot processing. "
"Probably authority list has changed.");
}
} else {
metric_is_relaychain_validator_->set(true);
Expand Down Expand Up @@ -1065,7 +1068,7 @@ namespace kagome::consensus::babe {
return;
}

auto timer = metric_block_proposal_time.manual();
auto proposal_start = std::chrono::steady_clock::now();
// calculate babe_pre_digest
auto babe_pre_digest_res =
babePreDigest(slot_type, output, authority_index);
Expand All @@ -1075,26 +1078,43 @@ namespace kagome::consensus::babe {
}
const auto &babe_pre_digest = babe_pre_digest_res.value();

auto changes_tracker =
std::make_shared<storage::changes_trie::StorageChangesTrackerImpl>();

// create new block
auto pre_seal_block_res =
proposer_->propose(best_block_,
babe_util_->slotFinishTime(current_slot_)
- babe_config_repo_->slotDuration() / 3,
inherent_data,
{babe_pre_digest},
changes_tracker);
if (!pre_seal_block_res) {
SL_ERROR(log_, "Cannot propose a block: {}", pre_seal_block_res.error());
return;
}
auto propose = [this,
inherent_data{std::move(inherent_data)},
now,
proposal_start,
babe_pre_digest{std::move(babe_pre_digest)}] {
auto changes_tracker =
std::make_shared<storage::changes_trie::StorageChangesTrackerImpl>();

// create new block
auto res = proposer_->propose(best_block_,
babe_util_->slotFinishTime(current_slot_)
- babe_config_repo_->slotDuration() / 3,
inherent_data,
{babe_pre_digest},
changes_tracker);
if (not res) {
SL_ERROR(log_, "Cannot propose a block: {}", res.error());
return;
}

auto duration_ms = timer().count();
SL_DEBUG(log_, "Block has been built in {} ms", duration_ms);
processSlotLeadershipProposed(now,
proposal_start,
std::move(changes_tracker),
std::move(res.value()));
};
io_context_->post(std::move(propose));
}

auto block = pre_seal_block_res.value();
void BabeImpl::processSlotLeadershipProposed(
uint64_t now,
clock::SteadyClock::TimePoint proposal_start,
std::shared_ptr<storage::changes_trie::StorageChangesTrackerImpl>
&&changes_tracker,
primitives::Block &&block) {
auto duration_ms =
metric_block_proposal_time.observe(proposal_start).count();
SL_DEBUG(log_, "Block has been built in {} ms", duration_ms);

// Ensure block's extrinsics root matches extrinsics in block's body
BOOST_ASSERT_MSG(
Expand Down
29 changes: 28 additions & 1 deletion core/consensus/babe/impl/babe_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@
#include "primitives/inherent_data.hpp"
#include "telemetry/service.hpp"

namespace boost::asio {
class io_context;
} // namespace boost::asio

namespace kagome {
class ThreadPool;
} // namespace kagome

namespace kagome::application {
class AppStateManager;
} // namespace kagome::application
Expand Down Expand Up @@ -73,7 +81,11 @@ namespace kagome::consensus::babe {

namespace kagome::storage::trie {
class TrieStorage;
}
} // namespace kagome::storage::trie

namespace kagome::storage::changes_trie {
class StorageChangesTrackerImpl;
} // namespace kagome::storage::changes_trie

namespace kagome::consensus::babe {

Expand All @@ -100,6 +112,7 @@ namespace kagome::consensus::babe {
std::shared_ptr<application::AppStateManager> app_state_manager,
std::shared_ptr<BabeLottery> lottery,
std::shared_ptr<BabeConfigRepository> babe_config_repo,
const ThreadPool &thread_pool,
std::shared_ptr<authorship::Proposer> proposer,
std::shared_ptr<blockchain::BlockTree> block_tree,
std::shared_ptr<network::BlockAnnounceTransmitter>
Expand Down Expand Up @@ -186,6 +199,19 @@ namespace kagome::consensus::babe {
clock::SystemClock::TimePoint slot_timestamp,
std::optional<std::reference_wrapper<const crypto::VRFOutput>> output,
primitives::AuthorityIndex authority_index);
/**
* `processSlotLeadership` coroutine piece
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be confusing because it's not actually a coroutine, I'd rather that you describe it in the comment the way it is.

* processSlotLeadership() {
* await propose()
* // processSlotLeadershipProposed()
* }
*/
void processSlotLeadershipProposed(
turuslan marked this conversation as resolved.
Show resolved Hide resolved
uint64_t now,
clock::SteadyClock::TimePoint proposal_start,
std::shared_ptr<storage::changes_trie::StorageChangesTrackerImpl>
&&changes_tracker,
primitives::Block &&block);

void changeLotteryEpoch(
const EpochDescriptor &epoch,
Expand All @@ -205,6 +231,7 @@ namespace kagome::consensus::babe {
std::shared_ptr<application::AppStateManager> app_state_manager_;
std::shared_ptr<BabeLottery> lottery_;
std::shared_ptr<BabeConfigRepository> babe_config_repo_;
std::shared_ptr<boost::asio::io_context> io_context_;
std::shared_ptr<authorship::Proposer> proposer_;
std::shared_ptr<blockchain::BlockTree> block_tree_;
std::shared_ptr<network::BlockAnnounceTransmitter>
Expand Down
68 changes: 54 additions & 14 deletions core/consensus/babe/impl/block_executor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "consensus/babe/impl/block_executor_impl.hpp"

#include "application/app_configuration.hpp"
#include "blockchain/block_tree.hpp"
#include "blockchain/block_tree_error.hpp"
#include "blockchain/digest_tracker.hpp"
Expand All @@ -22,6 +23,7 @@
#include "storage/changes_trie/impl/storage_changes_tracker_impl.hpp"
#include "transaction_pool/transaction_pool.hpp"
#include "transaction_pool/transaction_pool_error.hpp"
#include "utils/thread_pool.hpp"

namespace kagome::consensus::babe {
metrics::HistogramTimer metric_block_execution_time{
Expand All @@ -31,15 +33,19 @@ namespace kagome::consensus::babe {
};

BlockExecutorImpl::BlockExecutorImpl(
const application::AppConfiguration &app_config,
std::shared_ptr<blockchain::BlockTree> block_tree,
const ThreadPool &thread_pool,
std::shared_ptr<runtime::Core> core,
std::shared_ptr<transaction_pool::TransactionPool> tx_pool,
std::shared_ptr<crypto::Hasher> hasher,
std::shared_ptr<runtime::OffchainWorkerApi> offchain_worker_api,
primitives::events::StorageSubscriptionEnginePtr storage_sub_engine,
primitives::events::ChainSubscriptionEnginePtr chain_sub_engine,
std::unique_ptr<BlockAppenderBase> appender)
: block_tree_{std::move(block_tree)},
: app_config_{app_config},
block_tree_{std::move(block_tree)},
io_context_{thread_pool.io_context()},
core_{std::move(core)},
tx_pool_{std::move(tx_pool)},
hasher_{std::move(hasher)},
Expand Down Expand Up @@ -79,7 +85,7 @@ namespace kagome::consensus::babe {
}

// get current time to measure performance if block execution
auto start_time = std::chrono::high_resolution_clock::now();
auto start_time = std::chrono::steady_clock::now();

bool block_was_applied_earlier = false;

Expand All @@ -101,15 +107,13 @@ namespace kagome::consensus::babe {
return;
}

std::optional<ConsistencyGuard> consistency_guard{};
if (auto res =
appender_->observeDigestsAndValidateHeader(block, block_context);
res.has_value()) {
consistency_guard.emplace(std::move(res.value()));
} else {
callback(res.as_failure());
auto consistency_guard_res =
appender_->observeDigestsAndValidateHeader(block, block_context);
if (not consistency_guard_res) {
callback(consistency_guard_res.as_failure());
return;
}
auto &consistency_guard = consistency_guard_res.value();

// Calculate best block before new one will be applied
auto last_finalized_block = block_tree_->getLastFinalized();
Expand All @@ -118,7 +122,26 @@ namespace kagome::consensus::babe {
BOOST_ASSERT(previous_best_block_res.has_value());
const auto &previous_best_block = previous_best_block_res.value();

if (not block_was_applied_earlier) {
if (block_was_applied_earlier) {
applyBlockExecuted(std::move(block),
justification,
std::move(callback),
block_info,
start_time,
consistency_guard,
previous_best_block);
return;
}
auto execute = [this,
self{shared_from_this()},
block{std::move(block)},
justification,
callback{std::move(callback)},
block_info,
start_time,
consistency_guard{std::make_shared<ConsistencyGuard>(
std::move(consistency_guard))},
previous_best_block]() mutable {
auto timer = metric_block_execution_time.manual();

auto parent =
Expand Down Expand Up @@ -166,14 +189,31 @@ namespace kagome::consensus::babe {

changes_tracker->onBlockAdded(
block_info.hash, storage_sub_engine_, chain_subscription_engine_);
}

applyBlockExecuted(std::move(block),
justification,
std::move(callback),
block_info,
start_time,
*consistency_guard,
previous_best_block);
};
io_context_->post(std::move(execute));
}

void BlockExecutorImpl::applyBlockExecuted(
primitives::Block &&block,
const std::optional<primitives::Justification> &justification,
ApplyJustificationCb &&callback,
const primitives::BlockInfo &block_info,
clock::SteadyClock::TimePoint start_time,
ConsistencyGuard &consistency_guard,
const primitives::BlockInfo &previous_best_block) {
/// TODO(iceseer): in a case we change the authority set, we can get an
/// error with the following behavior: the finalisation will commit the
/// authority change and the step of the next block processing will be
/// failed because of VRF error
BOOST_ASSERT(consistency_guard);
consistency_guard->commit();
consistency_guard.commit();

appender_->applyJustifications(
block_info,
Expand Down Expand Up @@ -236,7 +276,7 @@ namespace kagome::consensus::babe {
lag_msg = " (lag <1 min.)";
}

auto now = std::chrono::high_resolution_clock::now();
auto now = std::chrono::steady_clock::now();

self->logger_->info(
"Imported block {} within {} ms.{}",
Expand Down
26 changes: 26 additions & 0 deletions core/consensus/babe/impl/block_executor_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@
#include "primitives/event_types.hpp"
#include "telemetry/service.hpp"

namespace boost::asio {
class io_context;
} // namespace boost::asio

namespace kagome {
class ThreadPool;
} // namespace kagome

namespace kagome::application {
class AppConfiguration;
} // namespace kagome::application

namespace kagome::runtime {
class OffchainWorkerApi;
class Core;
Expand All @@ -36,13 +48,16 @@ namespace kagome::transaction_pool {
namespace kagome::consensus::babe {

class BlockAppenderBase;
class ConsistencyGuard;

class BlockExecutorImpl
: public BlockExecutor,
public std::enable_shared_from_this<BlockExecutorImpl> {
public:
BlockExecutorImpl(
const application::AppConfiguration &app_config,
std::shared_ptr<blockchain::BlockTree> block_tree,
const ThreadPool &thread_pool,
std::shared_ptr<runtime::Core> core,
std::shared_ptr<transaction_pool::TransactionPool> tx_pool,
std::shared_ptr<crypto::Hasher> hasher,
Expand All @@ -59,7 +74,18 @@ namespace kagome::consensus::babe {
ApplyJustificationCb &&callback) override;

private:
void applyBlockExecuted(
primitives::Block &&block,
const std::optional<primitives::Justification> &justification,
ApplyJustificationCb &&callback,
const primitives::BlockInfo &block_info,
clock::SteadyClock::TimePoint start_time,
ConsistencyGuard &consistency_guard,
const primitives::BlockInfo &previous_best_block);

const application::AppConfiguration &app_config_;
std::shared_ptr<blockchain::BlockTree> block_tree_;
std::shared_ptr<boost::asio::io_context> io_context_;
std::shared_ptr<runtime::Core> core_;
std::shared_ptr<transaction_pool::TransactionPool> tx_pool_;
std::shared_ptr<crypto::Hasher> hasher_;
Expand Down
Loading