Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

[WIP] Enable validators to find and connect to other validators #3247

Closed
wants to merge 11 commits into from
Closed
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion core/client/src/runtime_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,3 @@ decl_runtime_apis! {
fn validate_transaction(tx: <Block as BlockT>::Extrinsic) -> TransactionValidity;
}
}

1 change: 1 addition & 0 deletions core/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ pub use service::{
NetworkStateInfo,
};
pub use protocol::{PeerInfo, Context, consensus_gossip, message, specialization};
pub use protocol::event::{Event, DhtEvent};
pub use protocol::sync::SyncState;
pub use libp2p::{Multiaddr, PeerId};
#[doc(inline)]
Expand Down
2 changes: 2 additions & 0 deletions core/network/src/protocol/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use libp2p::multihash::Multihash;

/// Events generated by DHT as a response to get_value and put_value requests.
#[derive(Debug, Clone)]
pub enum DhtEvent {
/// The value was found.
ValueFound(Vec<(Multihash, Vec<u8>)>),
Expand All @@ -35,6 +36,7 @@ pub enum DhtEvent {
}

/// Type for events generated by networking layer.
#[derive(Debug, Clone)]
pub enum Event {
/// Event generated by a DHT.
Dht(DhtEvent),
Expand Down
13 changes: 7 additions & 6 deletions core/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,11 +589,11 @@ pub struct NetworkWorker<B: BlockT + 'static, S: NetworkSpecialization<B>, H: Ex
on_demand_in: Option<mpsc::UnboundedReceiver<RequestData<B>>>,
}

impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for NetworkWorker<B, S, H> {
type Item = ();
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Stream for NetworkWorker<B, S, H> {
type Item = Event;
type Error = io::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// Poll the import queue for actions to perform.
let _ = futures03::future::poll_fn(|cx| {
self.import_queue.poll_actions(cx, &mut NetworkLink {
Expand All @@ -613,7 +613,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
// Process the next message coming from the `NetworkService`.
let msg = match self.from_worker.poll() {
Ok(Async::Ready(Some(msg))) => msg,
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::NotReady),
Ok(Async::NotReady) => break,
};

Expand Down Expand Up @@ -654,8 +654,9 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
Ok(Async::Ready(Some(BehaviourOut::SubstrateAction(outcome)))) => outcome,
Ok(Async::Ready(Some(BehaviourOut::Dht(ev)))) => {
self.network_service.user_protocol_mut()
.on_event(Event::Dht(ev));
CustomMessageOutcome::None
.on_event(Event::Dht(ev.clone()));

return Ok(Async::Ready(Some(Event::Dht(ev))));
},
Ok(Async::Ready(None)) => CustomMessageOutcome::None,
Err(err) => {
Expand Down
3 changes: 3 additions & 0 deletions core/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ rpc = { package = "substrate-rpc-servers", path = "../../core/rpc-servers" }
tel = { package = "substrate-telemetry", path = "../../core/telemetry" }
offchain = { package = "substrate-offchain", path = "../../core/offchain" }
parity-multiaddr = { package = "parity-multiaddr", version = "0.5.0" }
libp2p = { version = "0.11.0", default-features = false, features = ["secp256k1", "libp2p-websocket"] }
consensus_common_primitives = { package = "substrate-consensus-common-primitives", path = "../../core/consensus/common/primitives", default-features = false }
consensus_aura_primitives = { package = "substrate-consensus-aura-primitives", path = "../../core/consensus/aura/primitives", default-features = false }

[dev-dependencies]
substrate-test-runtime-client = { path = "../test-runtime/client" }
Expand Down
263 changes: 262 additions & 1 deletion core/service/src/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,28 @@
use std::{sync::Arc, ops::Deref, ops::DerefMut};
use serde::{Serialize, de::DeserializeOwned};
use crate::chain_spec::ChainSpec;
use std::time::{Duration, Instant};
use log::{log, warn, Level};
use libp2p::Multiaddr;
use parking_lot::Mutex;
use client::{BlockchainEvents};
use futures03::stream::{StreamExt as _, TryStreamExt as _};
use offchain::AuthorityKeyProvider as _;
use consensus_common_primitives::ConsensusApi;

use network::{Event, DhtEvent};
use client_db;
use client::{self, Client, runtime_api};
use crate::{error, Service, AuthorityKeyProvider};
use crate::{error, Service, AuthorityKeyProvider, NetworkStatus};
use consensus_common::{import_queue::ImportQueue, SelectChain};
use network::{self, OnDemand, FinalityProofProvider, NetworkStateInfo, config::BoxFinalityProofRequestBuilder};
use substrate_executor::{NativeExecutor, NativeExecutionDispatch};
use transaction_pool::txpool::{self, Options as TransactionPoolOptions, Pool as TransactionPool};
use sr_primitives::{
BuildStorage, traits::{Block as BlockT, Header as HeaderT, ProvideRuntimeApi}, generic::BlockId
};

use network::NetworkState;
use crate::config::Configuration;
use primitives::{Blake2Hasher, H256, Pair};
use rpc::{self, apis::system::SystemInfo};
Expand Down Expand Up @@ -268,12 +280,257 @@ impl<C: Components> OffchainWorker<Self> for C where
}
}

pub trait NetworkFutureBuilder<C: Components> {
fn build_network_future<H, S>(
network: network::NetworkWorker<ComponentBlock<C>, S, H >,
client: Arc<ComponentClient<C>>,
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<(NetworkStatus<ComponentBlock<C>>, NetworkState)>>>>,
rpc_rx: mpsc::UnboundedReceiver<rpc::apis::system::Request<ComponentBlock<C>>>,
should_have_peers: bool,
authority_key_provider: ComponentAuthorityKeyProvider<C>,
)-> Box<dyn Future<Item = (), Error = ()>+ Send>
where
H: network::ExHashT,
S:network::specialization::NetworkSpecialization<ComponentBlock<C>>;
}

impl<C: Components> NetworkFutureBuilder<Self> for C
where
ComponentClient<C>: ProvideRuntimeApi,
<ComponentClient<C> as ProvideRuntimeApi>::Api: ConsensusApi<ComponentBlock<C>, <C::Factory as ServiceFactory>::AuthorityId>,
<<<C as Components>::Factory as ServiceFactory>::ConsensusPair as primitives::crypto::Pair>::Public : std::string::ToString,
{
fn build_network_future<H, S>(
mut network: network::NetworkWorker<ComponentBlock<C>, S, H>,
client: Arc<ComponentClient<C>>,
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<(NetworkStatus<ComponentBlock<C>>, NetworkState)>>>>,
mut rpc_rx: mpsc::UnboundedReceiver<rpc::apis::system::Request<ComponentBlock<C>>>,
should_have_peers: bool,
authority_key_provider: ComponentAuthorityKeyProvider<C>,
)-> Box<dyn Future<Item = (), Error = ()> + Send>
where
H: network::ExHashT,
S:network::specialization::NetworkSpecialization<ComponentBlock<C>>,
{
// Interval at which we send status updates on the status stream.
const STATUS_INTERVAL: Duration = Duration::from_millis(5000);
let mut status_interval = tokio_timer::Interval::new_interval(STATUS_INTERVAL);

let mut report_ext_addresses_interval = tokio_timer::Interval::new_interval(Duration::from_secs(5));

let mut imported_blocks_stream = client.import_notification_stream().fuse()
.map(|v| Ok::<_, ()>(v)).compat();
let mut finality_notification_stream = client.finality_notification_stream().fuse()
.map(|v| Ok::<_, ()>(v)).compat();

Box::new(futures::future::poll_fn(move || {
let before_polling = Instant::now();

// We poll `imported_blocks_stream`.
while let Ok(Async::Ready(Some(notification))) = imported_blocks_stream.poll() {
network.on_block_imported(notification.hash, notification.header);
}

while let Ok(Async::Ready(_)) = report_ext_addresses_interval.poll() {
println!("==== We are connected to {} nodes", network.service().num_connected());
let id = BlockId::hash( client.info().chain.best_hash);

// Put our addresses on the DHT if we are a validator.
if let Some(authority_key) = authority_key_provider.authority_key( &id) {
let public_key = authority_key.public().to_string();

let hashed_public_key = libp2p::multihash::encode(
libp2p::multihash::Hash::SHA2256,
&public_key.as_bytes(),
).expect("public key hashing not to fail");

let addresses: Vec<Multiaddr> = network.service().external_addresses()
.iter()
.map(|a| {
let mut a = a.clone();
a.push(libp2p::core::multiaddr::Protocol::P2p(network.service().peer_id().into()));
a
})
.collect();
println!("==== external addresses: {:?}", addresses);

// TODO: Remove unwrap.
let signature = authority_key.sign(
&serde_json::to_string(&addresses)
.map(|s| s.into_bytes())
.expect("enriched_address marshaling not to fail")
).as_ref().to_vec();

// TODO: Remove unwrap.
let payload = serde_json::to_string(&(addresses, signature)).expect("payload marshaling not to fail");

network.service().put_value(hashed_public_key, payload.into_bytes());
}

// Query addresses of other validators.
// TODO: Should non-validators also do this? Probably not a good default.
match client.runtime_api().authorities(&id) {
Ok(authorities) => {
for authority in authorities.iter() {
println!("==== querying dht for authority: {}", authority.to_string());
// TODO: Remove unwrap.
let hashed_public_key = libp2p::multihash::encode(
libp2p::multihash::Hash::SHA2256,
authority.to_string().as_bytes(),
).expect("public key hashing not to fail");

network.service().get_value(&hashed_public_key.clone());
}
},
Err(e) => {
println!("==== Got no authorities, but an error: {:?}", e);
}
}
}

// We poll `finality_notification_stream`, but we only take the last event.
let mut last = None;
while let Ok(Async::Ready(Some(item))) = finality_notification_stream.poll() {
last = Some(item);
}
if let Some(notification) = last {
network.on_block_finalized(notification.hash, notification.header);
}

// Poll the RPC requests and answer them.
while let Ok(Async::Ready(Some(request))) = rpc_rx.poll() {
match request {
rpc::apis::system::Request::Health(sender) => {
let _ = sender.send(rpc::apis::system::Health {
peers: network.peers_debug_info().len(),
is_syncing: network.service().is_major_syncing(),
should_have_peers,
});
},
rpc::apis::system::Request::Peers(sender) => {
let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)|
rpc::apis::system::PeerInfo {
peer_id: peer_id.to_base58(),
roles: format!("{:?}", p.roles),
protocol_version: p.protocol_version,
best_hash: p.best_hash,
best_number: p.best_number,
}
).collect());
}
rpc::apis::system::Request::NetworkState(sender) => {
let _ = sender.send(network.network_state());
}
};
}

// Interval report for the external API.
while let Ok(Async::Ready(_)) = status_interval.poll() {
let status = NetworkStatus {
sync_state: network.sync_state(),
best_seen_block: network.best_seen_block(),
num_sync_peers: network.num_sync_peers(),
num_connected_peers: network.num_connected_peers(),
num_active_peers: network.num_active_peers(),
average_download_per_sec: network.average_download_per_sec(),
average_upload_per_sec: network.average_upload_per_sec(),
};
let state = network.network_state();

status_sinks.lock().retain(|sink| sink.unbounded_send((status.clone(), state.clone())).is_ok());
}

let authorities = client.runtime_api().authorities(&BlockId::hash(client.info().chain.best_hash));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For GRANDPA we need to check the active authority set from the node state and not the runtime, it is tracked here. Maybe we can just pass a Fn() -> Vec<Authority> into this function and then build up the authorities externally (babe authorities + grandpa authorities).

let valid_authority = |a: &libp2p::multihash::Multihash| {
match &authorities {
Ok(authorities) => {
for authority in authorities.iter() {
let hashed_public_key = libp2p::multihash::encode(
libp2p::multihash::Hash::SHA2256,
authority.to_string().as_bytes(),
).expect("public key hashing not to fail");

// TODO: Comparing two pointers is safe, right? Given they are not fat-pointers.
if a == &hashed_public_key {
return Some(authority.clone());
}
}
},
// TODO: Should we handle the error here?
Err(_e) => {},
}

return None;
};

// TODO: Can we do this nicer?
let network_service = network.service().clone();
let add_reserved_peer = |values: Vec<(libp2p::multihash::Multihash, Vec<u8>)>| {
for (key, value) in values.iter() {
// TODO: Should we log if it is not a valid one?
if let Some(authority_pub_key) = valid_authority(key) {
println!("===== adding other node");
let value = std::str::from_utf8(value).expect("value to string not to fail");

let (addresses, signature): (Vec<Multiaddr>, Vec<u8>) = serde_json::from_str(value).expect("payload unmarshaling not to fail");

// TODO: is using verify-weak a problem here?
if <<C as Components>::Factory as ServiceFactory>::ConsensusPair::verify_weak(
&signature,
&serde_json::to_string(&addresses)
.map(|s| s.into_bytes())
.expect("address marshaling not to fail"),
authority_pub_key,
) {
for address in addresses.iter() {
// TODO: Why does add_reserved_peer take a string?
// TODO: Remove unwrap.
network_service.add_reserved_peer(address.to_string()).expect("adding reserved peer not to fail");
}
} else {
// TODO: Log, don't print.
println!("==== signature not valid");
}
} else {
println!("==== Did not find a match for the key");
}
}
};

// Main network polling.
while let Ok(Async::Ready(Some(Event::Dht(event)))) = network.poll().map_err(|err| {
warn!(target: "service", "Error in network: {:?}", err);
}) {
match event {
DhtEvent::ValueFound(values) => add_reserved_peer(values),
DhtEvent::ValueNotFound(_h) => println!("==== Didn't find hash"),
DhtEvent::ValuePut(_h) => {},
DhtEvent::ValuePutFailed(_h) => println!("==== failed to put value on DHT"),
}
};

// Now some diagnostic for performances.
let polling_dur = before_polling.elapsed();
log!(
target: "service",
if polling_dur >= Duration::from_millis(50) { Level::Warn } else { Level::Trace },
"Polling the network future took {:?}",
polling_dur
);

Ok(Async::NotReady)
}))
}

}

/// The super trait that combines all required traits a `Service` needs to implement.
pub trait ServiceTrait<C: Components>:
Deref<Target = Service<C>>
+ Send
+ 'static
+ StartRPC<C>
+ NetworkFutureBuilder<C>
+ MaintainTransactionPool<C>
+ OffchainWorker<C>
{}
Expand All @@ -282,6 +539,7 @@ impl<C: Components, T> ServiceTrait<C> for T where
+ Send
+ 'static
+ StartRPC<C>
+ NetworkFutureBuilder<C>
+ MaintainTransactionPool<C>
+ OffchainWorker<C>
{}
Expand Down Expand Up @@ -321,6 +579,9 @@ pub trait ServiceFactory: 'static + Sized {
type LightImportQueue: ImportQueue<Self::Block> + 'static;
/// The Fork Choice Strategy for the chain
type SelectChain: SelectChain<Self::Block> + 'static;
///
// TODO: Are all of these trait bounds necessary?
type AuthorityId: primitives::crypto::Public + std::hash::Hash + parity_codec::Codec + std::string::ToString;

//TODO: replace these with a constructor trait. that TransactionPool implements. (#1242)
/// Extrinsic pool constructor for the full client.
Expand Down
Loading