Skip to content

Commit

Permalink
Merge 4a7f53c into e47d231
Browse files Browse the repository at this point in the history
  • Loading branch information
iceseer authored Nov 8, 2022
2 parents e47d231 + 4a7f53c commit 1f5948a
Show file tree
Hide file tree
Showing 14 changed files with 566 additions and 23 deletions.
6 changes: 3 additions & 3 deletions core/blockchain/impl/block_tree_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ namespace kagome::blockchain {
SL_DEBUG(log, "Last finalized block #{}", tree->depth);
auto meta = std::make_shared<TreeMeta>(tree, last_finalized_justification);

auto *block_tree =
std::shared_ptr<BlockTreeImpl> block_tree(
new BlockTreeImpl(std::move(header_repo),
std::move(storage),
std::make_unique<CachedTree>(tree, meta),
Expand All @@ -364,7 +364,7 @@ namespace kagome::blockchain {
std::move(runtime_core),
std::move(changes_tracker),
std::move(babe_util),
std::move(justification_storage_policy));
std::move(justification_storage_policy)));

// Add non-finalized block to the block tree
for (auto &e : collected) {
Expand All @@ -382,7 +382,7 @@ namespace kagome::blockchain {
log, "Existing non-finalized block {} is added to block tree", block);
}

return std::shared_ptr<BlockTreeImpl>(block_tree);
return block_tree;
}

outcome::result<void> BlockTreeImpl::recover(
Expand Down
3 changes: 2 additions & 1 deletion core/blockchain/impl/block_tree_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ namespace kagome::blockchain {
class TreeNode;
class CachedTree;

class BlockTreeImpl : public BlockTree {
class BlockTreeImpl : public BlockTree,
public std::enable_shared_from_this<BlockTreeImpl> {
public:
/// Create an instance of block tree
static outcome::result<std::shared_ptr<BlockTreeImpl>> create(
Expand Down
8 changes: 8 additions & 0 deletions core/common/visitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <type_traits> // for std::decay
#include <utility> // for std::forward

#include <boost/variant.hpp>
#include <boost/variant/apply_visitor.hpp> // for boost::apply_visitor

namespace kagome {
Expand Down Expand Up @@ -85,6 +86,13 @@ namespace kagome {
std::forward<TVariant>(variant));
}

template <typename TReturn, typename TVariant>
constexpr std::optional<std::reference_wrapper<TReturn>> if_type(
TVariant &&variant) {
if (auto ptr = boost::get<TReturn>(&variant)) return *ptr;
return std::nullopt;
}

/// apply Matcher to optional T
template <typename T, typename Matcher>
constexpr decltype(auto) match(T &&t, Matcher &&m) {
Expand Down
39 changes: 39 additions & 0 deletions core/injector/application_injector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
#include "network/impl/sync_protocol_observer_impl.hpp"
#include "network/impl/synchronizer_impl.hpp"
#include "network/impl/transactions_transmitter_impl.hpp"
#include "network/peer_view.hpp"
#include "network/sync_protocol_observer.hpp"
#include "offchain/impl/offchain_local_storage.hpp"
#include "offchain/impl/offchain_persistent_storage.hpp"
Expand All @@ -112,6 +113,7 @@
#include "outcome/outcome.hpp"
#include "parachain/availability/bitfield/store_impl.hpp"
#include "parachain/availability/store/store_impl.hpp"
#include "parachain/thread_pool.hpp"
#include "parachain/validator/parachain_observer.hpp"
#include "parachain/validator/parachain_processor.hpp"
#include "runtime/binaryen/binaryen_memory_provider.hpp"
Expand Down Expand Up @@ -724,6 +726,35 @@ namespace {
return instance;
}

template <typename Injector>
sptr<network::PeerView> get_peer_view(const Injector &injector) {
auto get_instance = [&]() {
return std::make_shared<network::PeerView>(
injector.template create<
primitives::events::ChainSubscriptionEnginePtr>(),
injector
.template create<std::shared_ptr<application::AppStateManager>>(),
injector.template create<blockchain::BlockTree>());
};

static auto instance = get_instance();
return instance;
}

template <typename Injector>
sptr<thread::ThreadPool> get_thread_pool(const Injector &injector) {
auto get_instance = [&]() {
auto ptr = std::make_shared<thread::ThreadPool>(
injector
.template create<std::shared_ptr<application::AppStateManager>>(),
10ull);
return ptr;
};

static auto instance = get_instance();
return instance;
}

template <typename Injector>
sptr<parachain::ParachainProcessorImpl> get_parachain_processor_impl(
const Injector &injector) {
Expand Down Expand Up @@ -1219,6 +1250,10 @@ namespace {
[](auto const &injector) {
return get_parachain_processor_impl(injector);
}),
di::bind<thread::ThreadPool>.to(
[](auto const &injector) { return get_thread_pool(injector); }),
di::bind<network::PeerView>.to(
[](auto const &injector) { return get_peer_view(injector); }),
di::bind<storage::trie::TrieStorageBackend>.to(
[](auto const &injector) {
auto storage =
Expand Down Expand Up @@ -1594,6 +1629,10 @@ namespace kagome::injector {
return pimpl_->injector_.create<sptr<parachain::ParachainProcessorImpl>>();
}

std::shared_ptr<thread::ThreadPool> KagomeNodeInjector::injectThreadPool() {
return pimpl_->injector_.create<sptr<thread::ThreadPool>>();
}

std::shared_ptr<consensus::babe::Babe> KagomeNodeInjector::injectBabe() {
return pimpl_->injector_.create<sptr<consensus::babe::Babe>>();
}
Expand Down
8 changes: 6 additions & 2 deletions core/injector/application_injector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ namespace soralog {
}

namespace kagome {
namespace thread {
struct ThreadPool;
}

namespace application {
class AppConfiguration;
class ChainSpec;
Expand Down Expand Up @@ -50,7 +54,6 @@ namespace kagome {
class Executor;
}


namespace api {
class ApiService;
}
Expand All @@ -66,7 +69,7 @@ namespace kagome {
namespace blockchain {
class BlockStorage;
class BlockTree;
}
} // namespace blockchain

namespace storage::trie {
class TrieStorage;
Expand Down Expand Up @@ -103,6 +106,7 @@ namespace kagome::injector {
std::shared_ptr<parachain::ParachainObserverImpl> injectParachainObserver();
std::shared_ptr<parachain::ParachainProcessorImpl>
injectParachainProcessor();
std::shared_ptr<thread::ThreadPool> injectThreadPool();
std::shared_ptr<consensus::grandpa::Grandpa> injectGrandpa();
std::shared_ptr<soralog::LoggingSystem> injectLoggingSystem();
std::shared_ptr<storage::trie::TrieStorage> injectTrieStorage();
Expand Down
8 changes: 8 additions & 0 deletions core/network/impl/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ add_library(peer_manager
target_link_libraries(peer_manager
logger
scale_libp2p_types
peer_view
)

add_library(reputation_repository
Expand All @@ -88,3 +89,10 @@ target_link_libraries(reputation_repository
p2p::p2p_peer_id
p2p::p2p_basic_scheduler
)

add_library(peer_view
peer_view.cpp
)
target_link_libraries(peer_view
p2p::p2p_peer_id
)
99 changes: 99 additions & 0 deletions core/network/impl/peer_view.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#include "network/peer_view.hpp"
#include "blockchain/block_tree.hpp"
#include "common/visitor.hpp"

namespace kagome::network {

PeerView::PeerView(
const primitives::events::ChainSubscriptionEnginePtr &chain_events_engine,
std::shared_ptr<application::AppStateManager> asmgr,
std::shared_ptr<blockchain::BlockTree> block_tree)
: chain_events_engine_{chain_events_engine},
my_view_update_observable_{
std::make_shared<MyViewSubscriptionEngine>()},
remote_view_update_observable_{
std::make_shared<PeerViewSubscriptionEngine>()},
block_tree_{std::move(block_tree)} {
BOOST_ASSERT(chain_events_engine_);
BOOST_ASSERT(block_tree_);
asmgr->takeControl(*this);
}

bool PeerView::start() {
return true;
}

void PeerView::stop() {
if (chain_sub_) {
chain_sub_->unsubscribe();
}
chain_sub_.reset();
}

bool PeerView::prepare() {
chain_sub_ = std::make_shared<primitives::events::ChainEventSubscriber>(
chain_events_engine_);
chain_sub_->subscribe(chain_sub_->generateSubscriptionSetId(),
primitives::events::ChainEventType::kNewHeads);
chain_sub_->setCallback(
[wptr{weak_from_this()}](
auto /*set_id*/,
auto && /*internal_obj*/,
auto /*event_type*/,
const primitives::events::ChainEventParams &event) {
if (auto self = wptr.lock()) {
if (auto const value =
if_type<const primitives::events::HeadsEventParams>(
event)) {
self->updateMyView(
View{.heads_ = self->block_tree_->getLeaves(),
.finalized_number_ =
self->block_tree_->getLastFinalized().number});
}
}
});
return true;
}

PeerView::MyViewSubscriptionEnginePtr PeerView::getMyViewObservable() {
BOOST_ASSERT(my_view_update_observable_);
return my_view_update_observable_;
}

PeerView::PeerViewSubscriptionEnginePtr PeerView::getRemoteViewObservable() {
BOOST_ASSERT(remote_view_update_observable_);
return remote_view_update_observable_;
}

void PeerView::updateMyView(network::View &&view) {
BOOST_ASSERT(my_view_update_observable_);
std::sort(view.heads_.begin(), view.heads_.end());
if (my_view_ != view) {
my_view_ = std::move(view);

BOOST_ASSERT(my_view_);
my_view_update_observable_->notify(EventType::kViewUpdated, *my_view_);
}
}

void PeerView::updateRemoteView(const PeerId &peer_id, network::View &&view) {
auto it = remote_view_.find(peer_id);
if (it == remote_view_.end() || it->second != view) {
auto &ref = remote_view_[peer_id];
ref = std::move(view);
remote_view_update_observable_->notify(
EventType::kViewUpdated, peer_id, ref);
}
}

std::optional<std::reference_wrapper<const View>> PeerView::getMyView()
const {
return my_view_;
}

} // namespace kagome::network
80 changes: 80 additions & 0 deletions core/network/peer_view.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef KAGOME_PEER_VIEW
#define KAGOME_PEER_VIEW

#include <libp2p/peer/peer_id.hpp>
#include <memory>
#include <unordered_map>

#include "application/app_state_manager.hpp"
#include "blockchain/block_tree.hpp"
#include "network/types/collator_messages.hpp"
#include "outcome/outcome.hpp"
#include "primitives/event_types.hpp"
#include "subscription/subscriber.hpp"
#include "subscription/subscription_engine.hpp"
#include "utils/non_copyable.hpp"

namespace kagome::network {

class PeerView : public NonCopyable,
public NonMovable,
public std::enable_shared_from_this<PeerView> {
public:
enum struct EventType : uint32_t { kViewUpdated };

using PeerId = libp2p::peer::PeerId;

using MyViewSubscriptionEngine =
subscription::SubscriptionEngine<EventType, bool, network::View>;
using MyViewSubscriptionEnginePtr =
std::shared_ptr<MyViewSubscriptionEngine>;
using MyViewSubscriber = MyViewSubscriptionEngine::SubscriberType;
using MyViewSubscriberPtr = std::shared_ptr<MyViewSubscriber>;

using PeerViewSubscriptionEngine = subscription::
SubscriptionEngine<EventType, bool, PeerId, network::View>;
using PeerViewSubscriptionEnginePtr =
std::shared_ptr<PeerViewSubscriptionEngine>;
using PeerViewSubscriber = PeerViewSubscriptionEngine::SubscriberType;
using PeerViewSubscriberPtr = std::shared_ptr<PeerViewSubscriber>;

PeerView(const primitives::events::ChainSubscriptionEnginePtr
&chain_events_engine,
std::shared_ptr<application::AppStateManager> asmgr,
std::shared_ptr<blockchain::BlockTree> block_tree);
virtual ~PeerView() = default;

/**
* Object lifetime control subsystem.
*/
bool start();
void stop();
bool prepare();

MyViewSubscriptionEnginePtr getMyViewObservable();
PeerViewSubscriptionEnginePtr getRemoteViewObservable();

void updateRemoteView(const PeerId &peer_id, network::View &&view);
std::optional<std::reference_wrapper<const View>> getMyView() const;

private:
void updateMyView(network::View &&view);

primitives::events::ChainSubscriptionEnginePtr chain_events_engine_;
std::shared_ptr<primitives::events::ChainEventSubscriber> chain_sub_;

MyViewSubscriptionEnginePtr my_view_update_observable_;
PeerViewSubscriptionEnginePtr remote_view_update_observable_;

std::optional<View> my_view_;
std::unordered_map<PeerId, View> remote_view_;
std::shared_ptr<blockchain::BlockTree> block_tree_;
};

} // namespace kagome::network

#endif // KAGOME_PEER_VIEW
23 changes: 23 additions & 0 deletions core/network/types/collator_messages.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,29 @@ namespace kagome::network {
/// Signed availability bitfield.
using SignedBitfield = Signed<scale::BitVec>;

/// A succinct representation of a peer's view. This consists of a bounded
/// amount of chain heads
/// and the highest known finalized block number.
///
/// Up to `N` (5?) chain heads.
/// The rust representation:
/// https://github.com/paritytech/polkadot/blob/master/node/network/protocol/src/lib.rs#L160
struct View {
SCALE_TIE(2);

/// A bounded amount of chain heads.
/// Invariant: Sorted.
std::vector<primitives::BlockHash> heads_;

/// The highest known finalized block number.
primitives::BlockNumber finalized_number_;

bool contains(const primitives::BlockHash &hash) const {
auto const it = std::lower_bound(heads_.begin(), heads_.end(), hash);
return it != heads_.end() && *it == hash;
}
};

/**
* Collator -> Validator and Validator -> Collator if statement message.
* Type of the appropriate message.
Expand Down
Loading

0 comments on commit 1f5948a

Please sign in to comment.