Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel backing with validation in pool #1544

Merged
merged 16 commits into from
Mar 31, 2023
22 changes: 19 additions & 3 deletions core/blockchain/impl/block_tree_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "blockchain/block_tree_error.hpp"
#include "blockchain/impl/cached_tree.hpp"
#include "blockchain/impl/justification_storage_policy.hpp"
#include "blockchain/impl/storage_util.hpp"
#include "consensus/babe/impl/babe_digests_util.hpp"
#include "consensus/babe/is_primary.hpp"
#include "crypto/blake2/blake2b.h"
Expand Down Expand Up @@ -693,7 +694,19 @@ namespace kagome::blockchain {
node->finalized = true;
node->has_justification = true;

OUTCOME_TRY(prune(node));
OUTCOME_TRY(retired_hashes, prune(node));
for (primitives::BlockNumber n = last_finalized_block_info.number;
n < node->getBlockInfo().number;
++n) {
if (auto result = storage_->getBlockHash(n);
result.has_value() && result.value()) {
retired_hashes.emplace_back(std::move(*result.value()));
}
}

chain_events_engine_->notify(
primitives::events::ChainEventType::kDeactivateAfterFinalization,
retired_hashes);

tree_->updateTreeRoot(node, justification);

Expand Down Expand Up @@ -1159,7 +1172,7 @@ namespace kagome::blockchain {
}
}

outcome::result<void> BlockTreeImpl::prune(
outcome::result<std::vector<primitives::BlockHash>> BlockTreeImpl::prune(
const std::shared_ptr<TreeNode> &lastFinalizedNode) {
std::deque<std::shared_ptr<TreeNode>> to_remove;

Expand Down Expand Up @@ -1191,9 +1204,12 @@ namespace kagome::blockchain {
}

std::vector<primitives::Extrinsic> extrinsics;
std::vector<primitives::BlockHash> retired_hashes;

// remove from storage
retired_hashes.reserve(to_remove.size());
for (const auto &node : to_remove) {
retired_hashes.emplace_back(node->block_hash);
OUTCOME_TRY(block_body_res, storage_->getBlockBody(node->block_hash));
if (block_body_res.has_value()) {
extrinsics.reserve(extrinsics.size() + block_body_res.value().size());
Expand Down Expand Up @@ -1223,7 +1239,7 @@ namespace kagome::blockchain {
}
}

return outcome::success();
return retired_hashes;
}

outcome::result<void> BlockTreeImpl::reorganize() {
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain/impl/block_tree_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ namespace kagome::blockchain {
*/
std::vector<primitives::BlockHash> getLeavesSorted() const;

outcome::result<void> prune(
outcome::result<std::vector<primitives::BlockHash>> prune(
const std::shared_ptr<TreeNode> &lastFinalizedNode);

outcome::result<void> reorganize();
Expand Down
6 changes: 4 additions & 2 deletions core/consensus/babe/impl/babe_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -943,8 +943,10 @@ namespace kagome::consensus::babe {
bitfield_store_->getBitfields(relay_parent);

paras_inherent_data.backed_candidates = backing_store_->get(relay_parent);
log_->trace("Get backed candidates from store.(count={})",
paras_inherent_data.backed_candidates.size());
SL_TRACE(log_,
"Get backed candidates from store.(count={}, relay_parent={})",
paras_inherent_data.backed_candidates.size(),
relay_parent);

auto best_block_header_res =
block_tree_->getBlockHeader(best_block_.hash);
Expand Down
5 changes: 4 additions & 1 deletion core/injector/application_injector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
#include "outcome/outcome.hpp"
#include "parachain/approval/approval_distribution.hpp"
#include "parachain/availability/bitfield/store_impl.hpp"
#include "parachain/availability/fetch/fetch_impl.hpp"
#include "parachain/availability/recovery/recovery_impl.hpp"
#include "parachain/availability/store/store_impl.hpp"
#include "parachain/backing/store_impl.hpp"
Expand Down Expand Up @@ -509,7 +510,8 @@ namespace {
injector
.template create<std::shared_ptr<application::AppStateManager>>(),
injector.template create<
primitives::events::BabeStateSubscriptionEnginePtr>());
primitives::events::BabeStateSubscriptionEnginePtr>(),
injector.template create<sptr<authority_discovery::Query>>());

auto protocol_factory =
injector.template create<std::shared_ptr<network::ProtocolFactory>>();
Expand Down Expand Up @@ -939,6 +941,7 @@ namespace {
bind_by_lambda<network::StateProtocolObserver>(get_state_observer_impl),
bind_by_lambda<network::SyncProtocolObserver>(get_sync_observer_impl),
di::bind<parachain::AvailabilityStore>.template to<parachain::AvailabilityStoreImpl>(),
di::bind<parachain::Fetch>.template to<parachain::FetchImpl>(),
di::bind<parachain::Recovery>.template to<parachain::RecoveryImpl>(),
di::bind<parachain::BitfieldStore>.template to<parachain::BitfieldStoreImpl>(),
di::bind<parachain::BackingStore>.template to<parachain::BackingStoreImpl>(),
Expand Down
4 changes: 2 additions & 2 deletions core/network/impl/protocols/protocol_req_pov.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ namespace kagome::network {
}

void ReqPovProtocol::request(
const PeerId &peer_id,
const PeerInfo &peer_info,
RequestPov request,
std::function<void(outcome::result<ResponsePov>)> &&response_handler) {
BOOST_ASSERT(impl_ && !!"ReqPovProtocolImpl must be initialized!");
return impl_->doRequest(
peer_id, std::move(request), std::move(response_handler));
peer_info, std::move(request), std::move(response_handler));
}

} // namespace kagome::network
2 changes: 1 addition & 1 deletion core/network/impl/protocols/protocol_req_pov.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ namespace kagome::network {
std::function<void(outcome::result<std::shared_ptr<Stream>>)> &&cb)
override;

void request(const PeerId &peer_id,
void request(const PeerInfo &peer_info,
RequestPov,
std::function<void(outcome::result<ResponsePov>)>
&&response_handler) override;
Expand Down
2 changes: 1 addition & 1 deletion core/network/protocols/req_pov_protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace kagome::network {

class IReqPovProtocol : public ProtocolBase {
public:
virtual void request(const PeerId &peer_id,
virtual void request(const PeerInfo &peer_info,
RequestPov,
std::function<void(outcome::result<ResponsePov>)>
&&response_handler) = 0;
Expand Down
1 change: 1 addition & 0 deletions core/parachain/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ add_library(validator_parachain
availability/bitfield/signer.cpp
availability/bitfield/store_impl.cpp
availability/erasure_coding_error.cpp
availability/fetch/fetch_impl.cpp
availability/recovery/recovery_impl.cpp
availability/store/store_impl.cpp
backing/store_impl.cpp
Expand Down
82 changes: 56 additions & 26 deletions core/parachain/approval/approval_distribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,30 @@ namespace kagome::parachain {
}
});

chain_sub_ = std::make_shared<primitives::events::ChainEventSubscriber>(
peer_view_->intoChainEventsEngine());
chain_sub_->subscribe(
chain_sub_->generateSubscriptionSetId(),
primitives::events::ChainEventType::kDeactivateAfterFinalization);
chain_sub_->setCallback(
[wptr{weak_from_this()}](
auto /*set_id*/,
auto && /*internal_obj*/,
auto /*event_type*/,
const primitives::events::ChainEventParams &event) {
if (auto self = wptr.lock()) {
if (auto const value = if_type<
const primitives::events::RemoveAfterFinalizationParams>(
event)) {
for (auto const &lost : value->get()) {
self->logger_->trace(
"Cleaning up stale pending messages.(block hash={})", lost);
self->pending_known_.erase(lost);
}
}
}
});

return true;
}

Expand Down Expand Up @@ -1071,16 +1095,6 @@ namespace kagome::parachain {
[[maybe_unused]] auto &_ = pending_known_[result.value()];
}

for (auto it = pending_known_.begin(); it != pending_known_.end();) {
if (updated.view.contains(it->first)) {
++it;
} else {
logger_->trace("Cleaning up stale pending messages.(block hash={})",
it->first);
it = pending_known_.erase(it);
}
}

handle_new_head(
result.value(),
updated,
Expand Down Expand Up @@ -1397,7 +1411,31 @@ namespace kagome::parachain {
source ? source->get().toBase58() : "our",
block_hash,
validator_index);

auto &entry = opt_entry->get();
if (claimed_candidate_index >= entry.candidates.size()) {
logger_->warn(
"Unexpected candidate entry. (candidate index={}, block hash={})",
claimed_candidate_index,
block_hash);
return;
}

auto &candidate_entry = entry.candidates[claimed_candidate_index];
if (auto it = candidate_entry.messages.find(validator_index);
it != candidate_entry.messages.end()) {
if (auto state{boost::get<DistribApprovalStateApproved>(
&it->second.approval_state)}) {
logger_->warn(
"Already have approved state. (candidate index={}, "
"block hash={}, validator index={})",
claimed_candidate_index,
block_hash,
validator_index);
return;
}
}

if (source) {
/// TODO(iceseer): vector-clock for knowledge
switch (
Expand Down Expand Up @@ -1430,15 +1468,6 @@ namespace kagome::parachain {
}

auto const local = !source;
if (claimed_candidate_index >= entry.candidates.size()) {
logger_->warn(
"Unexpected candidate entry. (candidate index={}, block hash={})",
claimed_candidate_index,
block_hash);
return;
}

auto &candidate_entry = entry.candidates[claimed_candidate_index];
[[maybe_unused]] auto &message_state =
candidate_entry.messages
.emplace(validator_index,
Expand Down Expand Up @@ -1867,12 +1896,12 @@ namespace kagome::parachain {
auto const block_number = block_entry.block_number;
auto const tick_now = ::tickNow();

logger_->info(
"Advance approval state.(candidate {}, block {}, "
"validator {})",
candidate_hash,
block_hash,
validator_index);
SL_TRACE(logger_,
"Advance approval state.(candidate {}, block {}, "
"validator {})",
candidate_hash,
block_hash,
validator_index);

auto result = approval_status(block_entry, candidate_entry);
if (!result) {
Expand Down Expand Up @@ -1978,7 +2007,8 @@ namespace kagome::parachain {
primitives::BlockNumber block_number,
CandidateHash const &candidate_hash,
Tick tick) {
logger_->info(
SL_TRACE(
logger_,
"Scheduling wakeup. Block hash {}, candidate hash {}, block number {}, "
"tick {}",
block_hash,
Expand Down
2 changes: 2 additions & 0 deletions core/parachain/approval/approval_distribution.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,8 @@ namespace kagome::parachain {
const ApprovalVotingSubsystem config_;
std::shared_ptr<network::PeerView> peer_view_;
network::PeerView::MyViewSubscriberPtr my_view_sub_;
std::shared_ptr<primitives::events::ChainEventSubscriber> chain_sub_;

Store<StorePair<primitives::BlockNumber, std::unordered_set<Hash>>,
StorePair<CandidateHash, CandidateEntry>,
StorePair<RelayHash, BlockEntry>,
Expand Down
44 changes: 30 additions & 14 deletions core/parachain/availability/bitfield/signer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ namespace kagome::parachain {
std::shared_ptr<libp2p::basic::Scheduler> scheduler,
std::shared_ptr<runtime::ParachainHost> parachain_api,
std::shared_ptr<AvailabilityStore> store,
std::shared_ptr<Fetch> fetch,
std::shared_ptr<BitfieldStore> bitfield_store)
: hasher_{std::move(hasher)},
signer_factory_{std::move(signer_factory)},
scheduler_{std::move(scheduler)},
parachain_api_{std::move(parachain_api)},
store_{std::move(store)},
fetch_{std::move(fetch)},
bitfield_store_{std::move(bitfield_store)} {}

void BitfieldSigner::start(
Expand Down Expand Up @@ -62,31 +64,28 @@ namespace kagome::parachain {
broadcast_ = std::move(callback);
}

outcome::result<void> BitfieldSigner::sign(const ValidatorSigner &signer) {
outcome::result<void> BitfieldSigner::sign(const ValidatorSigner &signer,
const Candidates &candidates) {
BlockHash const &relay_parent = signer.relayParent();
scale::BitVec bitfield;
OUTCOME_TRY(cores, parachain_api_->availability_cores(relay_parent));
bitfield.bits.reserve(cores.size());
for (auto &core : cores) {
auto occupied = boost::get<runtime::OccupiedCore>(&core);
if (occupied) {
bitfield.bits.reserve(candidates.size());
for (auto &candidate : candidates) {
bitfield.bits.push_back(
candidate && store_->hasChunk(*candidate, signer.validatorIndex()));
if (candidate) {
SL_TRACE(logger_,
"Signing bitfields.(relay_parent={}, validator index={}, has "
"chunk={})",
relay_parent,
signer.validatorIndex(),
store_->hasChunk(occupied->candidate_hash,
signer.validatorIndex()));
bitfield.bits.back());
} else {
SL_TRACE(logger_,
"Signing bitfields.(relay_parent={}, validator index={}, NOT "
"OCCUPIED)",
relay_parent,
signer.validatorIndex());
}
bitfield.bits.push_back(occupied != nullptr
&& store_->hasChunk(occupied->candidate_hash,
signer.validatorIndex()));
}

OUTCOME_TRY(signed_bitfield, signer.sign(bitfield));
Expand All @@ -104,11 +103,28 @@ namespace kagome::parachain {
if (not signer.has_value()) {
return outcome::success();
}
// TODO(turuslan): fetch_chunks(candidates, signer.validatorIndex());
Candidates candidates;
OUTCOME_TRY(cores, parachain_api_->availability_cores(relay_parent));
OUTCOME_TRY(
session,
parachain_api_->session_info(relay_parent, signer->getSessionIndex()));
candidates.reserve(cores.size());
for (auto &core : cores) {
if (auto occupied = boost::get<runtime::OccupiedCore>(&core)) {
candidates.emplace_back(occupied->candidate_hash);
fetch_->fetch(
relay_parent, signer->validatorIndex(), *occupied, *session);
} else {
candidates.emplace_back(std::nullopt);
}
}

scheduler_->schedule(
[weak = weak_from_this(), signer{std::move(*signer)}]() mutable {
[weak = weak_from_this(),
signer{std::move(*signer)},
candidates{std::move(candidates)}]() mutable {
if (auto self = weak.lock()) {
auto r = self->sign(signer);
auto r = self->sign(signer, candidates);
if (r.has_error()) {
SL_WARN(log(), "sign error {}", r.error());
}
Expand Down
Loading