Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor #1934

Merged
merged 3 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions core/authority_discovery/query/query_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "common/buffer_view.hpp"
#include "common/bytestr.hpp"
#include "crypto/sha/sha256.hpp"
#include "utils/retain_if.hpp"

OUTCOME_CPP_DEFINE_CATEGORY(kagome::authority_discovery, QueryImpl::Error, e) {
using E = decltype(e);
Expand Down Expand Up @@ -102,15 +103,10 @@ namespace kagome::authority_discovery {
OUTCOME_TRY(local_keys,
crypto_store_->getSr25519PublicKeys(
crypto::KeyTypes::AUTHORITY_DISCOVERY));
authorities.erase(
std::remove_if(authorities.begin(),
authorities.end(),
[&](const primitives::AuthorityDiscoveryId &id) {
return std::find(
local_keys.begin(), local_keys.end(), id)
!= local_keys.end();
}),
authorities.end());
retain_if(authorities, [&](const primitives::AuthorityDiscoveryId &id) {
return std::find(local_keys.begin(), local_keys.end(), id)
!= local_keys.end();
});
for (auto it = auth_to_peer_cache_.begin();
it != auth_to_peer_cache_.end();) {
if (std::find(authorities.begin(), authorities.end(), it->first)
Expand Down
9 changes: 4 additions & 5 deletions core/common/lru_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <boost/assert.hpp>

#include "outcome/outcome.hpp"
#include "utils/retain_if.hpp"

namespace kagome {

Expand Down Expand Up @@ -172,11 +173,9 @@ namespace kagome {
void erase_if(const std::function<bool(const Key &key, const Value &value)>
&predicate) {
LockGuard lg(*this);
auto it =
std::remove_if(cache_.begin(), cache_.end(), [&](const auto &item) {
return predicate(item.key, *item.value);
});
cache_.erase(it, cache_.end());
retain_if(cache_, [&](const CacheEntry &item) {
return not predicate(item.key, *item.value);
turuslan marked this conversation as resolved.
Show resolved Hide resolved
});
}

private:
Expand Down
137 changes: 71 additions & 66 deletions core/consensus/grandpa/impl/grandpa_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
#include "consensus/grandpa/impl/voting_round_impl.hpp"
#include "consensus/grandpa/vote_graph/vote_graph_impl.hpp"
#include "consensus/grandpa/voting_round_error.hpp"
#include "consensus/timeline/timeline.hpp"
#include "crypto/crypto_store/session_keys.hpp"
#include "network/peer_manager.hpp"
#include "network/reputation_repository.hpp"
#include "network/synchronizer.hpp"
#include "utils/retain_if.hpp"

namespace {
constexpr auto highestGrandpaRoundMetricName =
Expand Down Expand Up @@ -69,7 +71,8 @@ namespace kagome::consensus::grandpa {
std::shared_ptr<network::PeerManager> peer_manager,
std::shared_ptr<blockchain::BlockTree> block_tree,
std::shared_ptr<network::ReputationRepository> reputation_repository,
primitives::events::BabeStateSubscriptionEnginePtr babe_status_observable,
LazySPtr<Timeline> timeline,
primitives::events::ChainSubscriptionEnginePtr chain_sub_engine,
std::shared_ptr<Watchdog> watchdog,
WeakIoContext main_thread)
: round_time_factor_{kGossipDuration},
Expand All @@ -82,7 +85,8 @@ namespace kagome::consensus::grandpa {
peer_manager_(std::move(peer_manager)),
block_tree_(std::move(block_tree)),
reputation_repository_(std::move(reputation_repository)),
babe_status_observable_(std::move(babe_status_observable)),
timeline_{std::move(timeline)},
chain_sub_{chain_sub_engine},
execution_thread_pool_{
std::make_shared<ThreadPool>(std::move(watchdog), "grandpa", 1ull)},
internal_thread_context_{execution_thread_pool_->handler()},
Expand All @@ -98,7 +102,6 @@ namespace kagome::consensus::grandpa {
BOOST_ASSERT(synchronizer_ != nullptr);
BOOST_ASSERT(peer_manager_ != nullptr);
BOOST_ASSERT(block_tree_ != nullptr);
BOOST_ASSERT(babe_status_observable_ != nullptr);
BOOST_ASSERT(reputation_repository_ != nullptr);

BOOST_ASSERT(app_state_manager != nullptr);
Expand All @@ -117,25 +120,6 @@ namespace kagome::consensus::grandpa {
}

bool GrandpaImpl::prepare() {
babe_status_observer_ =
std::make_shared<primitives::events::BabeStateEventSubscriber>(
babe_status_observable_, false);
babe_status_observer_->subscribe(
babe_status_observer_->generateSubscriptionSetId(),
primitives::events::SyncStateEventType::kSyncState);
babe_status_observer_->setCallback(
[wself{weak_from_this()}](
auto /*set_id*/,
bool &synchronized,
auto /*event_type*/,
const primitives::events::SyncStateEventParams &event) {
if (auto self = wself.lock()) {
if (event == SyncState::SYNCHRONIZED) {
self->synchronized_once_.store(true);
}
}
});

internal_thread_context_->start();
main_thread_.start();
return true;
Expand Down Expand Up @@ -215,6 +199,13 @@ namespace kagome::consensus::grandpa {
std::chrono::minutes(1));

tryExecuteNextRound(current_round_);

chain_sub_.onHead(
[weak{weak_from_this()}](const primitives::BlockHeader &block) {
if (auto self = weak.lock()) {
self->onHead(block.blockInfo());
}
});
return true;
}

Expand Down Expand Up @@ -484,7 +475,7 @@ namespace kagome::consensus::grandpa {
}
}

if (not synchronized_once_.load()) {
if (not timeline_.get()->wasSynchronized()) {
return;
}

Expand Down Expand Up @@ -1483,12 +1474,35 @@ namespace kagome::consensus::grandpa {
if (not gc.peer_id.has_value() || gc.missing_blocks.empty()) {
return;
}
if (not timeline_.get()->wasSynchronized()) {
return;
}
post(main_thread_,
[s{synchronizer_}, blocks{gc.missing_blocks}, peer{*gc.peer_id}] {
for (auto &block : blocks) {
s->syncByBlockInfo(block, peer, nullptr, false);
}
});
waiting_blocks_.emplace_back(std::move(gc));
pruneWaitingBlocks();
}

auto grandpa_context = std::make_shared<GrandpaContext>(std::move(gc));
main_thread_.execute([wself{weak_from_this()}, grandpa_context]() mutable {
auto final = [wp{wself}](
std::shared_ptr<GrandpaContext> grandpa_context) {
if (auto self = wp.lock()) {
void GrandpaImpl::onHead(const primitives::BlockInfo &block) {
if (not timeline_.get()->wasSynchronized()) {
return;
}
REINVOKE(*internal_thread_context_, onHead, block);
auto f = [&](GrandpaContext &gc) {
if (gc.missing_blocks.erase(block) == 0) {
return true;
}
if (not gc.missing_blocks.empty()) {
return true;
}
auto f = [weak{weak_from_this()},
grandpa_context{
std::make_shared<GrandpaContext>(std::move(gc))}] {
if (auto self = weak.lock()) {
if (grandpa_context->vote.has_value()) {
auto const &peer_id = grandpa_context->peer_id.value();
auto const &vote = grandpa_context->vote.value();
Expand All @@ -1510,44 +1524,35 @@ namespace kagome::consensus::grandpa {
}
}
};
post(*internal_thread_context_, std::move(f));
return false;
};
retain_if(waiting_blocks_, f);
pruneWaitingBlocks();
}

auto do_request_ptr = std::make_shared<
std::function<void(std::shared_ptr<GrandpaContext>)>>();
auto &do_request = *do_request_ptr;

do_request =
[wp{wself},
do_request_ptr = std::move(do_request_ptr),
final = std::move(final)](
std::shared_ptr<GrandpaContext> grandpa_context) mutable {
BOOST_ASSERT(grandpa_context);
if (auto self = wp.lock()) {
auto &peer_id = grandpa_context->peer_id.value();
auto &blocks = grandpa_context->missing_blocks;
if (not blocks.empty()) {
auto it = blocks.rbegin();
auto node = blocks.extract((++it).base());
auto block = node.value();
self->synchronizer_->syncByBlockInfo(
block,
peer_id,
[wp,
grandpa_context{std::move(grandpa_context)},
do_request_ptr =
std::move(do_request_ptr)](auto res) mutable {
if (do_request_ptr != nullptr) {
auto do_request = std::move(*do_request_ptr);
do_request(std::move(grandpa_context));
}
},
true);
return;
}
final(std::move(grandpa_context));
do_request_ptr.reset();
}
};
do_request(std::move(grandpa_context));
});
void GrandpaImpl::pruneWaitingBlocks() {
auto round = [&](VoterSetId set, RoundNumber round) {
for (auto p = current_round_; p; p = p->getPreviousRound()) {
if (p->voterSetId() == set and p->roundNumber() == round) {
return true;
}
}
return false;
};
auto f = [&](const GrandpaContext &gc) {
if (gc.catch_up_response) {
return round(gc.catch_up_response->voter_set_id,
gc.catch_up_response->round_number);
}
if (gc.commit) {
return round(gc.commit->set_id, gc.commit->round);
}
if (gc.vote) {
return round(gc.vote->counter, gc.vote->round_number);
}
return false;
};
retain_if(waiting_blocks_, f);
}
} // namespace kagome::consensus::grandpa
25 changes: 13 additions & 12 deletions core/consensus/grandpa/impl/grandpa_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
#include "consensus/grandpa/grandpa.hpp"
#include "consensus/grandpa/grandpa_observer.hpp"

#include <atomic>
#include <boost/asio/io_context.hpp>
#include <libp2p/basic/scheduler.hpp>

#include "consensus/grandpa/impl/votes_cache.hpp"
#include "injector/lazy.hpp"
#include "log/logger.hpp"
#include "metrics/metrics.hpp"
#include "primitives/event_types.hpp"
Expand All @@ -28,6 +27,10 @@ namespace kagome::blockchain {
class BlockTree;
}

namespace kagome::consensus {
class Timeline;
} // namespace kagome::consensus

namespace kagome::consensus::grandpa {
class AuthorityManager;
class Environment;
Expand Down Expand Up @@ -100,8 +103,8 @@ namespace kagome::consensus::grandpa {
std::shared_ptr<network::PeerManager> peer_manager,
std::shared_ptr<blockchain::BlockTree> block_tree,
std::shared_ptr<network::ReputationRepository> reputation_repository,
primitives::events::BabeStateSubscriptionEnginePtr
babe_status_observable,
LazySPtr<Timeline> timeline,
primitives::events::ChainSubscriptionEnginePtr chain_sub_engine,
std::shared_ptr<Watchdog> watchdog,
WeakIoContext main_thread);

Expand Down Expand Up @@ -297,6 +300,8 @@ namespace kagome::consensus::grandpa {
* cannot accept precommit when there is no corresponding block)
*/
void loadMissingBlocks(GrandpaContext &&grandpa_context);
void onHead(const primitives::BlockInfo &block);
void pruneWaitingBlocks();

const size_t kVotesCacheSize = 5;

Expand All @@ -312,14 +317,8 @@ namespace kagome::consensus::grandpa {
std::shared_ptr<blockchain::BlockTree> block_tree_;
VotesCache votes_cache_{kVotesCacheSize};
std::shared_ptr<network::ReputationRepository> reputation_repository_;
primitives::events::BabeStateSubscriptionEnginePtr babe_status_observable_;
primitives::events::BabeStateEventSubscriberPtr babe_status_observer_;

std::atomic_bool synchronized_once_ =
false; // declares if initial sync was done, does not
// necessarily mean that node is currently synced.
// Needed for enabling neighbor message processing.
// By default is false
LazySPtr<Timeline> timeline_;
primitives::events::ChainSub chain_sub_;

std::shared_ptr<ThreadPool> execution_thread_pool_;
std::shared_ptr<ThreadHandler> internal_thread_context_;
Expand All @@ -333,6 +332,8 @@ namespace kagome::consensus::grandpa {
libp2p::basic::Scheduler::Handle catchup_request_timer_handle_;
libp2p::basic::Scheduler::Handle fallback_timer_handle_;

std::vector<GrandpaContext> waiting_blocks_;

// Metrics
metrics::RegistryPtr metrics_registry_ = metrics::createRegistry();
metrics::Gauge *metric_highest_round_;
Expand Down
4 changes: 4 additions & 0 deletions core/consensus/grandpa/impl/verified_justification_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ namespace kagome::consensus::grandpa {
if (set < expected_) {
return;
}
if (justification.block_info.number
<= block_tree_->getLastFinalized().number) {
return;
}
auto block_res = block_tree_->getBlockHeader(justification.block_info.hash);
if (not block_res) {
return;
Expand Down
6 changes: 0 additions & 6 deletions core/consensus/grandpa/structs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,6 @@ namespace kagome::consensus::grandpa {
std::vector<primitives::BlockHeader> votes_ancestries{};
};

/// A commit message which is an aggregate of precommits.
struct Commit {
primitives::BlockInfo vote;
GrandpaJustification justification;
};

// either prevote, precommit or primary propose
struct VoteMessage {
SCALE_TIE(3);
Expand Down
16 changes: 4 additions & 12 deletions core/consensus/grandpa/vote_graph/vote_graph_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,11 @@
#include "consensus/grandpa/chain.hpp"
#include "consensus/grandpa/vote_graph/vote_graph_error.hpp"
#include "consensus/grandpa/voting_round_error.hpp"
#include "utils/retain_if.hpp"

namespace kagome::consensus::grandpa {

namespace {
/// filters vector in-place - effectively removes all elements for which
/// func returned false.
template <typename Vector, typename Func>
inline void filter_if(Vector &&v, Func &&func) {
v.erase(std::remove_if(
v.begin(), v.end(), std::not_fn(std::forward<Func>(func))),
v.end());
}

/// check if collection contains a given item.
template <typename Collection, typename Item>
inline bool contains(const Collection &collection, const Item &item) {
Expand Down Expand Up @@ -250,7 +242,7 @@ namespace kagome::consensus::grandpa {
new_entry.descendants.end()};

// filter descendants
filter_if(prev_ancestor_entry.descendants,
retain_if(prev_ancestor_entry.descendants,
[&set](const BlockHash &hash) { return !contains(set, hash); });
prev_ancestor_entry.descendants.push_back(ancestor.hash);
}
Expand Down Expand Up @@ -348,7 +340,7 @@ namespace kagome::consensus::grandpa {
const std::optional<BlockInfo> &force_constrain,
const VoteGraph::Condition &condition) const {
auto descendants = active_node.descendants;
filter_if(descendants, [&](const BlockHash &hash) {
retain_if(descendants, [&](const BlockHash &hash) {
if (not force_constrain) {
return true;
}
Expand Down Expand Up @@ -403,7 +395,7 @@ namespace kagome::consensus::grandpa {

++best_number;
descendant_blocks.clear();
filter_if(descendants, [&](const BlockHash &hash) {
retain_if(descendants, [&](const BlockHash &hash) {
return inDirectAncestry(entries_.at(hash), *new_best, best_number);
});

Expand Down
Loading
Loading