diff --git a/core/consensus/babe/impl/babe_config_repository_impl.cpp b/core/consensus/babe/impl/babe_config_repository_impl.cpp index 2c360bbe47..2bfa2f1218 100644 --- a/core/consensus/babe/impl/babe_config_repository_impl.cpp +++ b/core/consensus/babe/impl/babe_config_repository_impl.cpp @@ -82,9 +82,11 @@ namespace kagome::consensus::babe { BOOST_ASSERT(header_repo_ != nullptr); BOOST_ASSERT(babe_api_ != nullptr); - if (auto r = indexer_.init(); not r) { - logger_->error("Indexer::init error: {}", r.error()); - } + SAFE_UNIQUE(indexer_) { + if (auto r = indexer_.init(); not r) { + logger_->error("Indexer::init error: {}", r.error()); + } + }; app_state_manager.takeControl(*this); } @@ -104,27 +106,29 @@ namespace kagome::consensus::babe { return false; } - std::unique_lock lock{indexer_mutex_}; auto finalized = block_tree_->getLastFinalized(); auto finalized_header = block_tree_->getBlockHeader(finalized.hash).value(); - if (finalized.number - indexer_.last_finalized_indexed_.number - > kMaxUnindexedBlocksNum - and trie_storage_->getEphemeralBatchAt(finalized_header.state_root)) { - warp(lock, finalized); - } + SAFE_UNIQUE(indexer_) { + if (finalized.number - indexer_.last_finalized_indexed_.number + > kMaxUnindexedBlocksNum + and trie_storage_->getEphemeralBatchAt(finalized_header.state_root)) { + warp(indexer_, finalized); + } - if (!timings_) { - auto genesis_res = config({block_tree_->getGenesisBlockHash(), 0}, false); - if (genesis_res.has_value()) { - auto &genesis = genesis_res.value(); - timings_.init(genesis->slot_duration, genesis->epoch_length); - SL_DEBUG(logger_, - "Timing was initialized: slot is {}ms, epoch is {} slots", - timings_.slot_duration.count(), - timings_.epoch_length); + if (!timings_) { + auto genesis_res = + config(indexer_, {block_tree_->getGenesisBlockHash(), 0}, false); + if (genesis_res.has_value()) { + auto &genesis = genesis_res.value(); + timings_.init(genesis->slot_duration, genesis->epoch_length); + SL_DEBUG(logger_, + "Timing was initialized: slot is {}ms, epoch is {} slots", + timings_.slot_duration.count(), + timings_.epoch_length); + } } - } + }; [[maybe_unused]] bool active_ = false; @@ -133,7 +137,10 @@ namespace kagome::consensus::babe { auto consensus = consensus_selector_.get()->getProductionConsensus(best); if (std::dynamic_pointer_cast(consensus)) { active_ = true; - if (auto res = config(best, true); not res and not config_warp_sync_) { + auto res = SAFE_UNIQUE(indexer_) { + return config(indexer_, best, true); + }; + if (not res and not config_warp_sync_) { SL_ERROR(logger_, "get config at best {} error: {}", best, res.error()); auto best_header = block_tree_->getBlockHeader(best.hash).value(); if (not trie_storage_->getEphemeralBatchAt(best_header.state_root)) { @@ -146,8 +153,10 @@ namespace kagome::consensus::babe { chain_sub_.onFinalize([weak{weak_from_this()}]() { if (auto self = weak.lock()) { - std::unique_lock lock{self->indexer_mutex_}; - self->indexer_.finalize(); + auto &indexer_ = self->indexer_; + SAFE_UNIQUE(indexer_) { + indexer_.finalize(); + }; } }); @@ -165,8 +174,9 @@ namespace kagome::consensus::babe { slots_util_.get()->slotToEpoch(parent_info, parent_slot)); epoch_changed = epoch_number != parent_epoch; } - std::unique_lock lock{indexer_mutex_}; - return config(parent_info, epoch_changed); + return SAFE_UNIQUE(indexer_) { + return config(indexer_, parent_info, epoch_changed); + }; } outcome::result BabeConfigRepositoryImpl::getFirstBlockSlotNumber( @@ -209,18 +219,20 @@ namespace kagome::consensus::babe { return slot1.value(); } - void BabeConfigRepositoryImpl::warp(std::unique_lock &lock, + void BabeConfigRepositoryImpl::warp(Indexer &indexer_, const primitives::BlockInfo &block) { indexer_.put(block, {}, true); } void BabeConfigRepositoryImpl::warp(const primitives::BlockInfo &block) { - std::unique_lock lock{indexer_mutex_}; - warp(lock, block); + SAFE_UNIQUE(indexer_) { + warp(indexer_, block); + }; } outcome::result> - BabeConfigRepositoryImpl::config(const primitives::BlockInfo &block, + BabeConfigRepositoryImpl::config(Indexer &indexer_, + const primitives::BlockInfo &block, bool next_epoch) const { auto descent = indexer_.descend(block); outcome::result cb_res = outcome::success(); @@ -262,7 +274,7 @@ namespace kagome::consensus::babe { OUTCOME_TRY(header, block_tree_->getBlockHeader(info.hash)); if (HasBabeConsensusDigest digests{header}) { if (not prev_state) { - BOOST_OUTCOME_TRY(prev_state, loadPrev(prev)); + BOOST_OUTCOME_TRY(prev_state, loadPrev(indexer_, prev)); } auto state = applyDigests(getConfig(*prev_state), digests); BabeIndexedValue value{ @@ -294,10 +306,10 @@ namespace kagome::consensus::babe { return *r->second.value->state; } if (next_epoch) { - OUTCOME_TRY(load(r->first, r->second)); + OUTCOME_TRY(load(indexer_, r->first, r->second)); return *r->second.value->next_state; } - return loadPrev(r->second.prev); + return loadPrev(indexer_, r->second.prev); } std::shared_ptr BabeConfigRepositoryImpl::applyDigests( @@ -320,6 +332,7 @@ namespace kagome::consensus::babe { } outcome::result BabeConfigRepositoryImpl::load( + Indexer &indexer_, const primitives::BlockInfo &block, blockchain::Indexed &item) const { if (not item.value->next_state) { @@ -339,6 +352,7 @@ namespace kagome::consensus::babe { outcome::result> BabeConfigRepositoryImpl::loadPrev( + Indexer &indexer_, const std::optional &prev) const { if (not prev) { return Error::PREVIOUS_NOT_FOUND; @@ -350,7 +364,7 @@ namespace kagome::consensus::babe { if (not r->value) { return Error::PREVIOUS_NOT_FOUND; } - OUTCOME_TRY(load(*prev, *r)); + OUTCOME_TRY(load(indexer_, *prev, *r)); return *r->value->next_state; } } // namespace kagome::consensus::babe diff --git a/core/consensus/babe/impl/babe_config_repository_impl.hpp b/core/consensus/babe/impl/babe_config_repository_impl.hpp index bbcbc559d0..ec1d2bc1b9 100644 --- a/core/consensus/babe/impl/babe_config_repository_impl.hpp +++ b/core/consensus/babe/impl/babe_config_repository_impl.hpp @@ -8,8 +8,6 @@ #include "consensus/babe/babe_config_repository.hpp" -#include - #include "blockchain/indexer.hpp" #include "consensus/babe/has_babe_consensus_digest.hpp" #include "consensus/babe/types/scheduled_change.hpp" @@ -18,6 +16,7 @@ #include "primitives/block_data.hpp" #include "primitives/event_types.hpp" #include "storage/spaced_storage.hpp" +#include "utils/safe_object.hpp" namespace kagome::application { class AppStateManager; @@ -98,32 +97,36 @@ namespace kagome::consensus::babe { void warp(const primitives::BlockInfo &block) override; private: + using Indexer = blockchain::Indexer; + outcome::result getFirstBlockSlotNumber( const primitives::BlockInfo &parent_info) const; outcome::result> config( - const primitives::BlockInfo &block, bool next_epoch) const; + Indexer &indexer_, + const primitives::BlockInfo &block, + bool next_epoch) const; std::shared_ptr applyDigests( const NextConfigDataV1 &config, const HasBabeConsensusDigest &digests) const; outcome::result load( + Indexer &indexer_, const primitives::BlockInfo &block, blockchain::Indexed &item) const; outcome::result> loadPrev( + Indexer &indexer_, const std::optional &prev) const; - void warp(std::unique_lock &lock, - const primitives::BlockInfo &block); + void warp(Indexer &indexer_, const primitives::BlockInfo &block); std::shared_ptr persistent_storage_; bool config_warp_sync_; EpochTimings &timings_; std::shared_ptr block_tree_; - mutable std::mutex indexer_mutex_; - mutable blockchain::Indexer indexer_; + mutable SafeObject indexer_; std::shared_ptr header_repo_; LazySPtr consensus_selector_; std::shared_ptr babe_api_; diff --git a/core/consensus/timeline/types.hpp b/core/consensus/timeline/types.hpp index e417a5bc70..1e78686784 100644 --- a/core/consensus/timeline/types.hpp +++ b/core/consensus/timeline/types.hpp @@ -56,7 +56,9 @@ namespace kagome::consensus { *this); } - auto operator<=>(const SlotDuration &) const = default; + auto operator<=>(const SlotDuration &r) const { + return count() <=> r.count(); + } friend ::scale::ScaleEncoderStream &operator<<( ::scale::ScaleEncoderStream &s, const SlotDuration &duration) { diff --git a/core/utils/safe_object.hpp b/core/utils/safe_object.hpp index b4aa4107da..84b73e4f7e 100644 --- a/core/utils/safe_object.hpp +++ b/core/utils/safe_object.hpp @@ -10,6 +10,14 @@ #include #include #include +#include + +#define SAFE_UNIQUE_CAPTURE(x, ...) \ + x ^= __VA_ARGS__(typename std::remove_cvref_t::Type & x) +#define SAFE_SHARED_CAPTURE(x, ...) \ + x |= __VA_ARGS__(const typename std::remove_cvref_t::Type &x) +#define SAFE_UNIQUE(x) SAFE_UNIQUE_CAPTURE(x, [&]) +#define SAFE_SHARED(x) SAFE_SHARED_CAPTURE(x, [&]) // clang-format off /** @@ -55,6 +63,13 @@ struct SafeObject { return std::forward(f)(t_); } + auto operator^=(auto &&f) { + return exclusiveAccess(std::forward(f)); + } + auto operator|=(auto &&f) const { + return sharedAccess(std::forward(f)); + } + T &unsafeGet() { return t_; }