diff --git a/core/blockchain/impl/block_tree_impl.cpp b/core/blockchain/impl/block_tree_impl.cpp index 1a3d1a176b..4328303248 100644 --- a/core/blockchain/impl/block_tree_impl.cpp +++ b/core/blockchain/impl/block_tree_impl.cpp @@ -352,7 +352,7 @@ namespace kagome::blockchain { SL_DEBUG(log, "Last finalized block #{}", tree->depth); auto meta = std::make_shared(tree, last_finalized_justification); - auto *block_tree = + std::shared_ptr block_tree( new BlockTreeImpl(std::move(header_repo), std::move(storage), std::make_unique(tree, meta), @@ -364,7 +364,7 @@ namespace kagome::blockchain { std::move(runtime_core), std::move(changes_tracker), std::move(babe_util), - std::move(justification_storage_policy)); + std::move(justification_storage_policy))); // Add non-finalized block to the block tree for (auto &e : collected) { @@ -382,7 +382,7 @@ namespace kagome::blockchain { log, "Existing non-finalized block {} is added to block tree", block); } - return std::shared_ptr(block_tree); + return block_tree; } outcome::result BlockTreeImpl::recover( diff --git a/core/blockchain/impl/block_tree_impl.hpp b/core/blockchain/impl/block_tree_impl.hpp index 82d7475eef..4c127abc81 100644 --- a/core/blockchain/impl/block_tree_impl.hpp +++ b/core/blockchain/impl/block_tree_impl.hpp @@ -49,7 +49,8 @@ namespace kagome::blockchain { class TreeNode; class CachedTree; - class BlockTreeImpl : public BlockTree { + class BlockTreeImpl : public BlockTree, + public std::enable_shared_from_this { public: /// Create an instance of block tree static outcome::result> create( diff --git a/core/common/visitor.hpp b/core/common/visitor.hpp index 99789aeb98..c9d35268d9 100644 --- a/core/common/visitor.hpp +++ b/core/common/visitor.hpp @@ -9,6 +9,7 @@ #include // for std::decay #include // for std::forward +#include #include // for boost::apply_visitor namespace kagome { @@ -85,6 +86,13 @@ namespace kagome { std::forward(variant)); } + template + constexpr std::optional> if_type( + TVariant &&variant) { + if (auto ptr = boost::get(&variant)) return *ptr; + return std::nullopt; + } + /// apply Matcher to optional T template constexpr decltype(auto) match(T &&t, Matcher &&m) { diff --git a/core/injector/application_injector.cpp b/core/injector/application_injector.cpp index 7c0dc4673b..814bc0de06 100644 --- a/core/injector/application_injector.cpp +++ b/core/injector/application_injector.cpp @@ -103,6 +103,7 @@ #include "network/impl/sync_protocol_observer_impl.hpp" #include "network/impl/synchronizer_impl.hpp" #include "network/impl/transactions_transmitter_impl.hpp" +#include "network/peer_view.hpp" #include "network/sync_protocol_observer.hpp" #include "offchain/impl/offchain_local_storage.hpp" #include "offchain/impl/offchain_persistent_storage.hpp" @@ -112,6 +113,7 @@ #include "outcome/outcome.hpp" #include "parachain/availability/bitfield/store_impl.hpp" #include "parachain/availability/store/store_impl.hpp" +#include "parachain/thread_pool.hpp" #include "parachain/validator/parachain_observer.hpp" #include "parachain/validator/parachain_processor.hpp" #include "runtime/binaryen/binaryen_memory_provider.hpp" @@ -724,6 +726,35 @@ namespace { return instance; } + template + sptr get_peer_view(const Injector &injector) { + auto get_instance = [&]() { + return std::make_shared( + injector.template create< + primitives::events::ChainSubscriptionEnginePtr>(), + injector + .template create>(), + injector.template create()); + }; + + static auto instance = get_instance(); + return instance; + } + + template + sptr get_thread_pool(const Injector &injector) { + auto get_instance = [&]() { + auto ptr = std::make_shared( + injector + .template create>(), + 10ull); + return ptr; + }; + + static auto instance = get_instance(); + return instance; + } + template sptr get_parachain_processor_impl( const Injector &injector) { @@ -1219,6 +1250,10 @@ namespace { [](auto const &injector) { return get_parachain_processor_impl(injector); }), + di::bind.to( + [](auto const &injector) { return get_thread_pool(injector); }), + di::bind.to( + [](auto const &injector) { return get_peer_view(injector); }), di::bind.to( [](auto const &injector) { auto storage = @@ -1594,6 +1629,10 @@ namespace kagome::injector { return pimpl_->injector_.create>(); } + std::shared_ptr KagomeNodeInjector::injectThreadPool() { + return pimpl_->injector_.create>(); + } + std::shared_ptr KagomeNodeInjector::injectBabe() { return pimpl_->injector_.create>(); } diff --git a/core/injector/application_injector.hpp b/core/injector/application_injector.hpp index 606a3ba56a..b1248fdf94 100644 --- a/core/injector/application_injector.hpp +++ b/core/injector/application_injector.hpp @@ -18,6 +18,10 @@ namespace soralog { } namespace kagome { + namespace thread { + struct ThreadPool; + } + namespace application { class AppConfiguration; class ChainSpec; @@ -50,7 +54,6 @@ namespace kagome { class Executor; } - namespace api { class ApiService; } @@ -66,7 +69,7 @@ namespace kagome { namespace blockchain { class BlockStorage; class BlockTree; - } + } // namespace blockchain namespace storage::trie { class TrieStorage; @@ -103,6 +106,7 @@ namespace kagome::injector { std::shared_ptr injectParachainObserver(); std::shared_ptr injectParachainProcessor(); + std::shared_ptr injectThreadPool(); std::shared_ptr injectGrandpa(); std::shared_ptr injectLoggingSystem(); std::shared_ptr injectTrieStorage(); diff --git a/core/network/impl/CMakeLists.txt b/core/network/impl/CMakeLists.txt index 595a44a272..a399d7ff5a 100644 --- a/core/network/impl/CMakeLists.txt +++ b/core/network/impl/CMakeLists.txt @@ -79,6 +79,7 @@ add_library(peer_manager target_link_libraries(peer_manager logger scale_libp2p_types + peer_view ) add_library(reputation_repository @@ -88,3 +89,10 @@ target_link_libraries(reputation_repository p2p::p2p_peer_id p2p::p2p_basic_scheduler ) + +add_library(peer_view + peer_view.cpp + ) +target_link_libraries(peer_view + p2p::p2p_peer_id + ) diff --git a/core/network/impl/peer_view.cpp b/core/network/impl/peer_view.cpp new file mode 100644 index 0000000000..90e01a3e7a --- /dev/null +++ b/core/network/impl/peer_view.cpp @@ -0,0 +1,99 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "network/peer_view.hpp" +#include "blockchain/block_tree.hpp" +#include "common/visitor.hpp" + +namespace kagome::network { + + PeerView::PeerView( + const primitives::events::ChainSubscriptionEnginePtr &chain_events_engine, + std::shared_ptr asmgr, + std::shared_ptr block_tree) + : chain_events_engine_{chain_events_engine}, + my_view_update_observable_{ + std::make_shared()}, + remote_view_update_observable_{ + std::make_shared()}, + block_tree_{std::move(block_tree)} { + BOOST_ASSERT(chain_events_engine_); + BOOST_ASSERT(block_tree_); + asmgr->takeControl(*this); + } + + bool PeerView::start() { + return true; + } + + void PeerView::stop() { + if (chain_sub_) { + chain_sub_->unsubscribe(); + } + chain_sub_.reset(); + } + + bool PeerView::prepare() { + chain_sub_ = std::make_shared( + chain_events_engine_); + chain_sub_->subscribe(chain_sub_->generateSubscriptionSetId(), + primitives::events::ChainEventType::kNewHeads); + chain_sub_->setCallback( + [wptr{weak_from_this()}]( + auto /*set_id*/, + auto && /*internal_obj*/, + auto /*event_type*/, + const primitives::events::ChainEventParams &event) { + if (auto self = wptr.lock()) { + if (auto const value = + if_type( + event)) { + self->updateMyView( + View{.heads_ = self->block_tree_->getLeaves(), + .finalized_number_ = + self->block_tree_->getLastFinalized().number}); + } + } + }); + return true; + } + + PeerView::MyViewSubscriptionEnginePtr PeerView::getMyViewObservable() { + BOOST_ASSERT(my_view_update_observable_); + return my_view_update_observable_; + } + + PeerView::PeerViewSubscriptionEnginePtr PeerView::getRemoteViewObservable() { + BOOST_ASSERT(remote_view_update_observable_); + return remote_view_update_observable_; + } + + void PeerView::updateMyView(network::View &&view) { + BOOST_ASSERT(my_view_update_observable_); + std::sort(view.heads_.begin(), view.heads_.end()); + if (my_view_ != view) { + my_view_ = std::move(view); + + BOOST_ASSERT(my_view_); + my_view_update_observable_->notify(EventType::kViewUpdated, *my_view_); + } + } + + void PeerView::updateRemoteView(const PeerId &peer_id, network::View &&view) { + auto it = remote_view_.find(peer_id); + if (it == remote_view_.end() || it->second != view) { + auto &ref = remote_view_[peer_id]; + ref = std::move(view); + remote_view_update_observable_->notify( + EventType::kViewUpdated, peer_id, ref); + } + } + + std::optional> PeerView::getMyView() + const { + return my_view_; + } + +} // namespace kagome::network diff --git a/core/network/peer_view.hpp b/core/network/peer_view.hpp new file mode 100644 index 0000000000..966f675ab6 --- /dev/null +++ b/core/network/peer_view.hpp @@ -0,0 +1,80 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ +#ifndef KAGOME_PEER_VIEW +#define KAGOME_PEER_VIEW + +#include +#include +#include + +#include "application/app_state_manager.hpp" +#include "blockchain/block_tree.hpp" +#include "network/types/collator_messages.hpp" +#include "outcome/outcome.hpp" +#include "primitives/event_types.hpp" +#include "subscription/subscriber.hpp" +#include "subscription/subscription_engine.hpp" +#include "utils/non_copyable.hpp" + +namespace kagome::network { + + class PeerView : public NonCopyable, + public NonMovable, + public std::enable_shared_from_this { + public: + enum struct EventType : uint32_t { kViewUpdated }; + + using PeerId = libp2p::peer::PeerId; + + using MyViewSubscriptionEngine = + subscription::SubscriptionEngine; + using MyViewSubscriptionEnginePtr = + std::shared_ptr; + using MyViewSubscriber = MyViewSubscriptionEngine::SubscriberType; + using MyViewSubscriberPtr = std::shared_ptr; + + using PeerViewSubscriptionEngine = subscription:: + SubscriptionEngine; + using PeerViewSubscriptionEnginePtr = + std::shared_ptr; + using PeerViewSubscriber = PeerViewSubscriptionEngine::SubscriberType; + using PeerViewSubscriberPtr = std::shared_ptr; + + PeerView(const primitives::events::ChainSubscriptionEnginePtr + &chain_events_engine, + std::shared_ptr asmgr, + std::shared_ptr block_tree); + virtual ~PeerView() = default; + + /** + * Object lifetime control subsystem. + */ + bool start(); + void stop(); + bool prepare(); + + MyViewSubscriptionEnginePtr getMyViewObservable(); + PeerViewSubscriptionEnginePtr getRemoteViewObservable(); + + void updateRemoteView(const PeerId &peer_id, network::View &&view); + std::optional> getMyView() const; + + private: + void updateMyView(network::View &&view); + + primitives::events::ChainSubscriptionEnginePtr chain_events_engine_; + std::shared_ptr chain_sub_; + + MyViewSubscriptionEnginePtr my_view_update_observable_; + PeerViewSubscriptionEnginePtr remote_view_update_observable_; + + std::optional my_view_; + std::unordered_map remote_view_; + std::shared_ptr block_tree_; + }; + +} // namespace kagome::network + +#endif // KAGOME_PEER_VIEW diff --git a/core/network/types/collator_messages.hpp b/core/network/types/collator_messages.hpp index e918097b85..85eb5f02ce 100644 --- a/core/network/types/collator_messages.hpp +++ b/core/network/types/collator_messages.hpp @@ -219,6 +219,29 @@ namespace kagome::network { /// Signed availability bitfield. using SignedBitfield = Signed; + /// A succinct representation of a peer's view. This consists of a bounded + /// amount of chain heads + /// and the highest known finalized block number. + /// + /// Up to `N` (5?) chain heads. + /// The rust representation: + /// https://github.com/paritytech/polkadot/blob/master/node/network/protocol/src/lib.rs#L160 + struct View { + SCALE_TIE(2); + + /// A bounded amount of chain heads. + /// Invariant: Sorted. + std::vector heads_; + + /// The highest known finalized block number. + primitives::BlockNumber finalized_number_; + + bool contains(const primitives::BlockHash &hash) const { + auto const it = std::lower_bound(heads_.begin(), heads_.end(), hash); + return it != heads_.end() && *it == hash; + } + }; + /** * Collator -> Validator and Validator -> Collator if statement message. * Type of the appropriate message. diff --git a/core/parachain/CMakeLists.txt b/core/parachain/CMakeLists.txt index 92d8c2e899..acd8328333 100644 --- a/core/parachain/CMakeLists.txt +++ b/core/parachain/CMakeLists.txt @@ -13,4 +13,5 @@ target_link_libraries(validator_parachain req_collation_protocol collation_protocol protocol_error + peer_view ) diff --git a/core/parachain/thread_pool.hpp b/core/parachain/thread_pool.hpp new file mode 100644 index 0000000000..3ce1f16b1a --- /dev/null +++ b/core/parachain/thread_pool.hpp @@ -0,0 +1,251 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef KAGOME_THREAD_POOL_HPP +#define KAGOME_THREAD_POOL_HPP + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "application/app_state_manager.hpp" + +namespace kagome::thread { + + template + struct ThreadQueueContext { + template + [[maybe_unused]] ThreadQueueContext(D &&); + template + [[maybe_unused]] void make_call(F &&func); + }; + + template + inline auto createThreadQueueContext(T &&t) { + return ThreadQueueContext>{std::forward(t)}; + } + + template <> + struct ThreadQueueContext> { + using Type = std::weak_ptr; + Type t; + + template + ThreadQueueContext(D &&arg) : t{std::forward(arg)} {} + + template + void make_call(F &&func) { + if (auto call_context = t.lock()) { + boost::asio::post(*call_context, std::forward(func)); + } + } + }; + + template <> + struct ThreadQueueContext> { + using Type = ThreadQueueContext>; + Type t; + + template + ThreadQueueContext(D &&arg) : t{std::forward(arg)} {} + + template + void make_call(F &&func) { + return t.template make_call(std::forward(func)); + } + }; + + struct ThreadPool final : std::enable_shared_from_this { + using WorkersContext = boost::asio::io_context; + using WorkGuard = boost::asio::executor_work_guard< + boost::asio::io_context::executor_type>; + + ThreadPool() = delete; + ThreadPool(const ThreadPool &) = delete; + ThreadPool(ThreadPool &&) = delete; + ThreadPool &operator=(const ThreadPool &) = delete; + ThreadPool &operator=(ThreadPool &&) = delete; + + ThreadPool(std::shared_ptr asmgr, + size_t thread_count = 5ull) + : thread_count_{thread_count} { + BOOST_ASSERT(thread_count_ > 0); + asmgr->takeControl(*this); + } + + ThreadPool(size_t thread_count = 5ull) : thread_count_{thread_count} { + BOOST_ASSERT(thread_count_ > 0); + } + + ~ThreadPool() { + /// check that all workers are stopped. + BOOST_ASSERT(workers_.empty()); + } + + bool prepare() { + context_ = std::make_shared(); + work_guard_ = std::make_shared(context_->get_executor()); + workers_.reserve(thread_count_); + return true; + } + + bool start() { + BOOST_ASSERT(context_); + BOOST_ASSERT(work_guard_); + for (size_t ix = 0; ix < thread_count_; ++ix) { + workers_.emplace_back( + [wptr{this->weak_from_this()}, context{context_}]() { + if (auto self = wptr.lock()) { + self->logger_->debug("Started thread worker with id: {}", + std::this_thread::get_id()); + } + context->run(); + }); + } + return true; + } + + void stop() { + work_guard_.reset(); + if (context_) { + context_->stop(); + } + for (auto &worker : workers_) { + if (worker.joinable()) { + worker.join(); + } + } + workers_.clear(); + } + + template + void execute(std::pair &&t, Args &&...args) { + contextCall( + std::move(t.first), + [func{std::move(t.second)}, + forwarding_func{bindArgs(std::forward(args)...)}]() mutable { + forwarding_func(func()); + }); + } + + template + void execute(std::pair &&t) { + contextCall(std::move(t.first), std::move(t.second)); + } + + template + void execute(F &&func, Args &&...args) { + contextCall( + [func{std::forward(func)}, + forwarding_func{bindArgs(std::forward(args)...)}]() mutable { + forwarding_func(func()); + }); + } + + template + void execute(F &&func) { + contextCall(std::forward(func)); + } + + private: + friend struct ThreadQueueContext; + + template + void contextCall(ThreadQueueContext &&t, F &&f) { + t.make_call(std::forward(f)); + } + + template + void contextCall(F &&f) { + ThreadQueueContext>(context_) + .template make_call(std::forward(f)); + } + + template + void executeI(outcome::result &&r, std::pair &&t) { + if (r.has_value()) { + contextCall( + std::move(t.first), + [r{std::move(r.value())}, func{std::move(t.second)}]() mutable { + func(std::move(r)); + }); + } + } + + template + void executeI(outcome::result &&r, F &&func) { + if (r.has_value()) { + contextCall( + [r{std::move(r.value())}, func{std::forward(func)}]() mutable { + std::forward(func)(std::move(r)); + }); + } + } + + template + void executeI(outcome::result &&r, std::pair &&t, Args &&...args) { + if (r.has_value()) { + contextCall( + std::move(t.first), + [func{std::move(t.second)}, + r{std::move(r.value())}, + forwarding_func{bindArgs(std::forward(args)...)}]() mutable { + forwarding_func(func(std::move(r))); + }); + } + } + + template + void executeI(outcome::result &&r, F &&func, Args &&...args) { + if (r.has_value()) { + contextCall( + [func{std::forward(func)}, + r{std::move(r.value())}, + forwarding_func{bindArgs(std::forward(args)...)}]() mutable { + forwarding_func(std::forward(func)(std::move(r))); + }); + } + } + + template + auto bindArgs(Args &&...args) { + return std::bind( + [wptr{weak_from_this()}](auto &&...args) mutable { + if (auto self = wptr.lock()) self->executeI(std::move(args)...); + }, + std::placeholders::_1, + std::move(args)...); + } + + const size_t thread_count_; + std::shared_ptr context_; + std::shared_ptr work_guard_; + std::vector workers_; + log::Logger logger_ = log::createLogger("ThreadPool", "thread"); + }; + + template <> + struct ThreadQueueContext { + using Type = ThreadQueueContext>; + Type t; + + ThreadQueueContext(const ThreadPool &arg) : t{arg.context_} {} + + template + void make_call(F &&func) { + return t.template make_call(std::forward(func)); + } + }; + +} // namespace kagome::thread + +#endif // KAGOME_THREAD_POOL_HPP diff --git a/core/primitives/math.hpp b/core/primitives/math.hpp index 535dfece50..8f0cbd8004 100644 --- a/core/primitives/math.hpp +++ b/core/primitives/math.hpp @@ -6,6 +6,7 @@ #ifndef KAGOME_MATH_HPP #define KAGOME_MATH_HPP +#include #include namespace kagome::math { @@ -24,6 +25,16 @@ namespace kagome::math { return (t + (X - 1)) & ~(X - 1); } + template + inline constexpr T sat_sub_unsigned(T x, T y) { + static_assert(std::numeric_limits::is_integer + && !std::numeric_limits::is_signed, + "Value must be integer and unsigned!"); + auto res = x - y; + res &= -(res <= x); + return res; + } + } // namespace kagome::math #endif // KAGOME_MATH_HPP diff --git a/core/primitives/transcript.hpp b/core/primitives/transcript.hpp index b73d39614e..03897d6ac7 100644 --- a/core/primitives/transcript.hpp +++ b/core/primitives/transcript.hpp @@ -10,6 +10,21 @@ namespace kagome::primitives { + template + inline void decompose(const T &value, uint8_t (&dst)[sizeof(value)]) { + static_assert(std::is_pod_v, "T must be pod!"); + static_assert(!std::is_reference_v, "T must not be a reference!"); + + for (size_t i = 0; i < sizeof(value); ++i) { +#if __BYTE_ORDER__ != __ORDER_LITTLE_ENDIAN__ + dst[sizeof(value) - i - 1] = +#else + dst[i] = +#endif + static_cast((value >> (8 * i)) & 0xff); + } + } + /** * C++ implementation of * https://github.com/dalek-cryptography/merlin @@ -17,21 +32,6 @@ namespace kagome::primitives { class Transcript final { Strobe strobe_; - template - inline void decompose(const T &value, uint8_t (&dst)[sizeof(value)]) { - static_assert(std::is_pod_v, "T must be pod!"); - static_assert(!std::is_reference_v, "T must not be a reference!"); - - for (size_t i = 0; i < sizeof(value); ++i) { -#if __BYTE_ORDER__ != __ORDER_LITTLE_ENDIAN__ - dst[sizeof(value) - i - 1] = -#else - dst[i] = -#endif - static_cast((value >> (8 * i)) & 0xff); - } - } - public: Transcript() = default; @@ -67,6 +67,24 @@ namespace kagome::primitives { append_message(label, tmp); } + /// Fill the supplied buffer with the verifier's challenge bytes. + /// + /// The `label` parameter is metadata about the challenge, and is + /// also appended to the transcript. See the [Transcript + /// Protocols](https://merlin.cool/use/protocol.html) section of + /// the Merlin website for details on labels. + template + void challenge_bytes(const T (&label)[N], K (&dest)[M]) { + const uint32_t data_len = sizeof(dest); + strobe_.metaAd(label); + + uint8_t tmp[sizeof(data_len)]; + decompose(data_len, tmp); + + strobe_.metaAd(tmp); + strobe_.template prf(dest); + } + auto data() { return strobe_.data(); } diff --git a/core/subscription/subscriber.hpp b/core/subscription/subscriber.hpp index b9eba4b6e6..b990f66c0b 100644 --- a/core/subscription/subscriber.hpp +++ b/core/subscription/subscriber.hpp @@ -64,9 +64,9 @@ namespace kagome::subscription { public: template - explicit Subscriber(SubscriptionEnginePtr &ptr, + explicit Subscriber(SubscriptionEnginePtr ptr, SubscriberConstructorArgs &&...args) - : engine_(ptr), + : engine_(std::move(ptr)), object_(std::forward(args)...) {} ~Subscriber() {