From f85e797ea95b9d41ae53d7af9887da4929b05a37 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 6 Sep 2019 17:43:03 +0200 Subject: [PATCH] core/authority-discovery: Enable authorities to discover each other (#3452) With the *authority-discovery* module an authoritative node makes itself discoverable and is able to discover other authorities. Once discovered, a node can directly connect to other authorities instead of multi-hop gossiping information. 1. **Making itself discoverable** 1. Retrieve its external addresses 2. Adds its network peer id to the addresses 3. Sign the above 4. Put the signature and the addresses on the libp2p Kademlia DHT 2. **Discovering other authorities** 1. Retrieve the current set of authorities 2. Start DHT queries for the ids of the authorities 3. Validate the signatures of the retrieved key value pairs 4. Add the retrieved external addresses as ~reserved~ priority nodes to the peerset * node/runtime: Add authority-discovery as session handler The srml/authority-discovery module implements the OneSessionHandler in order to keep its authority set in sync. This commit adds the module to the set of session handlers. * core/network: Make network worker return Dht events on poll Instead of network worker implement the Future trait, have it implement the Stream interface returning Dht events. For now these events are ignored in build_network_future but will be used by the core/authority-discovery module in subsequent commits. * *: Add scaffolding and integration for core/authority-discovery module * core/authority-discovery: Implement module logic itself --- Cargo.lock | 102 +++ Cargo.toml | 1 + core/authority-discovery/Cargo.toml | 32 + core/authority-discovery/build.rs | 3 + .../authority-discovery/primitives/src/lib.rs | 20 +- core/authority-discovery/src/error.rs | 47 ++ core/authority-discovery/src/lib.rs | 698 ++++++++++++++++++ core/authority-discovery/src/schema/dht.proto | 14 + core/network/src/lib.rs | 1 + core/network/src/protocol/event.rs | 2 + core/network/src/service.rs | 13 +- core/service/Cargo.toml | 2 + core/service/src/builder.rs | 43 +- core/service/src/lib.rs | 30 +- node/cli/Cargo.toml | 3 +- node/cli/src/service.rs | 17 + node/runtime/src/lib.rs | 38 +- srml/authority-discovery/src/lib.rs | 35 +- srml/im-online/src/lib.rs | 2 +- 19 files changed, 1041 insertions(+), 62 deletions(-) create mode 100644 core/authority-discovery/Cargo.toml create mode 100644 core/authority-discovery/build.rs create mode 100644 core/authority-discovery/src/error.rs create mode 100644 core/authority-discovery/src/lib.rs create mode 100644 core/authority-discovery/src/schema/dht.proto diff --git a/Cargo.lock b/Cargo.lock index b242c7337d920..335932c5956ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -876,6 +876,11 @@ dependencies = [ "static_assertions 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "fixedbitset" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "flate2" version = "1.0.9" @@ -2236,6 +2241,11 @@ dependencies = [ "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "multimap" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "multistream-select" version = "0.5.1" @@ -2324,6 +2334,7 @@ dependencies = [ "srml-system 2.0.0", "srml-timestamp 2.0.0", "structopt 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)", + "substrate-authority-discovery 2.0.0", "substrate-basic-authorship 2.0.0", "substrate-cli 2.0.0", "substrate-client 2.0.0", @@ -2934,6 +2945,14 @@ name = "percent-encoding" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "petgraph" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "fixedbitset 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "pin-utils" version = "0.1.0-alpha.4" @@ -3005,6 +3024,54 @@ dependencies = [ "unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "prost" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "prost-derive 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "prost-build" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", + "multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "petgraph 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)", + "prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "prost-types 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "which 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "prost-derive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.15.42 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "prost-types" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "protobuf" version = "2.8.0" @@ -4440,6 +4507,32 @@ dependencies = [ "substrate-test-runtime-client 2.0.0", ] +[[package]] +name = "substrate-authority-discovery" +version = "2.0.0" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "libp2p 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", + "parity-scale-codec 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "prost-build 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", + "sr-primitives 2.0.0", + "substrate-authority-discovery-primitives 2.0.0", + "substrate-client 2.0.0", + "substrate-keystore 2.0.0", + "substrate-network 2.0.0", + "substrate-peerset 2.0.0", + "substrate-primitives 2.0.0", + "substrate-test-runtime-client 2.0.0", + "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "substrate-authority-discovery-primitives" version = "2.0.0" @@ -5134,6 +5227,8 @@ dependencies = [ "sr-io 2.0.0", "sr-primitives 2.0.0", "substrate-application-crypto 2.0.0", + "substrate-authority-discovery 2.0.0", + "substrate-authority-discovery-primitives 2.0.0", "substrate-client 2.0.0", "substrate-client-db 2.0.0", "substrate-consensus-babe-primitives 2.0.0", @@ -6433,6 +6528,7 @@ dependencies = [ "checksum fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b1ee15a7050e5580b3712877157068ea713b245b080ff302ae2ca973cfcd9baa" "checksum finality-grandpa 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9681c1f75941ea47584573dd2bc10558b2067d460612945887e00744e43393be" "checksum fixed-hash 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "516877b7b9a1cc2d0293cbce23cd6203f0edbfd4090e6ca4489fecb5aa73050e" +"checksum fixedbitset 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "86d4de0081402f5e88cdac65c8dcdcc73118c1a7a465e2a05f0da05843a8ea33" "checksum flate2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)" = "550934ad4808d5d39365e5d61727309bf18b3b02c6c56b729cb92e7dd84bc3d8" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" @@ -6561,6 +6657,7 @@ dependencies = [ "checksum mio-extras 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "46e73a04c2fa6250b8d802134d56d554a9ec2922bf977777c805ea5def61ce40" "checksum mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)" = "966257a94e196b11bb43aca423754d87429960a768de9414f3691d6957abf125" "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" +"checksum multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2eb04b9f127583ed176e163fb9ec6f3e793b87e21deedd5734a69386a18a0151" "checksum multistream-select 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e8f3cb4c93f2d79811fc11fa01faab99d8b7b8cbe024b602c27434ff2b08a59d" "checksum names 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ef320dab323286b50fb5cdda23f61c796a72a89998ab565ca32525c5c556f2da" "checksum native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4b2df1a4c22fd44a62147fd8f13dd0f95c9d8ca7b2610299b2a2f9cf8964274e" @@ -6607,6 +6704,7 @@ dependencies = [ "checksum peeking_take_while 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" "checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" "checksum percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +"checksum petgraph 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)" = "9c3659d1ee90221741f65dd128d9998311b0e40c5d3c23a62445938214abce4f" "checksum pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587" "checksum pkg-config 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "a7c1d2cfa5a714db3b5f24f0915e74fcdf91d09d496ba61329705dda7774d2af" "checksum ppv-lite86 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e3cbf9f658cdb5000fcf6f362b8ea2ba154b9f146a61c7a20d647034c6b6561b" @@ -6616,6 +6714,10 @@ dependencies = [ "checksum proc-macro-hack 0.5.8 (registry+https://github.com/rust-lang/crates.io-index)" = "982a35d1194084ba319d65c4a68d24ca28f5fdb5b8bc20899e4eef8641ea5178" "checksum proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)" = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759" "checksum proc-macro2 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "175a40b9cf564ce9bf050654633dbf339978706b8ead1a907bb970b63185dd95" +"checksum prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "96d14b1c185652833d24aaad41c5832b0be5616a590227c1fbff57c616754b23" +"checksum prost-build 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "eb788126ea840817128183f8f603dce02cb7aea25c2a0b764359d8e20010702e" +"checksum prost-derive 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5e7dc378b94ac374644181a2247cebf59a6ec1c88b49ac77f3a94b86b79d0e11" +"checksum prost-types 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1de482a366941c8d56d19b650fac09ca08508f2a696119ee7513ad590c8bac6f" "checksum protobuf 2.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8aefcec9f142b524d98fc81d07827743be89dd6586a1ba6ab21fa66a500b3fa5" "checksum pwasm-utils 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "efb0dcbddbb600f47a7098d33762a00552c671992171637f5bb310b37fe1f0e4" "checksum quick-error 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5fb6ccf8db7bbcb9c2eae558db5ab4f3da1c2a87e4e597ed394726bc8ea6ca1d" diff --git a/Cargo.toml b/Cargo.toml index aaa8c372fb182..29f5307610794 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,7 @@ members = [ "core/utils/fork-tree", "core/utils/wasm-builder", "core/utils/wasm-builder-runner", + "core/authority-discovery", "srml/support", "srml/support/procedural", "srml/support/procedural/tools", diff --git a/core/authority-discovery/Cargo.toml b/core/authority-discovery/Cargo.toml new file mode 100644 index 0000000000000..ac7f8ac3685ee --- /dev/null +++ b/core/authority-discovery/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "substrate-authority-discovery" +version = "2.0.0" +authors = ["Parity Technologies "] +edition = "2018" +build = "build.rs" + +[build-dependencies] +prost-build = "0.5" + +[dependencies] +authority-discovery-primitives = { package = "substrate-authority-discovery-primitives", path = "./primitives", default-features = false } +bytes = "0.4" +client = { package = "substrate-client", path = "../../core/client" } +codec = { package = "parity-scale-codec", default-features = false, version = "1.0.3" } +derive_more = "0.14.0" +futures = "0.1" +keystore = { package = "substrate-keystore", path = "../../core/keystore" } +libp2p = { version = "0.12.0", default-features = false, features = ["secp256k1", "libp2p-websocket"] } +log = "0.4" +network = { package = "substrate-network", path = "../../core/network" } +primitives = { package = "substrate-primitives", path = "../primitives" } +prost = "0.5" +serde_json = "1.0" +sr-primitives = { path = "../../core/sr-primitives" } +tokio-timer = "0.2" + +[dev-dependencies] +parking_lot = { version = "0.9.0" } +peerset = { package = "substrate-peerset", path = "../../core/peerset" } +test-client = { package = "substrate-test-runtime-client", path = "../../core/test-runtime/client" } +tokio = { version = "0.1"} diff --git a/core/authority-discovery/build.rs b/core/authority-discovery/build.rs new file mode 100644 index 0000000000000..ed632575f3ba8 --- /dev/null +++ b/core/authority-discovery/build.rs @@ -0,0 +1,3 @@ +fn main() { + prost_build::compile_protos(&["src/schema/dht.proto"], &["src/schema"]).unwrap(); +} diff --git a/core/authority-discovery/primitives/src/lib.rs b/core/authority-discovery/primitives/src/lib.rs index 556b758aa61fc..13da4de020466 100644 --- a/core/authority-discovery/primitives/src/lib.rs +++ b/core/authority-discovery/primitives/src/lib.rs @@ -19,9 +19,15 @@ #![cfg_attr(not(feature = "std"), no_std)] use client::decl_runtime_apis; -use codec::Codec; use rstd::vec::Vec; +#[derive(codec::Encode, codec::Decode, Eq, PartialEq, Clone)] +#[cfg_attr(feature = "std", derive(Debug, Hash))] +pub struct Signature(pub Vec); +#[derive(codec::Encode, codec::Decode, Eq, PartialEq, Clone)] +#[cfg_attr(feature = "std", derive(Debug, Hash))] +pub struct AuthorityId(pub Vec); + decl_runtime_apis! { /// The authority discovery api. /// @@ -29,21 +35,15 @@ decl_runtime_apis! { /// own authority identifier, to retrieve identifiers of the current authority /// set, as well as sign and verify Kademlia Dht external address payloads /// from and to other authorities. - pub trait AuthorityDiscoveryApi { - /// Returns own authority identifier iff it is part of the current authority - /// set, otherwise this function returns None. The restriction might be - /// softened in the future in case a consumer needs to learn own authority - /// identifier. - fn authority_id() -> Option; - + pub trait AuthorityDiscoveryApi { /// Retrieve authority identifiers of the current authority set. fn authorities() -> Vec; /// Sign the given payload with the private key corresponding to the given authority id. - fn sign(payload: Vec, authority_id: AuthorityId) -> Option>; + fn sign(payload: &Vec) -> Option<(Signature, AuthorityId)>; /// Verify the given signature for the given payload with the given /// authority identifier. - fn verify(payload: Vec, signature: Vec, authority_id: AuthorityId) -> bool; + fn verify(payload: &Vec, signature: &Signature, authority_id: &AuthorityId) -> bool; } } diff --git a/core/authority-discovery/src/error.rs b/core/authority-discovery/src/error.rs new file mode 100644 index 0000000000000..e8c1ad9705f0c --- /dev/null +++ b/core/authority-discovery/src/error.rs @@ -0,0 +1,47 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Authority discovery errors. + +/// AuthorityDiscovery Result. +pub type Result = std::result::Result; + +/// Error type for the authority discovery module. +#[derive(Debug, derive_more::Display, derive_more::From)] +pub enum Error { + /// Failed to verify a dht payload with the given signature. + VerifyingDhtPayload, + /// Failed to hash the authority id to be used as a dht key. + HashingAuthorityId(libp2p::core::multiaddr::multihash::EncodeError), + /// Failed calling into the Substrate runtime. + CallingRuntime(client::error::Error), + /// Failed signing the dht payload via the Substrate runtime. + SigningDhtPayload, + /// From the Dht we only get the hashed authority id. In order to retrieve the actual authority id and to ensure it + /// is actually an authority, we match the hash against the hash of the authority id of all other authorities. This + /// error is the result of the above failing. + MatchingHashedAuthorityIdWithAuthorityId, + /// Failed to set the authority discovery peerset priority group in the peerset module. + SettingPeersetPriorityGroup(String), + /// Failed to encode a dht payload. + Encoding(prost::EncodeError), + /// Failed to decode a dht payload. + Decoding(prost::DecodeError), + /// Failed to parse a libp2p multi address. + ParsingMultiaddress(libp2p::core::multiaddr::Error), + /// Tokio timer error. + PollingTokioTimer(tokio_timer::Error) +} diff --git a/core/authority-discovery/src/lib.rs b/core/authority-discovery/src/lib.rs new file mode 100644 index 0000000000000..987169ead90b1 --- /dev/null +++ b/core/authority-discovery/src/lib.rs @@ -0,0 +1,698 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +#![warn(missing_docs)] + +//! Substrate authority discovery. +//! +//! This crate enables Substrate authorities to directly connect to other authorities. [`AuthorityDiscovery`] implements +//! the Future trait. By polling [`AuthorityDiscovery`] an authority: +//! +//! +//! 1. **Makes itself discoverable** +//! +//! 1. Retrieves its external addresses. +//! +//! 2. Adds its network peer id to the addresses. +//! +//! 3. Signs the above. +//! +//! 4. Puts the signature and the addresses on the libp2p Kademlia DHT. +//! +//! +//! 2. **Discovers other authorities** +//! +//! 1. Retrieves the current set of authorities. +//! +//! 2. Starts DHT queries for the ids of the authorities. +//! +//! 3. Validates the signatures of the retrieved key value pairs. +//! +//! 4. Adds the retrieved external addresses as priority nodes to the peerset. + +use authority_discovery_primitives::{AuthorityDiscoveryApi, AuthorityId, Signature}; +use client::blockchain::HeaderBackend; +use error::{Error, Result}; +use futures::{prelude::*, sync::mpsc::Receiver}; +use log::{debug, error, log_enabled, warn}; +use network::specialization::NetworkSpecialization; +use network::{DhtEvent, ExHashT}; +use prost::Message; +use sr_primitives::generic::BlockId; +use sr_primitives::traits::{Block as BlockT, ProvideRuntimeApi}; +use std::collections::{HashMap, HashSet}; +use std::convert::TryInto; +use std::iter::FromIterator; +use std::marker::PhantomData; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +mod error; +/// Dht payload schemas generated from Protobuf definitions via Prost crate in build.rs. +mod schema { + include!(concat!(env!("OUT_DIR"), "/authority_discovery.rs")); +} + +/// An `AuthorityDiscovery` makes a given authority discoverable and discovers other authorities. +pub struct AuthorityDiscovery +where + Block: BlockT + 'static, + Network: NetworkProvider, + Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, + ::Api: AuthorityDiscoveryApi, +{ + client: Arc, + + network: Arc, + /// Channel we receive Dht events on. + dht_event_rx: Receiver, + + /// Interval to be proactive, publishing own addresses. + publish_interval: tokio_timer::Interval, + /// Interval on which to query for addresses of other authorities. + query_interval: tokio_timer::Interval, + + /// The network peerset interface for priority groups lets us only set an entire group, but we retrieve the + /// addresses of other authorities one by one from the network. To use the peerset interface we need to cache the + /// addresses and always overwrite the entire peerset priority group. To ensure this map doesn't grow indefinitely + /// `purge_old_authorities_from_cache` function is called each time we add a new entry. + address_cache: HashMap>, + + phantom: PhantomData, +} + +impl AuthorityDiscovery +where + Block: BlockT + 'static, + Network: NetworkProvider, + Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, + ::Api: AuthorityDiscoveryApi, +{ + /// Return a new authority discovery. + pub fn new( + client: Arc, + network: Arc, + dht_event_rx: futures::sync::mpsc::Receiver, + ) -> AuthorityDiscovery { + // Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h. Given that a node + // could restart at any point in time, one can not depend on the republishing process, thus publishing own + // external addresses should happen on an interval < 36h. + let publish_interval = + tokio_timer::Interval::new(Instant::now(), Duration::from_secs(12 * 60 * 60)); + + // External addresses of other authorities can change at any given point in time. The interval on which to query + // for external addresses of other authorities is a trade off between efficiency and performance. + let query_interval = + tokio_timer::Interval::new(Instant::now(), Duration::from_secs(10 * 60)); + + let address_cache = HashMap::new(); + + AuthorityDiscovery { + client, + network, + dht_event_rx, + publish_interval, + query_interval, + address_cache, + phantom: PhantomData, + } + } + + fn publish_own_ext_addresses(&mut self) -> Result<()> { + let id = BlockId::hash(self.client.info().best_hash); + + let addresses = self + .network + .external_addresses() + .into_iter() + .map(|a| { + a.with(libp2p::core::multiaddr::Protocol::P2p( + self.network.local_peer_id().into(), + )) + }) + .map(|a| a.to_vec()) + .collect(); + + let mut serialized_addresses = vec![]; + schema::AuthorityAddresses { addresses } + .encode(&mut serialized_addresses) + .map_err(Error::Encoding)?; + + let (signature, authority_id) = self + .client + .runtime_api() + .sign(&id, &serialized_addresses) + .map_err(Error::CallingRuntime)? + .ok_or(Error::SigningDhtPayload)?; + + let mut signed_addresses = vec![]; + schema::SignedAuthorityAddresses { + addresses: serialized_addresses, + signature: signature.0, + } + .encode(&mut signed_addresses) + .map_err(Error::Encoding)?; + + self.network.put_value( + hash_authority_id(authority_id.0.as_ref())?, + signed_addresses, + ); + + Ok(()) + } + + fn request_addresses_of_others(&mut self) -> Result<()> { + let id = BlockId::hash(self.client.info().best_hash); + + let authorities = self + .client + .runtime_api() + .authorities(&id) + .map_err(Error::CallingRuntime)?; + + for authority_id in authorities.iter() { + self.network + .get_value(&hash_authority_id(authority_id.0.as_ref())?); + } + + Ok(()) + } + + fn handle_dht_events(&mut self) -> Result<()> { + while let Ok(Async::Ready(Some(event))) = self.dht_event_rx.poll() { + match event { + DhtEvent::ValueFound(v) => { + if log_enabled!(log::Level::Debug) { + let hashes = v.iter().map(|(hash, _value)| hash.clone()); + debug!(target: "sub-authority-discovery", "Value for hash '{:?}' found on Dht.", hashes); + } + + self.handle_dht_value_found_event(v)?; + } + DhtEvent::ValueNotFound(hash) => { + warn!(target: "sub-authority-discovery", "Value for hash '{:?}' not found on Dht.", hash) + } + DhtEvent::ValuePut(hash) => { + debug!(target: "sub-authority-discovery", "Successfully put hash '{:?}' on Dht.", hash) + } + DhtEvent::ValuePutFailed(hash) => { + warn!(target: "sub-authority-discovery", "Failed to put hash '{:?}' on Dht.", hash) + } + } + } + + Ok(()) + } + + fn handle_dht_value_found_event( + &mut self, + values: Vec<(libp2p::kad::record::Key, Vec)>, + ) -> Result<()> { + debug!(target: "sub-authority-discovery", "Got Dht value from network."); + + let id = BlockId::hash(self.client.info().best_hash); + + // From the Dht we only get the hashed authority id. In order to retrieve the actual authority id and to ensure + // it is actually an authority, we match the hash against the hash of the authority id of all other authorities. + let authorities = self.client.runtime_api().authorities(&id)?; + self.purge_old_authorities_from_cache(&authorities); + + let authorities = authorities + .into_iter() + .map(|a| hash_authority_id(a.0.as_ref()).map(|h| (h, a))) + .collect::>>()?; + + for (key, value) in values.iter() { + // Check if the event origins from an authority in the current authority set. + let authority_id: &AuthorityId = authorities + .get(key) + .ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?; + + let schema::SignedAuthorityAddresses { + signature, + addresses, + } = schema::SignedAuthorityAddresses::decode(value).map_err(Error::Decoding)?; + let signature = Signature(signature); + + let is_verified = self + .client + .runtime_api() + .verify(&id, &addresses, &signature, &authority_id.clone()) + .map_err(Error::CallingRuntime)?; + + if !is_verified { + return Err(Error::VerifyingDhtPayload); + } + + let addresses: Vec = schema::AuthorityAddresses::decode(addresses) + .map(|a| a.addresses) + .map_err(Error::Decoding)? + .into_iter() + .map(|a| a.try_into()) + .collect::>() + .map_err(Error::ParsingMultiaddress)?; + + self.address_cache.insert(authority_id.clone(), addresses); + } + + // Let's update the peerset priority group with the all the addresses we have in our cache. + + let addresses = HashSet::from_iter( + self.address_cache + .iter() + .map(|(_peer_id, addresses)| addresses.clone()) + .flatten(), + ); + + debug!(target: "sub-authority-discovery", "Applying priority group {:#?} to peerset.", addresses); + self.network + .set_priority_group("authorities".to_string(), addresses) + .map_err(Error::SettingPeersetPriorityGroup)?; + + Ok(()) + } + + fn purge_old_authorities_from_cache(&mut self, current_authorities: &Vec) { + self.address_cache + .retain(|peer_id, _addresses| current_authorities.contains(peer_id)) + } +} + +impl futures::Future for AuthorityDiscovery +where + Block: BlockT + 'static, + Network: NetworkProvider, + Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend, + ::Api: AuthorityDiscoveryApi, +{ + type Item = (); + type Error = (); + + fn poll(&mut self) -> futures::Poll { + let mut inner = || -> Result<()> { + // Process incoming events before triggering new ones. + self.handle_dht_events()?; + + if let Async::Ready(_) = self + .publish_interval + .poll() + .map_err(Error::PollingTokioTimer)? + { + // Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, in case one of the + // function calls within this block do a `return`, we don't call `interval.poll` again and thereby the + // underlying Tokio task is never registered with Tokio's Reactor to be woken up on the next interval + // tick. + while let Async::Ready(_) = self + .publish_interval + .poll() + .map_err(Error::PollingTokioTimer)? + {} + + self.publish_own_ext_addresses()?; + } + + if let Async::Ready(_) = self + .query_interval + .poll() + .map_err(Error::PollingTokioTimer)? + { + // Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, in case one of the + // function calls within this block do a `return`, we don't call `interval.poll` again and thereby the + // underlying Tokio task is never registered with Tokio's Reactor to be woken up on the next interval + // tick. + while let Async::Ready(_) = self + .query_interval + .poll() + .map_err(Error::PollingTokioTimer)? + {} + + self.request_addresses_of_others()?; + } + + Ok(()) + }; + + match inner() { + Ok(()) => {} + Err(e) => error!(target: "sub-authority-discovery", "Poll failure: {:?}", e), + }; + + // Make sure to always return NotReady as this is a long running task with the same lifetime as the node itself. + Ok(futures::Async::NotReady) + } +} + +/// NetworkProvider provides AuthorityDiscovery with all necessary hooks into the underlying Substrate networking. Using +/// this trait abstraction instead of NetworkService directly is necessary to unit test AuthorityDiscovery. +pub trait NetworkProvider { + /// Returns the local external addresses. + fn external_addresses(&self) -> Vec; + + /// Returns the network identity of the node. + fn local_peer_id(&self) -> libp2p::PeerId; + + /// Modify a peerset priority group. + fn set_priority_group( + &self, + group_id: String, + peers: HashSet, + ) -> std::result::Result<(), String>; + + /// Start putting a value in the Dht. + fn put_value(&self, key: libp2p::kad::record::Key, value: Vec); + + /// Start getting a value from the Dht. + fn get_value(&self, key: &libp2p::kad::record::Key); +} + +impl NetworkProvider for network::NetworkService +where + B: BlockT + 'static, + S: NetworkSpecialization, + H: ExHashT, +{ + fn external_addresses(&self) -> Vec { + self.external_addresses() + } + fn local_peer_id(&self) -> libp2p::PeerId { + self.local_peer_id() + } + fn set_priority_group( + &self, + group_id: String, + peers: HashSet, + ) -> std::result::Result<(), String> { + self.set_priority_group(group_id, peers) + } + fn put_value(&self, key: libp2p::kad::record::Key, value: Vec) { + self.put_value(key, value) + } + fn get_value(&self, key: &libp2p::kad::record::Key) { + self.get_value(key) + } +} + +fn hash_authority_id(id: &[u8]) -> Result { + libp2p::multihash::encode(libp2p::multihash::Hash::SHA2256, id) + .map(|k| libp2p::kad::record::Key::new(&k)) + .map_err(Error::HashingAuthorityId) +} + +#[cfg(test)] +mod tests { + use super::*; + use client::runtime_api::{ApiExt, Core, RuntimeVersion}; + use futures::future::poll_fn; + use primitives::{ExecutionContext, NativeOrEncoded}; + use sr_primitives::traits::Zero; + use sr_primitives::traits::{ApiRef, Block as BlockT, NumberFor, ProvideRuntimeApi}; + use std::sync::{Arc, Mutex}; + use test_client::runtime::Block; + use tokio::runtime::current_thread; + + #[derive(Clone)] + struct TestApi {} + + impl ProvideRuntimeApi for TestApi { + type Api = RuntimeApi; + + fn runtime_api<'a>(&'a self) -> ApiRef<'a, Self::Api> { + RuntimeApi {}.into() + } + } + + /// Blockchain database header backend. Does not perform any validation. + impl HeaderBackend for TestApi { + fn header( + &self, + _id: BlockId, + ) -> std::result::Result, client::error::Error> { + Ok(None) + } + + fn info(&self) -> client::blockchain::Info { + client::blockchain::Info { + best_hash: Default::default(), + best_number: Zero::zero(), + finalized_hash: Default::default(), + finalized_number: Zero::zero(), + genesis_hash: Default::default(), + } + } + + fn status( + &self, + _id: BlockId, + ) -> std::result::Result { + Ok(client::blockchain::BlockStatus::Unknown) + } + + fn number( + &self, + _hash: Block::Hash, + ) -> std::result::Result>, client::error::Error> { + Ok(None) + } + + fn hash( + &self, + _number: NumberFor, + ) -> std::result::Result, client::error::Error> { + Ok(None) + } + } + + struct RuntimeApi {} + + impl Core for RuntimeApi { + fn Core_version_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> std::result::Result, client::error::Error> { + unimplemented!("Not required for testing!") + } + + fn Core_execute_block_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<(Block)>, + _: Vec, + ) -> std::result::Result, client::error::Error> { + unimplemented!("Not required for testing!") + } + + fn Core_initialize_block_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<&::Header>, + _: Vec, + ) -> std::result::Result, client::error::Error> { + unimplemented!("Not required for testing!") + } + } + + impl ApiExt for RuntimeApi { + fn map_api_result std::result::Result, R, E>( + &self, + _: F, + ) -> std::result::Result { + unimplemented!("Not required for testing!") + } + + fn runtime_version_at( + &self, + _: &BlockId, + ) -> std::result::Result { + unimplemented!("Not required for testing!") + } + + fn record_proof(&mut self) { + unimplemented!("Not required for testing!") + } + + fn extract_proof(&mut self) -> Option>> { + unimplemented!("Not required for testing!") + } + } + + impl AuthorityDiscoveryApi for RuntimeApi { + fn AuthorityDiscoveryApi_authorities_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> std::result::Result>, client::error::Error> { + return Ok(NativeOrEncoded::Native(vec![ + AuthorityId("test-authority-id-1".as_bytes().to_vec()), + AuthorityId("test-authority-id-2".as_bytes().to_vec()), + ])); + } + fn AuthorityDiscoveryApi_sign_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<&std::vec::Vec>, + _: Vec, + ) -> std::result::Result< + NativeOrEncoded>, + client::error::Error, + > { + return Ok(NativeOrEncoded::Native(Some(( + Signature("test-signature-1".as_bytes().to_vec()), + AuthorityId("test-authority-id-1".as_bytes().to_vec()), + )))); + } + fn AuthorityDiscoveryApi_verify_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + args: Option<(&Vec, &Signature, &AuthorityId)>, + _: Vec, + ) -> std::result::Result, client::error::Error> { + if *args.unwrap().1 == Signature("test-signature-1".as_bytes().to_vec()) { + return Ok(NativeOrEncoded::Native(true)); + } + return Ok(NativeOrEncoded::Native(false)); + } + } + + #[derive(Default)] + struct TestNetwork { + // Whenever functions on `TestNetwork` are called, the function arguments are added to the vectors below. + pub put_value_call: Arc)>>>, + pub get_value_call: Arc>>, + pub set_priority_group_call: Arc)>>>, + } + + impl NetworkProvider for TestNetwork { + fn external_addresses(&self) -> Vec { + vec![] + } + fn local_peer_id(&self) -> libp2p::PeerId { + libp2p::PeerId::random() + } + fn set_priority_group( + &self, + group_id: String, + peers: HashSet, + ) -> std::result::Result<(), String> { + self.set_priority_group_call + .lock() + .unwrap() + .push((group_id, peers)); + Ok(()) + } + fn put_value(&self, key: libp2p::kad::record::Key, value: Vec) { + self.put_value_call.lock().unwrap().push((key, value)); + } + fn get_value(&self, key: &libp2p::kad::record::Key) { + self.get_value_call.lock().unwrap().push(key.clone()); + } + } + + #[test] + fn publish_own_ext_addresses_puts_record_on_dht() { + let (_dht_event_tx, dht_event_rx) = futures::sync::mpsc::channel(1000); + let test_api = Arc::new(TestApi {}); + let network: Arc = Arc::new(Default::default()); + + let mut authority_discovery = + AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx); + + authority_discovery.publish_own_ext_addresses().unwrap(); + + // Expect authority discovery to put a new record onto the dht. + assert_eq!(network.put_value_call.lock().unwrap().len(), 1); + } + + #[test] + fn request_addresses_of_others_triggers_dht_get_query() { + let (_dht_event_tx, dht_event_rx) = futures::sync::mpsc::channel(1000); + let test_api = Arc::new(TestApi {}); + let network: Arc = Arc::new(Default::default()); + + let mut authority_discovery = + AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx); + + authority_discovery.request_addresses_of_others().unwrap(); + + // Expect authority discovery to request new records from the dht. + assert_eq!(network.get_value_call.lock().unwrap().len(), 2); + } + + #[test] + fn handle_dht_events_with_value_found_should_call_set_priority_group() { + // Create authority discovery. + + let (mut dht_event_tx, dht_event_rx) = futures::sync::mpsc::channel(1000); + let test_api = Arc::new(TestApi {}); + let network: Arc = Arc::new(Default::default()); + + let mut authority_discovery = + AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx); + + // Create sample dht event. + + let authority_id_1 = hash_authority_id("test-authority-id-1".as_bytes()).unwrap(); + let address_1: libp2p::Multiaddr = "/ip6/2001:db8::".parse().unwrap(); + + let mut serialized_addresses = vec![]; + schema::AuthorityAddresses { + addresses: vec![address_1.to_vec()], + } + .encode(&mut serialized_addresses) + .unwrap(); + + let mut signed_addresses = vec![]; + schema::SignedAuthorityAddresses { + addresses: serialized_addresses, + signature: "test-signature-1".as_bytes().to_vec(), + } + .encode(&mut signed_addresses) + .unwrap(); + + let dht_event = network::DhtEvent::ValueFound(vec![(authority_id_1, signed_addresses)]); + dht_event_tx.try_send(dht_event).unwrap(); + + // Make authority discovery handle the event. + + let f = || { + authority_discovery.handle_dht_events().unwrap(); + + // Expect authority discovery to set the priority set. + assert_eq!(network.set_priority_group_call.lock().unwrap().len(), 1); + + assert_eq!( + network.set_priority_group_call.lock().unwrap()[0], + ( + "authorities".to_string(), + HashSet::from_iter(vec![address_1.clone()].into_iter()) + ) + ); + + Ok(Async::Ready(())) + }; + + let mut runtime = current_thread::Runtime::new().unwrap(); + runtime.block_on(poll_fn::<(), (), _>(f)).unwrap(); + } +} diff --git a/core/authority-discovery/src/schema/dht.proto b/core/authority-discovery/src/schema/dht.proto new file mode 100644 index 0000000000000..9dbe9d559f4b1 --- /dev/null +++ b/core/authority-discovery/src/schema/dht.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package authority_discovery; + +// First we need to serialize the addresses in order to be able to sign them. +message AuthorityAddresses { + repeated bytes addresses = 1; +} + +// Then we need to serialize addresses and signature to send them over the wire. +message SignedAuthorityAddresses { + bytes addresses = 1; + bytes signature = 2; +} diff --git a/core/network/src/lib.rs b/core/network/src/lib.rs index e797ffb208ec1..7e9fd51a41533 100644 --- a/core/network/src/lib.rs +++ b/core/network/src/lib.rs @@ -192,6 +192,7 @@ pub use service::{ NetworkStateInfo, }; pub use protocol::{PeerInfo, Context, consensus_gossip, message, specialization}; +pub use protocol::event::{Event, DhtEvent}; pub use protocol::sync::SyncState; pub use libp2p::{Multiaddr, PeerId}; #[doc(inline)] diff --git a/core/network/src/protocol/event.rs b/core/network/src/protocol/event.rs index c0c26da515f0b..c8bee5588c704 100644 --- a/core/network/src/protocol/event.rs +++ b/core/network/src/protocol/event.rs @@ -20,6 +20,7 @@ use libp2p::kad::record::Key; /// Events generated by DHT as a response to get_value and put_value requests. +#[derive(Debug, Clone)] pub enum DhtEvent { /// The value was found. ValueFound(Vec<(Key, Vec)>), @@ -35,6 +36,7 @@ pub enum DhtEvent { } /// Type for events generated by networking layer. +#[derive(Debug, Clone)] pub enum Event { /// Event generated by a DHT. Dht(DhtEvent), diff --git a/core/network/src/service.rs b/core/network/src/service.rs index c3f773e232e7a..ac6bd1ac05dd5 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -612,11 +612,11 @@ pub struct NetworkWorker, H: Ex light_client_rqs: Option>>, } -impl, H: ExHashT> Future for NetworkWorker { - type Item = (); +impl, H: ExHashT> Stream for NetworkWorker { + type Item = Event; type Error = io::Error; - fn poll(&mut self) -> Poll { + fn poll(&mut self) -> Poll, Self::Error> { // Poll the import queue for actions to perform. let _ = futures03::future::poll_fn(|cx| { self.import_queue.poll_actions(cx, &mut NetworkLink { @@ -636,7 +636,7 @@ impl, H: ExHashT> Future for Ne // Process the next message coming from the `NetworkService`. let msg = match self.from_worker.poll() { Ok(Async::Ready(Some(msg))) => msg, - Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), + Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(None)), Ok(Async::NotReady) => break, }; @@ -677,8 +677,9 @@ impl, H: ExHashT> Future for Ne Ok(Async::Ready(Some(BehaviourOut::SubstrateAction(outcome)))) => outcome, Ok(Async::Ready(Some(BehaviourOut::Dht(ev)))) => { self.network_service.user_protocol_mut() - .on_event(Event::Dht(ev)); - CustomMessageOutcome::None + .on_event(Event::Dht(ev.clone())); + + return Ok(Async::Ready(Some(Event::Dht(ev)))); }, Ok(Async::Ready(None)) => CustomMessageOutcome::None, Err(err) => { diff --git a/core/service/Cargo.toml b/core/service/Cargo.toml index 7afd59ebc0679..7f17d83931564 100644 --- a/core/service/Cargo.toml +++ b/core/service/Cargo.toml @@ -31,12 +31,14 @@ client = { package = "substrate-client", path = "../../core/client" } client_db = { package = "substrate-client-db", path = "../../core/client/db", features = ["kvdb-rocksdb"] } codec = { package = "parity-scale-codec", version = "1.0.0" } substrate-executor = { path = "../../core/executor" } +substrate-authority-discovery = { path = "../../core/authority-discovery"} transaction_pool = { package = "substrate-transaction-pool", path = "../../core/transaction-pool" } rpc-servers = { package = "substrate-rpc-servers", path = "../../core/rpc-servers" } rpc = { package = "substrate-rpc", path = "../../core/rpc" } tel = { package = "substrate-telemetry", path = "../../core/telemetry" } offchain = { package = "substrate-offchain", path = "../../core/offchain" } parity-multiaddr = { package = "parity-multiaddr", version = "0.5.0" } +authority-discovery-primitives = { package = "substrate-authority-discovery-primitives", path = "../authority-discovery/primitives", default-features = false } [dev-dependencies] substrate-test-runtime-client = { path = "../test-runtime/client" } diff --git a/core/service/src/builder.rs b/core/service/src/builder.rs index c675710e540a0..53cec940d7cc5 100644 --- a/core/service/src/builder.rs +++ b/core/service/src/builder.rs @@ -28,7 +28,7 @@ use futures::{prelude::*, sync::mpsc}; use futures03::{FutureExt as _, compat::Compat, StreamExt as _, TryStreamExt as _}; use keystore::{Store as Keystore, KeyStorePtr}; use log::{info, warn}; -use network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo}; +use network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo, DhtEvent}; use network::{config::BoxFinalityProofRequestBuilder, specialization::NetworkSpecialization}; use parking_lot::{Mutex, RwLock}; use primitives::{Blake2Hasher, H256, Hasher}; @@ -76,6 +76,7 @@ pub struct ServiceBuilder, rpc_extensions: TRpc, rpc_builder: TRpcB, + dht_event_tx: Option>, marker: PhantomData<(TBl, TRtApi)>, } @@ -197,6 +198,7 @@ where TGen: Serialize + DeserializeOwned + BuildStorage { transaction_pool: Arc::new(()), rpc_extensions: Default::default(), rpc_builder, + dht_event_tx: None, marker: PhantomData, }) } @@ -266,6 +268,7 @@ where TGen: Serialize + DeserializeOwned + BuildStorage { transaction_pool: Arc::new(()), rpc_extensions: Default::default(), rpc_builder, + dht_event_tx: None, marker: PhantomData, }) } @@ -312,6 +315,7 @@ impl, + ) -> Result, Error> { + Ok(ServiceBuilder { + config: self.config, + client: self.client, + backend: self.backend, + keystore: self.keystore, + fetcher: self.fetcher, + select_chain: self.select_chain, + import_queue: self.import_queue, + finality_proof_request_builder: self.finality_proof_request_builder, + finality_proof_provider: self.finality_proof_provider, + network_protocol: self.network_protocol, + transaction_pool: self.transaction_pool, + rpc_extensions: self.rpc_extensions, + rpc_builder: self.rpc_builder, + dht_event_tx: Some(dht_event_tx), + marker: self.marker, + }) + } } /// RPC handlers builder. @@ -798,6 +834,7 @@ ServiceBuilder< network_protocol, transaction_pool, rpc_extensions, + dht_event_tx, rpc_builder, ) = ( self.client, @@ -811,6 +848,7 @@ ServiceBuilder< self.network_protocol, self.transaction_pool, self.rpc_extensions, + self.dht_event_tx, self.rpc_builder, ); @@ -829,7 +867,8 @@ ServiceBuilder< finality_proof_provider, network_protocol, transaction_pool, - rpc_extensions + rpc_extensions, + dht_event_tx, )) }, |h, c, tx| maintain_transaction_pool(h, c, tx), diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 2056c8a2f2da3..9fc305560f45a 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -39,7 +39,7 @@ use client::{runtime_api::BlockT, Client}; use exit_future::Signal; use futures::prelude::*; use futures03::stream::{StreamExt as _, TryStreamExt as _}; -use network::{NetworkService, NetworkState, specialization::NetworkSpecialization}; +use network::{NetworkService, NetworkState, specialization::NetworkSpecialization, Event, DhtEvent}; use log::{log, warn, debug, error, Level}; use codec::{Encode, Decode}; use primitives::{Blake2Hasher, H256}; @@ -154,7 +154,8 @@ macro_rules! new_impl { finality_proof_provider, network_protocol, transaction_pool, - rpc_extensions + rpc_extensions, + dht_event_tx, ) = $build_components(&$config)?; let import_queue = Box::new(import_queue); let chain_info = client.info().chain; @@ -357,12 +358,14 @@ macro_rules! new_impl { let rpc_handlers = gen_handler(); let rpc = start_rpc_servers(&$config, gen_handler)?; + let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future( network_mut, client.clone(), network_status_sinks.clone(), system_rpc_rx, - has_bootnodes + has_bootnodes, + dht_event_tx, ) .map_err(|_| ()) .select(exit.clone()) @@ -653,6 +656,7 @@ fn build_network_future< status_sinks: Arc, NetworkState)>>>>, rpc_rx: futures03::channel::mpsc::UnboundedReceiver>, should_have_peers: bool, + dht_event_tx: Option>, ) -> impl Future { // Compatibility shim while we're transitionning to stable Futures. // See https://github.com/paritytech/substrate/issues/3099 @@ -730,11 +734,21 @@ fn build_network_future< } // Main network polling. - match network.poll() { - Ok(Async::NotReady) => {} - Err(err) => warn!(target: "service", "Error in network: {:?}", err), - Ok(Async::Ready(())) => warn!(target: "service", "Network service finished"), - } + while let Ok(Async::Ready(Some(Event::Dht(event)))) = network.poll().map_err(|err| { + warn!(target: "service", "Error in network: {:?}", err); + }) { + // Given that core/authority-discovery is the only upper stack consumer of Dht events at the moment, all Dht + // events are being passed on to the authority-discovery module. In the future there might be multiple + // consumers of these events. In that case this would need to be refactored to properly dispatch the events, + // e.g. via a subscriber model. + if let Some(Err(e)) = dht_event_tx.as_ref().map(|c| c.clone().try_send(event)) { + if e.is_full() { + warn!(target: "service", "Dht event channel to authority discovery is full, dropping event."); + } else if e.is_disconnected() { + warn!(target: "service", "Dht event channel to authority discovery is disconnected, dropping event."); + } + } + }; // Now some diagnostic for performances. let polling_dur = before_polling.elapsed(); diff --git a/node/cli/Cargo.toml b/node/cli/Cargo.toml index 1f35f7b86b41c..c28a517639654 100644 --- a/node/cli/Cargo.toml +++ b/node/cli/Cargo.toml @@ -46,7 +46,8 @@ system = { package = "srml-system", path = "../../srml/system" } balances = { package = "srml-balances", path = "../../srml/balances" } support = { package = "srml-support", path = "../../srml/support", default-features = false } im_online = { package = "srml-im-online", path = "../../srml/im-online", default-features = false } -authority-discovery = { package = "srml-authority-discovery", path = "../../srml/authority-discovery", default-features = false } +sr-authority-discovery = { package = "srml-authority-discovery", path = "../../srml/authority-discovery", default-features = false } +authority-discovery = { package = "substrate-authority-discovery", path = "../../core/authority-discovery"} [dev-dependencies] keystore = { package = "substrate-keystore", path = "../../core/keystore" } diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index bb1d77b285ca6..c982bd0ee6fde 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -103,6 +103,8 @@ macro_rules! new_full_start { macro_rules! new_full { ($config:expr) => {{ use futures::Future; + use futures::sync::mpsc; + use network::DhtEvent; let ( is_authority, @@ -118,10 +120,18 @@ macro_rules! new_full { let (builder, mut import_setup, inherent_data_providers, mut tasks_to_spawn) = new_full_start!($config); + // Dht event channel from the network to the authority discovery module. Use bounded channel to ensure + // back-pressure. Authority discovery is triggering one event per authority within the current authority set. + // This estimates the authority set size to be somewhere below 10 000 thereby setting the channel buffer size to + // 10 000. + let (dht_event_tx, dht_event_rx) = + mpsc::channel::(10000); + let service = builder.with_network_protocol(|_| Ok(crate::service::NodeProtocol::new()))? .with_finality_proof_provider(|client, backend| Ok(Arc::new(grandpa::FinalityProofProvider::new(backend, client)) as _) )? + .with_dht_event_tx(dht_event_tx)? .build()?; let (block_import, link_half, babe_link) = import_setup.take() @@ -162,6 +172,13 @@ macro_rules! new_full { let babe = babe::start_babe(babe_config)?; let select = babe.select(service.on_exit()).then(|_| Ok(())); service.spawn_task(Box::new(select)); + + let authority_discovery = authority_discovery::AuthorityDiscovery::new( + service.client(), + service.network(), + dht_event_rx, + ); + service.spawn_task(Box::new(authority_discovery)); } let config = grandpa::Config { diff --git a/node/runtime/src/lib.rs b/node/runtime/src/lib.rs index fe8a6107bbbeb..d6becf06f45b8 100644 --- a/node/runtime/src/lib.rs +++ b/node/runtime/src/lib.rs @@ -47,7 +47,9 @@ use elections::VoteIndex; use version::NativeVersion; use primitives::OpaqueMetadata; use grandpa::{AuthorityId as GrandpaId, AuthorityWeight as GrandpaWeight}; -use im_online::sr25519::{AuthorityId as ImOnlineId}; +use im_online::sr25519::{AuthorityId as ImOnlineId, AuthoritySignature as ImOnlineSignature}; +use authority_discovery_primitives::{AuthorityId as EncodedAuthorityId, Signature as EncodedSignature}; +use codec::{Encode, Decode}; use system::offchain::TransactionSubmitter; #[cfg(any(feature = "std", test))] @@ -191,7 +193,7 @@ impl authorship::Trait for Runtime { type EventHandler = Staking; } -type SessionHandlers = (Grandpa, Babe, ImOnline); +type SessionHandlers = (Grandpa, Babe, ImOnline, AuthorityDiscovery); impl_opaque_keys! { pub struct SessionKeys { @@ -615,20 +617,32 @@ impl_runtime_apis! { } } - impl authority_discovery_primitives::AuthorityDiscoveryApi for Runtime { - fn authority_id() -> Option { - AuthorityDiscovery::authority_id() - } - fn authorities() -> Vec { - AuthorityDiscovery::authorities() + impl authority_discovery_primitives::AuthorityDiscoveryApi for Runtime { + fn authorities() -> Vec { + AuthorityDiscovery::authorities().into_iter() + .map(|id| id.encode()) + .map(EncodedAuthorityId) + .collect() } - fn sign(payload: Vec, authority_id: ImOnlineId) -> Option> { - AuthorityDiscovery::sign(payload, authority_id) + fn sign(payload: &Vec) -> Option<(EncodedSignature, EncodedAuthorityId)> { + AuthorityDiscovery::sign(payload).map(|(sig, id)| { + (EncodedSignature(sig.encode()), EncodedAuthorityId(id.encode())) + }) } - fn verify(payload: Vec, signature: Vec, public_key: ImOnlineId) -> bool { - AuthorityDiscovery::verify(payload, signature, public_key) + fn verify(payload: &Vec, signature: &EncodedSignature, authority_id: &EncodedAuthorityId) -> bool { + let signature = match ImOnlineSignature::decode(&mut &signature.0[..]) { + Ok(s) => s, + _ => return false, + }; + + let authority_id = match ImOnlineId::decode(&mut &authority_id.0[..]) { + Ok(id) => id, + _ => return false, + }; + + AuthorityDiscovery::verify(payload, signature, authority_id) } } diff --git a/srml/authority-discovery/src/lib.rs b/srml/authority-discovery/src/lib.rs index 76b0c93c4ffb5..1c46822dfef6f 100644 --- a/srml/authority-discovery/src/lib.rs +++ b/srml/authority-discovery/src/lib.rs @@ -29,13 +29,14 @@ #![cfg_attr(not(feature = "std"), no_std)] use app_crypto::RuntimeAppPublic; -use codec::{Decode, Encode}; use rstd::prelude::*; use support::{decl_module, decl_storage, StorageValue}; pub trait Trait: system::Trait + session::Trait + im_online::Trait {} type AuthorityIdFor = ::AuthorityId; +type AuthoritySignatureFor = + <::AuthorityId as RuntimeAppPublic>::Signature; decl_storage! { trait Store for Module as AuthorityDiscovery { @@ -58,7 +59,7 @@ impl Module { /// set, otherwise this function returns None. The restriction might be /// softened in the future in case a consumer needs to learn own authority /// identifier. - pub fn authority_id() -> Option> { + fn authority_id() -> Option> { let authorities = Keys::::get(); let local_keys = >::all(); @@ -78,20 +79,19 @@ impl Module { } /// Sign the given payload with the private key corresponding to the given authority id. - pub fn sign(payload: Vec, authority_id: AuthorityIdFor) -> Option> { - authority_id.sign(&payload).map(|s| s.encode()) + pub fn sign(payload: &Vec) -> Option<(AuthoritySignatureFor, AuthorityIdFor)> { + let authority_id = Module::::authority_id()?; + authority_id.sign(payload).map(|s| (s, authority_id)) } /// Verify the given signature for the given payload with the given /// authority identifier. pub fn verify( - payload: Vec, - signature: Vec, + payload: &Vec, + signature: AuthoritySignatureFor, authority_id: AuthorityIdFor, ) -> bool { - as RuntimeAppPublic>::Signature::decode(&mut &signature[..]) - .map(|s| authority_id.verify(&payload, &s)) - .unwrap_or(false) + authority_id.verify(payload, &signature) } fn initialize_keys(keys: &[AuthorityIdFor]) { @@ -158,10 +158,7 @@ mod tests { pub struct TestOnSessionEnding; impl session::OnSessionEnding for TestOnSessionEnding { - fn on_session_ending( - _: SessionIndex, - _: SessionIndex, - ) -> Option> { + fn on_session_ending(_: SessionIndex, _: SessionIndex) -> Option> { None } } @@ -351,19 +348,13 @@ mod tests { externalities.set_keystore(key_store); with_externalities(&mut externalities, || { - let authority_id = AuthorityDiscovery::authority_id().expect("authority id"); let payload = String::from("test payload").into_bytes(); - let sig = - AuthorityDiscovery::sign(payload.clone(), authority_id.clone()).expect("signature"); + let (sig, authority_id) = AuthorityDiscovery::sign(&payload).expect("signature"); - assert!(AuthorityDiscovery::verify( - payload, - sig.clone(), - authority_id.clone() - )); + assert!(AuthorityDiscovery::verify(&payload, sig.clone(), authority_id.clone(),)); assert!(!AuthorityDiscovery::verify( - String::from("other payload").into_bytes(), + &String::from("other payload").into_bytes(), sig, authority_id )) diff --git a/srml/im-online/src/lib.rs b/srml/im-online/src/lib.rs index d6ef6c2e9cb13..dc26f5b2710eb 100644 --- a/srml/im-online/src/lib.rs +++ b/srml/im-online/src/lib.rs @@ -67,7 +67,7 @@ // Ensure we're `no_std` when compiling for Wasm. #![cfg_attr(not(feature = "std"), no_std)] -use app_crypto::{AppPublic, RuntimeAppPublic}; +use app_crypto::{AppPublic, RuntimeAppPublic, AppSignature}; use codec::{Encode, Decode}; use primitives::offchain::{OpaqueNetworkState, StorageKind}; use rstd::prelude::*;