From c8a90c71df518b9e4605d3c99e95b0357a140ddf Mon Sep 17 00:00:00 2001 From: iceseer Date: Sun, 26 Mar 2023 13:52:34 +0300 Subject: [PATCH 01/13] parallel backing Signed-off-by: iceseer --- core/blockchain/impl/block_tree_impl.cpp | 23 ++- core/blockchain/impl/block_tree_impl.hpp | 2 +- core/consensus/babe/impl/babe_impl.cpp | 2 +- .../approval/approval_distribution.cpp | 67 +++++-- .../approval/approval_distribution.hpp | 2 + core/parachain/availability/store/store.hpp | 32 ++-- .../availability/store/store_impl.cpp | 178 ++++++++++-------- .../availability/store/store_impl.hpp | 46 ++--- .../validator/impl/parachain_processor.cpp | 132 ++++++++----- .../validator/parachain_processor.hpp | 2 + core/primitives/event_types.hpp | 7 +- 11 files changed, 299 insertions(+), 194 deletions(-) diff --git a/core/blockchain/impl/block_tree_impl.cpp b/core/blockchain/impl/block_tree_impl.cpp index 29087e0a87..fe9ea93f83 100644 --- a/core/blockchain/impl/block_tree_impl.cpp +++ b/core/blockchain/impl/block_tree_impl.cpp @@ -11,6 +11,7 @@ #include "blockchain/block_tree_error.hpp" #include "blockchain/impl/cached_tree.hpp" #include "blockchain/impl/justification_storage_policy.hpp" +#include "blockchain/impl/storage_util.hpp" #include "consensus/babe/impl/babe_digests_util.hpp" #include "consensus/babe/is_primary.hpp" #include "crypto/blake2/blake2b.h" @@ -693,7 +694,20 @@ namespace kagome::blockchain { node->finalized = true; node->has_justification = true; - OUTCOME_TRY(prune(node)); + OUTCOME_TRY(retired_hashes, prune(node)); + retired_hashes.emplace_back(node->getBlockInfo().hash); + for (primitives::BlockNumber n = last_finalized_block_info.number + 1ull; + n < node->getBlockInfo().number; + ++n) { + OUTCOME_TRY(hash_opt, storage_->getBlockHash(n)); + if (hash_opt) { + retired_hashes.emplace_back(std::move(*hash_opt)); + } + } + + chain_events_engine_->notify( + primitives::events::ChainEventType::kDeactivateAfterFinalization, + retired_hashes); tree_->updateTreeRoot(node, justification); @@ -1159,7 +1173,7 @@ namespace kagome::blockchain { } } - outcome::result BlockTreeImpl::prune( + outcome::result> BlockTreeImpl::prune( const std::shared_ptr &lastFinalizedNode) { std::deque> to_remove; @@ -1191,9 +1205,12 @@ namespace kagome::blockchain { } std::vector extrinsics; + std::vector retired_hashes; // remove from storage + retired_hashes.reserve(to_remove.size()); for (const auto &node : to_remove) { + retired_hashes.emplace_back(node->block_hash); OUTCOME_TRY(block_body_res, storage_->getBlockBody(node->block_hash)); if (block_body_res.has_value()) { extrinsics.reserve(extrinsics.size() + block_body_res.value().size()); @@ -1223,7 +1240,7 @@ namespace kagome::blockchain { } } - return outcome::success(); + return retired_hashes; } outcome::result BlockTreeImpl::reorganize() { diff --git a/core/blockchain/impl/block_tree_impl.hpp b/core/blockchain/impl/block_tree_impl.hpp index 18dae814f8..ef5934c1c2 100644 --- a/core/blockchain/impl/block_tree_impl.hpp +++ b/core/blockchain/impl/block_tree_impl.hpp @@ -168,7 +168,7 @@ namespace kagome::blockchain { */ std::vector getLeavesSorted() const; - outcome::result prune( + outcome::result> prune( const std::shared_ptr &lastFinalizedNode); outcome::result reorganize(); diff --git a/core/consensus/babe/impl/babe_impl.cpp b/core/consensus/babe/impl/babe_impl.cpp index 64bf9014c4..f453d1bc3d 100644 --- a/core/consensus/babe/impl/babe_impl.cpp +++ b/core/consensus/babe/impl/babe_impl.cpp @@ -699,7 +699,7 @@ namespace kagome::consensus::babe { // Slot processing begins in 1/3 slot time before end auto finish_time = babe_util_->slotFinishTime(current_slot_) - - babe_config_repo_->slotDuration() / 3; + - babe_config_repo_->slotDuration() / 10; SL_VERBOSE(log_, "Starting a slot {} in epoch {} (remains {:.2f} sec.)", diff --git a/core/parachain/approval/approval_distribution.cpp b/core/parachain/approval/approval_distribution.cpp index 070525939f..acba1bebe5 100644 --- a/core/parachain/approval/approval_distribution.cpp +++ b/core/parachain/approval/approval_distribution.cpp @@ -526,6 +526,30 @@ namespace kagome::parachain { } }); + chain_sub_ = std::make_shared( + peer_view_->intoChainEventsEngine()); + chain_sub_->subscribe( + chain_sub_->generateSubscriptionSetId(), + primitives::events::ChainEventType::kDeactivateAfterFinalization); + chain_sub_->setCallback( + [wptr{weak_from_this()}]( + auto /*set_id*/, + auto && /*internal_obj*/, + auto /*event_type*/, + const primitives::events::ChainEventParams &event) { + if (auto self = wptr.lock()) { + if (auto const value = if_type< + const primitives::events::RemoveAfterFinalizationParams>( + event)) { + for (auto const &lost : value->get()) { + self->logger_->trace( + "Cleaning up stale pending messages.(block hash={})", lost); + self->pending_known_.erase(lost); + } + } + } + }); + return true; } @@ -1071,16 +1095,6 @@ namespace kagome::parachain { [[maybe_unused]] auto &_ = pending_known_[result.value()]; } - for (auto it = pending_known_.begin(); it != pending_known_.end();) { - if (updated.view.contains(it->first)) { - ++it; - } else { - logger_->trace("Cleaning up stale pending messages.(block hash={})", - it->first); - it = pending_known_.erase(it); - } - } - handle_new_head( result.value(), updated, @@ -1397,7 +1411,31 @@ namespace kagome::parachain { source ? source->get().toBase58() : "our", block_hash, validator_index); + auto &entry = opt_entry->get(); + if (claimed_candidate_index >= entry.candidates.size()) { + logger_->warn( + "Unexpected candidate entry. (candidate index={}, block hash={})", + claimed_candidate_index, + block_hash); + return; + } + + auto &candidate_entry = entry.candidates[claimed_candidate_index]; + if (auto it = candidate_entry.messages.find(validator_index); + it != candidate_entry.messages.end()) { + if (auto state{boost::get( + &it->second.approval_state)}) { + logger_->warn( + "Already have approved state. (candidate index={}, " + "block hash={}, validator index={})", + claimed_candidate_index, + block_hash, + validator_index); + return; + } + } + if (source) { /// TODO(iceseer): vector-clock for knowledge switch ( @@ -1430,15 +1468,6 @@ namespace kagome::parachain { } auto const local = !source; - if (claimed_candidate_index >= entry.candidates.size()) { - logger_->warn( - "Unexpected candidate entry. (candidate index={}, block hash={})", - claimed_candidate_index, - block_hash); - return; - } - - auto &candidate_entry = entry.candidates[claimed_candidate_index]; [[maybe_unused]] auto &message_state = candidate_entry.messages .emplace(validator_index, diff --git a/core/parachain/approval/approval_distribution.hpp b/core/parachain/approval/approval_distribution.hpp index 194f3ef8ea..6780d58df9 100644 --- a/core/parachain/approval/approval_distribution.hpp +++ b/core/parachain/approval/approval_distribution.hpp @@ -619,6 +619,8 @@ namespace kagome::parachain { const ApprovalVotingSubsystem config_; std::shared_ptr peer_view_; network::PeerView::MyViewSubscriberPtr my_view_sub_; + std::shared_ptr chain_sub_; + Store>, StorePair, StorePair, diff --git a/core/parachain/availability/store/store.hpp b/core/parachain/availability/store/store.hpp index 59494d2726..9320f4f5b2 100644 --- a/core/parachain/availability/store/store.hpp +++ b/core/parachain/availability/store/store.hpp @@ -26,37 +26,29 @@ namespace kagome::parachain { /// Has ErasureChunk virtual bool hasChunk(const CandidateHash &candidate_hash, - ValidatorIndex index) = 0; + ValidatorIndex index) const = 0; /// Has PoV - virtual bool hasPov(const CandidateHash &candidate_hash) = 0; + virtual bool hasPov(const CandidateHash &candidate_hash) const = 0; /// Has PersistedValidationData - virtual bool hasData(const CandidateHash &candidate_hash) = 0; + virtual bool hasData(const CandidateHash &candidate_hash) const = 0; /// Get ErasureChunk virtual std::optional getChunk( - const CandidateHash &candidate_hash, ValidatorIndex index) = 0; + const CandidateHash &candidate_hash, ValidatorIndex index) const = 0; /// Get PoV virtual std::optional getPov( - const CandidateHash &candidate_hash) = 0; + const CandidateHash &candidate_hash) const = 0; /// Get AvailableData (PoV and PersistedValidationData) virtual std::optional getPovAndData( - const CandidateHash &candidate_hash) = 0; + const CandidateHash &candidate_hash) const = 0; // Get list of ErasureChunk virtual std::vector getChunks( - const CandidateHash &candidate_hash) = 0; + const CandidateHash &candidate_hash) const = 0; /// Store ErasureChunk - virtual void putChunk(const CandidateHash &candidate_hash, - const ErasureChunk &chunk) = 0; - virtual void putChunkSet(const CandidateHash &candidate_hash, - std::vector &&chunks) = 0; - /// Store PoV - virtual void putPov(const CandidateHash &candidate_hash, - const ParachainBlock &pov) = 0; - /// Store PersistedValidationData - virtual void putData(const CandidateHash &candidate_hash, - const PersistedValidationData &data) = 0; - /// Registers relay_parent -> candidate_hash - virtual void registerCandidate(network::RelayHash const &relay_parent, - CandidateHash const &candidate_hash) = 0; + virtual void storeData(network::RelayHash const &relay_parent, + CandidateHash const &candidate_hash, + std::vector &&chunks, + ParachainBlock const &pov, + PersistedValidationData const &data) = 0; /// Clears all data according to this relay_parent virtual void remove(network::RelayHash const &relay_parent) = 0; }; diff --git a/core/parachain/availability/store/store_impl.cpp b/core/parachain/availability/store/store_impl.cpp index eba685f0da..bc0783e69a 100644 --- a/core/parachain/availability/store/store_impl.cpp +++ b/core/parachain/availability/store/store_impl.cpp @@ -7,111 +7,125 @@ namespace kagome::parachain { bool AvailabilityStoreImpl::hasChunk(const CandidateHash &candidate_hash, - ValidatorIndex index) { - auto it = per_candidate_.find(candidate_hash); - if (it == per_candidate_.end()) { - return false; - } - return it->second.chunks.count(index) != 0; + ValidatorIndex index) const { + return state_.sharedAccess([&](auto const &state) { + auto it = state.per_candidate_.find(candidate_hash); + if (it == state.per_candidate_.end()) { + return false; + } + return it->second.chunks.count(index) != 0; + }); } - bool AvailabilityStoreImpl::hasPov(const CandidateHash &candidate_hash) { - auto it = per_candidate_.find(candidate_hash); - if (it == per_candidate_.end()) { - return false; - } - return it->second.pov.has_value(); + bool AvailabilityStoreImpl::hasPov( + const CandidateHash &candidate_hash) const { + return state_.sharedAccess([&](auto const &state) { + auto it = state.per_candidate_.find(candidate_hash); + if (it == state.per_candidate_.end()) { + return false; + } + return it->second.pov.has_value(); + }); } - bool AvailabilityStoreImpl::hasData(const CandidateHash &candidate_hash) { - auto it = per_candidate_.find(candidate_hash); - if (it == per_candidate_.end()) { - return false; - } - return it->second.data.has_value(); + bool AvailabilityStoreImpl::hasData( + const CandidateHash &candidate_hash) const { + return state_.sharedAccess([&](auto const &state) { + auto it = state.per_candidate_.find(candidate_hash); + if (it == state.per_candidate_.end()) { + return false; + } + return it->second.data.has_value(); + }); } std::optional AvailabilityStoreImpl::getChunk(const CandidateHash &candidate_hash, - ValidatorIndex index) { - auto it = per_candidate_.find(candidate_hash); - if (it == per_candidate_.end()) { - return std::nullopt; - } - auto it2 = it->second.chunks.find(index); - if (it2 == it->second.chunks.end()) { - return std::nullopt; - } - return it2->second; + ValidatorIndex index) const { + return state_.sharedAccess( + [&](auto const &state) + -> std::optional { + auto it = state.per_candidate_.find(candidate_hash); + if (it == state.per_candidate_.end()) { + return std::nullopt; + } + auto it2 = it->second.chunks.find(index); + if (it2 == it->second.chunks.end()) { + return std::nullopt; + } + return it2->second; + }); } std::optional - AvailabilityStoreImpl::getPov(const CandidateHash &candidate_hash) { - auto it = per_candidate_.find(candidate_hash); - if (it == per_candidate_.end()) { - return std::nullopt; - } - return it->second.pov; + AvailabilityStoreImpl::getPov(const CandidateHash &candidate_hash) const { + return state_.sharedAccess( + [&](auto const &state) + -> std::optional { + auto it = state.per_candidate_.find(candidate_hash); + if (it == state.per_candidate_.end()) { + return std::nullopt; + } + return it->second.pov; + }); } std::optional - AvailabilityStoreImpl::getPovAndData(const CandidateHash &candidate_hash) { - auto it = per_candidate_.find(candidate_hash); - if (it == per_candidate_.end()) { - return std::nullopt; - } - if (not it->second.pov or not it->second.data) { - return std::nullopt; - } - return AvailableData{*it->second.pov, *it->second.data}; + AvailabilityStoreImpl::getPovAndData( + const CandidateHash &candidate_hash) const { + return state_.sharedAccess( + [&](auto const &state) + -> std::optional { + auto it = state.per_candidate_.find(candidate_hash); + if (it == state.per_candidate_.end()) { + return std::nullopt; + } + if (not it->second.pov or not it->second.data) { + return std::nullopt; + } + return AvailableData{*it->second.pov, *it->second.data}; + }); } std::vector AvailabilityStoreImpl::getChunks( - const CandidateHash &candidate_hash) { - std::vector chunks; - auto it = per_candidate_.find(candidate_hash); - if (it != per_candidate_.end()) { - for (auto &p : it->second.chunks) { - chunks.emplace_back(p.second); + const CandidateHash &candidate_hash) const { + return state_.sharedAccess([&](auto const &state) { + std::vector chunks; + auto it = state.per_candidate_.find(candidate_hash); + if (it != state.per_candidate_.end()) { + for (auto &p : it->second.chunks) { + chunks.emplace_back(p.second); + } } - } - return chunks; - } - - void AvailabilityStoreImpl::putChunk(const CandidateHash &candidate_hash, - const ErasureChunk &chunk) { - per_candidate_[candidate_hash].chunks[chunk.index] = chunk; - } - - void AvailabilityStoreImpl::putChunkSet(const CandidateHash &candidate_hash, - std::vector &&chunks) { - for (auto &&chunk : std::move(chunks)) { - per_candidate_[candidate_hash].chunks[chunk.index] = std::move(chunk); - } - } - - void AvailabilityStoreImpl::putPov(const CandidateHash &candidate_hash, - const ParachainBlock &pov) { - per_candidate_[candidate_hash].pov = pov; + return chunks; + }); } - void AvailabilityStoreImpl::putData(const CandidateHash &candidate_hash, - const PersistedValidationData &data) { - per_candidate_[candidate_hash].data = data; - } - - void AvailabilityStoreImpl::registerCandidate( - network::RelayHash const &relay_parent, - CandidateHash const &candidate_hash) { - candidates_[relay_parent].insert(candidate_hash); + void AvailabilityStoreImpl::storeData(network::RelayHash const &relay_parent, + CandidateHash const &candidate_hash, + std::vector &&chunks, + ParachainBlock const &pov, + PersistedValidationData const &data) { + state_.exclusiveAccess([&](auto &state) { + state.candidates_[relay_parent].insert(candidate_hash); + auto &candidate_data = state.per_candidate_[candidate_hash]; + for (auto &&chunk : std::move(chunks)) { + candidate_data.chunks[chunk.index] = std::move(chunk); + } + candidate_data.pov = pov; + candidate_data.data = data; + }); } void AvailabilityStoreImpl::remove(network::RelayHash const &relay_parent) { - if (auto it = candidates_.find(relay_parent); it != candidates_.end()) { - for (auto const &l : it->second) { - per_candidate_.erase(l); + state_.exclusiveAccess([&](auto &state) { + if (auto it = state.candidates_.find(relay_parent); + it != state.candidates_.end()) { + for (auto const &l : it->second) { + state.per_candidate_.erase(l); + } + state.candidates_.erase(it); } - candidates_.erase(it); - } + }); } } // namespace kagome::parachain diff --git a/core/parachain/availability/store/store_impl.hpp b/core/parachain/availability/store/store_impl.hpp index acd2bbf328..948f60c805 100644 --- a/core/parachain/availability/store/store_impl.hpp +++ b/core/parachain/availability/store/store_impl.hpp @@ -10,6 +10,7 @@ #include #include +#include "utils/safe_object.hpp" namespace kagome::parachain { class AvailabilityStoreImpl : public AvailabilityStore { @@ -17,39 +18,38 @@ namespace kagome::parachain { ~AvailabilityStoreImpl() override = default; bool hasChunk(const CandidateHash &candidate_hash, - ValidatorIndex index) override; - bool hasPov(const CandidateHash &candidate_hash) override; - bool hasData(const CandidateHash &candidate_hash) override; + ValidatorIndex index) const override; + bool hasPov(const CandidateHash &candidate_hash) const override; + bool hasData(const CandidateHash &candidate_hash) const override; std::optional getChunk(const CandidateHash &candidate_hash, - ValidatorIndex index) override; + ValidatorIndex index) const override; std::optional getPov( - const CandidateHash &candidate_hash) override; + const CandidateHash &candidate_hash) const override; std::optional getPovAndData( - const CandidateHash &candidate_hash) override; + const CandidateHash &candidate_hash) const override; std::vector getChunks( - const CandidateHash &candidate_hash) override; - void putChunk(const CandidateHash &candidate_hash, - const ErasureChunk &chunk) override; - void putChunkSet(const CandidateHash &candidate_hash, - std::vector &&chunks) override; - void putPov(const CandidateHash &candidate_hash, - const ParachainBlock &pov) override; - void putData(const CandidateHash &candidate_hash, - const PersistedValidationData &data) override; - void registerCandidate(network::RelayHash const &relay_parent, - CandidateHash const &candidate_hash) override; + const CandidateHash &candidate_hash) const override; + void storeData(network::RelayHash const &relay_parent, + CandidateHash const &candidate_hash, + std::vector &&chunks, + ParachainBlock const &pov, + PersistedValidationData const &data) override; void remove(network::RelayHash const &relay_parent) override; private: struct PerCandidate { - std::unordered_map chunks; - std::optional pov; - std::optional data; + std::unordered_map chunks{}; + std::optional pov{}; + std::optional data{}; }; - std::unordered_map per_candidate_; - std::unordered_map> - candidates_; + struct State { + std::unordered_map per_candidate_{}; + std::unordered_map> + candidates_{}; + }; + + SafeObject state_{}; }; } // namespace kagome::parachain diff --git a/core/parachain/validator/impl/parachain_processor.cpp b/core/parachain/validator/impl/parachain_processor.cpp index 605bbd4e39..40bd477bac 100644 --- a/core/parachain/validator/impl/parachain_processor.cpp +++ b/core/parachain/validator/impl/parachain_processor.cpp @@ -23,6 +23,7 @@ #include "parachain/candidate_view.hpp" #include "parachain/peer_relay_parent_knowledge.hpp" #include "scale/scale.hpp" +#include "utils/async_sequence.hpp" OUTCOME_CPP_DEFINE_CATEGORY(kagome::parachain, ParachainProcessorImpl::Error, @@ -156,6 +157,34 @@ namespace kagome::parachain { } }); + chain_sub_ = std::make_shared( + peer_view_->intoChainEventsEngine()); + chain_sub_->subscribe( + chain_sub_->generateSubscriptionSetId(), + primitives::events::ChainEventType::kDeactivateAfterFinalization); + chain_sub_->setCallback( + [wptr{weak_from_this()}]( + auto /*set_id*/, + auto && /*internal_obj*/, + auto /*event_type*/, + const primitives::events::ChainEventParams &event) { + if (auto self = wptr.lock()) { + if (auto const value = if_type< + const primitives::events::RemoveAfterFinalizationParams>( + event)) { + for (auto const &lost : value->get()) { + SL_TRACE(self->logger_, + "Remove from storages.(relay parent={})", + lost); + + self->backing_store_->remove(lost); + self->av_store_->remove(lost); + self->bitfield_store_->remove(lost); + } + } + } + }); + my_view_sub_ = std::make_shared( peer_view_->getMyViewObservable(), false); my_view_sub_->subscribe(my_view_sub_->generateSubscriptionSetId(), @@ -176,9 +205,6 @@ namespace kagome::parachain { self->our_current_state_.state_by_relay_parent.erase(lost); self->pending_candidates.exclusiveAccess( [&](auto &container) { container.erase(lost); }); - self->backing_store_->remove(lost); - self->av_store_->remove(lost); - self->bitfield_store_->remove(lost); } if (auto r = self->canProcessParachains(); r.has_error()) { @@ -404,14 +430,6 @@ namespace kagome::parachain { BOOST_ASSERT_MSG( bd, "BitfieldDistribution is not present. Check message format."); - auto opt_parachain_state = tryGetStateByRelayParent(bd->relay_parent); - if (!opt_parachain_state) { - logger_->debug("Handled bitfield from {}:{} out of view", - peer_id, - bd->relay_parent); - return; - } - logger_->info( "Imported bitfield {} {}", bd->data.payload.ix, bd->relay_parent); bitfield_store_->putBitfield(bd->relay_parent, bd->data); @@ -546,34 +564,64 @@ namespace kagome::parachain { relay_parent, peer_id); - std::optional validate_and_second_result = - std::nullopt; - if (auto result = validateAndMakeAvailable(std::move(candidate), - std::move(pov), - peer_id, - relay_parent, - n_validators); - result.has_error()) { - logger_->warn("Validation task failed.(error={})", - result.error().message()); - return; - } else { - validate_and_second_result = std::move(result.value()); - } - BOOST_ASSERT(validate_and_second_result); - - logger_->info("Async validation complete.(relay parent={}, para_id={})", - validate_and_second_result->relay_parent, - validate_and_second_result->candidate.descriptor.para_id); - - parachain_state.awaiting_validation.erase( - candidateHashFrom(validate_and_second_result->candidate)); + sequenceIgnore( + thread_pool_->io_context()->wrap( + asAsync([wself{weak_from_this()}, + candidate{std::move(candidate)}, + pov{std::move(pov)}, + peer_id, + relay_parent, + n_validators]() mutable + -> outcome::result< + ParachainProcessorImpl::ValidateAndSecondResult> { + if (auto self = wself.lock()) { + if (auto result = + self->validateAndMakeAvailable(std::move(candidate), + std::move(pov), + peer_id, + relay_parent, + n_validators); + result.has_error()) { + self->logger_->warn("Validation task failed.(error={})", + result.error().message()); + return result.as_failure(); + } else { + return result; + } + } + return Error::NO_INSTANCE; + })), + this_context_->wrap( + asAsync([wself{weak_from_this()}, peer_id, candidate_hash]( + auto &&validate_and_second_result) mutable + -> outcome::result { + if (auto self = wself.lock()) { + auto parachain_state = self->tryGetStateByRelayParent( + validate_and_second_result.relay_parent); + if (!parachain_state) { + self->logger_->warn( + "After validation no parachain state on relay_parent {}", + validate_and_second_result.relay_parent); + return Error::OUT_OF_VIEW; + } - if constexpr (kMode == ValidationTaskType::kSecond) { - onValidationComplete(peer_id, std::move(*validate_and_second_result)); - } else { - onAttestComplete(peer_id, std::move(*validate_and_second_result)); - } + self->logger_->info( + "Async validation complete.(relay parent={}, para_id={})", + validate_and_second_result.relay_parent, + validate_and_second_result.candidate.descriptor.para_id); + + parachain_state->get().awaiting_validation.erase( + candidate_hash); + auto q{std::move(validate_and_second_result)}; + if constexpr (kMode == ValidationTaskType::kSecond) { + self->onValidationComplete(peer_id, std::move(q)); + } else { + self->onAttestComplete(peer_id, std::move(q)); + } + return outcome::success(); + } + return Error::NO_INSTANCE; + }))); } template @@ -1228,12 +1276,10 @@ namespace kagome::parachain { network::ParachainBlock const &pov, runtime::PersistedValidationData const &data) { makeTrieProof(chunks); - av_store_->registerCandidate(relay_parent, candidate_hash); - av_store_->putChunkSet(candidate_hash, std::move(chunks)); + /// TODO(iceseer): remove copy + av_store_->storeData( + relay_parent, candidate_hash, std::move(chunks), pov, data); logger_->trace("Put chunks set.(candidate={})", candidate_hash); - - av_store_->putPov(candidate_hash, pov); - av_store_->putData(candidate_hash, data); /// TODO(iceseer): remove copy } outcome::result diff --git a/core/parachain/validator/parachain_processor.hpp b/core/parachain/validator/parachain_processor.hpp index 6ea9979fd7..a40b2faa95 100644 --- a/core/parachain/validator/parachain_processor.hpp +++ b/core/parachain/validator/parachain_processor.hpp @@ -422,6 +422,8 @@ namespace kagome::parachain { const application::AppConfiguration &app_config_; primitives::events::BabeStateSubscriptionEnginePtr babe_status_observable_; primitives::events::BabeStateEventSubscriberPtr babe_status_observer_; + + std::shared_ptr chain_sub_; }; } // namespace kagome::parachain diff --git a/core/primitives/event_types.hpp b/core/primitives/event_types.hpp index 6380d3f45d..d448983d8b 100644 --- a/core/primitives/event_types.hpp +++ b/core/primitives/event_types.hpp @@ -38,7 +38,8 @@ namespace kagome::primitives::events { kFinalizedHeads = 2, kAllHeads = 3, kFinalizedRuntimeVersion = 4, - kNewRuntime = 5 + kNewRuntime = 5, + kDeactivateAfterFinalization = 6, }; enum struct BabeStateEventType : uint32_t { kSynchronized = 1 }; @@ -46,11 +47,13 @@ namespace kagome::primitives::events { using HeadsEventParams = ref_t; using RuntimeVersionEventParams = ref_t; using NewRuntimeEventParams = ref_t; + using RemoveAfterFinalizationParams = std::vector; using ChainEventParams = boost::variant; + NewRuntimeEventParams, + RemoveAfterFinalizationParams>; struct BabeStateEventParams { primitives::BlockInfo best_block; From 605212261e7817f6cd0a5b3700afa280fa62f0b7 Mon Sep 17 00:00:00 2001 From: iceseer Date: Sun, 26 Mar 2023 21:59:38 +0300 Subject: [PATCH 02/13] fixes Signed-off-by: iceseer --- core/blockchain/impl/block_tree_impl.cpp | 6 ++--- core/consensus/babe/impl/babe_impl.cpp | 3 +-- .../validator/impl/parachain_processor.cpp | 5 +++- core/utils/profiler.hpp | 27 +++++++++++-------- test/core/blockchain/block_tree_test.cpp | 6 +++-- 5 files changed, 28 insertions(+), 19 deletions(-) diff --git a/core/blockchain/impl/block_tree_impl.cpp b/core/blockchain/impl/block_tree_impl.cpp index fe9ea93f83..77aad83ddd 100644 --- a/core/blockchain/impl/block_tree_impl.cpp +++ b/core/blockchain/impl/block_tree_impl.cpp @@ -699,9 +699,9 @@ namespace kagome::blockchain { for (primitives::BlockNumber n = last_finalized_block_info.number + 1ull; n < node->getBlockInfo().number; ++n) { - OUTCOME_TRY(hash_opt, storage_->getBlockHash(n)); - if (hash_opt) { - retired_hashes.emplace_back(std::move(*hash_opt)); + if (auto result = storage_->getBlockHash(n); + result.has_value() && result.value()) { + retired_hashes.emplace_back(std::move(*result.value())); } } diff --git a/core/consensus/babe/impl/babe_impl.cpp b/core/consensus/babe/impl/babe_impl.cpp index f453d1bc3d..032d931ae4 100644 --- a/core/consensus/babe/impl/babe_impl.cpp +++ b/core/consensus/babe/impl/babe_impl.cpp @@ -698,8 +698,7 @@ namespace kagome::consensus::babe { } while (rewind_slots); // Slot processing begins in 1/3 slot time before end - auto finish_time = babe_util_->slotFinishTime(current_slot_) - - babe_config_repo_->slotDuration() / 10; + auto finish_time = babe_util_->slotFinishTime(current_slot_); SL_VERBOSE(log_, "Starting a slot {} in epoch {} (remains {:.2f} sec.)", diff --git a/core/parachain/validator/impl/parachain_processor.cpp b/core/parachain/validator/impl/parachain_processor.cpp index 40bd477bac..ae2c5f944a 100644 --- a/core/parachain/validator/impl/parachain_processor.cpp +++ b/core/parachain/validator/impl/parachain_processor.cpp @@ -24,6 +24,7 @@ #include "parachain/peer_relay_parent_knowledge.hpp" #include "scale/scale.hpp" #include "utils/async_sequence.hpp" +#include "utils/profiler.hpp" OUTCOME_CPP_DEFINE_CATEGORY(kagome::parachain, ParachainProcessorImpl::Error, @@ -1289,8 +1290,9 @@ namespace kagome::parachain { libp2p::peer::PeerId const &peer_id, primitives::BlockHash const &relay_parent, size_t n_validators) { - auto const candidate_hash{candidateHashFrom(candidate)}; + TicToc _measure{"Parachain validation", logger_}; + auto const candidate_hash{candidateHashFrom(candidate)}; auto validation_result = validateCandidate(candidate, pov); if (!validation_result) { logger_->warn( @@ -1315,6 +1317,7 @@ namespace kagome::parachain { candidate_hash, available_data.pov, available_data.validation_data); + return ValidateAndSecondResult{ .result = outcome::success(), .relay_parent = relay_parent, diff --git a/core/utils/profiler.hpp b/core/utils/profiler.hpp index a0b76b055c..c876e2b212 100644 --- a/core/utils/profiler.hpp +++ b/core/utils/profiler.hpp @@ -5,30 +5,35 @@ #include class TicToc { - std::string name_; + std::string_view name_; const kagome::log::Logger &log_; std::chrono::time_point t_; public: - TicToc(const std::string &name, const kagome::log::Logger &log) + TicToc(std::string_view name, const kagome::log::Logger &log) : name_(name), log_(log) { t_ = std::chrono::high_resolution_clock::now(); } - void toc(int line = -1) { + template + inline void toc(int line = -1) { auto prev = t_; t_ = std::chrono::high_resolution_clock::now(); - auto str = name_; - if (line != -1) { - str += "at line " + std::to_string(line); + if constexpr (kLine != -1) { + log_->info( + "{} at line {} lasted for {} sec", + name_, + std::to_string(kLine), + std::chrono::duration_cast(t_ - prev).count()); + } else { + log_->info( + "{} lasted for {} sec", + name_, + std::chrono::duration_cast(t_ - prev).count()); } - log_->info( - "{} lasted for {} sec", - str, - std::chrono::duration_cast(t_ - prev).count()); } ~TicToc() { - toc(); + toc<>(); } }; diff --git a/test/core/blockchain/block_tree_test.cpp b/test/core/blockchain/block_tree_test.cpp index ab942f2bc6..0e3d3049a1 100644 --- a/test/core/blockchain/block_tree_test.cpp +++ b/test/core/blockchain/block_tree_test.cpp @@ -75,8 +75,10 @@ struct BlockTreeTest : public testing::Test { EXPECT_CALL(*storage_, setBlockTreeLeaves(_)) .WillRepeatedly(Return(outcome::success())); - EXPECT_CALL(*storage_, getBlockHash(kFirstBlockInfo.number)) - .WillRepeatedly(Return(kFirstBlockInfo.hash)); + for (kagome::primitives::BlockNumber i = 1; i < 100; ++i) { + EXPECT_CALL(*storage_, getBlockHash(i)) + .WillRepeatedly(Return(kFirstBlockInfo.hash)); + } EXPECT_CALL(*storage_, hasBlockHeader(kFirstBlockInfo.hash)) .WillRepeatedly(Return(true)); From bdd765c5cff8afe6ee6e088d85dfcc98db062757 Mon Sep 17 00:00:00 2001 From: iceseer Date: Mon, 27 Mar 2023 17:36:17 +0300 Subject: [PATCH 03/13] remove logs Signed-off-by: iceseer --- core/consensus/babe/impl/babe_impl.cpp | 2 +- core/parachain/approval/approval_distribution.cpp | 15 ++++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/core/consensus/babe/impl/babe_impl.cpp b/core/consensus/babe/impl/babe_impl.cpp index 032d931ae4..d6adcaf6df 100644 --- a/core/consensus/babe/impl/babe_impl.cpp +++ b/core/consensus/babe/impl/babe_impl.cpp @@ -698,7 +698,7 @@ namespace kagome::consensus::babe { } while (rewind_slots); // Slot processing begins in 1/3 slot time before end - auto finish_time = babe_util_->slotFinishTime(current_slot_); + auto finish_time = babe_util_->slotFinishTime(current_slot_)- babe_config_repo_->slotDuration() / 3;; SL_VERBOSE(log_, "Starting a slot {} in epoch {} (remains {:.2f} sec.)", diff --git a/core/parachain/approval/approval_distribution.cpp b/core/parachain/approval/approval_distribution.cpp index acba1bebe5..ec6301d3a5 100644 --- a/core/parachain/approval/approval_distribution.cpp +++ b/core/parachain/approval/approval_distribution.cpp @@ -1896,12 +1896,12 @@ namespace kagome::parachain { auto const block_number = block_entry.block_number; auto const tick_now = ::tickNow(); - logger_->info( - "Advance approval state.(candidate {}, block {}, " - "validator {})", - candidate_hash, - block_hash, - validator_index); + SL_TRACE(logger_, + "Advance approval state.(candidate {}, block {}, " + "validator {})", + candidate_hash, + block_hash, + validator_index); auto result = approval_status(block_entry, candidate_entry); if (!result) { @@ -2007,7 +2007,8 @@ namespace kagome::parachain { primitives::BlockNumber block_number, CandidateHash const &candidate_hash, Tick tick) { - logger_->info( + SL_TRACE( + logger_, "Scheduling wakeup. Block hash {}, candidate hash {}, block number {}, " "tick {}", block_hash, From 631993f4d31ae683ea094b5f402d89cf177f94c8 Mon Sep 17 00:00:00 2001 From: iceseer Date: Tue, 28 Mar 2023 10:51:21 +0300 Subject: [PATCH 04/13] logs Signed-off-by: iceseer --- core/network/impl/protocols/parachain_protocol.hpp | 2 ++ core/network/impl/protocols/protocol_base_impl.hpp | 9 ++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/core/network/impl/protocols/parachain_protocol.hpp b/core/network/impl/protocols/parachain_protocol.hpp index cf9bf2c2b6..c4df46088d 100644 --- a/core/network/impl/protocols/parachain_protocol.hpp +++ b/core/network/impl/protocols/parachain_protocol.hpp @@ -68,6 +68,8 @@ namespace kagome::network { } void onIncomingStream(std::shared_ptr stream) override { + SL_INFO(base_.logger(), + "Incoming parachain protocol {} from {}", base_.protocolName(), stream->remotePeerId()); BOOST_ASSERT(stream->remotePeerId().has_value()); doCollatorHandshake( stream, diff --git a/core/network/impl/protocols/protocol_base_impl.hpp b/core/network/impl/protocols/protocol_base_impl.hpp index e214a0c31a..93fc9c70c3 100644 --- a/core/network/impl/protocols/protocol_base_impl.hpp +++ b/core/network/impl/protocols/protocol_base_impl.hpp @@ -32,7 +32,9 @@ namespace kagome::network { class ProtocolBaseImpl final : NonCopyable, NonMovable { public: ProtocolBaseImpl() = delete; - ~ProtocolBaseImpl() = default; + ~ProtocolBaseImpl() { + SL_INFO(log_, "Destroyed base protocol"); + } ProtocolBaseImpl(ProtocolName name, libp2p::Host &host, @@ -43,10 +45,15 @@ namespace kagome::network { protocols_{std::move(protocols)}, log_{std::move(logger)} { BOOST_ASSERT(!protocols_.empty()); + SL_INFO(log_, "Created parachain protocol"); } template bool start(std::weak_ptr wptr) { + SL_INFO(logger(), "Start base protocol"); + for (auto const &p : protocols_) { + SL_INFO(logger(), "{}", p); + } host_.setProtocolHandler( protocols_, [log{logger()}, wp(std::move(wptr))](auto &&stream_and_proto) { From 97151a4c386d19763f48971448c368d174e93e3e Mon Sep 17 00:00:00 2001 From: iceseer Date: Wed, 29 Mar 2023 13:28:09 +0300 Subject: [PATCH 05/13] remove up to last finalized from blockchain Signed-off-by: iceseer --- core/blockchain/impl/block_tree_impl.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/blockchain/impl/block_tree_impl.cpp b/core/blockchain/impl/block_tree_impl.cpp index 77aad83ddd..7963107e8c 100644 --- a/core/blockchain/impl/block_tree_impl.cpp +++ b/core/blockchain/impl/block_tree_impl.cpp @@ -695,8 +695,7 @@ namespace kagome::blockchain { node->has_justification = true; OUTCOME_TRY(retired_hashes, prune(node)); - retired_hashes.emplace_back(node->getBlockInfo().hash); - for (primitives::BlockNumber n = last_finalized_block_info.number + 1ull; + for (primitives::BlockNumber n = last_finalized_block_info.number; n < node->getBlockInfo().number; ++n) { if (auto result = storage_->getBlockHash(n); From b85e371fb20c861c31ec76891b7a6c44d9e4f25d Mon Sep 17 00:00:00 2001 From: iceseer Date: Wed, 29 Mar 2023 15:29:35 +0300 Subject: [PATCH 06/13] formatting Signed-off-by: iceseer --- core/consensus/babe/impl/babe_impl.cpp | 3 ++- core/network/impl/protocols/parachain_protocol.hpp | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/core/consensus/babe/impl/babe_impl.cpp b/core/consensus/babe/impl/babe_impl.cpp index d6adcaf6df..64bf9014c4 100644 --- a/core/consensus/babe/impl/babe_impl.cpp +++ b/core/consensus/babe/impl/babe_impl.cpp @@ -698,7 +698,8 @@ namespace kagome::consensus::babe { } while (rewind_slots); // Slot processing begins in 1/3 slot time before end - auto finish_time = babe_util_->slotFinishTime(current_slot_)- babe_config_repo_->slotDuration() / 3;; + auto finish_time = babe_util_->slotFinishTime(current_slot_) + - babe_config_repo_->slotDuration() / 3; SL_VERBOSE(log_, "Starting a slot {} in epoch {} (remains {:.2f} sec.)", diff --git a/core/network/impl/protocols/parachain_protocol.hpp b/core/network/impl/protocols/parachain_protocol.hpp index c4df46088d..8379fab5f6 100644 --- a/core/network/impl/protocols/parachain_protocol.hpp +++ b/core/network/impl/protocols/parachain_protocol.hpp @@ -69,7 +69,9 @@ namespace kagome::network { void onIncomingStream(std::shared_ptr stream) override { SL_INFO(base_.logger(), - "Incoming parachain protocol {} from {}", base_.protocolName(), stream->remotePeerId()); + "Incoming parachain protocol {} from {}", + base_.protocolName(), + stream->remotePeerId()); BOOST_ASSERT(stream->remotePeerId().has_value()); doCollatorHandshake( stream, From be196ae7e9ec710fd5a548f970438b095307b714 Mon Sep 17 00:00:00 2001 From: iceseer Date: Wed, 29 Mar 2023 15:32:51 +0300 Subject: [PATCH 07/13] remove logs Signed-off-by: iceseer --- core/network/impl/protocols/parachain_protocol.hpp | 4 ---- core/network/impl/protocols/protocol_base_impl.hpp | 9 +-------- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/core/network/impl/protocols/parachain_protocol.hpp b/core/network/impl/protocols/parachain_protocol.hpp index 8379fab5f6..cf9bf2c2b6 100644 --- a/core/network/impl/protocols/parachain_protocol.hpp +++ b/core/network/impl/protocols/parachain_protocol.hpp @@ -68,10 +68,6 @@ namespace kagome::network { } void onIncomingStream(std::shared_ptr stream) override { - SL_INFO(base_.logger(), - "Incoming parachain protocol {} from {}", - base_.protocolName(), - stream->remotePeerId()); BOOST_ASSERT(stream->remotePeerId().has_value()); doCollatorHandshake( stream, diff --git a/core/network/impl/protocols/protocol_base_impl.hpp b/core/network/impl/protocols/protocol_base_impl.hpp index 93fc9c70c3..e214a0c31a 100644 --- a/core/network/impl/protocols/protocol_base_impl.hpp +++ b/core/network/impl/protocols/protocol_base_impl.hpp @@ -32,9 +32,7 @@ namespace kagome::network { class ProtocolBaseImpl final : NonCopyable, NonMovable { public: ProtocolBaseImpl() = delete; - ~ProtocolBaseImpl() { - SL_INFO(log_, "Destroyed base protocol"); - } + ~ProtocolBaseImpl() = default; ProtocolBaseImpl(ProtocolName name, libp2p::Host &host, @@ -45,15 +43,10 @@ namespace kagome::network { protocols_{std::move(protocols)}, log_{std::move(logger)} { BOOST_ASSERT(!protocols_.empty()); - SL_INFO(log_, "Created parachain protocol"); } template bool start(std::weak_ptr wptr) { - SL_INFO(logger(), "Start base protocol"); - for (auto const &p : protocols_) { - SL_INFO(logger(), "{}", p); - } host_.setProtocolHandler( protocols_, [log{logger()}, wp(std::move(wptr))](auto &&stream_and_proto) { From dc71b59b7bd03cdc912fbf6cd2395a6f3bda7e63 Mon Sep 17 00:00:00 2001 From: turuslan Date: Thu, 30 Mar 2023 15:53:55 +0300 Subject: [PATCH 08/13] fetch chunk Signed-off-by: turuslan --- core/injector/application_injector.cpp | 2 + core/parachain/CMakeLists.txt | 1 + .../availability/bitfield/signer.cpp | 43 +++++--- .../availability/bitfield/signer.hpp | 6 +- core/parachain/availability/fetch/fetch.hpp | 25 +++++ .../availability/fetch/fetch_impl.cpp | 104 ++++++++++++++++++ .../availability/fetch/fetch_impl.hpp | 50 +++++++++ 7 files changed, 216 insertions(+), 15 deletions(-) create mode 100644 core/parachain/availability/fetch/fetch.hpp create mode 100644 core/parachain/availability/fetch/fetch_impl.cpp create mode 100644 core/parachain/availability/fetch/fetch_impl.hpp diff --git a/core/injector/application_injector.cpp b/core/injector/application_injector.cpp index 8f9e916cf2..7fbbcebd5f 100644 --- a/core/injector/application_injector.cpp +++ b/core/injector/application_injector.cpp @@ -112,6 +112,7 @@ #include "outcome/outcome.hpp" #include "parachain/approval/approval_distribution.hpp" #include "parachain/availability/bitfield/store_impl.hpp" +#include "parachain/availability/fetch/fetch_impl.hpp" #include "parachain/availability/recovery/recovery_impl.hpp" #include "parachain/availability/store/store_impl.hpp" #include "parachain/backing/store_impl.hpp" @@ -939,6 +940,7 @@ namespace { bind_by_lambda(get_state_observer_impl), bind_by_lambda(get_sync_observer_impl), di::bind.template to(), + di::bind.template to(), di::bind.template to(), di::bind.template to(), di::bind.template to(), diff --git a/core/parachain/CMakeLists.txt b/core/parachain/CMakeLists.txt index 5d2ca86847..0a90703877 100644 --- a/core/parachain/CMakeLists.txt +++ b/core/parachain/CMakeLists.txt @@ -7,6 +7,7 @@ add_library(validator_parachain availability/bitfield/signer.cpp availability/bitfield/store_impl.cpp availability/erasure_coding_error.cpp + availability/fetch/fetch_impl.cpp availability/recovery/recovery_impl.cpp availability/store/store_impl.cpp backing/store_impl.cpp diff --git a/core/parachain/availability/bitfield/signer.cpp b/core/parachain/availability/bitfield/signer.cpp index 6ced8229da..ba82c9656e 100644 --- a/core/parachain/availability/bitfield/signer.cpp +++ b/core/parachain/availability/bitfield/signer.cpp @@ -23,12 +23,14 @@ namespace kagome::parachain { std::shared_ptr scheduler, std::shared_ptr parachain_api, std::shared_ptr store, + std::shared_ptr fetch, std::shared_ptr bitfield_store) : hasher_{std::move(hasher)}, signer_factory_{std::move(signer_factory)}, scheduler_{std::move(scheduler)}, parachain_api_{std::move(parachain_api)}, store_{std::move(store)}, + fetch_{std::move(fetch)}, bitfield_store_{std::move(bitfield_store)} {} void BitfieldSigner::start( @@ -62,21 +64,21 @@ namespace kagome::parachain { broadcast_ = std::move(callback); } - outcome::result BitfieldSigner::sign(const ValidatorSigner &signer) { + outcome::result BitfieldSigner::sign(const ValidatorSigner &signer, + const Candidates &candidates) { BlockHash const &relay_parent = signer.relayParent(); scale::BitVec bitfield; - OUTCOME_TRY(cores, parachain_api_->availability_cores(relay_parent)); - bitfield.bits.reserve(cores.size()); - for (auto &core : cores) { - auto occupied = boost::get(&core); - if (occupied) { + bitfield.bits.reserve(candidates.size()); + for (auto &candidate : candidates) { + bitfield.bits.push_back( + candidate && store_->hasChunk(*candidate, signer.validatorIndex())); + if (candidate) { SL_TRACE(logger_, "Signing bitfields.(relay_parent={}, validator index={}, has " "chunk={})", relay_parent, signer.validatorIndex(), - store_->hasChunk(occupied->candidate_hash, - signer.validatorIndex())); + bitfield.bits.back()); } else { SL_TRACE(logger_, "Signing bitfields.(relay_parent={}, validator index={}, NOT " @@ -84,9 +86,6 @@ namespace kagome::parachain { relay_parent, signer.validatorIndex()); } - bitfield.bits.push_back(occupied != nullptr - && store_->hasChunk(occupied->candidate_hash, - signer.validatorIndex())); } OUTCOME_TRY(signed_bitfield, signer.sign(bitfield)); @@ -104,11 +103,27 @@ namespace kagome::parachain { if (not signer.has_value()) { return outcome::success(); } - // TODO(turuslan): fetch_chunks(candidates, signer.validatorIndex()); + Candidates candidates; + OUTCOME_TRY(cores, parachain_api_->availability_cores(relay_parent)); + OUTCOME_TRY( + session, + parachain_api_->session_info(relay_parent, signer->getSessionIndex())); + candidates.reserve(cores.size()); + for (auto &core : cores) { + if (auto occupied = boost::get(&core)) { + candidates.emplace_back(occupied->candidate_hash); + fetch_->fetch(signer->validatorIndex(), *occupied, *session); + } else { + candidates.emplace_back(std::nullopt); + } + } + scheduler_->schedule( - [weak = weak_from_this(), signer{std::move(*signer)}]() mutable { + [weak = weak_from_this(), + signer{std::move(*signer)}, + candidates{std::move(candidates)}]() mutable { if (auto self = weak.lock()) { - auto r = self->sign(signer); + auto r = self->sign(signer, candidates); if (r.has_error()) { SL_WARN(log(), "sign error {}", r.error()); } diff --git a/core/parachain/availability/bitfield/signer.hpp b/core/parachain/availability/bitfield/signer.hpp index 4597c53472..9d2815d5a9 100644 --- a/core/parachain/availability/bitfield/signer.hpp +++ b/core/parachain/availability/bitfield/signer.hpp @@ -11,6 +11,7 @@ #include "crypto/hasher.hpp" #include "log/logger.hpp" #include "parachain/availability/bitfield/store.hpp" +#include "parachain/availability/fetch/fetch.hpp" #include "parachain/availability/store/store.hpp" #include "parachain/validator/signer.hpp" #include "primitives/event_types.hpp" @@ -29,6 +30,7 @@ namespace kagome::parachain { std::shared_ptr scheduler, std::shared_ptr parachain_api, std::shared_ptr store, + std::shared_ptr fetch, std::shared_ptr bitfield_store); /// Subscribes to new heads. @@ -36,7 +38,8 @@ namespace kagome::parachain { chain_sub_engine); /// Sign bitfield for given block. - outcome::result sign(const ValidatorSigner &signer); + outcome::result sign(const ValidatorSigner &signer, + const Candidates &candidates); void setBroadcastCallback(BroadcastCallback &&callback); @@ -50,6 +53,7 @@ namespace kagome::parachain { std::shared_ptr scheduler_; std::shared_ptr parachain_api_; std::shared_ptr store_; + std::shared_ptr fetch_; std::shared_ptr bitfield_store_; std::shared_ptr chain_sub_; BroadcastCallback broadcast_; diff --git a/core/parachain/availability/fetch/fetch.hpp b/core/parachain/availability/fetch/fetch.hpp new file mode 100644 index 0000000000..686318222f --- /dev/null +++ b/core/parachain/availability/fetch/fetch.hpp @@ -0,0 +1,25 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef KAGOME_PARACHAIN_AVAILABILITY_FETCH_FETCH_HPP +#define KAGOME_PARACHAIN_AVAILABILITY_FETCH_FETCH_HPP + +#include "runtime/runtime_api/parachain_host_types.hpp" + +namespace kagome::parachain { + /** + * Fetch chunk for availability bitfield voting. + */ + class Fetch { + public: + virtual ~Fetch() = default; + + virtual void fetch(ValidatorIndex chunk_index, + const runtime::OccupiedCore &core, + const runtime::SessionInfo &session) = 0; + }; +} // namespace kagome::parachain + +#endif // KAGOME_PARACHAIN_AVAILABILITY_FETCH_FETCH_HPP diff --git a/core/parachain/availability/fetch/fetch_impl.cpp b/core/parachain/availability/fetch/fetch_impl.cpp new file mode 100644 index 0000000000..f081374671 --- /dev/null +++ b/core/parachain/availability/fetch/fetch_impl.cpp @@ -0,0 +1,104 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "parachain/availability/fetch/fetch_impl.hpp" + +#include "parachain/availability/proof.hpp" + +namespace kagome::parachain { + inline auto log() { + return log::createLogger("FetchImpl", "parachain"); + } + + FetchImpl::FetchImpl(std::shared_ptr av_store, + std::shared_ptr query_audi, + std::shared_ptr router) + : av_store_{std::move(av_store)}, + query_audi_{std::move(query_audi)}, + router_{std::move(router)} {} + + void FetchImpl::fetch(ValidatorIndex chunk_index, + const runtime::OccupiedCore &core, + const runtime::SessionInfo &session) { + std::unique_lock lock{mutex_}; + if (active_.find(core.candidate_hash) != active_.end()) { + return; + } + if (av_store_->hasChunk(core.candidate_hash, chunk_index)) { + return; + } + Active active; + active.chunk_index = chunk_index; + active.erasure_encoding_root = + core.candidate_descriptor.erasure_encoding_root; + for (auto &validator_index : + session.validator_groups[core.group_responsible]) { + active.validators.emplace_back(session.discovery_keys[validator_index]); + } + active_.emplace(core.candidate_hash, std::move(active)); + lock.unlock(); + fetch(core.candidate_hash); + } + + void FetchImpl::fetch(const CandidateHash &candidate_hash) { + std::unique_lock lock{mutex_}; + auto it = active_.find(candidate_hash); + if (it == active_.end()) { + return; + } + auto &active = it->second; + while (not active.validators.empty()) { + auto peer = query_audi_->get(active.validators.back()); + active.validators.pop_back(); + if (peer) { + router_->getFetchChunkProtocol()->doRequest( + *peer, + {candidate_hash, active.chunk_index}, + [=, weak{weak_from_this()}]( + outcome::result r) { + auto self = weak.lock(); + if (not self) { + return; + } + self->fetch(candidate_hash, std::move(r)); + }); + return; + } + } + SL_WARN(log(), + "candidate={} chunk={} not found", + candidate_hash, + active.chunk_index); + active_.erase(it); + } + + void FetchImpl::fetch(const CandidateHash &candidate_hash, + outcome::result _chunk) { + std::unique_lock lock{mutex_}; + auto it = active_.find(candidate_hash); + if (it == active_.end()) { + return; + } + auto &active = it->second; + if (_chunk) { + if (auto chunk2 = boost::get(&_chunk.value())) { + network::ErasureChunk chunk{std::move(chunk2->data), + active.chunk_index, + std::move(chunk2->proof)}; + if (checkTrieProof(chunk, active.erasure_encoding_root)) { + av_store_->putChunk(candidate_hash, chunk); + SL_VERBOSE(log(), + "candidate={} chunk={} fetched", + candidate_hash, + active.chunk_index); + active_.erase(it); + return; + } + } + } + lock.unlock(); + fetch(candidate_hash); + } +} // namespace kagome::parachain diff --git a/core/parachain/availability/fetch/fetch_impl.hpp b/core/parachain/availability/fetch/fetch_impl.hpp new file mode 100644 index 0000000000..d1de4d0d59 --- /dev/null +++ b/core/parachain/availability/fetch/fetch_impl.hpp @@ -0,0 +1,50 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef KAGOME_PARACHAIN_AVAILABILITY_FETCH_FETCH_IMPL_HPP +#define KAGOME_PARACHAIN_AVAILABILITY_FETCH_FETCH_IMPL_HPP + +#include "parachain/availability/fetch/fetch.hpp" + +#include +#include + +#include "authority_discovery/query/query.hpp" +#include "network/router.hpp" +#include "parachain/availability/store/store.hpp" + +namespace kagome::parachain { + class FetchImpl : public Fetch, + public std::enable_shared_from_this { + public: + FetchImpl(std::shared_ptr av_store, + std::shared_ptr query_audi, + std::shared_ptr router); + + void fetch(ValidatorIndex chunk_index, + const runtime::OccupiedCore &core, + const runtime::SessionInfo &session) override; + + private: + struct Active { + ValidatorIndex chunk_index; + std::vector validators; + storage::trie::RootHash erasure_encoding_root; + }; + + void fetch(const CandidateHash &candidate_hash); + void fetch(const CandidateHash &candidate_hash, + outcome::result _chunk); + + std::shared_ptr av_store_; + std::shared_ptr query_audi_; + std::shared_ptr router_; + + std::mutex mutex_; + std::unordered_map active_; + }; +} // namespace kagome::parachain + +#endif // KAGOME_PARACHAIN_AVAILABILITY_FETCH_FETCH_IMPL_HPP From 24229d622547da7635f533b0a5762ca14a9e155e Mon Sep 17 00:00:00 2001 From: iceseer Date: Thu, 30 Mar 2023 16:08:30 +0300 Subject: [PATCH 09/13] registered statement is not from our group logs Signed-off-by: iceseer --- core/consensus/babe/impl/babe_impl.cpp | 4 ++-- core/parachain/validator/impl/parachain_processor.cpp | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/consensus/babe/impl/babe_impl.cpp b/core/consensus/babe/impl/babe_impl.cpp index 64bf9014c4..aae2079df6 100644 --- a/core/consensus/babe/impl/babe_impl.cpp +++ b/core/consensus/babe/impl/babe_impl.cpp @@ -943,8 +943,8 @@ namespace kagome::consensus::babe { bitfield_store_->getBitfields(relay_parent); paras_inherent_data.backed_candidates = backing_store_->get(relay_parent); - log_->trace("Get backed candidates from store.(count={})", - paras_inherent_data.backed_candidates.size()); + SL_INFO(log_, "Get backed candidates from store.(count={}, relay_parent={})", + paras_inherent_data.backed_candidates.size(), relay_parent); auto best_block_header_res = block_tree_->getBlockHeader(best_block_.hash); diff --git a/core/parachain/validator/impl/parachain_processor.cpp b/core/parachain/validator/impl/parachain_processor.cpp index ae2c5f944a..643b067969 100644 --- a/core/parachain/validator/impl/parachain_processor.cpp +++ b/core/parachain/validator/impl/parachain_processor.cpp @@ -683,7 +683,7 @@ namespace kagome::parachain { "Registered statement from not our group(our: {}, registered: {}).", assignment, result->imported.group_id); - // return; + return; } logger_->trace( @@ -904,9 +904,9 @@ namespace kagome::parachain { relayParentState.table_context)) { if (auto backed = table_attested_to_backed( std::move(*attested), relayParentState.table_context)) { - logger_->trace("Candidate backed.(candidate={}, para id={})", + logger_->trace("Candidate backed.(candidate={}, para id={}, relay_parent={})", import_result->imported.candidate, - import_result->imported.group_id); + import_result->imported.group_id, relay_parent); backing_store_->add(relay_parent, std::move(*backed)); } } From b4f5389b11bfca517da8df549ef078e63cccd3d1 Mon Sep 17 00:00:00 2001 From: iceseer Date: Thu, 30 Mar 2023 16:47:29 +0300 Subject: [PATCH 10/13] build fix Signed-off-by: iceseer --- core/parachain/availability/bitfield/signer.cpp | 2 +- core/parachain/availability/fetch/fetch.hpp | 2 +- core/parachain/availability/fetch/fetch_impl.cpp | 14 +++++++------- core/parachain/availability/fetch/fetch_impl.hpp | 6 +++--- core/parachain/availability/store/store.hpp | 3 +++ core/parachain/availability/store/store_impl.cpp | 8 ++++++++ core/parachain/availability/store/store_impl.hpp | 2 ++ 7 files changed, 25 insertions(+), 12 deletions(-) diff --git a/core/parachain/availability/bitfield/signer.cpp b/core/parachain/availability/bitfield/signer.cpp index ba82c9656e..e261417e74 100644 --- a/core/parachain/availability/bitfield/signer.cpp +++ b/core/parachain/availability/bitfield/signer.cpp @@ -112,7 +112,7 @@ namespace kagome::parachain { for (auto &core : cores) { if (auto occupied = boost::get(&core)) { candidates.emplace_back(occupied->candidate_hash); - fetch_->fetch(signer->validatorIndex(), *occupied, *session); + fetch_->fetch(relay_parent, signer->validatorIndex(), *occupied, *session); } else { candidates.emplace_back(std::nullopt); } diff --git a/core/parachain/availability/fetch/fetch.hpp b/core/parachain/availability/fetch/fetch.hpp index 686318222f..30135e73ba 100644 --- a/core/parachain/availability/fetch/fetch.hpp +++ b/core/parachain/availability/fetch/fetch.hpp @@ -16,7 +16,7 @@ namespace kagome::parachain { public: virtual ~Fetch() = default; - virtual void fetch(ValidatorIndex chunk_index, + virtual void fetch(network::RelayHash const &relay_parent, ValidatorIndex chunk_index, const runtime::OccupiedCore &core, const runtime::SessionInfo &session) = 0; }; diff --git a/core/parachain/availability/fetch/fetch_impl.cpp b/core/parachain/availability/fetch/fetch_impl.cpp index f081374671..cdeae7b8e9 100644 --- a/core/parachain/availability/fetch/fetch_impl.cpp +++ b/core/parachain/availability/fetch/fetch_impl.cpp @@ -19,7 +19,7 @@ namespace kagome::parachain { query_audi_{std::move(query_audi)}, router_{std::move(router)} {} - void FetchImpl::fetch(ValidatorIndex chunk_index, + void FetchImpl::fetch(network::RelayHash const &relay_parent, ValidatorIndex chunk_index, const runtime::OccupiedCore &core, const runtime::SessionInfo &session) { std::unique_lock lock{mutex_}; @@ -39,10 +39,10 @@ namespace kagome::parachain { } active_.emplace(core.candidate_hash, std::move(active)); lock.unlock(); - fetch(core.candidate_hash); + fetch(relay_parent, core.candidate_hash); } - void FetchImpl::fetch(const CandidateHash &candidate_hash) { + void FetchImpl::fetch(network::RelayHash const &relay_parent, const CandidateHash &candidate_hash) { std::unique_lock lock{mutex_}; auto it = active_.find(candidate_hash); if (it == active_.end()) { @@ -62,7 +62,7 @@ namespace kagome::parachain { if (not self) { return; } - self->fetch(candidate_hash, std::move(r)); + self->fetch(relay_parent, candidate_hash, std::move(r)); }); return; } @@ -74,7 +74,7 @@ namespace kagome::parachain { active_.erase(it); } - void FetchImpl::fetch(const CandidateHash &candidate_hash, + void FetchImpl::fetch(network::RelayHash const &relay_parent, const CandidateHash &candidate_hash, outcome::result _chunk) { std::unique_lock lock{mutex_}; auto it = active_.find(candidate_hash); @@ -88,17 +88,17 @@ namespace kagome::parachain { active.chunk_index, std::move(chunk2->proof)}; if (checkTrieProof(chunk, active.erasure_encoding_root)) { - av_store_->putChunk(candidate_hash, chunk); SL_VERBOSE(log(), "candidate={} chunk={} fetched", candidate_hash, active.chunk_index); + av_store_->putChunk(relay_parent, candidate_hash, std::move(chunk)); active_.erase(it); return; } } } lock.unlock(); - fetch(candidate_hash); + fetch(relay_parent, candidate_hash); } } // namespace kagome::parachain diff --git a/core/parachain/availability/fetch/fetch_impl.hpp b/core/parachain/availability/fetch/fetch_impl.hpp index d1de4d0d59..9d42463a16 100644 --- a/core/parachain/availability/fetch/fetch_impl.hpp +++ b/core/parachain/availability/fetch/fetch_impl.hpp @@ -23,7 +23,7 @@ namespace kagome::parachain { std::shared_ptr query_audi, std::shared_ptr router); - void fetch(ValidatorIndex chunk_index, + void fetch(network::RelayHash const &relay_parent, ValidatorIndex chunk_index, const runtime::OccupiedCore &core, const runtime::SessionInfo &session) override; @@ -34,8 +34,8 @@ namespace kagome::parachain { storage::trie::RootHash erasure_encoding_root; }; - void fetch(const CandidateHash &candidate_hash); - void fetch(const CandidateHash &candidate_hash, + void fetch(network::RelayHash const &relay_parent, const CandidateHash &candidate_hash); + void fetch(network::RelayHash const &relay_parent, const CandidateHash &candidate_hash, outcome::result _chunk); std::shared_ptr av_store_; diff --git a/core/parachain/availability/store/store.hpp b/core/parachain/availability/store/store.hpp index 9320f4f5b2..ade824693c 100644 --- a/core/parachain/availability/store/store.hpp +++ b/core/parachain/availability/store/store.hpp @@ -49,6 +49,9 @@ namespace kagome::parachain { std::vector &&chunks, ParachainBlock const &pov, PersistedValidationData const &data) = 0; + virtual void putChunk(network::RelayHash const &relay_parent, const CandidateHash &candidate_hash, + ErasureChunk &&chunk) = 0; + /// Clears all data according to this relay_parent virtual void remove(network::RelayHash const &relay_parent) = 0; }; diff --git a/core/parachain/availability/store/store_impl.cpp b/core/parachain/availability/store/store_impl.cpp index bc0783e69a..c23787689f 100644 --- a/core/parachain/availability/store/store_impl.cpp +++ b/core/parachain/availability/store/store_impl.cpp @@ -117,6 +117,14 @@ namespace kagome::parachain { }); } + void AvailabilityStoreImpl::putChunk(network::RelayHash const &relay_parent, const CandidateHash &candidate_hash, + ErasureChunk &&chunk) { + state_.exclusiveAccess([&](auto &state) { + state.candidates_[relay_parent].insert(candidate_hash); + state.per_candidate_[candidate_hash].chunks[chunk.index] = std::move(chunk); + }); + } + void AvailabilityStoreImpl::remove(network::RelayHash const &relay_parent) { state_.exclusiveAccess([&](auto &state) { if (auto it = state.candidates_.find(relay_parent); diff --git a/core/parachain/availability/store/store_impl.hpp b/core/parachain/availability/store/store_impl.hpp index 948f60c805..2ff10ffcac 100644 --- a/core/parachain/availability/store/store_impl.hpp +++ b/core/parachain/availability/store/store_impl.hpp @@ -34,6 +34,8 @@ namespace kagome::parachain { std::vector &&chunks, ParachainBlock const &pov, PersistedValidationData const &data) override; + void putChunk(network::RelayHash const &relay_parent, const CandidateHash &candidate_hash, + ErasureChunk &&chunk) override; void remove(network::RelayHash const &relay_parent) override; private: From c318174107546b3cb051975bc644323188ea63f7 Mon Sep 17 00:00:00 2001 From: iceseer Date: Thu, 30 Mar 2023 16:49:58 +0300 Subject: [PATCH 11/13] formatting Signed-off-by: iceseer --- core/consensus/babe/impl/babe_impl.cpp | 6 ++++-- core/parachain/availability/bitfield/signer.cpp | 3 ++- core/parachain/availability/fetch/fetch.hpp | 3 ++- core/parachain/availability/fetch/fetch_impl.cpp | 9 ++++++--- core/parachain/availability/fetch/fetch_impl.hpp | 9 ++++++--- core/parachain/availability/store/store.hpp | 7 ++++--- core/parachain/availability/store/store_impl.cpp | 6 ++++-- core/parachain/availability/store/store_impl.hpp | 5 +++-- core/parachain/validator/impl/parachain_processor.cpp | 9 ++++++--- 9 files changed, 37 insertions(+), 20 deletions(-) diff --git a/core/consensus/babe/impl/babe_impl.cpp b/core/consensus/babe/impl/babe_impl.cpp index aae2079df6..1c3a312b38 100644 --- a/core/consensus/babe/impl/babe_impl.cpp +++ b/core/consensus/babe/impl/babe_impl.cpp @@ -943,8 +943,10 @@ namespace kagome::consensus::babe { bitfield_store_->getBitfields(relay_parent); paras_inherent_data.backed_candidates = backing_store_->get(relay_parent); - SL_INFO(log_, "Get backed candidates from store.(count={}, relay_parent={})", - paras_inherent_data.backed_candidates.size(), relay_parent); + SL_TRACE(log_, + "Get backed candidates from store.(count={}, relay_parent={})", + paras_inherent_data.backed_candidates.size(), + relay_parent); auto best_block_header_res = block_tree_->getBlockHeader(best_block_.hash); diff --git a/core/parachain/availability/bitfield/signer.cpp b/core/parachain/availability/bitfield/signer.cpp index e261417e74..37dd95ba18 100644 --- a/core/parachain/availability/bitfield/signer.cpp +++ b/core/parachain/availability/bitfield/signer.cpp @@ -112,7 +112,8 @@ namespace kagome::parachain { for (auto &core : cores) { if (auto occupied = boost::get(&core)) { candidates.emplace_back(occupied->candidate_hash); - fetch_->fetch(relay_parent, signer->validatorIndex(), *occupied, *session); + fetch_->fetch( + relay_parent, signer->validatorIndex(), *occupied, *session); } else { candidates.emplace_back(std::nullopt); } diff --git a/core/parachain/availability/fetch/fetch.hpp b/core/parachain/availability/fetch/fetch.hpp index 30135e73ba..3cc0ef1933 100644 --- a/core/parachain/availability/fetch/fetch.hpp +++ b/core/parachain/availability/fetch/fetch.hpp @@ -16,7 +16,8 @@ namespace kagome::parachain { public: virtual ~Fetch() = default; - virtual void fetch(network::RelayHash const &relay_parent, ValidatorIndex chunk_index, + virtual void fetch(network::RelayHash const &relay_parent, + ValidatorIndex chunk_index, const runtime::OccupiedCore &core, const runtime::SessionInfo &session) = 0; }; diff --git a/core/parachain/availability/fetch/fetch_impl.cpp b/core/parachain/availability/fetch/fetch_impl.cpp index cdeae7b8e9..1f053a3814 100644 --- a/core/parachain/availability/fetch/fetch_impl.cpp +++ b/core/parachain/availability/fetch/fetch_impl.cpp @@ -19,7 +19,8 @@ namespace kagome::parachain { query_audi_{std::move(query_audi)}, router_{std::move(router)} {} - void FetchImpl::fetch(network::RelayHash const &relay_parent, ValidatorIndex chunk_index, + void FetchImpl::fetch(network::RelayHash const &relay_parent, + ValidatorIndex chunk_index, const runtime::OccupiedCore &core, const runtime::SessionInfo &session) { std::unique_lock lock{mutex_}; @@ -42,7 +43,8 @@ namespace kagome::parachain { fetch(relay_parent, core.candidate_hash); } - void FetchImpl::fetch(network::RelayHash const &relay_parent, const CandidateHash &candidate_hash) { + void FetchImpl::fetch(network::RelayHash const &relay_parent, + const CandidateHash &candidate_hash) { std::unique_lock lock{mutex_}; auto it = active_.find(candidate_hash); if (it == active_.end()) { @@ -74,7 +76,8 @@ namespace kagome::parachain { active_.erase(it); } - void FetchImpl::fetch(network::RelayHash const &relay_parent, const CandidateHash &candidate_hash, + void FetchImpl::fetch(network::RelayHash const &relay_parent, + const CandidateHash &candidate_hash, outcome::result _chunk) { std::unique_lock lock{mutex_}; auto it = active_.find(candidate_hash); diff --git a/core/parachain/availability/fetch/fetch_impl.hpp b/core/parachain/availability/fetch/fetch_impl.hpp index 9d42463a16..10e8d3dd03 100644 --- a/core/parachain/availability/fetch/fetch_impl.hpp +++ b/core/parachain/availability/fetch/fetch_impl.hpp @@ -23,7 +23,8 @@ namespace kagome::parachain { std::shared_ptr query_audi, std::shared_ptr router); - void fetch(network::RelayHash const &relay_parent, ValidatorIndex chunk_index, + void fetch(network::RelayHash const &relay_parent, + ValidatorIndex chunk_index, const runtime::OccupiedCore &core, const runtime::SessionInfo &session) override; @@ -34,8 +35,10 @@ namespace kagome::parachain { storage::trie::RootHash erasure_encoding_root; }; - void fetch(network::RelayHash const &relay_parent, const CandidateHash &candidate_hash); - void fetch(network::RelayHash const &relay_parent, const CandidateHash &candidate_hash, + void fetch(network::RelayHash const &relay_parent, + const CandidateHash &candidate_hash); + void fetch(network::RelayHash const &relay_parent, + const CandidateHash &candidate_hash, outcome::result _chunk); std::shared_ptr av_store_; diff --git a/core/parachain/availability/store/store.hpp b/core/parachain/availability/store/store.hpp index ade824693c..2143e9b5fe 100644 --- a/core/parachain/availability/store/store.hpp +++ b/core/parachain/availability/store/store.hpp @@ -49,9 +49,10 @@ namespace kagome::parachain { std::vector &&chunks, ParachainBlock const &pov, PersistedValidationData const &data) = 0; - virtual void putChunk(network::RelayHash const &relay_parent, const CandidateHash &candidate_hash, - ErasureChunk &&chunk) = 0; - + virtual void putChunk(network::RelayHash const &relay_parent, + const CandidateHash &candidate_hash, + ErasureChunk &&chunk) = 0; + /// Clears all data according to this relay_parent virtual void remove(network::RelayHash const &relay_parent) = 0; }; diff --git a/core/parachain/availability/store/store_impl.cpp b/core/parachain/availability/store/store_impl.cpp index c23787689f..499d15604b 100644 --- a/core/parachain/availability/store/store_impl.cpp +++ b/core/parachain/availability/store/store_impl.cpp @@ -117,11 +117,13 @@ namespace kagome::parachain { }); } - void AvailabilityStoreImpl::putChunk(network::RelayHash const &relay_parent, const CandidateHash &candidate_hash, + void AvailabilityStoreImpl::putChunk(network::RelayHash const &relay_parent, + const CandidateHash &candidate_hash, ErasureChunk &&chunk) { state_.exclusiveAccess([&](auto &state) { state.candidates_[relay_parent].insert(candidate_hash); - state.per_candidate_[candidate_hash].chunks[chunk.index] = std::move(chunk); + state.per_candidate_[candidate_hash].chunks[chunk.index] = + std::move(chunk); }); } diff --git a/core/parachain/availability/store/store_impl.hpp b/core/parachain/availability/store/store_impl.hpp index 2ff10ffcac..2f76b90ed0 100644 --- a/core/parachain/availability/store/store_impl.hpp +++ b/core/parachain/availability/store/store_impl.hpp @@ -34,8 +34,9 @@ namespace kagome::parachain { std::vector &&chunks, ParachainBlock const &pov, PersistedValidationData const &data) override; - void putChunk(network::RelayHash const &relay_parent, const CandidateHash &candidate_hash, - ErasureChunk &&chunk) override; + void putChunk(network::RelayHash const &relay_parent, + const CandidateHash &candidate_hash, + ErasureChunk &&chunk) override; void remove(network::RelayHash const &relay_parent) override; private: diff --git a/core/parachain/validator/impl/parachain_processor.cpp b/core/parachain/validator/impl/parachain_processor.cpp index 643b067969..26bf0f3932 100644 --- a/core/parachain/validator/impl/parachain_processor.cpp +++ b/core/parachain/validator/impl/parachain_processor.cpp @@ -904,9 +904,12 @@ namespace kagome::parachain { relayParentState.table_context)) { if (auto backed = table_attested_to_backed( std::move(*attested), relayParentState.table_context)) { - logger_->trace("Candidate backed.(candidate={}, para id={}, relay_parent={})", - import_result->imported.candidate, - import_result->imported.group_id, relay_parent); + SL_TRACE( + logger_, + "Candidate backed.(candidate={}, para id={}, relay_parent={})", + import_result->imported.candidate, + import_result->imported.group_id, + relay_parent); backing_store_->add(relay_parent, std::move(*backed)); } } From 5b996b92a97667df680065e85072bd3cced856da Mon Sep 17 00:00:00 2001 From: iceseer Date: Fri, 31 Mar 2023 11:16:33 +0300 Subject: [PATCH 12/13] request POV with peers rotation Signed-off-by: iceseer --- core/injector/application_injector.cpp | 3 +- .../impl/protocols/protocol_req_pov.cpp | 4 +- .../impl/protocols/protocol_req_pov.hpp | 2 +- core/network/protocols/req_pov_protocol.hpp | 2 +- .../validator/impl/parachain_processor.cpp | 162 +++++++++++------- .../validator/parachain_processor.hpp | 14 +- 6 files changed, 114 insertions(+), 73 deletions(-) diff --git a/core/injector/application_injector.cpp b/core/injector/application_injector.cpp index 7fbbcebd5f..bb8567a252 100644 --- a/core/injector/application_injector.cpp +++ b/core/injector/application_injector.cpp @@ -510,7 +510,8 @@ namespace { injector .template create>(), injector.template create< - primitives::events::BabeStateSubscriptionEnginePtr>()); + primitives::events::BabeStateSubscriptionEnginePtr>(), + injector.template create>()); auto protocol_factory = injector.template create>(); diff --git a/core/network/impl/protocols/protocol_req_pov.cpp b/core/network/impl/protocols/protocol_req_pov.cpp index e72cab1996..44bb6af8e6 100644 --- a/core/network/impl/protocols/protocol_req_pov.cpp +++ b/core/network/impl/protocols/protocol_req_pov.cpp @@ -95,12 +95,12 @@ namespace kagome::network { } void ReqPovProtocol::request( - const PeerId &peer_id, + const PeerInfo &peer_info, RequestPov request, std::function)> &&response_handler) { BOOST_ASSERT(impl_ && !!"ReqPovProtocolImpl must be initialized!"); return impl_->doRequest( - peer_id, std::move(request), std::move(response_handler)); + peer_info, std::move(request), std::move(response_handler)); } } // namespace kagome::network diff --git a/core/network/impl/protocols/protocol_req_pov.hpp b/core/network/impl/protocols/protocol_req_pov.hpp index 3be0b73a19..6323208769 100644 --- a/core/network/impl/protocols/protocol_req_pov.hpp +++ b/core/network/impl/protocols/protocol_req_pov.hpp @@ -46,7 +46,7 @@ namespace kagome::network { std::function>)> &&cb) override; - void request(const PeerId &peer_id, + void request(const PeerInfo &peer_info, RequestPov, std::function)> &&response_handler) override; diff --git a/core/network/protocols/req_pov_protocol.hpp b/core/network/protocols/req_pov_protocol.hpp index f9729cbff1..0bce56ec56 100644 --- a/core/network/protocols/req_pov_protocol.hpp +++ b/core/network/protocols/req_pov_protocol.hpp @@ -17,7 +17,7 @@ namespace kagome::network { class IReqPovProtocol : public ProtocolBase { public: - virtual void request(const PeerId &peer_id, + virtual void request(const PeerInfo &peer_info, RequestPov, std::function)> &&response_handler) = 0; diff --git a/core/parachain/validator/impl/parachain_processor.cpp b/core/parachain/validator/impl/parachain_processor.cpp index 26bf0f3932..a2d64ee5cd 100644 --- a/core/parachain/validator/impl/parachain_processor.cpp +++ b/core/parachain/validator/impl/parachain_processor.cpp @@ -75,7 +75,8 @@ namespace kagome::parachain { std::shared_ptr signer_factory, const application::AppConfiguration &app_config, std::shared_ptr app_state_manager, - primitives::events::BabeStateSubscriptionEnginePtr babe_status_observable) + primitives::events::BabeStateSubscriptionEnginePtr babe_status_observable, + std::shared_ptr query_audi) : pm_(std::move(pm)), crypto_provider_(std::move(crypto_provider)), router_(std::move(router)), @@ -92,7 +93,8 @@ namespace kagome::parachain { av_store_(std::move(av_store)), parachain_host_(std::move(parachain_host)), app_config_(app_config), - babe_status_observable_(std::move(babe_status_observable)) { + babe_status_observable_(std::move(babe_status_observable)), + query_audi_{std::move(query_audi)} { BOOST_ASSERT(pm_); BOOST_ASSERT(peer_view_); BOOST_ASSERT(crypto_provider_); @@ -108,6 +110,7 @@ namespace kagome::parachain { BOOST_ASSERT(parachain_host_); BOOST_ASSERT(signer_factory_); BOOST_ASSERT(babe_status_observable_); + BOOST_ASSERT(query_audi_); app_state_manager->takeControl(*this); } @@ -448,26 +451,41 @@ namespace kagome::parachain { } template - void ParachainProcessorImpl::requestPoV(libp2p::peer::PeerId const &peer_id, - CandidateHash const &candidate_hash, - F &&callback) { + void ParachainProcessorImpl::requestPoV( + libp2p::peer::PeerInfo const &peer_info, + CandidateHash const &candidate_hash, + F &&callback) { /// TODO(iceseer): request PoV from validator, who seconded candidate /// But now we can assume, that if we received either `seconded` or `valid` /// from some peer, than we expect this peer has valid PoV, which we can /// request. - logger_->info( - "Requesting PoV.(candidate hash={}, peer={})", candidate_hash, peer_id); + logger_->info("Requesting PoV.(candidate hash={}, peer={})", + candidate_hash, + peer_info.id); auto protocol = router_->getReqPovProtocol(); BOOST_ASSERT(protocol); - protocol->request(peer_id, candidate_hash, std::forward(callback)); + protocol->request(peer_info, candidate_hash, std::forward(callback)); + } + + std::optional + ParachainProcessorImpl::retrieveSessionInfo(RelayHash const &relay_parent) { + if (auto session_index = + parachain_host_->session_index_for_child(relay_parent); + session_index.has_value()) { + if (auto session_info = parachain_host_->session_info( + relay_parent, session_index.value()); + session_info.has_value()) { + return session_info.value(); + } + } + return std::nullopt; } void ParachainProcessorImpl::kickOffValidationWork( RelayHash const &relay_parent, - libp2p::peer::PeerId const &peer_id, AttestingData &attesting_data, RelayParentState ¶chain_state) { auto const candidate_hash{candidateHashFrom(attesting_data.candidate)}; @@ -483,54 +501,72 @@ namespace kagome::parachain { return; } - requestPoV( - peer_id, - candidate_hash, - [candidate{attesting_data.candidate}, - candidate_hash, - wself{weak_from_this()}, - relay_parent, - peer_id](auto &&pov_response_result) mutable { - if (auto self = wself.lock()) { - auto parachain_state = self->tryGetStateByRelayParent(relay_parent); - if (!parachain_state) { - self->logger_->warn( - "After request pov no parachain state on relay_parent {}", - relay_parent); - return; - } + auto session_info = retrieveSessionInfo(relay_parent); + if (!session_info) { + SL_WARN(logger_, "No session info.(relay_parent={})", relay_parent); + return; + } - if (!pov_response_result) { - self->logger_->warn("Request PoV on relay_parent {} failed {}", - relay_parent, - pov_response_result.error().message()); - return; - } + if (session_info->discovery_keys.size() >= attesting_data.from_validator) { + SL_ERROR(logger_, + "Invalid validator index.(relay_parent={}, validator_index={})", + relay_parent, + attesting_data.from_validator); + return; + } - network::ResponsePov &opt_pov = pov_response_result.value(); - auto p{boost::get(&opt_pov)}; - if (!p) { - /// TODO(iceseer): Implement validators rotation to request PoV - self->logger_->warn( - "No PoV relay_parent {}. Should request next validator. Not " - "implemented.", - relay_parent); - return; - } + auto const &authority_id = + session_info->discovery_keys[attesting_data.from_validator]; + if (auto peer = query_audi_->get(authority_id)) { + requestPoV( + *peer, + candidate_hash, + [candidate{attesting_data.candidate}, + candidate_hash, + wself{weak_from_this()}, + relay_parent, + peer_id{peer->id}](auto &&pov_response_result) mutable { + if (auto self = wself.lock()) { + auto parachain_state = + self->tryGetStateByRelayParent(relay_parent); + if (!parachain_state) { + self->logger_->warn( + "After request pov no parachain state on relay_parent {}", + relay_parent); + return; + } - self->logger_->info("PoV received.(candidate hash={}, peer={})", - candidate_hash, - peer_id); - self->appendAsyncValidationTask( - std::move(candidate), - std::move(*p), - relay_parent, - peer_id, - parachain_state->get(), - candidate_hash, - parachain_state->get().table_context.validators.size()); - } - }); + if (!pov_response_result) { + self->logger_->warn("Request PoV on relay_parent {} failed {}", + relay_parent, + pov_response_result.error().message()); + return; + } + + network::ResponsePov &opt_pov = pov_response_result.value(); + auto p{boost::get(&opt_pov)}; + if (!p) { + self->logger_->warn("No PoV.(candidate={})", candidate_hash); + self->onAttestNoPoVComplete(relay_parent, candidate_hash); + return; + } + + self->logger_->info( + "PoV received.(relay_parent={}, candidate hash={}, peer={})", + relay_parent, + candidate_hash, + peer_id); + self->appendAsyncValidationTask( + std::move(candidate), + std::move(*p), + relay_parent, + peer_id, + parachain_state->get(), + candidate_hash, + parachain_state->get().table_context.validators.size()); + } + }); + } } outcome::result @@ -739,7 +775,7 @@ namespace kagome::parachain { if (attesting_ref) { kickOffValidationWork( - relay_parent, peer_id, attesting_ref->get(), parachain_state); + relay_parent, attesting_ref->get(), parachain_state); } } } @@ -1362,18 +1398,19 @@ namespace kagome::parachain { } void ParachainProcessorImpl::onAttestNoPoVComplete( - libp2p::peer::PeerId const &peer_id, ValidateAndSecondResult &&result) { - auto parachain_state = tryGetStateByRelayParent(result.relay_parent); + network::RelayHash const &relay_parent, + CandidateHash const &candidate_hash) { + auto parachain_state = tryGetStateByRelayParent(relay_parent); if (!parachain_state) { logger_->warn( - "onAttestNoPoVComplete result based on unexpected relay_parent {}", - result.relay_parent); + "onAttestNoPoVComplete result based on unexpected relay_parent. " + "(relay_parent={}, candidate={})", + relay_parent, + candidate_hash); return; } - auto const candidate_hash = candidateHashFrom(result.candidate); auto it = parachain_state->get().fallbacks.find(candidate_hash); - if (it == parachain_state->get().fallbacks.end()) { logger_->error( "Internal error. Fallbacks doesn't contain candidate hash {}", @@ -1386,8 +1423,7 @@ namespace kagome::parachain { if (!attesting.backing.empty()) { attesting.from_validator = attesting.backing.front(); attesting.backing.pop(); - kickOffValidationWork( - result.relay_parent, peer_id, attesting, *parachain_state); + kickOffValidationWork(relay_parent, attesting, *parachain_state); } } diff --git a/core/parachain/validator/parachain_processor.hpp b/core/parachain/validator/parachain_processor.hpp index a40b2faa95..89ec1d082d 100644 --- a/core/parachain/validator/parachain_processor.hpp +++ b/core/parachain/validator/parachain_processor.hpp @@ -19,6 +19,7 @@ #include #include "application/app_configuration.hpp" +#include "authority_discovery/query/query.hpp" #include "common/visitor.hpp" #include "crypto/hasher.hpp" #include "network/peer_manager.hpp" @@ -92,7 +93,8 @@ namespace kagome::parachain { const application::AppConfiguration &app_config, std::shared_ptr app_state_manager, primitives::events::BabeStateSubscriptionEnginePtr - babe_status_observable); + babe_status_observable, + std::shared_ptr query_audi); ~ParachainProcessorImpl() = default; bool start(); @@ -204,7 +206,7 @@ namespace kagome::parachain { primitives::BlockHash const &relay_parent, size_t n_validators); template - void requestPoV(libp2p::peer::PeerId const &peer_id, + void requestPoV(libp2p::peer::PeerInfo const &peer_info, CandidateHash const &candidate_hash, F &&callback); @@ -225,8 +227,8 @@ namespace kagome::parachain { ValidateAndSecondResult &&result); void onAttestComplete(libp2p::peer::PeerId const &peer_id, ValidateAndSecondResult &&result); - void onAttestNoPoVComplete(libp2p::peer::PeerId const &peer_id, - ValidateAndSecondResult &&result); + void onAttestNoPoVComplete(network::RelayHash const &relay_parent, + CandidateHash const &candidate_hash); template void appendAsyncValidationTask(network::CandidateReceipt &&candidate, @@ -237,9 +239,10 @@ namespace kagome::parachain { const primitives::BlockHash &candidate_hash, size_t n_validators); void kickOffValidationWork(RelayHash const &relay_parent, - libp2p::peer::PeerId const &peer_id, AttestingData &attesting_data, RelayParentState ¶chain_state); + std::optional retrieveSessionInfo( + RelayHash const &relay_parent); void handleFetchedCollation(network::CollationEvent &&pending_collation, network::CollationFetchingResponse &&response); template @@ -422,6 +425,7 @@ namespace kagome::parachain { const application::AppConfiguration &app_config_; primitives::events::BabeStateSubscriptionEnginePtr babe_status_observable_; primitives::events::BabeStateEventSubscriberPtr babe_status_observer_; + std::shared_ptr query_audi_; std::shared_ptr chain_sub_; }; From a21398f112e63df290d2ca131c436ee1d3c1a7b1 Mon Sep 17 00:00:00 2001 From: iceseer Date: Fri, 31 Mar 2023 13:05:58 +0300 Subject: [PATCH 13/13] issues fixes Signed-off-by: iceseer --- core/utils/profiler.hpp | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/core/utils/profiler.hpp b/core/utils/profiler.hpp index c876e2b212..240ac4a048 100644 --- a/core/utils/profiler.hpp +++ b/core/utils/profiler.hpp @@ -10,30 +10,33 @@ class TicToc { std::chrono::time_point t_; public: + TicToc(std::string &&) = delete; + TicToc(std::string const &) = delete; TicToc(std::string_view name, const kagome::log::Logger &log) : name_(name), log_(log) { t_ = std::chrono::high_resolution_clock::now(); } - template - inline void toc(int line = -1) { + void toc() { auto prev = t_; t_ = std::chrono::high_resolution_clock::now(); - if constexpr (kLine != -1) { - log_->info( - "{} at line {} lasted for {} sec", - name_, - std::to_string(kLine), - std::chrono::duration_cast(t_ - prev).count()); - } else { - log_->info( - "{} lasted for {} sec", - name_, - std::chrono::duration_cast(t_ - prev).count()); - } + log_->info( + "{} lasted for {} sec", + name_, + std::chrono::duration_cast(t_ - prev).count()); + } + + void toc(int line) { + auto prev = t_; + t_ = std::chrono::high_resolution_clock::now(); + log_->info( + "{} at line {} lasted for {} sec", + name_, + std::to_string(line), + std::chrono::duration_cast(t_ - prev).count()); } ~TicToc() { - toc<>(); + toc(); } };