From 66db77ca36c76d186381730f090626eeedea8104 Mon Sep 17 00:00:00 2001 From: timorl Date: Tue, 13 Dec 2022 18:32:27 +0100 Subject: [PATCH 1/5] Move clique network into network --- finality-aleph/src/lib.rs | 2 - .../clique}/crypto.rs | 0 .../clique}/incoming.rs | 16 +- .../clique}/io.rs | 2 +- .../clique}/manager/direction.rs | 4 +- .../clique}/manager/legacy.rs | 10 +- .../clique}/manager/mod.rs | 7 +- .../clique/mock.rs} | 392 ++++++++---------- .../clique}/mod.rs | 11 +- .../clique}/outgoing.rs | 24 +- .../clique}/protocols/handshake.rs | 4 +- .../clique}/protocols/mod.rs | 2 +- .../clique}/protocols/negotiation.rs | 4 +- .../clique}/protocols/v0/heartbeat.rs | 4 +- .../clique}/protocols/v0/mod.rs | 30 +- .../clique}/protocols/v1/mod.rs | 30 +- .../clique}/service.rs | 46 +- finality-aleph/src/network/gossip/mock.rs | 10 +- finality-aleph/src/network/gossip/service.rs | 18 +- finality-aleph/src/network/io.rs | 18 +- .../src/network/manager/compatibility.rs | 6 +- .../src/network/manager/connections.rs | 2 +- .../src/network/manager/discovery.rs | 2 +- finality-aleph/src/network/manager/service.rs | 22 +- finality-aleph/src/network/manager/session.rs | 2 +- finality-aleph/src/network/mock.rs | 60 ++- finality-aleph/src/network/mod.rs | 4 +- .../src/{tcp_network.rs => network/tcp.rs} | 10 +- finality-aleph/src/nodes/validator_node.rs | 7 +- finality-aleph/src/testing/clique_network.rs | 310 ++++++++++++++ finality-aleph/src/testing/mocks/mod.rs | 1 - finality-aleph/src/testing/mod.rs | 2 +- finality-aleph/src/testing/network.rs | 32 +- .../src/testing/validator_network.rs | 131 ------ finality-aleph/src/validator_network/mock.rs | 155 ------- scripts/run_nodes.sh | 2 +- 36 files changed, 718 insertions(+), 664 deletions(-) rename finality-aleph/src/{validator_network => network/clique}/crypto.rs (100%) rename finality-aleph/src/{validator_network => network/clique}/incoming.rs (85%) rename finality-aleph/src/{validator_network => network/clique}/io.rs (99%) rename finality-aleph/src/{validator_network => network/clique}/manager/direction.rs (98%) rename finality-aleph/src/{validator_network => network/clique}/manager/legacy.rs (98%) rename finality-aleph/src/{validator_network => network/clique}/manager/mod.rs (98%) rename finality-aleph/src/{testing/mocks/validator_network.rs => network/clique/mock.rs} (57%) rename finality-aleph/src/{validator_network => network/clique}/mod.rs (92%) rename finality-aleph/src/{validator_network => network/clique}/outgoing.rs (84%) rename finality-aleph/src/{validator_network => network/clique}/protocols/handshake.rs (99%) rename finality-aleph/src/{validator_network => network/clique}/protocols/mod.rs (99%) rename finality-aleph/src/{validator_network => network/clique}/protocols/negotiation.rs (98%) rename finality-aleph/src/{validator_network => network/clique}/protocols/v0/heartbeat.rs (94%) rename finality-aleph/src/{validator_network => network/clique}/protocols/v0/mod.rs (95%) rename finality-aleph/src/{validator_network => network/clique}/protocols/v1/mod.rs (95%) rename finality-aleph/src/{validator_network => network/clique}/service.rs (86%) rename finality-aleph/src/{tcp_network.rs => network/tcp.rs} (96%) create mode 100644 finality-aleph/src/testing/clique_network.rs delete mode 100644 finality-aleph/src/testing/validator_network.rs delete mode 100644 finality-aleph/src/validator_network/mock.rs diff --git a/finality-aleph/src/lib.rs b/finality-aleph/src/lib.rs index 84c083d5b3..4793919d65 100644 --- a/finality-aleph/src/lib.rs +++ b/finality-aleph/src/lib.rs @@ -44,10 +44,8 @@ mod nodes; mod party; mod session; mod session_map; -mod tcp_network; #[cfg(test)] pub mod testing; -mod validator_network; pub use abft::{Keychain, NodeCount, NodeIndex, Recipient, SignatureSet, SpawnHandle}; pub use aleph_primitives::{AuthorityId, AuthorityPair, AuthoritySignature}; diff --git a/finality-aleph/src/validator_network/crypto.rs b/finality-aleph/src/network/clique/crypto.rs similarity index 100% rename from finality-aleph/src/validator_network/crypto.rs rename to finality-aleph/src/network/clique/crypto.rs diff --git a/finality-aleph/src/validator_network/incoming.rs b/finality-aleph/src/network/clique/incoming.rs similarity index 85% rename from finality-aleph/src/validator_network/incoming.rs rename to finality-aleph/src/network/clique/incoming.rs index 719092759e..cab609af98 100644 --- a/finality-aleph/src/validator_network/incoming.rs +++ b/finality-aleph/src/network/clique/incoming.rs @@ -3,9 +3,9 @@ use std::fmt::{Display, Error as FmtError, Formatter}; use futures::channel::mpsc; use log::{debug, info}; -use crate::validator_network::{ +use crate::network::clique::{ protocols::{protocol, ProtocolError, ProtocolNegotiationError, ResultForService}, - Data, PublicKey, SecretKey, Splittable, + Data, PublicKey, SecretKey, Splittable, LOG_TARGET, }; enum IncomingError { @@ -41,9 +41,12 @@ async fn manage_incoming( result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), IncomingError> { - debug!(target: "validator-network", "Performing incoming protocol negotiation."); + debug!( + target: LOG_TARGET, + "Performing incoming protocol negotiation." + ); let (stream, protocol) = protocol(stream).await?; - debug!(target: "validator-network", "Negotiated protocol, running."); + debug!(target: LOG_TARGET, "Negotiated protocol, running."); Ok(protocol .manage_incoming(stream, secret_key, result_for_parent, data_for_user) .await?) @@ -62,6 +65,9 @@ pub async fn incoming( ) { let addr = stream.peer_address_info(); if let Err(e) = manage_incoming(secret_key, stream, result_for_parent, data_for_user).await { - info!(target: "validator-network", "Incoming connection from {} failed: {}.", addr, e); + info!( + target: LOG_TARGET, + "Incoming connection from {} failed: {}.", addr, e + ); } } diff --git a/finality-aleph/src/validator_network/io.rs b/finality-aleph/src/network/clique/io.rs similarity index 99% rename from finality-aleph/src/validator_network/io.rs rename to finality-aleph/src/network/clique/io.rs index c8c676055f..fee7f09187 100644 --- a/finality-aleph/src/validator_network/io.rs +++ b/finality-aleph/src/network/clique/io.rs @@ -6,7 +6,7 @@ use std::{ use codec::DecodeAll; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use crate::validator_network::Data; +use crate::network::Data; // We allow sending up to 16MiB, that should be enough forever. pub const MAX_DATA_SIZE: u32 = 16 * 1024 * 1024; diff --git a/finality-aleph/src/validator_network/manager/direction.rs b/finality-aleph/src/network/clique/manager/direction.rs similarity index 98% rename from finality-aleph/src/validator_network/manager/direction.rs rename to finality-aleph/src/network/clique/manager/direction.rs index de69416ba6..7ef675a7f6 100644 --- a/finality-aleph/src/validator_network/manager/direction.rs +++ b/finality-aleph/src/network/clique/manager/direction.rs @@ -3,7 +3,7 @@ use std::{ ops::BitXor, }; -use crate::validator_network::{Data, PublicKey}; +use crate::network::{clique::PublicKey, Data}; /// Data about peers we know and whether we should connect to them or they to us. For the former /// case also keeps the peers' addresses. @@ -85,7 +85,7 @@ impl DirectedPeers { #[cfg(test)] mod tests { use super::DirectedPeers; - use crate::validator_network::mock::{key, MockPublicKey}; + use crate::network::clique::mock::{key, MockPublicKey}; type Address = String; diff --git a/finality-aleph/src/validator_network/manager/legacy.rs b/finality-aleph/src/network/clique/manager/legacy.rs similarity index 98% rename from finality-aleph/src/validator_network/manager/legacy.rs rename to finality-aleph/src/network/clique/manager/legacy.rs index cac850d189..c65285d2a3 100644 --- a/finality-aleph/src/validator_network/manager/legacy.rs +++ b/finality-aleph/src/network/clique/manager/legacy.rs @@ -5,12 +5,12 @@ use std::{ use futures::channel::mpsc; -use crate::{ - network::PeerId, - validator_network::{ +use crate::network::{ + clique::{ manager::{AddResult, SendError}, - Data, PublicKey, + PublicKey, }, + Data, PeerId, }; /// Network component responsible for holding the list of peers that we @@ -218,7 +218,7 @@ mod tests { use futures::{channel::mpsc, StreamExt}; use super::{AddResult::*, Manager, SendError}; - use crate::validator_network::mock::{key, MockPublicKey}; + use crate::network::clique::mock::{key, MockPublicKey}; type Data = String; type Address = String; diff --git a/finality-aleph/src/validator_network/manager/mod.rs b/finality-aleph/src/network/clique/manager/mod.rs similarity index 98% rename from finality-aleph/src/validator_network/manager/mod.rs rename to finality-aleph/src/network/clique/manager/mod.rs index 2a5d094bd3..cb15c3db81 100644 --- a/finality-aleph/src/validator_network/manager/mod.rs +++ b/finality-aleph/src/network/clique/manager/mod.rs @@ -5,10 +5,7 @@ use std::{ use futures::channel::mpsc; -use crate::{ - network::PeerId, - validator_network::{Data, PublicKey}, -}; +use crate::network::{clique::PublicKey, Data, PeerId}; mod direction; mod legacy; @@ -242,7 +239,7 @@ mod tests { use futures::{channel::mpsc, StreamExt}; use super::{AddResult::*, Manager, SendError}; - use crate::validator_network::mock::{key, MockPublicKey}; + use crate::network::clique::mock::{key, MockPublicKey}; type Data = String; type Address = String; diff --git a/finality-aleph/src/testing/mocks/validator_network.rs b/finality-aleph/src/network/clique/mock.rs similarity index 57% rename from finality-aleph/src/testing/mocks/validator_network.rs rename to finality-aleph/src/network/clique/mock.rs index ba2e7acf5b..1fecd877cb 100644 --- a/finality-aleph/src/testing/mocks/validator_network.rs +++ b/finality-aleph/src/network/clique/mock.rs @@ -1,33 +1,169 @@ use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::HashMap, + fmt::{Display, Error as FmtError, Formatter}, io::Result as IoResult, pin::Pin, task::{Context, Poll}, }; -use codec::{Decode, Encode, Output}; +use codec::{Decode, Encode}; use futures::{ channel::{mpsc, oneshot}, StreamExt, }; use log::info; -use rand::{thread_rng, Rng}; -use sc_service::{SpawnTaskHandle, TaskManager}; -use tokio::{ - io::{duplex, AsyncRead, AsyncWrite, DuplexStream, ReadBuf}, - runtime::Handle, - time::{error::Elapsed, interval, timeout, Duration}, -}; +use rand::Rng; +use tokio::io::{duplex, AsyncRead, AsyncWrite, DuplexStream, ReadBuf}; -use crate::{ - network::{mock::Channel, AddressingInformation, Data, NetworkIdentity}, - validator_network::{ - mock::{key, random_keys, MockPublicKey, MockSecretKey}, - ConnectionInfo, Dialer as DialerT, Listener as ListenerT, Network, PeerAddressInfo, - SecretKey, Service, Splittable, +use crate::network::{ + clique::{ + ConnectionInfo, Dialer, Listener, Network, PeerAddressInfo, PublicKey, SecretKey, + Splittable, LOG_TARGET, }, + mock::Channel, + AddressingInformation, Data, NetworkIdentity, PeerId, }; +/// A mock secret key that is able to sign messages. +#[derive(Debug, PartialEq, Eq, Clone, Hash)] +pub struct MockSecretKey([u8; 4]); + +/// A mock public key for verifying signatures. +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Hash, Encode, Decode)] +pub struct MockPublicKey([u8; 4]); + +impl Display for MockPublicKey { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { + write!(f, "PublicKey({:?})", self.0) + } +} + +impl AsRef<[u8]> for MockPublicKey { + fn as_ref(&self) -> &[u8] { + self.0.as_ref() + } +} + +/// A mock signature, able to discern whether the correct key has been used to sign a specific +/// message. +#[derive(Debug, PartialEq, Eq, Clone, Hash, Encode, Decode)] +pub struct MockSignature { + message: Vec, + key_id: [u8; 4], +} + +impl PublicKey for MockPublicKey { + type Signature = MockSignature; + + fn verify(&self, message: &[u8], signature: &Self::Signature) -> bool { + (message == signature.message.as_slice()) && (self.0 == signature.key_id) + } +} + +impl PeerId for MockPublicKey {} + +#[async_trait::async_trait] +impl SecretKey for MockSecretKey { + type Signature = MockSignature; + type PublicKey = MockPublicKey; + + async fn sign(&self, message: &[u8]) -> Self::Signature { + MockSignature { + message: message.to_vec(), + key_id: self.0, + } + } + + fn public_key(&self) -> Self::PublicKey { + MockPublicKey(self.0) + } +} + +/// Create a random key pair. +pub fn key() -> (MockPublicKey, MockSecretKey) { + let secret_key = MockSecretKey(rand::random()); + (secret_key.public_key(), secret_key) +} + +/// Create a HashMap with public keys as keys and secret keys as values. +pub fn random_keys(n_peers: usize) -> HashMap { + let mut result = HashMap::with_capacity(n_peers); + while result.len() < n_peers { + let (pk, sk) = key(); + result.insert(pk, sk); + } + result +} + +/// A mock that can be split into two streams. +pub struct MockSplittable { + incoming_data: DuplexStream, + outgoing_data: DuplexStream, +} + +impl MockSplittable { + /// Create a pair of mock splittables connected to each other. + pub fn new(max_buf_size: usize) -> (Self, Self) { + let (in_a, out_b) = duplex(max_buf_size); + let (in_b, out_a) = duplex(max_buf_size); + ( + MockSplittable { + incoming_data: in_a, + outgoing_data: out_a, + }, + MockSplittable { + incoming_data: in_b, + outgoing_data: out_b, + }, + ) + } +} + +impl AsyncRead for MockSplittable { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.get_mut().incoming_data).poll_read(cx, buf) + } +} + +impl AsyncWrite for MockSplittable { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + Pin::new(&mut self.get_mut().outgoing_data).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().outgoing_data).poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().outgoing_data).poll_shutdown(cx) + } +} + +impl ConnectionInfo for MockSplittable { + fn peer_address_info(&self) -> PeerAddressInfo { + String::from("MOCK_ADDRESS") + } +} + +impl ConnectionInfo for DuplexStream { + fn peer_address_info(&self) -> PeerAddressInfo { + String::from("MOCK_ADDRESS") + } +} + +impl Splittable for MockSplittable { + type Sender = DuplexStream; + type Receiver = DuplexStream; + + fn split(self) -> (Self::Sender, Self::Receiver) { + (self.outgoing_data, self.incoming_data) + } +} + #[derive(Clone, Debug, PartialEq, Eq, Hash, Encode, Decode)] pub struct MockAddressingInformation { peer_id: MockPublicKey, @@ -292,12 +428,10 @@ impl Splittable for UnreliableSplittable { } type Address = u32; -type Addresses = HashMap; +pub type Addresses = HashMap; type Callers = HashMap; type Connection = UnreliableSplittable; -const TWICE_MAX_DATA_SIZE: usize = 32 * 1024 * 1024; - #[derive(Clone)] pub struct MockDialer { // used for logging @@ -306,7 +440,7 @@ pub struct MockDialer { } #[async_trait::async_trait] -impl DialerT
for MockDialer { +impl Dialer
for MockDialer { type Connection = Connection; type Error = std::io::Error; @@ -324,7 +458,7 @@ pub struct MockListener { } #[async_trait::async_trait] -impl ListenerT for MockListener { +impl Listener for MockListener { type Connection = Connection; type Error = std::io::Error; @@ -374,17 +508,26 @@ impl UnreliableConnectionMaker { pub async fn run(&mut self, connections_end_after: Option) { loop { - info!(target: "validator-network", "UnreliableConnectionMaker: waiting for new request..."); + info!( + target: LOG_TARGET, + "UnreliableConnectionMaker: waiting for new request..." + ); let (dialer_address, listener_address, c) = self.dialers.next().await.expect("should receive"); - info!(target: "validator-network", "UnreliableConnectionMaker: received request"); + info!( + target: LOG_TARGET, + "UnreliableConnectionMaker: received request" + ); let (dialer_stream, listener_stream) = Connection::new( 4096, connections_end_after, dialer_address, listener_address, ); - info!(target: "validator-network", "UnreliableConnectionMaker: sending stream"); + info!( + target: LOG_TARGET, + "UnreliableConnectionMaker: sending stream" + ); c.send(dialer_stream).expect("should send"); self.listeners[listener_address as usize] .unbounded_send(listener_stream) @@ -392,206 +535,3 @@ impl UnreliableConnectionMaker { } } } - -#[derive(Clone)] -struct MockData { - data: u32, - filler: Vec, - decodes: bool, -} - -impl MockData { - fn new(data: u32, filler_size: usize, decodes: bool) -> MockData { - MockData { - data, - filler: vec![0; filler_size], - decodes, - } - } -} - -impl Encode for MockData { - fn size_hint(&self) -> usize { - self.data.size_hint() + self.filler.size_hint() + self.decodes.size_hint() - } - - fn encode_to(&self, dest: &mut T) { - // currently this is exactly the default behaviour, but we still - // need it here to make sure that decode works in the future - self.data.encode_to(dest); - self.filler.encode_to(dest); - self.decodes.encode_to(dest); - } -} - -impl Decode for MockData { - fn decode(value: &mut I) -> Result { - let data = u32::decode(value)?; - let filler = Vec::::decode(value)?; - let decodes = bool::decode(value)?; - if !decodes { - return Err("Simulated decode failure.".into()); - } - Ok(Self { - data, - filler, - decodes, - }) - } -} - -#[allow(clippy::too_many_arguments)] -fn spawn_peer( - secret_key: MockSecretKey, - addr: Addresses, - n_msg: usize, - large_message_interval: Option, - corrupted_message_interval: Option, - dialer: MockDialer, - listener: MockListener, - report: mpsc::UnboundedSender<(MockPublicKey, usize)>, - spawn_handle: SpawnTaskHandle, -) { - let our_id = secret_key.public_key(); - let (service, mut interface) = Service::new(dialer, listener, secret_key, spawn_handle); - // run the service - tokio::spawn(async { - let (_exit, rx) = oneshot::channel(); - service.run(rx).await; - }); - // start connecting with the peers - let mut peer_ids = Vec::with_capacity(addr.len()); - for (id, addrs) in addr.into_iter() { - interface.add_connection(id.clone(), addrs); - peer_ids.push(id); - } - // peer main loop - // we send random messages to random peers - // a message is a number in range 0..n_msg - // we also keep a list of messages received at least once - // on receiving a message we report the total number of distinct messages received so far - // the goal is to receive every message at least once - tokio::spawn(async move { - let mut received: HashSet = HashSet::with_capacity(n_msg); - let mut send_ticker = tokio::time::interval(Duration::from_millis(5)); - let mut counter: usize = 0; - loop { - tokio::select! { - _ = send_ticker.tick() => { - counter += 1; - // generate random message - let filler_size = match large_message_interval { - Some(lmi) if counter % lmi == 0 => TWICE_MAX_DATA_SIZE, - _ => 0, - }; - let decodes = match corrupted_message_interval { - Some(cmi) if counter % cmi == 0 => false, - _ => true, - }; - let data: MockData = MockData::new(thread_rng().gen_range(0..n_msg) as u32, filler_size, decodes); - // choose a peer - let peer: MockPublicKey = peer_ids[thread_rng().gen_range(0..peer_ids.len())].clone(); - // send - interface.send(data, peer); - }, - data = interface.next() => { - // receive the message - let data: MockData = data.expect("next should not be closed"); - // mark the message as received, we do not care about sender's identity - received.insert(data.data as usize); - // report the number of received messages - report.unbounded_send((our_id.clone(), received.len())).expect("should send"); - }, - }; - } - }); -} - -/// Takes O(n log n) rounds to finish, where n = n_peers * n_msg. -pub async fn scenario( - n_peers: usize, - n_msg: usize, - broken_connection_interval: Option, - large_message_interval: Option, - corrupted_message_interval: Option, - status_report_interval: Duration, -) { - // create spawn_handle, we need to keep the task_manager - let task_manager = - TaskManager::new(Handle::current(), None).expect("should create TaskManager"); - let spawn_handle = task_manager.spawn_handle(); - // create peer identities - info!(target: "validator-network", "generating keys..."); - let keys = random_keys(n_peers); - info!(target: "validator-network", "done"); - // prepare and run the manager - let (mut connection_manager, mut callers, addr) = - UnreliableConnectionMaker::new(keys.keys().cloned().collect()); - tokio::spawn(async move { - connection_manager.run(broken_connection_interval).await; - }); - // channel for receiving status updates from spawned peers - let (tx_report, mut rx_report) = mpsc::unbounded::<(MockPublicKey, usize)>(); - let mut reports: BTreeMap = - keys.keys().cloned().map(|id| (id, 0)).collect(); - // spawn peers - for (id, secret_key) in keys.into_iter() { - let mut addr = addr.clone(); - // do not connect with itself - addr.remove(&secret_key.public_key()); - let (dialer, listener) = callers.remove(&id).expect("should contain all ids"); - spawn_peer( - secret_key, - addr, - n_msg, - large_message_interval, - corrupted_message_interval, - dialer, - listener, - tx_report.clone(), - spawn_handle.clone(), - ); - } - let mut status_ticker = interval(status_report_interval); - loop { - tokio::select! { - maybe_report = rx_report.next() => match maybe_report { - Some((peer_id, peer_n_msg)) => { - reports.insert(peer_id, peer_n_msg); - if reports.values().all(|&x| x == n_msg) { - info!(target: "validator-network", "Peers received {:?} messages out of {}, finishing.", reports.values(), n_msg); - return; - } - }, - None => panic!("should receive"), - }, - _ = status_ticker.tick() => { - info!(target: "validator-network", "Peers received {:?} messages out of {}.", reports.values(), n_msg); - } - }; - } -} - -/// Takes O(n log n) rounds to finish, where n = n_peers * n_msg. -pub async fn scenario_with_timeout( - n_peers: usize, - n_msg: usize, - broken_connection_interval: Option, - large_message_interval: Option, - corrupted_message_interval: Option, - status_report_interval: Duration, - scenario_timeout: Duration, -) -> Result<(), Elapsed> { - timeout( - scenario_timeout, - scenario( - n_peers, - n_msg, - broken_connection_interval, - large_message_interval, - corrupted_message_interval, - status_report_interval, - ), - ) - .await -} diff --git a/finality-aleph/src/validator_network/mod.rs b/finality-aleph/src/network/clique/mod.rs similarity index 92% rename from finality-aleph/src/validator_network/mod.rs rename to finality-aleph/src/network/clique/mod.rs index 9880754996..34ecd52a55 100644 --- a/finality-aleph/src/validator_network/mod.rs +++ b/finality-aleph/src/network/clique/mod.rs @@ -1,8 +1,10 @@ +//! A network for maintaining direct connections between all nodes. use std::fmt::Display; -use codec::Codec; use tokio::io::{AsyncRead, AsyncWrite}; +use crate::network::Data; + mod crypto; mod incoming; mod io; @@ -16,12 +18,9 @@ mod service; pub use crypto::{PublicKey, SecretKey}; pub use service::Service; -/// What the data sent using the network has to satisfy. -pub trait Data: Clone + Codec + Send + Sync + 'static {} - -impl Data for D {} +pub const LOG_TARGET: &str = "clique-network"; -/// Network represents an interface for opening and closing connections with other Validators, +/// Network represents an interface for opening and closing connections with other nodes, /// and sending direct messages between them. /// /// Note on Network reliability and security: it is neither assumed that the sent messages must be diff --git a/finality-aleph/src/validator_network/outgoing.rs b/finality-aleph/src/network/clique/outgoing.rs similarity index 84% rename from finality-aleph/src/validator_network/outgoing.rs rename to finality-aleph/src/network/clique/outgoing.rs index 946aa9e32e..fe100a0e46 100644 --- a/finality-aleph/src/validator_network/outgoing.rs +++ b/finality-aleph/src/network/clique/outgoing.rs @@ -4,11 +4,11 @@ use futures::channel::mpsc; use log::{debug, info}; use tokio::time::{sleep, timeout, Duration}; -use crate::validator_network::{ +use crate::network::clique::{ protocols::{ protocol, ConnectionType, ProtocolError, ProtocolNegotiationError, ResultForService, }, - ConnectionInfo, Data, Dialer, PeerAddressInfo, PublicKey, SecretKey, + ConnectionInfo, Data, Dialer, PeerAddressInfo, PublicKey, SecretKey, LOG_TARGET, }; enum OutgoingError> { @@ -49,17 +49,20 @@ async fn manage_outgoing>( result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), OutgoingError> { - debug!(target: "validator-network", "Trying to connect to {}.", public_key); + debug!(target: LOG_TARGET, "Trying to connect to {}.", public_key); let stream = timeout(DIAL_TIMEOUT, dialer.connect(address)) .await .map_err(|_| OutgoingError::TimedOut)? .map_err(OutgoingError::Dial)?; let peer_address_info = stream.peer_address_info(); - debug!(target: "validator-network", "Performing outgoing protocol negotiation."); + debug!( + target: LOG_TARGET, + "Performing outgoing protocol negotiation." + ); let (stream, protocol) = protocol(stream) .await .map_err(|e| OutgoingError::ProtocolNegotiation(peer_address_info.clone(), e))?; - debug!(target: "validator-network", "Negotiated protocol, running."); + debug!(target: LOG_TARGET, "Negotiated protocol, running."); protocol .manage_outgoing( stream, @@ -95,7 +98,14 @@ pub async fn outgoing>( ) .await { - info!(target: "validator-network", "Outgoing connection to {} {:?} failed: {}, will retry after {}s.", public_key, address, e, RETRY_DELAY.as_secs()); + info!( + target: LOG_TARGET, + "Outgoing connection to {} {:?} failed: {}, will retry after {}s.", + public_key, + address, + e, + RETRY_DELAY.as_secs() + ); sleep(RETRY_DELAY).await; // we send the "new" connection type, because we always assume it's new until proven // otherwise, and here we did not even get the chance to attempt negotiating a protocol @@ -103,7 +113,7 @@ pub async fn outgoing>( .unbounded_send((public_key, None, ConnectionType::New)) .is_err() { - debug!(target: "validator-network", "Could not send the closing message, we've probably been terminated by the parent service."); + debug!(target: LOG_TARGET, "Could not send the closing message, we've probably been terminated by the parent service."); } } } diff --git a/finality-aleph/src/validator_network/protocols/handshake.rs b/finality-aleph/src/network/clique/protocols/handshake.rs similarity index 99% rename from finality-aleph/src/validator_network/protocols/handshake.rs rename to finality-aleph/src/network/clique/protocols/handshake.rs index d436fca91b..e7720a8880 100644 --- a/finality-aleph/src/validator_network/protocols/handshake.rs +++ b/finality-aleph/src/network/clique/protocols/handshake.rs @@ -4,7 +4,7 @@ use codec::{Decode, Encode}; use rand::Rng; use tokio::time::{timeout, Duration}; -use crate::validator_network::{ +use crate::network::clique::{ io::{receive_data, send_data, ReceiveError, SendError}, PublicKey, SecretKey, Splittable, }; @@ -188,7 +188,7 @@ mod tests { execute_v0_handshake_incoming, execute_v0_handshake_outgoing, Challenge, HandshakeError, Response, }; - use crate::validator_network::{ + use crate::network::clique::{ io::{receive_data, send_data}, mock::{key, MockPublicKey, MockSecretKey, MockSplittable}, SecretKey, Splittable, diff --git a/finality-aleph/src/validator_network/protocols/mod.rs b/finality-aleph/src/network/clique/protocols/mod.rs similarity index 99% rename from finality-aleph/src/validator_network/protocols/mod.rs rename to finality-aleph/src/network/clique/protocols/mod.rs index c37e2d4253..eb056caca8 100644 --- a/finality-aleph/src/validator_network/protocols/mod.rs +++ b/finality-aleph/src/network/clique/protocols/mod.rs @@ -2,7 +2,7 @@ use std::fmt::{Display, Error as FmtError, Formatter}; use futures::channel::mpsc; -use crate::validator_network::{ +use crate::network::clique::{ io::{ReceiveError, SendError}, Data, PublicKey, SecretKey, Splittable, }; diff --git a/finality-aleph/src/validator_network/protocols/negotiation.rs b/finality-aleph/src/network/clique/protocols/negotiation.rs similarity index 98% rename from finality-aleph/src/validator_network/protocols/negotiation.rs rename to finality-aleph/src/network/clique/protocols/negotiation.rs index 52f0f1203a..5519a94c7a 100644 --- a/finality-aleph/src/validator_network/protocols/negotiation.rs +++ b/finality-aleph/src/network/clique/protocols/negotiation.rs @@ -8,7 +8,7 @@ use tokio::{ time::{timeout, Duration}, }; -use crate::validator_network::protocols::{Protocol, Version}; +use crate::network::clique::protocols::{Protocol, Version}; const PROTOCOL_NEGOTIATION_TIMEOUT: Duration = Duration::from_secs(5); @@ -141,7 +141,7 @@ mod tests { use tokio::io::duplex; use super::{negotiate_protocol_version, supported_protocol_range, ProtocolNegotiationError}; - use crate::validator_network::protocols::Protocol; + use crate::network::clique::protocols::Protocol; fn correct_negotiation(result: Result<(S, Protocol), ProtocolNegotiationError>) { match result { diff --git a/finality-aleph/src/validator_network/protocols/v0/heartbeat.rs b/finality-aleph/src/network/clique/protocols/v0/heartbeat.rs similarity index 94% rename from finality-aleph/src/validator_network/protocols/v0/heartbeat.rs rename to finality-aleph/src/network/clique/protocols/v0/heartbeat.rs index ba8fb183e0..2e83e5e60c 100644 --- a/finality-aleph/src/validator_network/protocols/v0/heartbeat.rs +++ b/finality-aleph/src/network/clique/protocols/v0/heartbeat.rs @@ -4,7 +4,7 @@ use tokio::{ time::{sleep, timeout, Duration}, }; -use crate::validator_network::io::{receive_data, send_data}; +use crate::network::clique::io::{receive_data, send_data}; const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5); const MAX_MISSED_HEARTBEATS: u32 = 4; @@ -54,7 +54,7 @@ mod tests { }; use super::{heartbeat_receiver, heartbeat_sender}; - use crate::validator_network::mock::MockSplittable; + use crate::network::clique::mock::MockSplittable; #[tokio::test] async fn sender_closed_on_broken_connection() { diff --git a/finality-aleph/src/validator_network/protocols/v0/mod.rs b/finality-aleph/src/network/clique/protocols/v0/mod.rs similarity index 95% rename from finality-aleph/src/validator_network/protocols/v0/mod.rs rename to finality-aleph/src/network/clique/protocols/v0/mod.rs index 7510f71b6e..7c17c39795 100644 --- a/finality-aleph/src/validator_network/protocols/v0/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v0/mod.rs @@ -2,13 +2,13 @@ use futures::{channel::mpsc, StreamExt}; use log::{debug, info, trace}; use tokio::io::{AsyncRead, AsyncWrite}; -use crate::validator_network::{ +use crate::network::clique::{ io::{receive_data, send_data}, protocols::{ handshake::{v0_handshake_incoming, v0_handshake_outgoing}, ConnectionType, ProtocolError, ResultForService, }, - Data, PublicKey, SecretKey, Splittable, + Data, PublicKey, SecretKey, Splittable, LOG_TARGET, }; mod heartbeat; @@ -38,9 +38,12 @@ pub async fn outgoing( public_key: SK::PublicKey, result_for_parent: mpsc::UnboundedSender>, ) -> Result<(), ProtocolError> { - trace!(target: "validator-network", "Extending hand to {}.", public_key); + trace!(target: LOG_TARGET, "Extending hand to {}.", public_key); let (sender, receiver) = v0_handshake_outgoing(stream, secret_key, public_key.clone()).await?; - info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", public_key); + info!( + target: LOG_TARGET, + "Outgoing handshake with {} finished successfully.", public_key + ); let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent .unbounded_send(( @@ -53,7 +56,10 @@ pub async fn outgoing( let sending = sending(sender, data_from_user); let heartbeat = heartbeat_receiver(receiver); - debug!(target: "validator-network", "Starting worker for sending to {}.", public_key); + debug!( + target: LOG_TARGET, + "Starting worker for sending to {}.", public_key + ); loop { tokio::select! { _ = heartbeat => return Err(ProtocolError::CardiacArrest), @@ -85,9 +91,12 @@ pub async fn incoming( result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { - trace!(target: "validator-network", "Waiting for extended hand..."); + trace!(target: LOG_TARGET, "Waiting for extended hand..."); let (sender, receiver, public_key) = v0_handshake_incoming(stream, secret_key).await?; - info!(target: "validator-network", "Incoming handshake with {} finished successfully.", public_key); + info!( + target: LOG_TARGET, + "Incoming handshake with {} finished successfully.", public_key + ); let (tx_exit, mut exit) = mpsc::unbounded(); result_for_parent @@ -101,7 +110,10 @@ pub async fn incoming( let receiving = receiving(receiver, data_for_user); let heartbeat = heartbeat_sender(sender); - debug!(target: "validator-network", "Starting worker for receiving from {}.", public_key); + debug!( + target: LOG_TARGET, + "Starting worker for receiving from {}.", public_key + ); loop { tokio::select! { _ = heartbeat => return Err(ProtocolError::CardiacArrest), @@ -119,7 +131,7 @@ mod tests { }; use super::{incoming, outgoing, ProtocolError}; - use crate::validator_network::{ + use crate::network::clique::{ mock::{key, MockPublicKey, MockSecretKey, MockSplittable}, protocols::{ConnectionType, ResultForService}, Data, diff --git a/finality-aleph/src/validator_network/protocols/v1/mod.rs b/finality-aleph/src/network/clique/protocols/v1/mod.rs similarity index 95% rename from finality-aleph/src/validator_network/protocols/v1/mod.rs rename to finality-aleph/src/network/clique/protocols/v1/mod.rs index ca138f9170..978dc3ba8e 100644 --- a/finality-aleph/src/validator_network/protocols/v1/mod.rs +++ b/finality-aleph/src/network/clique/protocols/v1/mod.rs @@ -6,13 +6,13 @@ use tokio::{ time::{timeout, Duration}, }; -use crate::validator_network::{ +use crate::network::clique::{ io::{receive_data, send_data}, protocols::{ handshake::{v0_handshake_incoming, v0_handshake_outgoing}, ConnectionType, ProtocolError, ResultForService, }, - Data, PublicKey, SecretKey, Splittable, + Data, PublicKey, SecretKey, Splittable, LOG_TARGET, }; const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5); @@ -92,9 +92,12 @@ pub async fn outgoing( result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { - trace!(target: "validator-network", "Extending hand to {}.", public_key); + trace!(target: LOG_TARGET, "Extending hand to {}.", public_key); let (sender, receiver) = v0_handshake_outgoing(stream, secret_key, public_key.clone()).await?; - info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", public_key); + info!( + target: LOG_TARGET, + "Outgoing handshake with {} finished successfully.", public_key + ); let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent .unbounded_send(( @@ -103,7 +106,10 @@ pub async fn outgoing( ConnectionType::New, )) .map_err(|_| ProtocolError::NoParentConnection)?; - debug!(target: "validator-network", "Starting worker for communicating with {}.", public_key); + debug!( + target: LOG_TARGET, + "Starting worker for communicating with {}.", public_key + ); manage_connection(sender, receiver, data_from_user, data_for_user).await } @@ -116,9 +122,12 @@ pub async fn incoming( result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { - trace!(target: "validator-network", "Waiting for extended hand..."); + trace!(target: LOG_TARGET, "Waiting for extended hand..."); let (sender, receiver, public_key) = v0_handshake_incoming(stream, secret_key).await?; - info!(target: "validator-network", "Incoming handshake with {} finished successfully.", public_key); + info!( + target: LOG_TARGET, + "Incoming handshake with {} finished successfully.", public_key + ); let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent .unbounded_send(( @@ -127,7 +136,10 @@ pub async fn incoming( ConnectionType::New, )) .map_err(|_| ProtocolError::NoParentConnection)?; - debug!(target: "validator-network", "Starting worker for communicating with {}.", public_key); + debug!( + target: LOG_TARGET, + "Starting worker for communicating with {}.", public_key + ); manage_connection(sender, receiver, data_from_user, data_for_user).await } @@ -139,7 +151,7 @@ mod tests { }; use super::{incoming, outgoing, ProtocolError}; - use crate::validator_network::{ + use crate::network::clique::{ mock::{key, MockPublicKey, MockSecretKey, MockSplittable}, protocols::{ConnectionType, ResultForService}, Data, diff --git a/finality-aleph/src/validator_network/service.rs b/finality-aleph/src/network/clique/service.rs similarity index 86% rename from finality-aleph/src/validator_network/service.rs rename to finality-aleph/src/network/clique/service.rs index 320e78c62d..a2ec583dff 100644 --- a/finality-aleph/src/validator_network/service.rs +++ b/finality-aleph/src/network/clique/service.rs @@ -8,13 +8,15 @@ use log::{debug, info, trace, warn}; use tokio::time; use crate::{ - network::PeerId, - validator_network::{ - incoming::incoming, - manager::{AddResult, LegacyManager, Manager}, - outgoing::outgoing, - protocols::{ConnectionType, ResultForService}, - Data, Dialer, Listener, Network, PublicKey, SecretKey, + network::{ + clique::{ + incoming::incoming, + manager::{AddResult, LegacyManager, Manager}, + outgoing::outgoing, + protocols::{ConnectionType, ResultForService}, + Dialer, Listener, Network, PublicKey, SecretKey, LOG_TARGET, + }, + Data, PeerId, }, SpawnTaskHandle, STATUS_REPORT_INTERVAL, }; @@ -39,7 +41,7 @@ impl Network for ServiceInterface Network for ServiceInterface Network for ServiceInterface match maybe_stream { Ok(stream) => self.spawn_new_incoming(stream, result_for_parent.clone()), - Err(e) => warn!(target: "validator-network", "Listener failed to accept connection: {}", e), + Err(e) => warn!(target: LOG_TARGET, "Listener failed to accept connection: {}", e), }, // got a new command from the interface Some(command) = self.commands_from_interface.next() => match command { @@ -264,12 +266,12 @@ where SendData(data, public_key) => { match self.legacy_connected.contains(&public_key) { true => match self.legacy_manager.send_to(&public_key, data) { - Ok(_) => trace!(target: "validator-network", "Sending data to {} through legacy.", public_key), - Err(e) => trace!(target: "validator-network", "Failed sending to {} through legacy: {}", public_key, e), + Ok(_) => trace!(target: LOG_TARGET, "Sending data to {} through legacy.", public_key), + Err(e) => trace!(target: LOG_TARGET, "Failed sending to {} through legacy: {}", public_key, e), }, false => match self.manager.send_to(&public_key, data) { - Ok(_) => trace!(target: "validator-network", "Sending data to {}.", public_key), - Err(e) => trace!(target: "validator-network", "Failed sending to {}: {}", public_key, e), + Ok(_) => trace!(target: LOG_TARGET, "Sending data to {}.", public_key), + Err(e) => trace!(target: LOG_TARGET, "Failed sending to {}: {}", public_key, e), }, } }, @@ -290,9 +292,9 @@ where use AddResult::*; match maybe_data_for_network { Some(data_for_network) => match self.add_connection(public_key.clone(), data_for_network, connection_type) { - Uninterested => warn!(target: "validator-network", "Established connection with peer {} for unknown reasons.", public_key), - Added => info!(target: "validator-network", "New connection with peer {}.", public_key), - Replaced => info!(target: "validator-network", "Replaced connection with peer {}.", public_key), + Uninterested => warn!(target: LOG_TARGET, "Established connection with peer {} for unknown reasons.", public_key), + Added => info!(target: LOG_TARGET, "New connection with peer {}.", public_key), + Replaced => info!(target: LOG_TARGET, "Replaced connection with peer {}.", public_key), }, None => if let Some(address) = self.peer_address(&public_key) { self.spawn_new_outgoing(public_key, address, result_for_parent.clone()); @@ -301,8 +303,8 @@ where }, // periodically reporting what we are trying to do _ = status_ticker.tick() => { - info!(target: "validator-network", "Validator Network status: {}", self.manager.status_report()); - debug!(target: "validator-network", "Validator Network legacy status: {}", self.legacy_manager.status_report()); + info!(target: LOG_TARGET, "Validator Network status: {}", self.manager.status_report()); + debug!(target: LOG_TARGET, "Validator Network legacy status: {}", self.legacy_manager.status_report()); } // received exit signal, stop the network // all workers will be killed automatically after the manager gets dropped diff --git a/finality-aleph/src/network/gossip/mock.rs b/finality-aleph/src/network/gossip/mock.rs index 076dc644f8..9bac21124d 100644 --- a/finality-aleph/src/network/gossip/mock.rs +++ b/finality-aleph/src/network/gossip/mock.rs @@ -7,12 +7,10 @@ use futures::{ }; use parking_lot::Mutex; -use crate::{ - network::{ - gossip::{Event, EventStream, NetworkSender, Protocol, RawNetwork}, - mock::Channel, - }, - validator_network::mock::MockPublicKey, +use crate::network::{ + clique::mock::MockPublicKey, + gossip::{Event, EventStream, NetworkSender, Protocol, RawNetwork}, + mock::Channel, }; pub type MockEvent = Event; diff --git a/finality-aleph/src/network/gossip/service.rs b/finality-aleph/src/network/gossip/service.rs index 6b5ae91f98..7f94c31d6c 100644 --- a/finality-aleph/src/network/gossip/service.rs +++ b/finality-aleph/src/network/gossip/service.rs @@ -280,16 +280,14 @@ mod tests { use tokio::runtime::Handle; use super::{Error, Service}; - use crate::{ - network::{ - gossip::{ - mock::{MockEvent, MockRawNetwork, MockSenderError}, - Network, - }, - mock::MockData, - Protocol, + use crate::network::{ + clique::mock::random_peer_id, + gossip::{ + mock::{MockEvent, MockRawNetwork, MockSenderError}, + Network, }, - testing::mocks::validator_network::random_peer_id, + mock::MockData, + Protocol, }; const PROTOCOL: Protocol = Protocol::Authentication; @@ -343,7 +341,7 @@ mod tests { } fn message(i: u8) -> MockData { - vec![i, i + 1, i + 2] + MockData::new(i.into(), 3) } #[tokio::test] diff --git a/finality-aleph/src/network/io.rs b/finality-aleph/src/network/io.rs index 60bed0de91..d86b6b3bfe 100644 --- a/finality-aleph/src/network/io.rs +++ b/finality-aleph/src/network/io.rs @@ -2,26 +2,24 @@ use std::fmt::Debug; use futures::channel::mpsc; -use crate::{ - network::{ - manager::{DataInSession, VersionedAuthentication}, - AddressingInformation, ConnectionManagerIO, Data, GossipNetwork, SessionManagerIO, - }, - validator_network::{Network as ValidatorNetwork, PublicKey}, +use crate::network::{ + clique::{Network as CliqueNetwork, PublicKey}, + manager::{DataInSession, VersionedAuthentication}, + AddressingInformation, ConnectionManagerIO, Data, GossipNetwork, SessionManagerIO, }; -type FullIO = (ConnectionManagerIO, SessionManagerIO); +type FullIO = (ConnectionManagerIO, SessionManagerIO); pub fn setup< D: Data, M: Data + Debug, A: AddressingInformation + TryFrom> + Into>, - VN: ValidatorNetwork>, + CN: CliqueNetwork>, GN: GossipNetwork>, >( - validator_network: VN, + validator_network: CN, gossip_network: GN, -) -> FullIO +) -> FullIO where A::PeerId: PublicKey, { diff --git a/finality-aleph/src/network/manager/compatibility.rs b/finality-aleph/src/network/manager/compatibility.rs index 314ecae5c1..bb368716c8 100644 --- a/finality-aleph/src/network/manager/compatibility.rs +++ b/finality-aleph/src/network/manager/compatibility.rs @@ -252,17 +252,15 @@ mod test { use crate::{ crypto::AuthorityVerifier, network::{ + clique::mock::MockAddressingInformation, manager::{ compatibility::{PeerAuthentications, MAX_AUTHENTICATION_SIZE}, LegacyDiscoveryMessage, SessionHandler, }, + tcp::{testing::new_identity, LegacyTcpMultiaddress, SignedTcpAddressingInformation}, NetworkIdentity, }, nodes::testing::new_pen, - tcp_network::{ - testing::new_identity, LegacyTcpMultiaddress, SignedTcpAddressingInformation, - }, - testing::mocks::validator_network::MockAddressingInformation, NodeIndex, SessionId, Version, }; diff --git a/finality-aleph/src/network/manager/connections.rs b/finality-aleph/src/network/manager/connections.rs index 4023a54ca4..4db59a51b7 100644 --- a/finality-aleph/src/network/manager/connections.rs +++ b/finality-aleph/src/network/manager/connections.rs @@ -57,7 +57,7 @@ mod tests { use super::Connections; use crate::{ - validator_network::mock::{random_keys, MockPublicKey}, + network::clique::mock::{random_keys, MockPublicKey}, SessionId, }; diff --git a/finality-aleph/src/network/manager/discovery.rs b/finality-aleph/src/network/manager/discovery.rs index f1b3d2bf3b..4e36513b69 100644 --- a/finality-aleph/src/network/manager/discovery.rs +++ b/finality-aleph/src/network/manager/discovery.rs @@ -122,11 +122,11 @@ mod tests { use super::Discovery; use crate::{ network::{ + clique::mock::{random_address, MockAddressingInformation}, manager::{compatibility::PeerAuthentications, SessionHandler}, mock::crypto_basics, testing::{authentication, legacy_authentication}, }, - testing::mocks::validator_network::{random_address, MockAddressingInformation}, SessionId, }; diff --git a/finality-aleph/src/network/manager/service.rs b/finality-aleph/src/network/manager/service.rs index 1b390bd316..7aeb4f3740 100644 --- a/finality-aleph/src/network/manager/service.rs +++ b/finality-aleph/src/network/manager/service.rs @@ -17,6 +17,7 @@ use crate::{ abft::Recipient, crypto::{AuthorityPen, AuthorityVerifier}, network::{ + clique::{Network as CliqueNetwork, PublicKey}, manager::{ compatibility::PeerAuthentications, Connections, DataInSession, Discovery, DiscoveryMessage, SessionHandler, SessionHandlerError, VersionedAuthentication, @@ -24,7 +25,6 @@ use crate::{ AddressedData, AddressingInformation, ConnectionCommand, Data, GossipNetwork, NetworkIdentity, PeerId, }, - validator_network::{Network as ValidatorNetwork, PublicKey}, MillisecsPerBlock, NodeIndex, SessionId, SessionPeriod, STATUS_REPORT_INTERVAL, }; @@ -558,14 +558,14 @@ pub struct IO< D: Data, M: Data, A: AddressingInformation + TryFrom> + Into>, - VN: ValidatorNetwork>, + CN: CliqueNetwork>, GN: GossipNetwork>, > where A::PeerId: PublicKey, { commands_from_user: mpsc::UnboundedReceiver>, messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>, - validator_network: VN, + validator_network: CN, gossip_network: GN, _phantom: PhantomData<(M, A)>, } @@ -575,7 +575,7 @@ pub struct IO< pub enum Error { CommandsChannel, MessageChannel, - ValidatorNetwork, + CliqueNetwork, GossipNetwork(GE), } @@ -585,7 +585,7 @@ impl Display for Error { match self { CommandsChannel => write!(f, "commands channel unexpectedly closed"), MessageChannel => write!(f, "message channel unexpectedly closed"), - ValidatorNetwork => write!(f, "validator network unexpectedly done"), + CliqueNetwork => write!(f, "validator network unexpectedly done"), GossipNetwork(e) => write!(f, "gossip network unexpectedly done: {}", e), } } @@ -595,18 +595,18 @@ impl< D: Data, M: Data + Debug, A: AddressingInformation + TryFrom> + Into>, - VN: ValidatorNetwork>, + CN: CliqueNetwork>, GN: GossipNetwork>, - > IO + > IO where A::PeerId: PublicKey, { pub fn new( commands_from_user: mpsc::UnboundedReceiver>, messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>, - validator_network: VN, + validator_network: CN, gossip_network: GN, - ) -> IO { + ) -> IO { IO { commands_from_user, messages_from_user, @@ -708,7 +708,7 @@ where SendError::NoSession => trace!(target: "aleph-network", "Received message for unknown session."), } }, - None => return Err(Error::ValidatorNetwork), + None => return Err(Error::CliqueNetwork), } }, maybe_authentication = self.gossip_network.next() => { @@ -742,11 +742,11 @@ mod tests { use super::{Config, SendError, Service, ServiceActions, SessionCommand}; use crate::{ network::{ + clique::mock::{random_address, MockAddressingInformation}, manager::{compatibility::PeerAuthentications, DataInSession, DiscoveryMessage}, mock::crypto_basics, ConnectionCommand, }, - testing::mocks::validator_network::{random_address, MockAddressingInformation}, Recipient, SessionId, }; diff --git a/finality-aleph/src/network/manager/session.rs b/finality-aleph/src/network/manager/session.rs index cf5b204ada..009786f946 100644 --- a/finality-aleph/src/network/manager/session.rs +++ b/finality-aleph/src/network/manager/session.rs @@ -273,11 +273,11 @@ mod tests { use super::{Handler, HandlerError}; use crate::{ network::{ + clique::mock::{random_address, random_invalid_address}, mock::crypto_basics, testing::{authentication, legacy_authentication}, AddressingInformation, }, - testing::mocks::validator_network::{random_address, random_invalid_address}, NodeIndex, SessionId, }; diff --git a/finality-aleph/src/network/mock.rs b/finality-aleph/src/network/mock.rs index 30c3aa56c9..dff81e2e46 100644 --- a/finality-aleph/src/network/mock.rs +++ b/finality-aleph/src/network/mock.rs @@ -1,6 +1,7 @@ use std::{sync::Arc, time::Duration}; use aleph_primitives::KEY_TYPE; +use codec::{Decode, Encode, Output}; use futures::{channel::mpsc, StreamExt}; use sp_keystore::{testing::KeyStore, CryptoStore}; use tokio::time::timeout; @@ -10,7 +11,64 @@ use crate::{ AuthorityId, NodeIndex, }; -pub type MockData = Vec; +#[derive(Hash, Debug, Clone, PartialEq, Eq)] +pub struct MockData { + data: u32, + filler: Vec, + decodes: bool, +} + +impl MockData { + pub fn new(data: u32, filler_size: usize) -> MockData { + MockData { + data, + filler: vec![0; filler_size], + decodes: true, + } + } + + pub fn new_undecodable(data: u32, filler_size: usize) -> MockData { + MockData { + data, + filler: vec![0; filler_size], + decodes: false, + } + } + + pub fn data(&self) -> u32 { + self.data + } +} + +impl Encode for MockData { + fn size_hint(&self) -> usize { + self.data.size_hint() + self.filler.size_hint() + self.decodes.size_hint() + } + + fn encode_to(&self, dest: &mut T) { + // currently this is exactly the default behaviour, but we still + // need it here to make sure that decode works in the future + self.data.encode_to(dest); + self.filler.encode_to(dest); + self.decodes.encode_to(dest); + } +} + +impl Decode for MockData { + fn decode(value: &mut I) -> Result { + let data = u32::decode(value)?; + let filler = Vec::::decode(value)?; + let decodes = bool::decode(value)?; + if !decodes { + return Err("Simulated decode failure.".into()); + } + Ok(Self { + data, + filler, + decodes, + }) + } +} #[derive(Clone)] pub struct Channel( diff --git a/finality-aleph/src/network/mod.rs b/finality-aleph/src/network/mod.rs index dda72e4486..6111f2e4b7 100644 --- a/finality-aleph/src/network/mod.rs +++ b/finality-aleph/src/network/mod.rs @@ -8,6 +8,7 @@ use codec::Codec; use sp_api::NumberFor; use sp_runtime::traits::Block; +pub mod clique; pub mod data; mod gossip; mod io; @@ -16,6 +17,7 @@ mod manager; pub mod mock; mod session; mod substrate; +pub mod tcp; pub use gossip::{Network as GossipNetwork, Protocol, Service as GossipService}; pub use io::setup as setup_io; @@ -29,13 +31,13 @@ pub use substrate::protocol_name; pub mod testing { use super::manager::LegacyAuthentication; pub use super::{ + clique::mock::MockAddressingInformation, gossip::mock::{MockEvent, MockRawNetwork}, manager::{ Authentication, DataInSession, DiscoveryMessage, LegacyDiscoveryMessage, PeerAuthentications, SessionHandler, VersionedAuthentication, }, }; - use crate::testing::mocks::validator_network::MockAddressingInformation; pub fn legacy_authentication( handler: &SessionHandler, diff --git a/finality-aleph/src/tcp_network.rs b/finality-aleph/src/network/tcp.rs similarity index 96% rename from finality-aleph/src/tcp_network.rs rename to finality-aleph/src/network/tcp.rs index e5a77727de..3c720c2038 100644 --- a/finality-aleph/src/tcp_network.rs +++ b/finality-aleph/src/network/tcp.rs @@ -11,8 +11,10 @@ use tokio::net::{ use crate::{ crypto::{verify, AuthorityPen, Signature}, - network::{AddressingInformation, NetworkIdentity, PeerId}, - validator_network::{ConnectionInfo, Dialer, Listener, PublicKey, SecretKey, Splittable}, + network::{ + clique::{ConnectionInfo, Dialer, Listener, PublicKey, SecretKey, Splittable, LOG_TARGET}, + AddressingInformation, NetworkIdentity, PeerId, + }, }; pub const KEY_TYPE: KeyTypeId = KeyTypeId(*b"a0vn"); @@ -62,7 +64,7 @@ impl Listener for TcpListener { async fn accept(&mut self) -> Result { let stream = TcpListener::accept(self).await.map(|(stream, _)| stream)?; if stream.set_linger(None).is_err() { - info!(target: "validator-network", "stream.set_linger(None) failed."); + info!(target: LOG_TARGET, "stream.set_linger(None) failed."); }; Ok(stream) } @@ -269,7 +271,7 @@ impl Dialer for TcpDialer { .collect(); let stream = TcpStream::connect(&parsed_addresses[..]).await?; if stream.set_linger(None).is_err() { - info!(target: "validator-network", "stream.set_linger(None) failed."); + info!(target: LOG_TARGET, "stream.set_linger(None) failed."); }; Ok(stream) } diff --git a/finality-aleph/src/nodes/validator_node.rs b/finality-aleph/src/nodes/validator_node.rs index 81b651b44e..50fb08f597 100644 --- a/finality-aleph/src/nodes/validator_node.rs +++ b/finality-aleph/src/nodes/validator_node.rs @@ -12,7 +12,10 @@ use sp_runtime::traits::Block; use crate::{ crypto::AuthorityPen, network::{ - setup_io, ConnectionManager, ConnectionManagerConfig, GossipService, SessionManager, + clique::Service, + setup_io, + tcp::{new_tcp_network, KEY_TYPE}, + ConnectionManager, ConnectionManagerConfig, GossipService, SessionManager, }, nodes::{setup_justification_handler, JustificationParams}, party::{ @@ -21,8 +24,6 @@ use crate::{ ConsensusParty, ConsensusPartyParams, }, session_map::{AuthorityProviderImpl, FinalityNotificatorImpl, SessionMapUpdater}, - tcp_network::{new_tcp_network, KEY_TYPE}, - validator_network::Service, AlephConfig, BlockchainBackend, }; diff --git a/finality-aleph/src/testing/clique_network.rs b/finality-aleph/src/testing/clique_network.rs new file mode 100644 index 0000000000..1fef226324 --- /dev/null +++ b/finality-aleph/src/testing/clique_network.rs @@ -0,0 +1,310 @@ +use std::{ + collections::{BTreeMap, HashSet}, + sync::Once, +}; + +use futures::{ + channel::{mpsc, oneshot}, + StreamExt, +}; +use log::info; +use rand::{thread_rng, Rng}; +use sc_service::{SpawnTaskHandle, TaskManager}; +use tokio::{ + runtime::Handle, + time::{error::Elapsed, interval, timeout, Duration}, +}; + +use crate::network::{ + clique::{ + mock::{ + random_keys, Addresses, MockDialer, MockListener, MockPublicKey, MockSecretKey, + UnreliableConnectionMaker, + }, + Network, SecretKey, Service, LOG_TARGET, + }, + mock::MockData, +}; + +const TWICE_MAX_DATA_SIZE: usize = 32 * 1024 * 1024; + +#[allow(clippy::too_many_arguments)] +fn spawn_peer( + secret_key: MockSecretKey, + addr: Addresses, + n_msg: usize, + large_message_interval: Option, + corrupted_message_interval: Option, + dialer: MockDialer, + listener: MockListener, + report: mpsc::UnboundedSender<(MockPublicKey, usize)>, + spawn_handle: SpawnTaskHandle, +) { + let our_id = secret_key.public_key(); + let (service, mut interface) = Service::new(dialer, listener, secret_key, spawn_handle); + // run the service + tokio::spawn(async { + let (_exit, rx) = oneshot::channel(); + service.run(rx).await; + }); + // start connecting with the peers + let mut peer_ids = Vec::with_capacity(addr.len()); + for (id, addrs) in addr.into_iter() { + interface.add_connection(id.clone(), addrs); + peer_ids.push(id); + } + // peer main loop + // we send random messages to random peers + // a message is a number in range 0..n_msg + // we also keep a list of messages received at least once + // on receiving a message we report the total number of distinct messages received so far + // the goal is to receive every message at least once + tokio::spawn(async move { + let mut received: HashSet = HashSet::with_capacity(n_msg); + let mut send_ticker = tokio::time::interval(Duration::from_millis(5)); + let mut counter: usize = 0; + loop { + tokio::select! { + _ = send_ticker.tick() => { + counter += 1; + // generate random message + let filler_size = match large_message_interval { + Some(lmi) if counter % lmi == 0 => TWICE_MAX_DATA_SIZE, + _ => 0, + }; + let data = match corrupted_message_interval { + Some(cmi) if counter % cmi == 0 => MockData::new_undecodable(thread_rng().gen_range(0..n_msg) as u32, filler_size), + _ => MockData::new(thread_rng().gen_range(0..n_msg) as u32, filler_size), + }; + // choose a peer + let peer: MockPublicKey = peer_ids[thread_rng().gen_range(0..peer_ids.len())].clone(); + // send + interface.send(data, peer); + }, + data = interface.next() => { + // receive the message + let data: MockData = data.expect("next should not be closed"); + // mark the message as received, we do not care about sender's identity + received.insert(data.data() as usize); + // report the number of received messages + report.unbounded_send((our_id.clone(), received.len())).expect("should send"); + }, + }; + } + }); +} + +/// Takes O(n log n) rounds to finish, where n = n_peers * n_msg. +async fn scenario( + n_peers: usize, + n_msg: usize, + broken_connection_interval: Option, + large_message_interval: Option, + corrupted_message_interval: Option, + status_report_interval: Duration, +) { + // create spawn_handle, we need to keep the task_manager + let task_manager = + TaskManager::new(Handle::current(), None).expect("should create TaskManager"); + let spawn_handle = task_manager.spawn_handle(); + // create peer identities + info!(target: LOG_TARGET, "generating keys..."); + let keys = random_keys(n_peers); + info!(target: LOG_TARGET, "done"); + // prepare and run the manager + let (mut connection_manager, mut callers, addr) = + UnreliableConnectionMaker::new(keys.keys().cloned().collect()); + tokio::spawn(async move { + connection_manager.run(broken_connection_interval).await; + }); + // channel for receiving status updates from spawned peers + let (tx_report, mut rx_report) = mpsc::unbounded::<(MockPublicKey, usize)>(); + let mut reports: BTreeMap = + keys.keys().cloned().map(|id| (id, 0)).collect(); + // spawn peers + for (id, secret_key) in keys.into_iter() { + let mut addr = addr.clone(); + // do not connect with itself + addr.remove(&secret_key.public_key()); + let (dialer, listener) = callers.remove(&id).expect("should contain all ids"); + spawn_peer( + secret_key, + addr, + n_msg, + large_message_interval, + corrupted_message_interval, + dialer, + listener, + tx_report.clone(), + spawn_handle.clone(), + ); + } + let mut status_ticker = interval(status_report_interval); + loop { + tokio::select! { + maybe_report = rx_report.next() => match maybe_report { + Some((peer_id, peer_n_msg)) => { + reports.insert(peer_id, peer_n_msg); + if reports.values().all(|&x| x == n_msg) { + info!(target: LOG_TARGET, "Peers received {:?} messages out of {}, finishing.", reports.values(), n_msg); + return; + } + }, + None => panic!("should receive"), + }, + _ = status_ticker.tick() => { + info!(target: LOG_TARGET, "Peers received {:?} messages out of {}.", reports.values(), n_msg); + } + }; + } +} + +/// Takes O(n log n) rounds to finish, where n = n_peers * n_msg. +async fn scenario_with_timeout( + n_peers: usize, + n_msg: usize, + broken_connection_interval: Option, + large_message_interval: Option, + corrupted_message_interval: Option, + status_report_interval: Duration, + scenario_timeout: Duration, +) -> Result<(), Elapsed> { + timeout( + scenario_timeout, + scenario( + n_peers, + n_msg, + broken_connection_interval, + large_message_interval, + corrupted_message_interval, + status_report_interval, + ), + ) + .await +} + +static INIT: Once = Once::new(); + +/// Required to capture logs from the tests e.g. by running +/// `RUST_LOG=info cargo test -- --nocapture testing::validator_network` +fn setup() { + // env_logger::init can be called at most once + INIT.call_once(|| { + env_logger::init(); + }); +} + +#[tokio::test(flavor = "multi_thread")] +async fn normal_conditions() { + setup(); + let n_peers: usize = 10; + let n_msg: usize = 30; + let broken_connection_interval: Option = None; + let large_message_interval: Option = None; + let corrupted_message_interval: Option = None; + let status_report_interval: Duration = Duration::from_secs(1); + let timeout: Duration = Duration::from_secs(300); + scenario_with_timeout( + n_peers, + n_msg, + broken_connection_interval, + large_message_interval, + corrupted_message_interval, + status_report_interval, + timeout, + ) + .await + .expect("timeout"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn connections_break() { + setup(); + let n_peers: usize = 10; + let n_msg: usize = 30; + let broken_connection_interval: Option = Some(10); + let large_message_interval: Option = None; + let corrupted_message_interval: Option = None; + let status_report_interval: Duration = Duration::from_secs(1); + let timeout: Duration = Duration::from_secs(300); + scenario_with_timeout( + n_peers, + n_msg, + broken_connection_interval, + large_message_interval, + corrupted_message_interval, + status_report_interval, + timeout, + ) + .await + .expect("timeout"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn large_messages_being_sent() { + setup(); + let n_peers: usize = 10; + let n_msg: usize = 30; + let broken_connection_interval: Option = None; + let large_message_interval: Option = Some(10); + let corrupted_message_interval: Option = None; + let status_report_interval: Duration = Duration::from_secs(1); + let timeout: Duration = Duration::from_secs(300); + scenario_with_timeout( + n_peers, + n_msg, + broken_connection_interval, + large_message_interval, + corrupted_message_interval, + status_report_interval, + timeout, + ) + .await + .expect("timeout"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn corrupted_messages_being_sent() { + setup(); + let n_peers: usize = 10; + let n_msg: usize = 30; + let broken_connection_interval: Option = None; + let large_message_interval: Option = None; + let corrupted_message_interval: Option = Some(10); + let status_report_interval: Duration = Duration::from_secs(1); + let timeout: Duration = Duration::from_secs(300); + scenario_with_timeout( + n_peers, + n_msg, + broken_connection_interval, + large_message_interval, + corrupted_message_interval, + status_report_interval, + timeout, + ) + .await + .expect("timeout"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn everything_fails_all_the_time() { + setup(); + let n_peers: usize = 3; + let n_msg: usize = 10; + let broken_connection_interval: Option = Some(5); + let large_message_interval: Option = Some(7); + let corrupted_message_interval: Option = Some(8); + let status_report_interval: Duration = Duration::from_secs(1); + let timeout: Duration = Duration::from_secs(600); + scenario_with_timeout( + n_peers, + n_msg, + broken_connection_interval, + large_message_interval, + corrupted_message_interval, + status_report_interval, + timeout, + ) + .await + .expect("timeout"); +} diff --git a/finality-aleph/src/testing/mocks/mod.rs b/finality-aleph/src/testing/mocks/mod.rs index dee1ad4e89..fdd4d2a098 100644 --- a/finality-aleph/src/testing/mocks/mod.rs +++ b/finality-aleph/src/testing/mocks/mod.rs @@ -21,4 +21,3 @@ mod justification_handler_config; mod proposal; mod session_info; mod single_action_mock; -pub mod validator_network; diff --git a/finality-aleph/src/testing/mod.rs b/finality-aleph/src/testing/mod.rs index 2c5cc9680d..a247ab5ca1 100644 --- a/finality-aleph/src/testing/mod.rs +++ b/finality-aleph/src/testing/mod.rs @@ -1,6 +1,6 @@ pub mod client_chain_builder; +mod clique_network; mod data_store; mod justification; pub mod mocks; mod network; -mod validator_network; diff --git a/finality-aleph/src/testing/network.rs b/finality-aleph/src/testing/network.rs index 9c2c1bea0c..07806ef495 100644 --- a/finality-aleph/src/testing/network.rs +++ b/finality-aleph/src/testing/network.rs @@ -12,6 +12,10 @@ use tokio::{runtime::Handle, task::JoinHandle, time::timeout}; use crate::{ crypto::{AuthorityPen, AuthorityVerifier}, network::{ + clique::mock::{ + key, random_address_from, MockAddressingInformation, MockNetwork as MockCliqueNetwork, + MockPublicKey, + }, data::Network, mock::{crypto_basics, MockData}, setup_io, @@ -22,10 +26,6 @@ use crate::{ AddressingInformation, ConnectionManager, ConnectionManagerConfig, GossipService, NetworkIdentity, Protocol, SessionManager, }, - testing::mocks::validator_network::{ - random_address_from, MockAddressingInformation, MockNetwork as MockValidatorNetwork, - }, - validator_network::mock::{key, MockPublicKey}, MillisecsPerBlock, NodeIndex, Recipient, SessionId, SessionPeriod, }; @@ -74,7 +74,7 @@ struct TestData { pub authority_verifier: AuthorityVerifier, pub session_manager: SessionManager, pub network: MockRawNetwork, - pub validator_network: MockValidatorNetwork>, + pub validator_network: MockCliqueNetwork>, network_manager_exit_tx: oneshot::Sender<()>, gossip_service_exit_tx: oneshot::Sender<()>, network_manager_handle: JoinHandle<()>, @@ -103,7 +103,7 @@ async fn prepare_one_session_test_data() -> TestData { let (network_manager_exit_tx, network_manager_exit_rx) = oneshot::channel(); let (gossip_service_exit_tx, gossip_service_exit_rx) = oneshot::channel(); let network = MockRawNetwork::new(event_stream_tx); - let validator_network = MockValidatorNetwork::new(); + let validator_network = MockCliqueNetwork::new(); let (gossip_service, gossip_network) = GossipService::new(network.clone(), task_manager.spawn_handle()); @@ -414,7 +414,7 @@ async fn test_connects_to_others() { let mut test_data = prepare_one_session_test_data().await; let mut data_network = test_data.start_session(session_id).await; - let data = vec![1, 2, 3]; + let data = MockData::new(43, 3); test_data.validator_network.next.send(DataInSession { data: data.clone(), session_id: SessionId(session_id), @@ -439,7 +439,7 @@ async fn test_connects_to_others_early_validator() { let mut data_network = test_data.start_validator_session(0, session_id).await; - let data = vec![1, 2, 3]; + let data = MockData::new(43, 3); test_data.validator_network.next.send(DataInSession { data: data.clone(), session_id: SessionId(session_id), @@ -497,10 +497,10 @@ async fn test_receives_data_in_correct_session() { let mut data_network_2 = test_data.start_session(session_id_2).await; - let data_1_1 = vec![1, 2, 3]; - let data_1_2 = vec![4, 5, 6]; - let data_2_1 = vec![7, 8, 9]; - let data_2_2 = vec![10, 11, 12]; + let data_1_1 = MockData::new(43, 3); + let data_1_2 = MockData::new(44, 3); + let data_2_1 = MockData::new(45, 3); + let data_2_2 = MockData::new(46, 3); test_data.validator_network.next.send(DataInSession { data: data_1_1.clone(), session_id: SessionId(session_id_1), @@ -558,8 +558,8 @@ async fn test_sends_data_to_correct_session() { let mut expected_data = HashSet::new(); for node_id in 1..NODES_N { - let data_1 = vec![2 * node_id as u8 - 1]; - let data_2 = vec![2 * node_id as u8]; + let data_1 = MockData::new((node_id - 1) as u32, 1); + let data_2 = MockData::new(node_id as u32, 1); expected_data.insert(( data_1.clone(), @@ -610,8 +610,8 @@ async fn test_broadcasts_data_to_correct_session() { let mut data_network_1 = test_data.start_session(session_id_1).await; let mut data_network_2 = test_data.start_session(session_id_2).await; - let data_1 = vec![1, 2, 3]; - let data_2 = vec![4, 5, 6]; + let data_1 = MockData::new(43, 3); + let data_2 = MockData::new(44, 3); data_network_1 .send(data_1.clone(), Recipient::Everyone) .expect("Should send"); diff --git a/finality-aleph/src/testing/validator_network.rs b/finality-aleph/src/testing/validator_network.rs deleted file mode 100644 index 68a8dcfdca..0000000000 --- a/finality-aleph/src/testing/validator_network.rs +++ /dev/null @@ -1,131 +0,0 @@ -use std::sync::Once; - -use tokio::time::Duration; - -use crate::testing::mocks::validator_network::scenario_with_timeout; - -static INIT: Once = Once::new(); - -/// Required to capture logs from the tests e.g. by running -/// `RUST_LOG=info cargo test -- --nocapture testing::validator_network` -fn setup() { - // env_logger::init can be called at most once - INIT.call_once(|| { - env_logger::init(); - }); -} - -#[tokio::test(flavor = "multi_thread")] -async fn normal_conditions() { - setup(); - let n_peers: usize = 10; - let n_msg: usize = 30; - let broken_connection_interval: Option = None; - let large_message_interval: Option = None; - let corrupted_message_interval: Option = None; - let status_report_interval: Duration = Duration::from_secs(1); - let timeout: Duration = Duration::from_secs(300); - scenario_with_timeout( - n_peers, - n_msg, - broken_connection_interval, - large_message_interval, - corrupted_message_interval, - status_report_interval, - timeout, - ) - .await - .expect("timeout"); -} - -#[tokio::test(flavor = "multi_thread")] -async fn connections_break() { - setup(); - let n_peers: usize = 10; - let n_msg: usize = 30; - let broken_connection_interval: Option = Some(10); - let large_message_interval: Option = None; - let corrupted_message_interval: Option = None; - let status_report_interval: Duration = Duration::from_secs(1); - let timeout: Duration = Duration::from_secs(300); - scenario_with_timeout( - n_peers, - n_msg, - broken_connection_interval, - large_message_interval, - corrupted_message_interval, - status_report_interval, - timeout, - ) - .await - .expect("timeout"); -} - -#[tokio::test(flavor = "multi_thread")] -async fn large_messages_being_sent() { - setup(); - let n_peers: usize = 10; - let n_msg: usize = 30; - let broken_connection_interval: Option = None; - let large_message_interval: Option = Some(10); - let corrupted_message_interval: Option = None; - let status_report_interval: Duration = Duration::from_secs(1); - let timeout: Duration = Duration::from_secs(300); - scenario_with_timeout( - n_peers, - n_msg, - broken_connection_interval, - large_message_interval, - corrupted_message_interval, - status_report_interval, - timeout, - ) - .await - .expect("timeout"); -} - -#[tokio::test(flavor = "multi_thread")] -async fn corrupted_messages_being_sent() { - setup(); - let n_peers: usize = 10; - let n_msg: usize = 30; - let broken_connection_interval: Option = None; - let large_message_interval: Option = None; - let corrupted_message_interval: Option = Some(10); - let status_report_interval: Duration = Duration::from_secs(1); - let timeout: Duration = Duration::from_secs(300); - scenario_with_timeout( - n_peers, - n_msg, - broken_connection_interval, - large_message_interval, - corrupted_message_interval, - status_report_interval, - timeout, - ) - .await - .expect("timeout"); -} - -#[tokio::test(flavor = "multi_thread")] -async fn everything_fails_all_the_time() { - setup(); - let n_peers: usize = 3; - let n_msg: usize = 10; - let broken_connection_interval: Option = Some(5); - let large_message_interval: Option = Some(7); - let corrupted_message_interval: Option = Some(8); - let status_report_interval: Duration = Duration::from_secs(1); - let timeout: Duration = Duration::from_secs(600); - scenario_with_timeout( - n_peers, - n_msg, - broken_connection_interval, - large_message_interval, - corrupted_message_interval, - status_report_interval, - timeout, - ) - .await - .expect("timeout"); -} diff --git a/finality-aleph/src/validator_network/mock.rs b/finality-aleph/src/validator_network/mock.rs deleted file mode 100644 index 37858f5858..0000000000 --- a/finality-aleph/src/validator_network/mock.rs +++ /dev/null @@ -1,155 +0,0 @@ -use std::{ - collections::HashMap, - fmt::{Display, Error as FmtError, Formatter}, - io::Result as IoResult, - pin::Pin, - task::{Context, Poll}, -}; - -use codec::{Decode, Encode}; -use tokio::io::{duplex, AsyncRead, AsyncWrite, DuplexStream, ReadBuf}; - -use crate::{ - network::PeerId, - validator_network::{ConnectionInfo, PeerAddressInfo, PublicKey, SecretKey, Splittable}, -}; - -/// A mock secret key that is able to sign messages. -#[derive(Debug, PartialEq, Eq, Clone, Hash)] -pub struct MockSecretKey([u8; 4]); - -/// A mock public key for verifying signatures. -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Hash, Encode, Decode)] -pub struct MockPublicKey([u8; 4]); - -impl Display for MockPublicKey { - fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { - write!(f, "PublicKey({:?})", self.0) - } -} - -impl AsRef<[u8]> for MockPublicKey { - fn as_ref(&self) -> &[u8] { - self.0.as_ref() - } -} - -/// A mock signature, able to discern whether the correct key has been used to sign a specific -/// message. -#[derive(Debug, PartialEq, Eq, Clone, Hash, Encode, Decode)] -pub struct MockSignature { - message: Vec, - key_id: [u8; 4], -} - -impl PublicKey for MockPublicKey { - type Signature = MockSignature; - - fn verify(&self, message: &[u8], signature: &Self::Signature) -> bool { - (message == signature.message.as_slice()) && (self.0 == signature.key_id) - } -} - -impl PeerId for MockPublicKey {} - -#[async_trait::async_trait] -impl SecretKey for MockSecretKey { - type Signature = MockSignature; - type PublicKey = MockPublicKey; - - async fn sign(&self, message: &[u8]) -> Self::Signature { - MockSignature { - message: message.to_vec(), - key_id: self.0, - } - } - - fn public_key(&self) -> Self::PublicKey { - MockPublicKey(self.0) - } -} - -/// Create a random key pair. -pub fn key() -> (MockPublicKey, MockSecretKey) { - let secret_key = MockSecretKey(rand::random()); - (secret_key.public_key(), secret_key) -} - -/// Create a HashMap with public keys as keys and secret keys as values. -pub fn random_keys(n_peers: usize) -> HashMap { - let mut result = HashMap::with_capacity(n_peers); - while result.len() < n_peers { - let (pk, sk) = key(); - result.insert(pk, sk); - } - result -} - -/// A mock that can be split into two streams. -pub struct MockSplittable { - incoming_data: DuplexStream, - outgoing_data: DuplexStream, -} - -impl MockSplittable { - /// Create a pair of mock splittables connected to each other. - pub fn new(max_buf_size: usize) -> (Self, Self) { - let (in_a, out_b) = duplex(max_buf_size); - let (in_b, out_a) = duplex(max_buf_size); - ( - MockSplittable { - incoming_data: in_a, - outgoing_data: out_a, - }, - MockSplittable { - incoming_data: in_b, - outgoing_data: out_b, - }, - ) - } -} - -impl AsyncRead for MockSplittable { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - Pin::new(&mut self.get_mut().incoming_data).poll_read(cx, buf) - } -} - -impl AsyncWrite for MockSplittable { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - Pin::new(&mut self.get_mut().outgoing_data).poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.get_mut().outgoing_data).poll_flush(cx) - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.get_mut().outgoing_data).poll_shutdown(cx) - } -} - -impl ConnectionInfo for MockSplittable { - fn peer_address_info(&self) -> PeerAddressInfo { - String::from("MOCK_ADDRESS") - } -} - -impl ConnectionInfo for DuplexStream { - fn peer_address_info(&self) -> PeerAddressInfo { - String::from("MOCK_ADDRESS") - } -} - -impl Splittable for MockSplittable { - type Sender = DuplexStream; - type Receiver = DuplexStream; - - fn split(self) -> (Self::Sender, Self::Receiver) { - (self.outgoing_data, self.incoming_data) - } -} diff --git a/scripts/run_nodes.sh b/scripts/run_nodes.sh index f7885ea417..ac3e9d94f0 100755 --- a/scripts/run_nodes.sh +++ b/scripts/run_nodes.sh @@ -101,7 +101,7 @@ run_node() { --validator-port ${validator_port} \ -laleph-party=debug \ -laleph-network=debug \ - -lvalidator-network=debug \ + -lclique-network=debug \ -laleph-finality=debug \ -laleph-justification=debug \ -laleph-data-store=debug \ From 302f1d5af60c7d04f5b22bc49bdbdf93e17fed33 Mon Sep 17 00:00:00 2001 From: timorl Date: Wed, 14 Dec 2022 10:59:06 +0100 Subject: [PATCH 2/5] For manager it is actually the validator network --- finality-aleph/src/network/manager/service.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/finality-aleph/src/network/manager/service.rs b/finality-aleph/src/network/manager/service.rs index 7aeb4f3740..d0d0978bc8 100644 --- a/finality-aleph/src/network/manager/service.rs +++ b/finality-aleph/src/network/manager/service.rs @@ -575,7 +575,7 @@ pub struct IO< pub enum Error { CommandsChannel, MessageChannel, - CliqueNetwork, + ValidatorNetwork, GossipNetwork(GE), } @@ -585,7 +585,7 @@ impl Display for Error { match self { CommandsChannel => write!(f, "commands channel unexpectedly closed"), MessageChannel => write!(f, "message channel unexpectedly closed"), - CliqueNetwork => write!(f, "validator network unexpectedly done"), + ValidatorNetwork => write!(f, "validator network unexpectedly done"), GossipNetwork(e) => write!(f, "gossip network unexpectedly done: {}", e), } } @@ -708,7 +708,7 @@ where SendError::NoSession => trace!(target: "aleph-network", "Received message for unknown session."), } }, - None => return Err(Error::CliqueNetwork), + None => return Err(Error::ValidatorNetwork), } }, maybe_authentication = self.gossip_network.next() => { From aa3b481e2c6797b25edc21bd33d8f1d047abe35a Mon Sep 17 00:00:00 2001 From: timorl Date: Wed, 14 Dec 2022 11:02:33 +0100 Subject: [PATCH 3/5] Stray bad use of validator_network --- finality-aleph/src/testing/clique_network.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/finality-aleph/src/testing/clique_network.rs b/finality-aleph/src/testing/clique_network.rs index 1fef226324..2eecbb9f37 100644 --- a/finality-aleph/src/testing/clique_network.rs +++ b/finality-aleph/src/testing/clique_network.rs @@ -186,7 +186,7 @@ async fn scenario_with_timeout( static INIT: Once = Once::new(); /// Required to capture logs from the tests e.g. by running -/// `RUST_LOG=info cargo test -- --nocapture testing::validator_network` +/// `RUST_LOG=info cargo test -- --nocapture testing::clique_network` fn setup() { // env_logger::init can be called at most once INIT.call_once(|| { From 3099ece2385d36055d63522e509065715435f27c Mon Sep 17 00:00:00 2001 From: timorl Date: Wed, 14 Dec 2022 11:33:06 +0100 Subject: [PATCH 4/5] Stray uses of 'validator' in clique --- finality-aleph/src/network/clique/service.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/finality-aleph/src/network/clique/service.rs b/finality-aleph/src/network/clique/service.rs index a2ec583dff..470ca0898c 100644 --- a/finality-aleph/src/network/clique/service.rs +++ b/finality-aleph/src/network/clique/service.rs @@ -74,7 +74,7 @@ impl Network for ServiceInterface, NL: Listener> where SK::PublicKey: PeerId, @@ -95,7 +95,7 @@ impl, NL: Listener> Servi where SK::PublicKey: PeerId, { - /// Create a new validator network service plus an interface for interacting with it. + /// Create a new clique network service plus an interface for interacting with it. pub fn new( dialer: ND, listener: NL, @@ -303,8 +303,8 @@ where }, // periodically reporting what we are trying to do _ = status_ticker.tick() => { - info!(target: LOG_TARGET, "Validator Network status: {}", self.manager.status_report()); - debug!(target: LOG_TARGET, "Validator Network legacy status: {}", self.legacy_manager.status_report()); + info!(target: LOG_TARGET, "Clique Network status: {}", self.manager.status_report()); + debug!(target: LOG_TARGET, "Clique Network legacy status: {}", self.legacy_manager.status_report()); } // received exit signal, stop the network // all workers will be killed automatically after the manager gets dropped From ed1b0d05db30d595305aa3e043bf744c800e1f56 Mon Sep 17 00:00:00 2001 From: timorl Date: Fri, 16 Dec 2022 09:26:00 +0100 Subject: [PATCH 5/5] Better logging targets --- finality-aleph/src/network/clique/mod.rs | 2 +- finality-aleph/src/network/tcp.rs | 4 +++- finality-aleph/src/testing/clique_network.rs | 4 +++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/finality-aleph/src/network/clique/mod.rs b/finality-aleph/src/network/clique/mod.rs index 34ecd52a55..02e30db490 100644 --- a/finality-aleph/src/network/clique/mod.rs +++ b/finality-aleph/src/network/clique/mod.rs @@ -18,7 +18,7 @@ mod service; pub use crypto::{PublicKey, SecretKey}; pub use service::Service; -pub const LOG_TARGET: &str = "clique-network"; +const LOG_TARGET: &str = "clique-network"; /// Network represents an interface for opening and closing connections with other nodes, /// and sending direct messages between them. diff --git a/finality-aleph/src/network/tcp.rs b/finality-aleph/src/network/tcp.rs index 3c720c2038..98346ff3ed 100644 --- a/finality-aleph/src/network/tcp.rs +++ b/finality-aleph/src/network/tcp.rs @@ -12,11 +12,13 @@ use tokio::net::{ use crate::{ crypto::{verify, AuthorityPen, Signature}, network::{ - clique::{ConnectionInfo, Dialer, Listener, PublicKey, SecretKey, Splittable, LOG_TARGET}, + clique::{ConnectionInfo, Dialer, Listener, PublicKey, SecretKey, Splittable}, AddressingInformation, NetworkIdentity, PeerId, }, }; +const LOG_TARGET: &str = "tcp-network"; + pub const KEY_TYPE: KeyTypeId = KeyTypeId(*b"a0vn"); impl ConnectionInfo for TcpStream { diff --git a/finality-aleph/src/testing/clique_network.rs b/finality-aleph/src/testing/clique_network.rs index 2eecbb9f37..a30c840f3e 100644 --- a/finality-aleph/src/testing/clique_network.rs +++ b/finality-aleph/src/testing/clique_network.rs @@ -21,11 +21,13 @@ use crate::network::{ random_keys, Addresses, MockDialer, MockListener, MockPublicKey, MockSecretKey, UnreliableConnectionMaker, }, - Network, SecretKey, Service, LOG_TARGET, + Network, SecretKey, Service, }, mock::MockData, }; +pub const LOG_TARGET: &str = "clique-network-test"; + const TWICE_MAX_DATA_SIZE: usize = 32 * 1024 * 1024; #[allow(clippy::too_many_arguments)]