diff --git a/core/injector/application_injector.cpp b/core/injector/application_injector.cpp index 15015115ce..720af3926c 100644 --- a/core/injector/application_injector.cpp +++ b/core/injector/application_injector.cpp @@ -506,7 +506,8 @@ namespace { injector .template create>(), injector.template create< - primitives::events::BabeStateSubscriptionEnginePtr>()); + primitives::events::BabeStateSubscriptionEnginePtr>(), + injector.template create>()); auto protocol_factory = injector.template create>(); diff --git a/core/network/impl/protocols/protocol_req_pov.cpp b/core/network/impl/protocols/protocol_req_pov.cpp index e72cab1996..6d16680597 100644 --- a/core/network/impl/protocols/protocol_req_pov.cpp +++ b/core/network/impl/protocols/protocol_req_pov.cpp @@ -54,7 +54,8 @@ namespace kagome::network { } void onTxRequest(RequestPov const &request) override { - base().logger()->info("Transmit PoV request(candidate hash={})", request); + base().logger()->trace("Transmit PoV request(candidate hash={})", + request); } private: @@ -95,12 +96,12 @@ namespace kagome::network { } void ReqPovProtocol::request( - const PeerId &peer_id, + const PeerInfo &peer_info, RequestPov request, std::function)> &&response_handler) { BOOST_ASSERT(impl_ && !!"ReqPovProtocolImpl must be initialized!"); return impl_->doRequest( - peer_id, std::move(request), std::move(response_handler)); + peer_info, std::move(request), std::move(response_handler)); } } // namespace kagome::network diff --git a/core/network/impl/protocols/protocol_req_pov.hpp b/core/network/impl/protocols/protocol_req_pov.hpp index 3be0b73a19..6323208769 100644 --- a/core/network/impl/protocols/protocol_req_pov.hpp +++ b/core/network/impl/protocols/protocol_req_pov.hpp @@ -46,7 +46,7 @@ namespace kagome::network { std::function>)> &&cb) override; - void request(const PeerId &peer_id, + void request(const PeerInfo &peer_info, RequestPov, std::function)> &&response_handler) override; diff --git a/core/network/impl/protocols/request_response_protocol.hpp b/core/network/impl/protocols/request_response_protocol.hpp index 38a42d8a3b..42a94dce18 100644 --- a/core/network/impl/protocols/request_response_protocol.hpp +++ b/core/network/impl/protocols/request_response_protocol.hpp @@ -122,10 +122,10 @@ namespace kagome::network { const PeerInfo &peer_info, std::function>)> &&cb) override { - SL_INFO(base_.logger(), - "Connect for {} stream with {}", - protocolName(), - peer_info.id); + SL_TRACE(base_.logger(), + "Connect for {} stream with {}", + protocolName(), + peer_info.id); base_.host().newStream( peer_info, diff --git a/core/network/protocols/req_pov_protocol.hpp b/core/network/protocols/req_pov_protocol.hpp index f9729cbff1..0bce56ec56 100644 --- a/core/network/protocols/req_pov_protocol.hpp +++ b/core/network/protocols/req_pov_protocol.hpp @@ -17,7 +17,7 @@ namespace kagome::network { class IReqPovProtocol : public ProtocolBase { public: - virtual void request(const PeerId &peer_id, + virtual void request(const PeerInfo &peer_info, RequestPov, std::function)> &&response_handler) = 0; diff --git a/core/parachain/approval/approval_distribution.cpp b/core/parachain/approval/approval_distribution.cpp index ec6301d3a5..fd6e1a4c3e 100644 --- a/core/parachain/approval/approval_distribution.cpp +++ b/core/parachain/approval/approval_distribution.cpp @@ -1086,6 +1086,12 @@ namespace kagome::parachain { void ApprovalDistribution::on_active_leaves_update( const network::ExView &updated) { + /* + * THIS CODE DISABLED TO PREVENT EXTRA CPU USAGE BECAUSE OF INFINITE TASKS + * IN MAIN THREAD. + */ + return; + if (!parachain_processor_->canProcessParachains()) { return; } @@ -1569,6 +1575,15 @@ namespace kagome::parachain { void ApprovalDistribution::onValidationProtocolMsg( libp2p::peer::PeerId const &peer_id, network::ValidatorProtocolMessage const &message) { + /* + * THIS CODE DISABLED TO PREVENT EXTRA CPU USAGE BECAUSE OF INFINITE TASKS + * IN MAIN THREAD. + */ + return; + + if (!parachain_processor_->canProcessParachains()) { + return; + } if (auto m{boost::get(&message)}) { visit_in_place( *m, diff --git a/core/parachain/validator/impl/parachain_processor.cpp b/core/parachain/validator/impl/parachain_processor.cpp index 26bf0f3932..5e58e4af73 100644 --- a/core/parachain/validator/impl/parachain_processor.cpp +++ b/core/parachain/validator/impl/parachain_processor.cpp @@ -75,7 +75,8 @@ namespace kagome::parachain { std::shared_ptr signer_factory, const application::AppConfiguration &app_config, std::shared_ptr app_state_manager, - primitives::events::BabeStateSubscriptionEnginePtr babe_status_observable) + primitives::events::BabeStateSubscriptionEnginePtr babe_status_observable, + std::shared_ptr query_audi) : pm_(std::move(pm)), crypto_provider_(std::move(crypto_provider)), router_(std::move(router)), @@ -92,7 +93,8 @@ namespace kagome::parachain { av_store_(std::move(av_store)), parachain_host_(std::move(parachain_host)), app_config_(app_config), - babe_status_observable_(std::move(babe_status_observable)) { + babe_status_observable_(std::move(babe_status_observable)), + query_audi_{std::move(query_audi)} { BOOST_ASSERT(pm_); BOOST_ASSERT(peer_view_); BOOST_ASSERT(crypto_provider_); @@ -108,6 +110,7 @@ namespace kagome::parachain { BOOST_ASSERT(parachain_host_); BOOST_ASSERT(signer_factory_); BOOST_ASSERT(babe_status_observable_); + BOOST_ASSERT(query_audi_); app_state_manager->takeControl(*this); } @@ -431,15 +434,18 @@ namespace kagome::parachain { BOOST_ASSERT_MSG( bd, "BitfieldDistribution is not present. Check message format."); - logger_->info( - "Imported bitfield {} {}", bd->data.payload.ix, bd->relay_parent); + SL_TRACE(logger_, + "Imported bitfield {} {}", + bd->data.payload.ix, + bd->relay_parent); bitfield_store_->putBitfield(bd->relay_parent, bd->data); return; } if (auto msg{boost::get(&message)}) { if (auto statement_msg{boost::get(msg)}) { - logger_->info("Imported statement on {}", statement_msg->relay_parent); + SL_TRACE( + logger_, "Imported statement on {}", statement_msg->relay_parent); handleStatement( peer_id, statement_msg->relay_parent, statement_msg->statement); } @@ -448,32 +454,49 @@ namespace kagome::parachain { } template - void ParachainProcessorImpl::requestPoV(libp2p::peer::PeerId const &peer_id, - CandidateHash const &candidate_hash, - F &&callback) { + void ParachainProcessorImpl::requestPoV( + libp2p::peer::PeerInfo const &peer_info, + CandidateHash const &candidate_hash, + F &&callback) { /// TODO(iceseer): request PoV from validator, who seconded candidate /// But now we can assume, that if we received either `seconded` or `valid` /// from some peer, than we expect this peer has valid PoV, which we can /// request. - logger_->info( - "Requesting PoV.(candidate hash={}, peer={})", candidate_hash, peer_id); + logger_->info("Requesting PoV.(candidate hash={}, peer={})", + candidate_hash, + peer_info.id); auto protocol = router_->getReqPovProtocol(); BOOST_ASSERT(protocol); - protocol->request(peer_id, candidate_hash, std::forward(callback)); + protocol->request(peer_info, candidate_hash, std::forward(callback)); + } + + std::optional + ParachainProcessorImpl::retrieveSessionInfo(RelayHash const &relay_parent) { + if (auto session_index = + parachain_host_->session_index_for_child(relay_parent); + session_index.has_value()) { + if (auto session_info = parachain_host_->session_info( + relay_parent, session_index.value()); + session_info.has_value()) { + return session_info.value(); + } + } + return std::nullopt; } void ParachainProcessorImpl::kickOffValidationWork( RelayHash const &relay_parent, - libp2p::peer::PeerId const &peer_id, AttestingData &attesting_data, RelayParentState ¶chain_state) { auto const candidate_hash{candidateHashFrom(attesting_data.candidate)}; BOOST_ASSERT(this_context_->get_executor().running_in_this_thread()); - parachain_state.awaiting_validation.insert(candidate_hash); + if (!parachain_state.awaiting_validation.insert(candidate_hash).second) { + return; + } auto const &collator_id = collatorIdFromDescriptor(attesting_data.candidate.descriptor); @@ -483,54 +506,72 @@ namespace kagome::parachain { return; } - requestPoV( - peer_id, - candidate_hash, - [candidate{attesting_data.candidate}, - candidate_hash, - wself{weak_from_this()}, - relay_parent, - peer_id](auto &&pov_response_result) mutable { - if (auto self = wself.lock()) { - auto parachain_state = self->tryGetStateByRelayParent(relay_parent); - if (!parachain_state) { - self->logger_->warn( - "After request pov no parachain state on relay_parent {}", - relay_parent); - return; - } + auto session_info = retrieveSessionInfo(relay_parent); + if (!session_info) { + SL_WARN(logger_, "No session info.(relay_parent={})", relay_parent); + return; + } - if (!pov_response_result) { - self->logger_->warn("Request PoV on relay_parent {} failed {}", - relay_parent, - pov_response_result.error().message()); - return; - } + if (session_info->discovery_keys.size() <= attesting_data.from_validator) { + SL_ERROR(logger_, + "Invalid validator index.(relay_parent={}, validator_index={})", + relay_parent, + attesting_data.from_validator); + return; + } - network::ResponsePov &opt_pov = pov_response_result.value(); - auto p{boost::get(&opt_pov)}; - if (!p) { - /// TODO(iceseer): Implement validators rotation to request PoV - self->logger_->warn( - "No PoV relay_parent {}. Should request next validator. Not " - "implemented.", - relay_parent); - return; - } + auto const &authority_id = + session_info->discovery_keys[attesting_data.from_validator]; + if (auto peer = query_audi_->get(authority_id)) { + requestPoV( + *peer, + candidate_hash, + [candidate{attesting_data.candidate}, + candidate_hash, + wself{weak_from_this()}, + relay_parent, + peer_id{peer->id}](auto &&pov_response_result) mutable { + if (auto self = wself.lock()) { + auto parachain_state = + self->tryGetStateByRelayParent(relay_parent); + if (!parachain_state) { + self->logger_->warn( + "After request pov no parachain state on relay_parent {}", + relay_parent); + return; + } - self->logger_->info("PoV received.(candidate hash={}, peer={})", - candidate_hash, - peer_id); - self->appendAsyncValidationTask( - std::move(candidate), - std::move(*p), - relay_parent, - peer_id, - parachain_state->get(), - candidate_hash, - parachain_state->get().table_context.validators.size()); - } - }); + if (!pov_response_result) { + self->logger_->warn("Request PoV on relay_parent {} failed {}", + relay_parent, + pov_response_result.error().message()); + return; + } + + network::ResponsePov &opt_pov = pov_response_result.value(); + auto p{boost::get(&opt_pov)}; + if (!p) { + self->logger_->warn("No PoV.(candidate={})", candidate_hash); + self->onAttestNoPoVComplete(relay_parent, candidate_hash); + return; + } + + self->logger_->info( + "PoV received.(relay_parent={}, candidate hash={}, peer={})", + relay_parent, + candidate_hash, + peer_id); + self->appendAsyncValidationTask( + std::move(candidate), + std::move(*p), + relay_parent, + peer_id, + parachain_state->get(), + candidate_hash, + parachain_state->get().table_context.validators.size()); + } + }); + } } outcome::result @@ -679,17 +720,18 @@ namespace kagome::parachain { if (auto result = importStatement(relay_parent, statement, parachain_state)) { if (result->imported.group_id != assignment) { - logger_->warn( + SL_TRACE( + logger_, "Registered statement from not our group(our: {}, registered: {}).", assignment, result->imported.group_id); return; } - logger_->trace( - "Registered incoming statement.(relay_parent={}, peer={}).", - relay_parent, - peer_id); + SL_TRACE(logger_, + "Registered incoming statement.(relay_parent={}, peer={}).", + relay_parent, + peer_id); std::optional> attesting_ref = visit_in_place( parachain::getPayload(statement).candidate_state, @@ -739,7 +781,7 @@ namespace kagome::parachain { if (attesting_ref) { kickOffValidationWork( - relay_parent, peer_id, attesting_ref->get(), parachain_state); + relay_parent, attesting_ref->get(), parachain_state); } } } @@ -749,7 +791,8 @@ namespace kagome::parachain { ParachainProcessorImpl::RelayParentState &relayParentState, primitives::BlockHash const &candidate_hash, network::SignedStatement const &statement) { - logger_->info("Import statement into table.(candidate={})", candidate_hash); + SL_TRACE( + logger_, "Import statement into table.(candidate={})", candidate_hash); if (auto r = backing_store_->put(relayParentState.table_context.groups, statement)) { @@ -894,23 +937,27 @@ namespace kagome::parachain { statement); if (import_result) { - logger_->info( - "Import result.(candidate={}, group id={}, validity votes={})", - import_result->imported.candidate, - import_result->imported.group_id, - import_result->imported.validity_votes); + SL_TRACE(logger_, + "Import result.(candidate={}, group id={}, validity votes={})", + import_result->imported.candidate, + import_result->imported.group_id, + import_result->imported.validity_votes); if (auto attested = attested_candidate(import_result->imported.candidate, relayParentState.table_context)) { - if (auto backed = table_attested_to_backed( - std::move(*attested), relayParentState.table_context)) { - SL_TRACE( - logger_, - "Candidate backed.(candidate={}, para id={}, relay_parent={})", - import_result->imported.candidate, - import_result->imported.group_id, - relay_parent); - backing_store_->add(relay_parent, std::move(*backed)); + if (relayParentState.backed_hashes + .insert(candidateHash(*hasher_, attested->candidate)) + .second) { + if (auto backed = table_attested_to_backed( + std::move(*attested), relayParentState.table_context)) { + SL_INFO( + logger_, + "Candidate backed.(candidate={}, para id={}, relay_parent={})", + import_result->imported.candidate, + import_result->imported.group_id, + relay_parent); + backing_store_->add(relay_parent, std::move(*backed)); + } } } } @@ -1332,7 +1379,7 @@ namespace kagome::parachain { } void ParachainProcessorImpl::onAttestComplete( - libp2p::peer::PeerId const &peer_id, ValidateAndSecondResult &&result) { + libp2p::peer::PeerId const &, ValidateAndSecondResult &&result) { auto parachain_state = tryGetStateByRelayParent(result.relay_parent); if (!parachain_state) { logger_->warn( @@ -1362,18 +1409,19 @@ namespace kagome::parachain { } void ParachainProcessorImpl::onAttestNoPoVComplete( - libp2p::peer::PeerId const &peer_id, ValidateAndSecondResult &&result) { - auto parachain_state = tryGetStateByRelayParent(result.relay_parent); + network::RelayHash const &relay_parent, + CandidateHash const &candidate_hash) { + auto parachain_state = tryGetStateByRelayParent(relay_parent); if (!parachain_state) { logger_->warn( - "onAttestNoPoVComplete result based on unexpected relay_parent {}", - result.relay_parent); + "onAttestNoPoVComplete result based on unexpected relay_parent. " + "(relay_parent={}, candidate={})", + relay_parent, + candidate_hash); return; } - auto const candidate_hash = candidateHashFrom(result.candidate); auto it = parachain_state->get().fallbacks.find(candidate_hash); - if (it == parachain_state->get().fallbacks.end()) { logger_->error( "Internal error. Fallbacks doesn't contain candidate hash {}", @@ -1386,8 +1434,7 @@ namespace kagome::parachain { if (!attesting.backing.empty()) { attesting.from_validator = attesting.backing.front(); attesting.backing.pop(); - kickOffValidationWork( - result.relay_parent, peer_id, attesting, *parachain_state); + kickOffValidationWork(relay_parent, attesting, *parachain_state); } } diff --git a/core/parachain/validator/parachain_processor.hpp b/core/parachain/validator/parachain_processor.hpp index a40b2faa95..cc7820111c 100644 --- a/core/parachain/validator/parachain_processor.hpp +++ b/core/parachain/validator/parachain_processor.hpp @@ -19,6 +19,7 @@ #include #include "application/app_configuration.hpp" +#include "authority_discovery/query/query.hpp" #include "common/visitor.hpp" #include "crypto/hasher.hpp" #include "network/peer_manager.hpp" @@ -92,7 +93,8 @@ namespace kagome::parachain { const application::AppConfiguration &app_config, std::shared_ptr app_state_manager, primitives::events::BabeStateSubscriptionEnginePtr - babe_status_observable); + babe_status_observable, + std::shared_ptr query_audi); ~ParachainProcessorImpl() = default; bool start(); @@ -187,6 +189,7 @@ namespace kagome::parachain { std::unordered_set issued_statements; std::unordered_set peers_advertised; std::unordered_map fallbacks; + std::unordered_set backed_hashes; }; /* @@ -204,7 +207,7 @@ namespace kagome::parachain { primitives::BlockHash const &relay_parent, size_t n_validators); template - void requestPoV(libp2p::peer::PeerId const &peer_id, + void requestPoV(libp2p::peer::PeerInfo const &peer_info, CandidateHash const &candidate_hash, F &&callback); @@ -225,8 +228,8 @@ namespace kagome::parachain { ValidateAndSecondResult &&result); void onAttestComplete(libp2p::peer::PeerId const &peer_id, ValidateAndSecondResult &&result); - void onAttestNoPoVComplete(libp2p::peer::PeerId const &peer_id, - ValidateAndSecondResult &&result); + void onAttestNoPoVComplete(network::RelayHash const &relay_parent, + CandidateHash const &candidate_hash); template void appendAsyncValidationTask(network::CandidateReceipt &&candidate, @@ -237,9 +240,10 @@ namespace kagome::parachain { const primitives::BlockHash &candidate_hash, size_t n_validators); void kickOffValidationWork(RelayHash const &relay_parent, - libp2p::peer::PeerId const &peer_id, AttestingData &attesting_data, RelayParentState ¶chain_state); + std::optional retrieveSessionInfo( + RelayHash const &relay_parent); void handleFetchedCollation(network::CollationEvent &&pending_collation, network::CollationFetchingResponse &&response); template @@ -422,6 +426,7 @@ namespace kagome::parachain { const application::AppConfiguration &app_config_; primitives::events::BabeStateSubscriptionEnginePtr babe_status_observable_; primitives::events::BabeStateEventSubscriberPtr babe_status_observer_; + std::shared_ptr query_audi_; std::shared_ptr chain_sub_; };