Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parachains. PeerManagerImpl refactoring. #1400

Merged
merged 5 commits into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 136 additions & 98 deletions core/network/impl/peer_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,25 @@ OUTCOME_CPP_DEFINE_CATEGORY(kagome::network, PeerManagerImpl::Error, e) {
return "Unknown error in ChainSpecImpl";
}

namespace {

template <typename P, typename F>
bool openOutgoing(std::shared_ptr<kagome::network::StreamEngine> &se,
std::shared_ptr<P> const &protocol,
kagome::network::PeerManager::PeerInfo const &pi,
F &&func) {
BOOST_ASSERT(se);
BOOST_ASSERT(protocol);

if (se->reserveOutgoing(pi.id, protocol)) {
protocol->newOutgoingStream(pi, std::forward<F>(func));
return true;
}
return false;
}

} // namespace

namespace kagome::network {
PeerManagerImpl::PeerManagerImpl(
std::shared_ptr<application::AppStateManager> app_state_manager,
Expand Down Expand Up @@ -75,6 +94,7 @@ namespace kagome::network {
BOOST_ASSERT(router_ != nullptr);
BOOST_ASSERT(storage_ != nullptr);
BOOST_ASSERT(hasher_ != nullptr);
BOOST_ASSERT(peer_view_);
BOOST_ASSERT(reputation_repository_ != nullptr);
BOOST_ASSERT(peer_view_ != nullptr);

Expand Down Expand Up @@ -248,13 +268,16 @@ namespace kagome::network {

outcome::result<
std::pair<network::CollatorPublicKey const &, network::ParachainId>>
PeerManagerImpl::insert_advertisement(PeerState &peer_state,
ParachainState &parachain_state,
primitives::BlockHash para_hash) {
PeerManagerImpl::insertAdvertisement(PeerState &peer_state,
primitives::BlockHash para_hash) {
if (!peer_state.collator_state) return Error::UNDECLARED_COLLATOR;

if (parachain_state.our_view.count(para_hash) == 0)
auto my_view = peer_view_->getMyView();
BOOST_ASSERT(my_view);

if (!my_view->get().contains(para_hash)) {
return Error::OUT_OF_VIEW;
}

if (peer_state.collator_state.value().advertisements.count(para_hash) != 0)
return Error::DUPLICATE;
Expand Down Expand Up @@ -512,10 +535,6 @@ namespace kagome::network {
it->second.best_block = status.best_block;
}

ParachainState &PeerManagerImpl::parachainState() {
return parachain_state_;
}

void PeerManagerImpl::updatePeerState(const PeerId &peer_id,
const BlockAnnounce &announce) {
auto hash = hasher_->blake2b_256(scale::encode(announce.header).value());
Expand Down Expand Up @@ -570,6 +589,105 @@ namespace kagome::network {
queue_to_connect_.size());
}

template <typename F>
void PeerManagerImpl::openBlockAnnounceProtocol(
PeerInfo const &peer_info,
libp2p::network::ConnectionManager::ConnectionSPtr const &connection,
F &&opened_callback) {
auto block_announce_protocol = router_->getBlockAnnounceProtocol();
BOOST_ASSERT_MSG(block_announce_protocol,
"Router did not provide block announce protocol");

if (!openOutgoing(
stream_engine_,
block_announce_protocol,
peer_info,
[wp = weak_from_this(),
peer_info,
protocol = block_announce_protocol,
connection,
opened_callback{std::forward<F>(opened_callback)}](
auto &&stream_res) mutable {
auto self = wp.lock();
if (not self) {
return;
}

auto &peer_id = peer_info.id;

self->stream_engine_->dropReserveOutgoing(peer_id, protocol);
if (not stream_res.has_value()) {
self->log_->warn("Unable to create stream {} with {}: {}",
protocol->protocolName(),
peer_id,
stream_res.error());
self->connecting_peers_.erase(peer_id);
self->disconnectFromPeer(peer_id);
return;
}
PeerType peer_type = connection->isInitiator()
? PeerType::PEER_TYPE_OUT
: PeerType::PEER_TYPE_IN;

// Add to active peer list
if (auto [ap_it, added] = self->active_peers_.emplace(
peer_id, PeerDescriptor{peer_type, self->clock_->now()});
added) {
self->recently_active_peers_.insert(peer_id);

// And remove from queue
if (auto piq_it = self->peers_in_queue_.find(peer_id);
piq_it != self->peers_in_queue_.end()) {
auto qtc_it =
std::find_if(self->queue_to_connect_.cbegin(),
self->queue_to_connect_.cend(),
[&peer_id = peer_id](const auto &item) {
return peer_id == item.get();
});
self->queue_to_connect_.erase(qtc_it);
self->peers_in_queue_.erase(piq_it);
BOOST_ASSERT(self->queue_to_connect_.size()
== self->peers_in_queue_.size());

SL_DEBUG(self->log_,
"Remained peers in queue for connect: {}",
self->peers_in_queue_.size());
}
self->sync_peer_num_->set(self->active_peers_.size());
}

self->connecting_peers_.erase(peer_id);

self->reserveStreams(peer_id);
self->startPingingPeer(peer_id);

/// Process callback when opened successfully
std::forward<F>(opened_callback)(
self, peer_info, self->getPeerState(peer_id));
})) {
SL_DEBUG(log_,
"Stream {} with {} is alive or connecting",
block_announce_protocol->protocolName(),
peer_info.id);
}
}

void PeerManagerImpl::tryOpenGrandpaProtocol(PeerInfo const &peer_info,
PeerState &r_info) {
if (auto o_info_opt = getPeerState(own_peer_info_.id);
o_info_opt.has_value()) {
auto &o_info = o_info_opt.value();

// Establish outgoing grandpa stream if node synced
if (r_info.best_block.number <= o_info.get().best_block.number) {
auto grandpa_protocol = router_->getGrandpaProtocol();
BOOST_ASSERT_MSG(grandpa_protocol,
"Router did not provide grandpa protocol");
grandpa_protocol->newOutgoingStream(peer_info, [](const auto &...) {});
iceseer marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

void PeerManagerImpl::processFullyConnectedPeer(const PeerId &peer_id) {
// Skip connection to itself
if (isSelfPeer(peer_id)) {
Expand Down Expand Up @@ -625,96 +743,16 @@ namespace kagome::network {
}

PeerInfo peer_info{.id = peer_id, .addresses = {}};

auto block_announce_protocol = router_->getBlockAnnounceProtocol();
BOOST_ASSERT_MSG(block_announce_protocol,
"Router did not provide block announce protocol");

if (stream_engine_->reserveOutgoing(peer_info.id,
block_announce_protocol)) {
block_announce_protocol->newOutgoingStream(
peer_info,
[wp = weak_from_this(),
peer_info,
protocol = block_announce_protocol,
connection](auto &&stream_res) {
auto self = wp.lock();
if (not self) {
return;
}

auto &peer_id = peer_info.id;

self->stream_engine_->dropReserveOutgoing(peer_id, protocol);
if (not stream_res.has_value()) {
self->log_->warn("Unable to create stream {} with {}: {}",
protocol->protocolName(),
peer_id,
stream_res.error());
self->connecting_peers_.erase(peer_id);
self->disconnectFromPeer(peer_id);
return;
}
PeerType peer_type = connection->isInitiator()
? PeerType::PEER_TYPE_OUT
: PeerType::PEER_TYPE_IN;

// Add to active peer list
if (auto [ap_it, added] = self->active_peers_.emplace(
peer_id, PeerDescriptor{peer_type, self->clock_->now()});
added) {
self->recently_active_peers_.insert(peer_id);

// And remove from queue
if (auto piq_it = self->peers_in_queue_.find(peer_id);
piq_it != self->peers_in_queue_.end()) {
auto qtc_it =
std::find_if(self->queue_to_connect_.cbegin(),
self->queue_to_connect_.cend(),
[&peer_id = peer_id](const auto &item) {
return peer_id == item.get();
});
self->queue_to_connect_.erase(qtc_it);
self->peers_in_queue_.erase(piq_it);
BOOST_ASSERT(self->queue_to_connect_.size()
== self->peers_in_queue_.size());

SL_DEBUG(self->log_,
"Remained peers in queue for connect: {}",
self->peers_in_queue_.size());
}
self->sync_peer_num_->set(self->active_peers_.size());
}

self->connecting_peers_.erase(peer_id);

self->reserveStreams(peer_id);
self->startPingingPeer(peer_id);

// Establish outgoing grandpa stream if node synced
auto r_info_opt = self->getPeerState(peer_id);
auto o_info_opt = self->getPeerState(self->own_peer_info_.id);
if (r_info_opt.has_value() and o_info_opt.has_value()) {
auto &r_info = r_info_opt.value();
auto &o_info = o_info_opt.value();

if (r_info.get().best_block.number
<= o_info.get().best_block.number) {
auto grandpa_protocol = self->router_->getGrandpaProtocol();
BOOST_ASSERT_MSG(grandpa_protocol,
"Router did not provide grandpa protocol");
grandpa_protocol->newOutgoingStream(peer_info,
[](const auto &...) {});
}
}
});
} else {
SL_DEBUG(log_,
"Stream {} with {} is alive",
block_announce_protocol->protocolName(),
peer_id);
connecting_peers_.erase(peer_id);
}
openBlockAnnounceProtocol(
peer_info,
connection,
[](std::shared_ptr<PeerManagerImpl> &self,
PeerInfo const &peer_info,
std::optional<std::reference_wrapper<PeerState>> peer_state) {
if (peer_state.has_value()) {
self->tryOpenGrandpaProtocol(peer_info, peer_state.value().get());
}
});

auto addresses_res =
host_.getPeerRepository().getAddressRepository().getAddresses(peer_id);
Expand Down
19 changes: 11 additions & 8 deletions core/network/impl/peer_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,10 @@ namespace kagome::network {
network::CollatorPublicKey const &collator_id,
network::ParachainId para_id) override;

/** @see PeerManager::parachainState */
ParachainState &parachainState() override;

outcome::result<
std::pair<network::CollatorPublicKey const &, network::ParachainId>>
insert_advertisement(PeerState &peer_state,
ParachainState &parachain_state,
primitives::BlockHash para_hash) override;
insertAdvertisement(PeerState &peer_state,
primitives::BlockHash para_hash) override;

/** @see PeerManager::forEachPeer */
void forEachPeer(std::function<void(const PeerId &)> func) const override;
Expand Down Expand Up @@ -149,6 +145,14 @@ namespace kagome::network {

void processFullyConnectedPeer(const PeerId &peer_id);

template <typename F>
void openBlockAnnounceProtocol(
PeerInfo const &peer_info,
libp2p::network::ConnectionManager::ConnectionSPtr const &connection,
F &&opened_callback);
void tryOpenGrandpaProtocol(PeerInfo const &peer_info,
PeerState &peer_state);

/// Opens streams set for special peer (i.e. new-discovered)
void connectToPeer(const PeerId &peer_id);

Expand Down Expand Up @@ -196,8 +200,7 @@ namespace kagome::network {
metrics::Gauge *sync_peer_num_;

// parachain
ParachainState parachain_state_;
std::shared_ptr<PeerView> peer_view_;
std::shared_ptr<network::PeerView> peer_view_;

log::Logger log_;
};
Expand Down
18 changes: 4 additions & 14 deletions core/network/peer_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ namespace kagome::network {
libp2p::peer::PeerId const &peer_id;
};

/*
* Parachain state view.
*/
struct ParachainState {
std::unordered_map<BlockHash, bool> our_view;
};
using OurView = network::View;

struct PeerState {
clock::SteadyClock::TimePoint time;
Expand All @@ -49,6 +44,7 @@ namespace kagome::network {
std::optional<VoterSetId> set_id = std::nullopt;
BlockNumber last_finalized = 0;
std::optional<CollatorState> collator_state = std::nullopt;
std::optional<View> view;
};

struct StreamEngine;
Expand Down Expand Up @@ -112,14 +108,8 @@ namespace kagome::network {
*/
virtual outcome::result<
std::pair<network::CollatorPublicKey const &, network::ParachainId>>
insert_advertisement(PeerState &peer_state,
ParachainState &parachain_state,
primitives::BlockHash para_hash) = 0;

/**
* Allows to update parachains states.
*/
virtual ParachainState &parachainState() = 0;
insertAdvertisement(PeerState &peer_state,
primitives::BlockHash para_hash) = 0;

/**
* Updates collation state and stores parachain id. Should be called once
Expand Down
29 changes: 1 addition & 28 deletions core/parachain/validator/impl/parachain_observer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,34 +33,7 @@ namespace kagome::observers {

void onAdvertise(libp2p::peer::PeerId const &peer_id,
primitives::BlockHash relay_parent) override {
auto &parachain_state = pm_->parachainState();
bool const contains_para_hash =
(parachain_state.our_view.count(relay_parent) != 0);

if (!contains_para_hash) {
logger_->warn("Advertise collation out of view from peer {}", peer_id);
return;
}

auto const peer_state = pm_->getPeerState(peer_id);
if (!peer_state) {
logger_->warn("Received collation advertise from unknown peer {}",
peer_id);
return;
}

auto result = pm_->insert_advertisement(
peer_state->get(), parachain_state, std::move(relay_parent));
if (!result) {
logger_->warn(
"Insert advertisement from {} failed: {}", peer_id, result.error());
return;
}

processor_->requestCollations(
network::PendingCollation{.para_id = result.value().second,
.relay_parent = relay_parent,
.peer_id = peer_id});
/// TODO(iceseer): removed because of merge
}

void onDeclare(libp2p::peer::PeerId const &peer_id,
Expand Down
Loading