diff --git a/bin/node/src/service.rs b/bin/node/src/service.rs index 4d0ad4ed2f..48549f4df0 100644 --- a/bin/node/src/service.rs +++ b/bin/node/src/service.rs @@ -192,10 +192,6 @@ fn setup( ), ServiceError, > { - config - .network - .extra_sets - .push(finality_aleph::peers_set_config(Protocol::Generic)); config .network .extra_sets @@ -246,7 +242,7 @@ fn setup( /// Builds a new service for a full client. pub fn new_authority( - mut config: Configuration, + config: Configuration, aleph_config: AlephCli, ) -> Result { let sc_service::PartialComponents { @@ -259,10 +255,6 @@ pub fn new_authority( transaction_pool, other: (block_import, justification_tx, justification_rx, mut telemetry, metrics), } = new_partial(&config)?; - config - .network - .extra_sets - .push(finality_aleph::peers_set_config(Protocol::Validator)); let backup_path = get_backup_path( &aleph_config, diff --git a/docker/docker-compose.bridged.yml b/docker/docker-compose.bridged.yml index 2ee3ddc132..b1303e4ca4 100644 --- a/docker/docker-compose.bridged.yml +++ b/docker/docker-compose.bridged.yml @@ -6,6 +6,8 @@ services: networks: - main - Node0 + environment: + - PUBLIC_VALIDATOR_ADDRESS=Node0:30343 Node1: extends: @@ -15,6 +17,7 @@ services: - main - Node1 environment: + - PUBLIC_VALIDATOR_ADDRESS=Node1:30344 - BOOT_NODES=/dns4/Node0/tcp/30333/p2p/$BOOTNODE_PEER_ID Node2: @@ -25,6 +28,7 @@ services: - main - Node2 environment: + - PUBLIC_VALIDATOR_ADDRESS=Node2:30345 - BOOT_NODES=/dns4/Node0/tcp/30333/p2p/$BOOTNODE_PEER_ID Node3: @@ -35,6 +39,7 @@ services: - main - Node3 environment: + - PUBLIC_VALIDATOR_ADDRESS=Node3:30346 - BOOT_NODES=/dns4/Node0/tcp/30333/p2p/$BOOTNODE_PEER_ID Node4: @@ -45,6 +50,7 @@ services: - main - Node4 environment: + - PUBLIC_VALIDATOR_ADDRESS=Node4:30347 - BOOT_NODES=/dns4/Node0/tcp/30333/p2p/$BOOTNODE_PEER_ID networks: diff --git a/finality-aleph/src/lib.rs b/finality-aleph/src/lib.rs index 8326fa7bc6..dab38bbbe9 100644 --- a/finality-aleph/src/lib.rs +++ b/finality-aleph/src/lib.rs @@ -83,14 +83,6 @@ pub fn peers_set_config(protocol: Protocol) -> sc_network::config::NonDefaultSet ); config.set_config = match protocol { - // No spontaneous connections, only reserved nodes added by the network logic. - Protocol::Validator => sc_network::config::SetConfig { - in_peers: 0, - out_peers: 0, - reserved_nodes: Vec::new(), - non_reserved_mode: sc_network::config::NonReservedPeerMode::Deny, - }, - Protocol::Generic => sc_network::config::SetConfig::default(), Protocol::Authentication => sc_network::config::SetConfig::default(), }; config diff --git a/finality-aleph/src/network/manager/discovery.rs b/finality-aleph/src/network/manager/discovery.rs index 93bab16160..02b1a22f86 100644 --- a/finality-aleph/src/network/manager/discovery.rs +++ b/finality-aleph/src/network/manager/discovery.rs @@ -5,12 +5,12 @@ use std::{ }; use codec::{Decode, Encode}; -use log::{debug, info, trace, warn}; +use log::{debug, info, trace}; use crate::{ network::{ manager::{Authentication, SessionHandler}, - DataCommand, Multiaddress, Protocol, + DataCommand, Multiaddress, }, NodeIndex, SessionId, }; @@ -33,7 +33,7 @@ impl DiscoveryMessage { } } -/// Handles creating and responding to discovery messages. +/// Handles creating and rebroadcasting discovery messages. pub struct Discovery { cooldown: Duration, last_broadcast: HashMap, @@ -54,16 +54,6 @@ fn authentication_broadcast( ) } -fn response( - authentication: Authentication, - peer_id: M::PeerId, -) -> DiscoveryCommand { - ( - DiscoveryMessage::Authentication(authentication), - DataCommand::SendTo(peer_id, Protocol::Generic), - ) -} - impl Discovery { /// Create a new discovery handler with the given response/broadcast cooldown. pub fn new(cooldown: Duration) -> Self { @@ -122,16 +112,6 @@ impl Discovery { } let node_id = authentication.0.creator(); let mut messages = Vec::new(); - match handler.peer_id(&node_id) { - Some(peer_id) => { - if let Some(handler_authentication) = handler.authentication() { - messages.push(response(handler_authentication, peer_id)); - } - } - None => { - warn!(target: "aleph-network", "Id of correctly authenticated peer not present.") - } - } if self.should_rebroadcast(&node_id) { trace!(target: "aleph-network", "Rebroadcasting {:?}.", authentication); self.last_broadcast.insert(node_id, Instant::now()); @@ -256,7 +236,7 @@ mod tests { } #[tokio::test] - async fn rebroadcasts_responds_and_accepts_addresses() { + async fn rebroadcasts_and_accepts_addresses() { let (mut discovery, mut handlers, _) = build().await; let authentication = handlers[1].authentication().unwrap(); let handler = &mut handlers[0]; @@ -265,19 +245,15 @@ mod tests { handler, ); assert_eq!(addresses, authentication.0.addresses()); - assert_eq!(commands.len(), 2); + assert_eq!(commands.len(), 1); assert!(commands.iter().any(|command| matches!(command, ( DiscoveryMessage::AuthenticationBroadcast(rebroadcast_authentication), DataCommand::Broadcast, ) if rebroadcast_authentication == &authentication))); - assert!(commands.iter().any(|command| matches!(command, ( - DiscoveryMessage::Authentication(authentication), - DataCommand::SendTo(_, _), - ) if *authentication == handler.authentication().unwrap()))); } #[tokio::test] - async fn non_validators_rebroadcasts_responds() { + async fn non_validators_rebroadcasts() { let (mut discovery, handlers, mut non_validator) = build().await; let authentication = handlers[1].authentication().unwrap(); let (addresses, commands) = discovery.handle_message( @@ -293,7 +269,7 @@ mod tests { } #[tokio::test] - async fn does_not_rebroadcast_nor_respond_to_wrong_authentications() { + async fn does_not_rebroadcast_wrong_authentications() { let (mut discovery, mut handlers, _) = build().await; let (auth_data, _) = handlers[1].authentication().unwrap(); let (_, signature) = handlers[2].authentication().unwrap(); @@ -307,31 +283,6 @@ mod tests { assert!(commands.is_empty()); } - #[tokio::test] - async fn does_not_rebroadcast_quickly_but_still_responds() { - let (mut discovery, mut handlers, _) = build().await; - let authentication = handlers[1].authentication().unwrap(); - let handler = &mut handlers[0]; - discovery.handle_message( - DiscoveryMessage::AuthenticationBroadcast(authentication.clone()), - handler, - ); - let (addresses, commands) = discovery.handle_message( - DiscoveryMessage::AuthenticationBroadcast(authentication.clone()), - handler, - ); - assert_eq!(addresses.len(), authentication.0.addresses().len()); - assert_eq!( - addresses[0].encode(), - authentication.0.addresses()[0].encode() - ); - assert_eq!(commands.len(), 1); - assert!(matches!(&commands[0], ( - DiscoveryMessage::Authentication(authentication), - DataCommand::SendTo(_, _), - ) if *authentication == handler.authentication().unwrap())); - } - #[tokio::test] async fn rebroadcasts_after_cooldown() { let (mut discovery, mut handlers, _) = build().await; diff --git a/finality-aleph/src/network/manager/mod.rs b/finality-aleph/src/network/manager/mod.rs index e2c7b3a83c..d7ff87df50 100644 --- a/finality-aleph/src/network/manager/mod.rs +++ b/finality-aleph/src/network/manager/mod.rs @@ -52,7 +52,7 @@ pub type Authentication = (AuthData, Signature); /// The order of the data and session_id is fixed in encode and the decode expects it to be data, session_id. /// Since data is versioned, i.e. it's encoding starts with a version number in the standardized way, /// this will allow us to retrofit versioning here if we ever need to change this structure. -#[derive(Clone)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct DataInSession { pub data: D, pub session_id: SessionId, diff --git a/finality-aleph/src/network/manager/service.rs b/finality-aleph/src/network/manager/service.rs index da1bdb597b..ea8bdadd8f 100644 --- a/finality-aleph/src/network/manager/service.rs +++ b/finality-aleph/src/network/manager/service.rs @@ -19,7 +19,7 @@ use crate::{ Connections, Discovery, DiscoveryMessage, NetworkData, SessionHandler, SessionHandlerError, }, - ConnectionCommand, Data, DataCommand, Multiaddress, NetworkIdentity, PeerId, Protocol, + ConnectionCommand, Data, DataCommand, Multiaddress, NetworkIdentity, PeerId, }, MillisecsPerBlock, NodeIndex, SessionId, SessionPeriod, STATUS_REPORT_INTERVAL, }; @@ -448,22 +448,12 @@ impl Service { Recipient::Everyone => (0..handler.node_count().0) .map(NodeIndex) .flat_map(|node_id| handler.peer_id(&node_id)) - .map(|peer_id| { - ( - to_send.clone(), - DataCommand::SendTo(peer_id, Protocol::Validator), - ) - }) + .map(|peer_id| (to_send.clone(), DataCommand::SendTo(peer_id))) .collect(), Recipient::Node(node_id) => handler .peer_id(&node_id) .into_iter() - .map(|peer_id| { - ( - to_send.clone(), - DataCommand::SendTo(peer_id, Protocol::Validator), - ) - }) + .map(|peer_id| (to_send.clone(), DataCommand::SendTo(peer_id))) .collect(), } } else { @@ -782,7 +772,7 @@ mod tests { network::{ manager::{DiscoveryMessage, NetworkData}, mock::{crypto_basics, MockNetworkIdentity}, - ConnectionCommand, DataCommand, Protocol, + ConnectionCommand, DataCommand, }, Recipient, SessionId, }; @@ -934,13 +924,10 @@ mod tests { addresses.into_iter().collect() )) ); - assert_eq!(data.len(), 2); + assert_eq!(data.len(), 1); assert!(data .iter() .any(|(_, command)| command == &DataCommand::Broadcast)); - assert!(data - .iter() - .any(|(_, command)| matches!(command, &DataCommand::SendTo(_, _)))); } #[tokio::test] @@ -975,10 +962,7 @@ mod tests { let messages = service.on_user_message(2137, session_id, Recipient::Everyone); assert_eq!(messages.len(), 1); let (network_data, data_command) = &messages[0]; - assert!(matches!( - data_command, - DataCommand::SendTo(_, Protocol::Validator) - )); + assert!(matches!(data_command, DataCommand::SendTo(_))); assert_eq!(network_data, &NetworkData::Data(2137, session_id)); } } diff --git a/finality-aleph/src/network/mock.rs b/finality-aleph/src/network/mock.rs index 6817de8251..2ac4283397 100644 --- a/finality-aleph/src/network/mock.rs +++ b/finality-aleph/src/network/mock.rs @@ -2,6 +2,7 @@ use std::{ collections::{HashSet, VecDeque}, fmt, sync::Arc, + time::Duration, }; use aleph_primitives::KEY_TYPE; @@ -14,6 +15,7 @@ use futures::{ use parking_lot::Mutex; use rand::random; use sp_keystore::{testing::KeyStore, CryptoStore}; +use tokio::time::timeout; use crate::{ crypto::{AuthorityPen, AuthorityVerifier}, @@ -106,6 +108,8 @@ pub struct Channel( pub Arc>>, ); +const TIMEOUT_FAIL: Duration = Duration::from_secs(10); + impl Channel { pub fn new() -> Self { let (tx, rx) = mpsc::unbounded(); @@ -117,7 +121,19 @@ impl Channel { } pub async fn next(&mut self) -> Option { - self.1.lock().await.next().await + timeout(TIMEOUT_FAIL, self.1.lock().await.next()) + .await + .ok() + .flatten() + } + + pub async fn take(&mut self, n: usize) -> Vec { + timeout( + TIMEOUT_FAIL, + self.1.lock().await.by_ref().take(n).collect::>(), + ) + .await + .unwrap_or(Vec::new()) } pub async fn try_next(&self) -> Option { @@ -142,38 +158,24 @@ pub type MockData = Vec; type MessageForUser = (NetworkData, DataCommand<::PeerId>); type NetworkServiceIO = NetworkIO, M>; -pub struct MockIO { +pub struct MockIO { pub messages_for_user: mpsc::UnboundedSender>, pub messages_from_user: mpsc::UnboundedReceiver>, pub commands_for_manager: mpsc::UnboundedSender>, - pub legacy_messages_for_user: mpsc::UnboundedSender>, - pub legacy_messages_from_user: mpsc::UnboundedReceiver>, - pub legacy_commands_for_manager: mpsc::UnboundedSender>, } -impl MockIO { - pub fn new() -> (MockIO, NetworkServiceIO, NetworkServiceIO) { +impl MockIO { + pub fn new() -> (MockIO, NetworkServiceIO) { let (mock_messages_for_user, messages_from_user) = mpsc::unbounded(); let (messages_for_user, mock_messages_from_user) = mpsc::unbounded(); let (mock_commands_for_manager, commands_from_manager) = mpsc::unbounded(); - let (legacy_mock_messages_for_user, legacy_messages_from_user) = mpsc::unbounded(); - let (legacy_messages_for_user, legacy_mock_messages_from_user) = mpsc::unbounded(); - let (legacy_mock_commands_for_manager, legacy_commands_from_manager) = mpsc::unbounded(); ( MockIO { messages_for_user: mock_messages_for_user, messages_from_user: mock_messages_from_user, commands_for_manager: mock_commands_for_manager, - legacy_messages_for_user: legacy_mock_messages_for_user, - legacy_messages_from_user: legacy_mock_messages_from_user, - legacy_commands_for_manager: legacy_mock_commands_for_manager, }, NetworkServiceIO::new(messages_from_user, messages_for_user, commands_from_manager), - NetworkServiceIO::new( - legacy_messages_from_user, - legacy_messages_for_user, - legacy_commands_from_manager, - ), ) } } diff --git a/finality-aleph/src/network/mod.rs b/finality-aleph/src/network/mod.rs index 91dc680378..5862a55341 100644 --- a/finality-aleph/src/network/mod.rs +++ b/finality-aleph/src/network/mod.rs @@ -70,13 +70,9 @@ pub trait Multiaddress: Debug + Hash + Codec + Clone + Eq + Send + Sync { fn add_matching_peer_id(self, peer_id: Self::PeerId) -> Option; } -/// The Generic protocol is used for validator discovery. -/// The Validator protocol is used for validator-specific messages, i.e. ones needed for -/// finalization. +/// The Authentication protocol is used for validator discovery. #[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)] pub enum Protocol { - Generic, - Validator, Authentication, } @@ -165,7 +161,7 @@ pub trait RequestBlocks: Clone + Send + Sync + 'static { #[derive(Debug, PartialEq, Eq, Clone)] pub enum DataCommand { Broadcast, - SendTo(PID, Protocol), + SendTo(PID), } /// Commands for manipulating the reserved peers set. diff --git a/finality-aleph/src/network/service.rs b/finality-aleph/src/network/service.rs index ca7bf71e10..6028945cbd 100644 --- a/finality-aleph/src/network/service.rs +++ b/finality-aleph/src/network/service.rs @@ -17,7 +17,7 @@ use crate::{ network::{ manager::{NetworkData, VersionedAuthentication}, ConnectionCommand, Data, DataCommand, Event, EventStream, Multiaddress, Network, - NetworkSender, PeerId, Protocol, + NetworkSender, Protocol, }, validator_network::Network as ValidatorNetwork, STATUS_REPORT_INTERVAL, @@ -37,7 +37,6 @@ type MessageFromUser = (NetworkData, DataCommand< pub struct Service< N: Network, D: Data, - LD: Data, A: Data + Multiaddress, VN: ValidatorNetwork>, > { @@ -46,20 +45,9 @@ pub struct Service< messages_from_user: mpsc::UnboundedReceiver>, messages_for_user: mpsc::UnboundedSender>, commands_from_manager: mpsc::UnboundedReceiver>, - // In future these legacy senders and receiver will be removed - legacy_messages_from_user: mpsc::UnboundedReceiver<(LD, DataCommand)>, - legacy_messages_for_user: mpsc::UnboundedSender, - legacy_commands_from_manager: mpsc::UnboundedReceiver>, - legacy_generic_connected_peers: HashSet, - legacy_validator_connected_peers: HashSet, authentication_connected_peers: HashSet, - // For now we need to use `Vec` here. - // This is needed for backward compatibility with old network. - // This can be changed back to `Data` once Legacy Network is removed. - // In future this will be changed to somethig like `AuthenticationData`. - legacy_generic_peer_senders: HashMap>>, - legacy_validator_peer_senders: HashMap>>, - authentication_peer_senders: HashMap>>, + authentication_peer_senders: + HashMap>>, spawn_handle: SpawnTaskHandle, } @@ -90,42 +78,27 @@ enum SendError { SendingFailed, } -#[derive(Debug)] -enum SendToUserError { - LegacySender, - LatestSender, -} - impl< N: Network, D: Data, - LD: Data, A: Data + Multiaddress, VN: ValidatorNetwork>, - > Service + > Service { pub fn new( network: N, validator_network: VN, spawn_handle: SpawnTaskHandle, io: IO, A>, - legacy_io: IO, - ) -> Service { + ) -> Service { Service { network, validator_network, messages_from_user: io.messages_from_user, messages_for_user: io.messages_for_user, commands_from_manager: io.commands_from_manager, - legacy_messages_from_user: legacy_io.messages_from_user, - legacy_messages_for_user: legacy_io.messages_for_user, - legacy_commands_from_manager: legacy_io.commands_from_manager, spawn_handle, - legacy_generic_connected_peers: HashSet::new(), - legacy_validator_connected_peers: HashSet::new(), authentication_connected_peers: HashSet::new(), - legacy_generic_peer_senders: HashMap::new(), - legacy_validator_peer_senders: HashMap::new(), authentication_peer_senders: HashMap::new(), } } @@ -134,10 +107,8 @@ impl< &mut self, peer: &N::PeerId, protocol: Protocol, - ) -> Option<&mut TracingUnboundedSender>> { + ) -> Option<&mut TracingUnboundedSender>> { match protocol { - Protocol::Generic => self.legacy_generic_peer_senders.get_mut(peer), - Protocol::Validator => self.legacy_validator_peer_senders.get_mut(peer), Protocol::Authentication => self.authentication_peer_senders.get_mut(peer), } } @@ -145,7 +116,7 @@ impl< fn peer_sender( &self, peer_id: N::PeerId, - mut receiver: TracingUnboundedReceiver>, + mut receiver: TracingUnboundedReceiver>, protocol: Protocol, ) -> impl Future + Send + 'static { let network = self.network.clone(); @@ -164,7 +135,7 @@ impl< } } }; - if let Err(e) = s.send(data).await { + if let Err(e) = s.send(data.encode()).await { debug!(target: "aleph-network", "Failed sending data to peer. Dropping sender and message: {}", e); sender = None; } @@ -178,7 +149,7 @@ impl< fn send_to_peer( &mut self, - data: Vec, + data: VersionedAuthentication, peer: N::PeerId, protocol: Protocol, ) -> Result<(), SendError> { @@ -200,11 +171,9 @@ impl< } } - fn broadcast(&mut self, data: Vec, protocol: Protocol) { + fn broadcast(&mut self, data: VersionedAuthentication, protocol: Protocol) { let peers = match protocol { // Validator protocol will never broadcast. - Protocol::Validator => HashSet::new(), - Protocol::Generic => self.legacy_generic_connected_peers.clone(), Protocol::Authentication => self.authentication_connected_peers.clone(), }; for peer in peers { @@ -219,41 +188,22 @@ impl< fn handle_network_event( &mut self, event: Event, - ) -> Result<(), SendToUserError> { + ) -> Result<(), mpsc::TrySendError>> { use Event::*; match event { Connected(multiaddress) => { trace!(target: "aleph-network", "Connected event from address {:?}", multiaddress); - self.network.add_reserved( - iter::once(multiaddress.clone()).collect(), - Protocol::Generic, - ); self.network .add_reserved(iter::once(multiaddress).collect(), Protocol::Authentication); } Disconnected(peer) => { trace!(target: "aleph-network", "Disconnected event for peer {:?}", peer); - self.network - .remove_reserved(iter::once(peer.clone()).collect(), Protocol::Generic); self.network .remove_reserved(iter::once(peer).collect(), Protocol::Authentication); } StreamOpened(peer, protocol) => { trace!(target: "aleph-network", "StreamOpened event for peer {:?} and the protocol {:?}.", peer, protocol); let rx = match &protocol { - Protocol::Generic => { - let (tx, rx) = tracing_unbounded("mpsc_notification_stream_legacy_generic"); - self.legacy_generic_connected_peers.insert(peer.clone()); - self.legacy_generic_peer_senders.insert(peer.clone(), tx); - rx - } - Protocol::Validator => { - let (tx, rx) = - tracing_unbounded("mpsc_notification_stream_legacy_validator"); - self.legacy_validator_connected_peers.insert(peer.clone()); - self.legacy_validator_peer_senders.insert(peer.clone(), tx); - rx - } Protocol::Authentication => { let (tx, rx) = tracing_unbounded("mpsc_notification_stream_authentication"); self.authentication_connected_peers.insert(peer.clone()); @@ -270,14 +220,6 @@ impl< StreamClosed(peer, protocol) => { trace!(target: "aleph-network", "StreamClosed event for peer {:?} and protocol {:?}", peer, protocol); match protocol { - Protocol::Generic => { - self.legacy_generic_connected_peers.remove(&peer); - self.legacy_generic_peer_senders.remove(&peer); - } - Protocol::Validator => { - self.legacy_validator_connected_peers.remove(&peer); - self.legacy_validator_peer_senders.remove(&peer); - } Protocol::Authentication => { self.authentication_connected_peers.remove(&peer); self.authentication_peer_senders.remove(&peer); @@ -287,32 +229,11 @@ impl< Messages(messages) => { for (protocol, data) in messages.into_iter() { match protocol { - Protocol::Generic => match LD::decode(&mut &data[..]) { - Ok(data) => self - .legacy_messages_for_user - .unbounded_send(data) - .map_err(|_| SendToUserError::LegacySender)?, - Err(e) => { - warn!(target: "aleph-network", "Error decoding legacy generic protocol message: {}", e) - } - }, - Protocol::Validator => match LD::decode(&mut &data[..]) { - Ok(data) => self - .legacy_messages_for_user - .unbounded_send(data) - .map_err(|_| SendToUserError::LegacySender)?, - Err(e) => { - warn!(target: "aleph-network", "Error decoding legacy validator protocol message: {}", e) - } - }, Protocol::Authentication => { match VersionedAuthentication::::decode(&mut &data[..]) .map(|a| a.try_into()) { - Ok(Ok(data)) => self - .messages_for_user - .unbounded_send(data) - .map_err(|_| SendToUserError::LatestSender)?, + Ok(Ok(data)) => self.messages_for_user.unbounded_send(data)?, Ok(Err(e)) => { warn!(target: "aleph-network", "Error decoding authentication protocol message: {}", e) } @@ -353,17 +274,6 @@ impl< } } - /// This will be removed in the future - fn legacy_on_manager_command(&mut self, command: ConnectionCommand) { - use ConnectionCommand::*; - match command { - AddReserved(addresses) => { - self.network.add_reserved(addresses, Protocol::Validator); - } - DelReserved(peers) => self.network.remove_reserved(peers, Protocol::Validator), - } - } - fn on_user_message(&mut self, data: NetworkData, command: DataCommand) { use DataCommand::*; @@ -371,8 +281,8 @@ impl< NetworkData::Meta(discovery_message) => { let data: VersionedAuthentication = discovery_message.into(); match command { - Broadcast => self.broadcast(data.encode(), Protocol::Authentication), - SendTo(_, _) => { + Broadcast => self.broadcast(data, Protocol::Authentication), + SendTo(_) => { // We ignore this for now. Sending Meta messages to peer is an optimization done for the sake of tests. } } @@ -382,7 +292,7 @@ impl< Broadcast => { // We ignore this for now. AlephBFT does not broadcast data. } - SendTo(peer, _) => self + SendTo(peer) => self .validator_network .send(DataInSession { data, session_id }, peer), } @@ -390,19 +300,6 @@ impl< } } - /// This will be removed in the future - fn legacy_on_user_message(&mut self, data: LD, command: DataCommand) { - use DataCommand::*; - match command { - Broadcast => self.broadcast(data.encode(), Protocol::Generic), - SendTo(peer, protocol) => { - if let Err(e) = self.send_to_peer(data.encode(), peer.clone(), protocol) { - trace!(target: "aleph-network", "Failed to send data to peer{:?} via protocol {:?}, {:?}", peer, protocol, e); - } - } - } - } - fn status_report(&self) { let mut status = String::from("Network status report: "); @@ -411,23 +308,6 @@ impl< self.authentication_connected_peers.len() )); - status.push_str(&format!( - "generic connected peers - {:?}; ", - self.legacy_generic_connected_peers.len() - )); - - let peer_ids = self - .legacy_validator_connected_peers - .iter() - .map(|peer_id| peer_id.to_short_string()) - .collect::>() - .join(", "); - status.push_str(&format!( - "validator connected peers - {:?} [{}]; ", - self.legacy_validator_connected_peers.len(), - peer_ids, - )); - info!(target: "aleph-network", "{}", status); } @@ -439,10 +319,7 @@ impl< tokio::select! { maybe_event = events_from_network.next_event() => match maybe_event { Some(event) => if let Err(e) = self.handle_network_event(event) { - match e { - SendToUserError::LegacySender => error!(target: "aleph-network", "Cannot forward messages to user through legacy sender: {:?}", e), - SendToUserError::LatestSender => error!(target: "aleph-network", "Cannot forward messages to user: {:?}", e), - }; + error!(target: "aleph-network", "Cannot forward messages to user: {:?}", e); return; }, None => { @@ -473,20 +350,6 @@ impl< return; } }, - maybe_message = self.legacy_messages_from_user.next() => match maybe_message { - Some((data, command)) => self.legacy_on_user_message(data, command), - None => { - error!(target: "aleph-network", "Legacy user message stream ended."); - return; - } - }, - maybe_command = self.legacy_commands_from_manager.next() => match maybe_command { - Some(command) => self.legacy_on_manager_command(command), - None => { - error!(target: "aleph-network", "Legacy manager command stream ended."); - return; - } - }, _ = status_ticker.tick() => { self.status_report(); }, @@ -507,18 +370,20 @@ mod tests { use super::{ConnectionCommand, DataCommand, Service}; use crate::{ network::{ - manager::DataInSession, + manager::{DataInSession, SessionHandler, VersionedAuthentication}, mock::{ - MockData, MockEvent, MockIO, MockMultiaddress as LegacyMockMultiaddress, - MockNetwork, MockNetworkIdentity, MockPeerId, MockSenderError, + crypto_basics, MockData, MockEvent, MockIO, + MockMultiaddress as MockAuthMultiaddress, MockNetwork, + MockPeerId as MockAuthPeerId, MockSenderError, }, - testing::NetworkData, + testing::{DiscoveryMessage, NetworkData}, NetworkIdentity, Protocol, }, - session::SessionId, + // session::SessionId, testing::mocks::validator_network::{ - MockMultiaddress, MockNetwork as MockValidatorNetwork, + random_authority_id, MockMultiaddress, MockNetwork as MockValidatorNetwork, }, + SessionId, }; pub struct TestData { @@ -526,7 +391,7 @@ mod tests { pub exit_tx: oneshot::Sender<()>, pub network: MockNetwork, pub validator_network: MockValidatorNetwork>, - pub mock_io: MockIO, + pub mock_io: MockIO, // `TaskManager` can't be dropped for `SpawnTaskHandle` to work _task_manager: TaskManager, } @@ -536,7 +401,7 @@ mod tests { let task_manager = TaskManager::new(Handle::current(), None).unwrap(); // Prepare communication with service - let (mock_io, io, legacy_io) = MockIO::new(); + let (mock_io, io) = MockIO::new(); // Prepare service let (event_stream_oneshot_tx, event_stream_oneshot_rx) = oneshot::channel(); let network = MockNetwork::new(event_stream_oneshot_tx); @@ -546,7 +411,6 @@ mod tests { validator_network.clone(), task_manager.spawn_handle(), io, - legacy_io, ); let (exit_tx, exit_rx) = oneshot::channel(); let task_handle = async move { @@ -574,11 +438,12 @@ mod tests { self.exit_tx.send(()).unwrap(); self.service_handle.await.unwrap(); self.network.close_channels().await; + self.validator_network.close_channels().await; } // We do this only to make sure that NotificationStreamOpened/NotificationStreamClosed events are handled async fn wait_for_events_handled(&mut self) { - let address = LegacyMockMultiaddress::random_with_id(MockPeerId::random()); + let address = MockAuthMultiaddress::random_with_id(MockAuthPeerId::random()); self.network .emit_event(MockEvent::Connected(address.clone())); assert_eq!( @@ -587,25 +452,43 @@ mod tests { .next() .await .expect("Should receive message"), - (iter::once(address).collect(), Protocol::Generic,) + (iter::once(address).collect(), Protocol::Authentication,) ); } } - fn message(i: u8) -> NetworkData { - NetworkData::Data(vec![i, i + 1, i + 2], SessionId(1)) + fn message(i: u8) -> DataInSession { + DataInSession { + data: vec![i, i + 1, i + 2], + session_id: SessionId(1), + } + } + + async fn authentication( + multiaddresses: Vec, + ) -> DiscoveryMessage { + let crypto_basics = crypto_basics(1).await; + let handler = SessionHandler::new( + Some(crypto_basics.0[0].clone()), + crypto_basics.1.clone(), + SessionId(43), + multiaddresses, + ) + .await + .unwrap(); + DiscoveryMessage::AuthenticationBroadcast(handler.authentication().unwrap()) } #[tokio::test] async fn test_sync_connected() { let mut test_data = TestData::prepare().await; - let address = LegacyMockMultiaddress::random_with_id(MockPeerId::random()); + let address = MockAuthMultiaddress::random_with_id(MockAuthPeerId::random()); test_data .network .emit_event(MockEvent::Connected(address.clone())); - let expected = (iter::once(address).collect(), Protocol::Generic); + let expected = (iter::once(address).collect(), Protocol::Authentication); assert_eq!( test_data @@ -624,13 +507,13 @@ mod tests { async fn test_sync_disconnected() { let mut test_data = TestData::prepare().await; - let peer_id = MockPeerId::random(); + let peer_id = MockAuthPeerId::random(); test_data .network .emit_event(MockEvent::Disconnected(peer_id)); - let expected = (iter::once(peer_id).collect(), Protocol::Generic); + let expected = (iter::once(peer_id).collect(), Protocol::Authentication); assert_eq!( test_data @@ -649,43 +532,40 @@ mod tests { async fn test_notification_stream_opened() { let mut test_data = TestData::prepare().await; - let peer_ids: Vec<_> = (0..3).map(|_| MockPeerId::random()).collect(); + let peer_ids: Vec<_> = (0..3).map(|_| MockAuthPeerId::random()).collect(); peer_ids.iter().for_each(|peer_id| { test_data .network - .emit_event(MockEvent::StreamOpened(*peer_id, Protocol::Generic)); + .emit_event(MockEvent::StreamOpened(*peer_id, Protocol::Authentication)); }); // We do this only to make sure that NotificationStreamOpened events are handled test_data.wait_for_events_handled().await; - let message = message(1); + let message = authentication(test_data.validator_network.identity().0).await; test_data .mock_io - .legacy_messages_for_user - .unbounded_send((message.clone(), DataCommand::Broadcast)) + .messages_for_user + .unbounded_send((NetworkData::Meta(message.clone()), DataCommand::Broadcast)) .unwrap(); let broadcasted_messages = HashSet::<_>::from_iter( test_data .network .send_message - .1 - .lock() - .await - .by_ref() .take(peer_ids.len()) - .collect::>() .await .into_iter(), ); - let expected_messages = HashSet::from_iter( - peer_ids - .into_iter() - .map(|peer_id| (message.encode(), peer_id, Protocol::Generic)), - ); + let expected_messages = HashSet::from_iter(peer_ids.into_iter().map(|peer_id| { + ( + VersionedAuthentication::V1(message.clone()).encode(), + peer_id, + Protocol::Authentication, + ) + })); assert_eq!(broadcasted_messages, expected_messages); @@ -696,13 +576,13 @@ mod tests { async fn test_notification_stream_closed() { let mut test_data = TestData::prepare().await; - let peer_ids: Vec<_> = (0..4).map(|_| MockPeerId::random()).collect(); + let peer_ids: Vec<_> = (0..3).map(|_| MockAuthPeerId::random()).collect(); let opened_authorities_n = 2; peer_ids.iter().for_each(|peer_id| { test_data .network - .emit_event(MockEvent::StreamOpened(*peer_id, Protocol::Generic)); + .emit_event(MockEvent::StreamOpened(*peer_id, Protocol::Authentication)); }); peer_ids @@ -711,190 +591,24 @@ mod tests { .for_each(|peer_id| { test_data .network - .emit_event(MockEvent::StreamClosed(*peer_id, Protocol::Generic)); + .emit_event(MockEvent::StreamClosed(*peer_id, Protocol::Authentication)); }); // We do this only to make sure that NotificationStreamClosed events are handled test_data.wait_for_events_handled().await; - let messages: Vec<_> = vec![message(1), message(2)]; - messages.iter().for_each(|m| { - test_data - .mock_io - .legacy_messages_for_user - .unbounded_send((m.clone(), DataCommand::Broadcast)) - .unwrap(); - }); - - let broadcasted_messages = HashSet::<_>::from_iter( - test_data - .network - .send_message - .1 - .lock() - .await - .by_ref() - .take(opened_authorities_n * messages.len()) - .collect::>() - .await - .into_iter(), - ); - - let expected_messages = - HashSet::from_iter(peer_ids.into_iter().take(opened_authorities_n).flat_map( - |peer_id| { - messages - .iter() - .map(move |m| (m.encode(), peer_id, Protocol::Generic)) - }, - )); - - assert_eq!(broadcasted_messages, expected_messages); - - test_data.cleanup().await - } - - #[tokio::test] - async fn test_validator_data_command_send_to() { - let mut test_data = TestData::prepare().await; - - let peer_id = MockPeerId::random(); - - let message = message(1); - - test_data - .network - .emit_event(MockEvent::StreamOpened(peer_id, Protocol::Validator)); - - // We do this only to make sure that NotificationStreamOpened events are handled - test_data.wait_for_events_handled().await; - + let message = authentication(test_data.validator_network.identity().0).await; test_data .mock_io - .legacy_messages_for_user - .unbounded_send(( - message.clone(), - DataCommand::SendTo(peer_id, Protocol::Validator), - )) + .messages_for_user + .unbounded_send((NetworkData::Meta(message.clone()), DataCommand::Broadcast)) .unwrap(); - let expected = (message.encode(), peer_id, Protocol::Validator); - - assert_eq!( - test_data - .network - .send_message - .next() - .await - .expect("Should receive message"), - expected, - ); - - test_data.cleanup().await - } - - #[tokio::test] - async fn test_validator_create_sender_error_one_peer() { - let mut test_data = TestData::prepare().await; - - test_data - .network - .create_sender_errors - .lock() - .push_back(MockSenderError::SomeError); - - let peer_id = MockPeerId::random(); - - let message_1 = message(1); - let message_2 = message(2); - - test_data - .network - .emit_event(MockEvent::StreamOpened(peer_id, Protocol::Validator)); - - // We do this only to make sure that NotificationStreamOpened events are handled - test_data.wait_for_events_handled().await; - - test_data - .mock_io - .legacy_messages_for_user - .unbounded_send(( - message_1.clone(), - DataCommand::SendTo(peer_id, Protocol::Validator), - )) - .unwrap(); - - test_data - .mock_io - .legacy_messages_for_user - .unbounded_send(( - message_2.clone(), - DataCommand::SendTo(peer_id, Protocol::Validator), - )) - .unwrap(); - - let expected = (message_2.encode(), peer_id, Protocol::Validator); - - assert_eq!( - test_data - .network - .send_message - .next() - .await - .expect("Should receive message"), - expected, - ); - - test_data.cleanup().await - } - - #[tokio::test] - async fn test_validator_create_sender_error_many_peers() { - let mut test_data = TestData::prepare().await; - - let all_authorities_n = 4; - let closed_authorities_n = 2; - for _ in 0..closed_authorities_n { - test_data - .network - .create_sender_errors - .lock() - .push_back(MockSenderError::SomeError); - } - - let peer_ids: Vec<_> = (0..4).map(|_| MockPeerId::random()).collect(); - let message = message(1); - - peer_ids.iter().for_each(|peer_id| { - test_data - .network - .emit_event(MockEvent::StreamOpened(*peer_id, Protocol::Validator)); - }); - - // We do this only to make sure that NotificationStreamOpened events are handled - test_data.wait_for_events_handled().await; - - peer_ids.iter().for_each(|peer_id| { - test_data - .mock_io - .legacy_messages_for_user - .unbounded_send(( - message.clone(), - DataCommand::SendTo(*peer_id, Protocol::Validator), - )) - .unwrap(); - }); - let broadcasted_messages = HashSet::<_>::from_iter( test_data .network .send_message - .1 - .lock() - .await - .by_ref() - .take(all_authorities_n - closed_authorities_n) - .collect::>() + .take(opened_authorities_n) .await .into_iter(), ); @@ -902,8 +616,14 @@ mod tests { let expected_messages = HashSet::from_iter( peer_ids .into_iter() - .skip(closed_authorities_n) - .map(|peer_id| (message.encode(), peer_id, Protocol::Validator)), + .take(opened_authorities_n) + .map(|peer_id| { + ( + VersionedAuthentication::V1(message.clone()).encode(), + peer_id, + Protocol::Authentication, + ) + }), ); assert_eq!(broadcasted_messages, expected_messages); @@ -912,51 +632,25 @@ mod tests { } #[tokio::test] - async fn test_validator_data_command_send_to_error_one_peer() { + async fn test_validator_data_command_send_to() { let mut test_data = TestData::prepare().await; - test_data - .network - .send_errors - .lock() - .push_back(MockSenderError::SomeError); - - let peer_id = MockPeerId::random(); + let peer_id = random_authority_id().await; - let message_1 = message(1); - let message_2 = message(2); - - test_data - .network - .emit_event(MockEvent::StreamOpened(peer_id, Protocol::Validator)); - - // We do this only to make sure that NotificationStreamOpened events are handled - test_data.wait_for_events_handled().await; - - test_data - .mock_io - .legacy_messages_for_user - .unbounded_send(( - message_1.clone(), - DataCommand::SendTo(peer_id, Protocol::Validator), - )) - .unwrap(); + let message = message(1); test_data .mock_io - .legacy_messages_for_user - .unbounded_send(( - message_2.clone(), - DataCommand::SendTo(peer_id, Protocol::Validator), - )) + .messages_for_user + .unbounded_send((message.clone().into(), DataCommand::SendTo(peer_id.clone()))) .unwrap(); - let expected = (message_2.encode(), peer_id, Protocol::Validator); + let expected = (message, peer_id); assert_eq!( test_data - .network - .send_message + .validator_network + .send .next() .await .expect("Should receive message"), @@ -967,98 +661,19 @@ mod tests { } #[tokio::test] - async fn test_validator_data_command_send_to_error_many_peers() { + async fn test_receives_validator_data() { let mut test_data = TestData::prepare().await; - let all_authorities_n = 4; - let closed_authorities_n = 2; - for _ in 0..closed_authorities_n { - test_data - .network - .send_errors - .lock() - .push_back(MockSenderError::SomeError); - } - - let peer_ids: Vec<_> = (0..4).map(|_| MockPeerId::random()).collect(); let message = message(1); - peer_ids.iter().for_each(|peer_id| { - test_data - .network - .emit_event(MockEvent::StreamOpened(*peer_id, Protocol::Validator)); - }); + test_data.validator_network.next.send(message.clone()); - // We do this only to make sure that NotificationStreamOpened events are handled - test_data.wait_for_events_handled().await; - - peer_ids.iter().for_each(|peer_id| { - test_data - .mock_io - .legacy_messages_for_user - .unbounded_send(( - message.clone(), - DataCommand::SendTo(*peer_id, Protocol::Validator), - )) - .unwrap(); - }); - - let broadcasted_messages = HashSet::<_>::from_iter( - test_data - .network - .send_message - .1 - .lock() - .await - .by_ref() - .take(all_authorities_n - closed_authorities_n) - .collect::>() - .await - .into_iter(), - ); - - let expected_messages = HashSet::from_iter( - peer_ids - .into_iter() - .skip(closed_authorities_n) - .map(|peer_id| (message.encode(), peer_id, Protocol::Validator)), - ); - - assert_eq!(broadcasted_messages, expected_messages); - - test_data.cleanup().await - } - - #[tokio::test] - async fn test_generic_data_command_send_to() { - let mut test_data = TestData::prepare().await; - - let peer_id = MockPeerId::random(); - - let message = message(1); - - test_data - .network - .emit_event(MockEvent::StreamOpened(peer_id, Protocol::Generic)); - - // We do this only to make sure that NotificationStreamOpened events are handled - test_data.wait_for_events_handled().await; - - test_data - .mock_io - .legacy_messages_for_user - .unbounded_send(( - message.clone(), - DataCommand::SendTo(peer_id, Protocol::Generic), - )) - .unwrap(); - - let expected = (message.encode(), peer_id, Protocol::Generic); + let expected: NetworkData<_, _> = message.into(); assert_eq!( test_data - .network - .send_message + .mock_io + .messages_from_user .next() .await .expect("Should receive message"), @@ -1069,7 +684,7 @@ mod tests { } #[tokio::test] - async fn test_generic_create_sender_error_one_peer() { + async fn test_create_sender_error() { let mut test_data = TestData::prepare().await; test_data @@ -1078,37 +693,37 @@ mod tests { .lock() .push_back(MockSenderError::SomeError); - let peer_id = MockPeerId::random(); + let peer_id = MockAuthPeerId::random(); - let message_1 = message(1); - let message_2 = message(2); + let message_1 = + authentication(vec![(random_authority_id().await, String::from("other_1"))]).await; + let message_2 = + authentication(vec![(random_authority_id().await, String::from("other_2"))]).await; test_data .network - .emit_event(MockEvent::StreamOpened(peer_id, Protocol::Generic)); + .emit_event(MockEvent::StreamOpened(peer_id, Protocol::Authentication)); // We do this only to make sure that NotificationStreamOpened events are handled test_data.wait_for_events_handled().await; test_data .mock_io - .legacy_messages_for_user - .unbounded_send(( - message_1.clone(), - DataCommand::SendTo(peer_id, Protocol::Generic), - )) + .messages_for_user + .unbounded_send((NetworkData::Meta(message_1), DataCommand::Broadcast)) .unwrap(); test_data .mock_io - .legacy_messages_for_user - .unbounded_send(( - message_2.clone(), - DataCommand::SendTo(peer_id, Protocol::Generic), - )) + .messages_for_user + .unbounded_send((NetworkData::Meta(message_2.clone()), DataCommand::Broadcast)) .unwrap(); - let expected = (message_2.encode(), peer_id, Protocol::Generic); + let expected = ( + VersionedAuthentication::V1(message_2).encode(), + peer_id, + Protocol::Authentication, + ); assert_eq!( test_data @@ -1124,70 +739,7 @@ mod tests { } #[tokio::test] - async fn test_generic_create_sender_error_many_peers() { - let mut test_data = TestData::prepare().await; - - let all_authorities_n = 4; - let closed_authorities_n = 2; - for _ in 0..closed_authorities_n { - test_data - .network - .create_sender_errors - .lock() - .push_back(MockSenderError::SomeError); - } - - let peer_ids: Vec<_> = (0..4).map(|_| MockPeerId::random()).collect(); - let message = message(1); - - peer_ids.iter().for_each(|peer_id| { - test_data - .network - .emit_event(MockEvent::StreamOpened(*peer_id, Protocol::Generic)); - }); - - // We do this only to make sure that NotificationStreamOpened events are handled - test_data.wait_for_events_handled().await; - - peer_ids.iter().for_each(|peer_id| { - test_data - .mock_io - .legacy_messages_for_user - .unbounded_send(( - message.clone(), - DataCommand::SendTo(*peer_id, Protocol::Generic), - )) - .unwrap(); - }); - - let broadcasted_messages = HashSet::<_>::from_iter( - test_data - .network - .send_message - .1 - .lock() - .await - .by_ref() - .take(all_authorities_n - closed_authorities_n) - .collect::>() - .await - .into_iter(), - ); - - let expected_messages = HashSet::from_iter( - peer_ids - .into_iter() - .skip(closed_authorities_n) - .map(|peer_id| (message.encode(), peer_id, Protocol::Generic)), - ); - - assert_eq!(broadcasted_messages, expected_messages); - - test_data.cleanup().await - } - - #[tokio::test] - async fn test_generic_data_command_send_to_error_one_peer() { + async fn test_send_error() { let mut test_data = TestData::prepare().await; test_data @@ -1196,37 +748,37 @@ mod tests { .lock() .push_back(MockSenderError::SomeError); - let peer_id = MockPeerId::random(); + let peer_id = MockAuthPeerId::random(); - let message_1 = message(1); - let message_2 = message(2); + let message_1 = + authentication(vec![(random_authority_id().await, String::from("other_1"))]).await; + let message_2 = + authentication(vec![(random_authority_id().await, String::from("other_2"))]).await; test_data .network - .emit_event(MockEvent::StreamOpened(peer_id, Protocol::Generic)); + .emit_event(MockEvent::StreamOpened(peer_id, Protocol::Authentication)); // We do this only to make sure that NotificationStreamOpened events are handled test_data.wait_for_events_handled().await; test_data .mock_io - .legacy_messages_for_user - .unbounded_send(( - message_1.clone(), - DataCommand::SendTo(peer_id, Protocol::Generic), - )) + .messages_for_user + .unbounded_send((NetworkData::Meta(message_1), DataCommand::Broadcast)) .unwrap(); test_data .mock_io - .legacy_messages_for_user - .unbounded_send(( - message_2.clone(), - DataCommand::SendTo(peer_id, Protocol::Generic), - )) + .messages_for_user + .unbounded_send((NetworkData::Meta(message_2.clone()), DataCommand::Broadcast)) .unwrap(); - let expected = (message_2.encode(), peer_id, Protocol::Generic); + let expected = ( + VersionedAuthentication::V1(message_2).encode(), + peer_id, + Protocol::Authentication, + ); assert_eq!( test_data @@ -1241,88 +793,31 @@ mod tests { test_data.cleanup().await } - #[tokio::test] - async fn test_generic_data_command_send_to_error_many_peers() { - let mut test_data = TestData::prepare().await; - - let all_authorities_n = 4; - let closed_authorities_n = 2; - for _ in 0..closed_authorities_n { - test_data - .network - .send_errors - .lock() - .push_back(MockSenderError::SomeError); - } - - let peer_ids: Vec<_> = (0..4).map(|_| MockPeerId::random()).collect(); - let message = message(1); - - peer_ids.iter().for_each(|peer_id| { - test_data - .network - .emit_event(MockEvent::StreamOpened(*peer_id, Protocol::Generic)); - }); - - // We do this only to make sure that NotificationStreamOpened events are handled - test_data.wait_for_events_handled().await; - - peer_ids.iter().for_each(|peer_id| { - test_data - .mock_io - .legacy_messages_for_user - .unbounded_send(( - message.clone(), - DataCommand::SendTo(*peer_id, Protocol::Generic), - )) - .unwrap(); - }); - - let broadcasted_messages = HashSet::<_>::from_iter( - test_data - .network - .send_message - .1 - .lock() - .await - .by_ref() - .take(all_authorities_n - closed_authorities_n) - .collect::>() - .await - .into_iter(), - ); - - let expected_messages = HashSet::from_iter( - peer_ids - .into_iter() - .skip(closed_authorities_n) - .map(|peer_id| (message.encode(), peer_id, Protocol::Generic)), - ); - - assert_eq!(broadcasted_messages, expected_messages); - - test_data.cleanup().await - } - #[tokio::test] async fn test_notification_received() { let mut test_data = TestData::prepare().await; - let message = message(1); + let message = authentication(vec![( + random_authority_id().await, + String::from("other_addr"), + )]) + .await; test_data.network.emit_event(MockEvent::Messages(vec![( - Protocol::Validator, - NetworkData::encode(&message).into(), + Protocol::Authentication, + VersionedAuthentication::V1(message.clone()).encode().into(), )])); + let expected = NetworkData::Meta(message); + assert_eq!( test_data .mock_io - .legacy_messages_from_user + .messages_from_user .next() .await .expect("Should receive message"), - message + expected ); test_data.cleanup().await @@ -1332,22 +827,23 @@ mod tests { async fn test_command_add_reserved() { let mut test_data = TestData::prepare().await; - let (addresses, _) = MockNetworkIdentity::new().identity(); + let multiaddress: MockMultiaddress = + (random_authority_id().await, String::from("other_addr")); test_data .mock_io - .legacy_commands_for_manager + .commands_for_manager .unbounded_send(ConnectionCommand::AddReserved( - addresses.clone().into_iter().collect(), + iter::once(multiaddress.clone()).collect(), )) .unwrap(); - let expected = (addresses.into_iter().collect(), Protocol::Validator); + let expected = (multiaddress.0.clone(), vec![multiaddress]); assert_eq!( test_data - .network - .add_reserved + .validator_network + .add_connection .next() .await .expect("Should receive message"), @@ -1361,26 +857,24 @@ mod tests { async fn test_command_remove_reserved() { let mut test_data = TestData::prepare().await; - let peer_id = MockPeerId::random(); + let peer_id = random_authority_id().await; test_data .mock_io - .legacy_commands_for_manager + .commands_for_manager .unbounded_send(ConnectionCommand::DelReserved( - iter::once(peer_id).collect(), + iter::once(peer_id.clone()).collect(), )) .unwrap(); - let expected = (iter::once(peer_id).collect(), Protocol::Validator); - assert_eq!( test_data - .network - .remove_reserved + .validator_network + .remove_connection .next() .await .expect("Should receive message"), - expected + peer_id ); test_data.cleanup().await diff --git a/finality-aleph/src/network/session.rs b/finality-aleph/src/network/session.rs index 5d7fedb648..4b835e1bc4 100644 --- a/finality-aleph/src/network/session.rs +++ b/finality-aleph/src/network/session.rs @@ -4,7 +4,7 @@ use super::SimpleNetwork; use crate::{ abft::Recipient, crypto::{AuthorityPen, AuthorityVerifier}, - network::{Data, ReceiverComponent, SendError, SenderComponent, SessionCommand}, + network::{Data, SendError, SenderComponent, SessionCommand}, NodeIndex, SessionId, }; @@ -13,44 +13,23 @@ use crate::{ pub struct Sender { session_id: SessionId, messages_for_network: mpsc::UnboundedSender<(D, SessionId, Recipient)>, - legacy_messages_for_network: mpsc::UnboundedSender<(D, SessionId, Recipient)>, } impl SenderComponent for Sender { fn send(&self, data: D, recipient: Recipient) -> Result<(), SendError> { self.messages_for_network - .unbounded_send((data.clone(), self.session_id, recipient.clone())) - .map_err(|_| SendError::SendFailed)?; - self.legacy_messages_for_network .unbounded_send((data, self.session_id, recipient)) .map_err(|_| SendError::SendFailed) } } -pub struct Receiver { - data_from_network: mpsc::UnboundedReceiver, - legacy_data_from_network: mpsc::UnboundedReceiver, -} - -#[async_trait::async_trait] -impl ReceiverComponent for Receiver { - async fn next(&mut self) -> Option { - tokio::select! { - maybe_next = self.data_from_network.next() => maybe_next, - maybe_next = self.legacy_data_from_network.next() => maybe_next, - } - } -} - /// Sends and receives data within a single session. -type Network = SimpleNetwork, Sender>; +type Network = SimpleNetwork, Sender>; /// Manages sessions for which the network should be active. pub struct Manager { commands_for_service: mpsc::UnboundedSender>, messages_for_service: mpsc::UnboundedSender<(D, SessionId, Recipient)>, - legacy_commands_for_service: mpsc::UnboundedSender>, - legacy_messages_for_service: mpsc::UnboundedSender<(D, SessionId, Recipient)>, } /// What went wrong during a session management operation. @@ -79,12 +58,10 @@ impl IO { impl Manager { /// Create a new manager with the given channels to the service. - pub fn new(io: IO, legacy_io: IO) -> Self { + pub fn new(io: IO) -> Self { Manager { commands_for_service: io.commands_for_service, messages_for_service: io.messages_for_service, - legacy_commands_for_service: legacy_io.commands_for_service, - legacy_messages_for_service: legacy_io.messages_for_service, } } @@ -96,12 +73,6 @@ impl Manager { verifier: AuthorityVerifier, ) -> Result<(), ManagerError> { self.commands_for_service - .unbounded_send(SessionCommand::StartNonvalidator( - session_id, - verifier.clone(), - )) - .map_err(|_| ManagerError::CommandSendFailed)?; - self.legacy_commands_for_service .unbounded_send(SessionCommand::StartNonvalidator(session_id, verifier)) .map_err(|_| ManagerError::CommandSendFailed) } @@ -118,23 +89,12 @@ impl Manager { ) -> Result, ManagerError> { let (result_for_us, result_from_service) = oneshot::channel(); self.commands_for_service - .unbounded_send(SessionCommand::StartValidator( - session_id, - verifier.clone(), - node_id, - pen.clone(), - Some(result_for_us), - )) - .map_err(|_| ManagerError::CommandSendFailed)?; - - let (legacy_result_for_us, legacy_result_from_service) = oneshot::channel(); - self.legacy_commands_for_service .unbounded_send(SessionCommand::StartValidator( session_id, verifier, node_id, pen, - Some(legacy_result_for_us), + Some(result_for_us), )) .map_err(|_| ManagerError::CommandSendFailed)?; @@ -143,20 +103,11 @@ impl Manager { .map_err(|_| ManagerError::NetworkReceiveFailed)?; let messages_for_network = self.messages_for_service.clone(); - let legacy_data_from_network = legacy_result_from_service - .await - .map_err(|_| ManagerError::NetworkReceiveFailed)?; - let legacy_messages_for_network = self.legacy_messages_for_service.clone(); - Ok(Network::new( - Receiver { - data_from_network, - legacy_data_from_network, - }, + data_from_network, Sender { session_id, messages_for_network, - legacy_messages_for_network, }, )) } @@ -172,15 +123,6 @@ impl Manager { pen: AuthorityPen, ) -> Result<(), ManagerError> { self.commands_for_service - .unbounded_send(SessionCommand::StartValidator( - session_id, - verifier.clone(), - node_id, - pen.clone(), - None, - )) - .map_err(|_| ManagerError::CommandSendFailed)?; - self.legacy_commands_for_service .unbounded_send(SessionCommand::StartValidator( session_id, verifier, node_id, pen, None, )) @@ -190,9 +132,6 @@ impl Manager { /// Stop participating in the given session. pub fn stop_session(&self, session_id: SessionId) -> Result<(), ManagerError> { self.commands_for_service - .unbounded_send(SessionCommand::Stop(session_id)) - .map_err(|_| ManagerError::CommandSendFailed)?; - self.legacy_commands_for_service .unbounded_send(SessionCommand::Stop(session_id)) .map_err(|_| ManagerError::CommandSendFailed) } diff --git a/finality-aleph/src/nodes/validator_node.rs b/finality-aleph/src/nodes/validator_node.rs index 53697bbccf..b6ed7b619b 100644 --- a/finality-aleph/src/nodes/validator_node.rs +++ b/finality-aleph/src/nodes/validator_node.rs @@ -121,27 +121,12 @@ where .expect("Failed to run connection manager") }; - let (legacy_connection_io, legacy_network_io, legacy_session_io) = setup_io(); - - let legacy_connection_manager = ConnectionManager::new( - network.clone(), - ConnectionManagerConfig::with_session_period(&session_period, &millisecs_per_block), - ); - - let legacy_connection_manager_task = async move { - legacy_connection_io - .run(legacy_connection_manager) - .await - .expect("Failed to legacy connection manager") - }; - - let session_manager = SessionManager::new(session_io, legacy_session_io); + let session_manager = SessionManager::new(session_io); let network = NetworkService::new( network.clone(), validator_network, spawn_handle.clone(), network_io, - legacy_network_io, ); let network_task = async move { network.run().await }; @@ -149,11 +134,6 @@ where debug!(target: "aleph-party", "JustificationHandler has started."); spawn_handle.spawn("aleph/connection_manager", None, connection_manager_task); - spawn_handle.spawn( - "aleph/legacy_connection_manager", - None, - legacy_connection_manager_task, - ); spawn_handle.spawn("aleph/network", None, network_task); debug!(target: "aleph-party", "Network has started."); diff --git a/finality-aleph/src/substrate_network.rs b/finality-aleph/src/substrate_network.rs index fc8d3a5aef..9638520b29 100644 --- a/finality-aleph/src/substrate_network.rs +++ b/finality-aleph/src/substrate_network.rs @@ -184,27 +184,16 @@ impl MultiaddressT for Multiaddress { } } -/// Name of the network protocol used by Aleph Zero. This is how messages -/// are subscribed to ensure that we are gossiping and communicating with our -/// own network. -const LEGACY_ALEPH_PROTOCOL_NAME: &str = "/cardinals/aleph/2"; - /// Name of the network protocol used by Aleph Zero. This is how messages /// are subscribed to ensure that we are gossiping and communicating with our /// own network. const AUTHENTICATION_PROTOCOL_NAME: &str = "/aleph/1"; -/// Name of the network protocol used by Aleph Zero validators. Similar to -/// ALEPH_PROTOCOL_NAME, but only used by validators that authenticated to each other. -const LEGACY_ALEPH_VALIDATOR_PROTOCOL_NAME: &str = "/cardinals/aleph_validator/1"; - /// Returns the canonical name of the protocol. pub fn protocol_name(protocol: &Protocol) -> Cow<'static, str> { use Protocol::*; match protocol { Authentication => Cow::Borrowed(AUTHENTICATION_PROTOCOL_NAME), - Generic => Cow::Borrowed(LEGACY_ALEPH_PROTOCOL_NAME), - Validator => Cow::Borrowed(LEGACY_ALEPH_VALIDATOR_PROTOCOL_NAME), } } @@ -212,8 +201,6 @@ pub fn protocol_name(protocol: &Protocol) -> Cow<'static, str> { fn to_protocol(protocol_name: &str) -> Result { match protocol_name { AUTHENTICATION_PROTOCOL_NAME => Ok(Protocol::Authentication), - LEGACY_ALEPH_PROTOCOL_NAME => Ok(Protocol::Generic), - LEGACY_ALEPH_VALIDATOR_PROTOCOL_NAME => Ok(Protocol::Validator), _ => Err(()), } } diff --git a/finality-aleph/src/testing/mocks/validator_network.rs b/finality-aleph/src/testing/mocks/validator_network.rs index de78b23ec5..b9efbbbf1a 100644 --- a/finality-aleph/src/testing/mocks/validator_network.rs +++ b/finality-aleph/src/testing/mocks/validator_network.rs @@ -86,14 +86,23 @@ impl NetworkIdentity for MockNetwork { } } +pub async fn random_authority_id() -> AuthorityId { + let key_store = Arc::new(KeyStore::new()); + key_store + .ed25519_generate_new(KEY_TYPE, None) + .await + .unwrap() + .into() +} + +pub async fn random_identity(address: String) -> (Vec, AuthorityId) { + let id = random_authority_id().await; + (vec![(id.clone(), address)], id) +} + impl MockNetwork { pub async fn new(address: &str) -> Self { - let key_store = Arc::new(KeyStore::new()); - let id: AuthorityId = key_store - .ed25519_generate_new(KEY_TYPE, None) - .await - .unwrap() - .into(); + let id = random_authority_id().await; let addresses = vec![(id.clone(), String::from(address))]; MockNetwork { add_connection: Channel::new(), @@ -105,8 +114,19 @@ impl MockNetwork { } } + pub fn from(addresses: Vec, id: AuthorityId) -> Self { + MockNetwork { + add_connection: Channel::new(), + remove_connection: Channel::new(), + send: Channel::new(), + next: Channel::new(), + addresses, + id, + } + } + // Consumes the network asserting there are no unreceived messages in the channels. - pub async fn _close_channels(self) { + pub async fn close_channels(self) { assert!(self.add_connection.close().await.is_none()); assert!(self.remove_connection.close().await.is_none()); assert!(self.send.close().await.is_none()); diff --git a/finality-aleph/src/testing/network.rs b/finality-aleph/src/testing/network.rs index c1672ad2b7..d1f6b36648 100644 --- a/finality-aleph/src/testing/network.rs +++ b/finality-aleph/src/testing/network.rs @@ -4,6 +4,7 @@ use std::{ time::Duration, }; +use aleph_primitives::AuthorityId as MockPeerId; use codec::{Decode, Encode}; use futures::channel::oneshot; use sc_service::TaskManager; @@ -12,16 +13,15 @@ use tokio::{runtime::Handle, task::JoinHandle, time::timeout}; use crate::{ crypto::{AuthorityPen, AuthorityVerifier}, network::{ - mock::{ - crypto_basics, MockData, MockEvent, MockMultiaddress, MockNetwork, MockNetworkIdentity, - MockPeerId, - }, + mock::{crypto_basics, MockData, MockEvent, MockNetwork, MockPeerId as MockAuthPeerId}, setup_io, - testing::{Authentication, DataInSession, DiscoveryMessage, NetworkData, SessionHandler}, + testing::{DataInSession, DiscoveryMessage, SessionHandler, VersionedAuthentication}, ConnectionManager, ConnectionManagerConfig, DataNetwork, NetworkIdentity, Protocol, Service as NetworkService, SessionManager, }, - testing::mocks::validator_network::MockNetwork as MockValidatorNetwork, + testing::mocks::validator_network::{ + random_identity, MockMultiaddress, MockNetwork as MockValidatorNetwork, + }, MillisecsPerBlock, NodeIndex, Recipient, SessionId, SessionPeriod, }; @@ -35,6 +35,7 @@ struct Authority { pen: AuthorityPen, addresses: Vec, peer_id: MockPeerId, + auth_peer_id: MockAuthPeerId, } impl Authority { @@ -47,7 +48,11 @@ impl Authority { } fn peer_id(&self) -> MockPeerId { - self.peer_id + self.peer_id.clone() + } + + fn auth_peer_id(&self) -> MockAuthPeerId { + self.auth_peer_id } } @@ -56,23 +61,19 @@ impl NetworkIdentity for Authority { type Multiaddress = MockMultiaddress; fn identity(&self) -> (Vec, Self::PeerId) { - (self.addresses.clone(), self.peer_id) + (self.addresses.clone(), self.peer_id.clone()) } } -type MockNetworkData = NetworkData; - struct TestData { pub authorities: Vec, pub authority_verifier: AuthorityVerifier, pub session_manager: SessionManager, pub network: MockNetwork, - pub _validator_network: MockValidatorNetwork>, + pub validator_network: MockValidatorNetwork>, network_manager_exit_tx: oneshot::Sender<()>, - legacy_network_manager_exit_tx: oneshot::Sender<()>, network_service_exit_tx: oneshot::Sender<()>, network_manager_handle: JoinHandle<()>, - legacy_network_manager_handle: JoinHandle<()>, network_service_handle: JoinHandle<()>, // `TaskManager` can't be dropped for `SpawnTaskHandle` to work _task_manager: TaskManager, @@ -81,47 +82,40 @@ struct TestData { async fn prepare_one_session_test_data() -> TestData { let task_manager = TaskManager::new(Handle::current(), None).unwrap(); let (authority_pens, authority_verifier) = crypto_basics(NODES_N).await; - let authorities: Vec<_> = authority_pens - .into_iter() - .map(|(_, p)| { - let identity = MockNetworkIdentity::new().identity(); - Authority { - pen: p, - addresses: identity.0, - peer_id: identity.1, - } - }) - .collect(); + let mut authorities = Vec::new(); + for (index, p) in authority_pens { + let identity = random_identity(index.0.to_string()).await; + let auth_peer_id = MockAuthPeerId::random(); + authorities.push(Authority { + pen: p, + addresses: identity.0, + peer_id: identity.1, + auth_peer_id, + }); + } // Prepare Network let (event_stream_tx, event_stream_rx) = oneshot::channel(); let (network_manager_exit_tx, network_manager_exit_rx) = oneshot::channel(); - let (legacy_network_manager_exit_tx, legacy_network_manager_exit_rx) = oneshot::channel(); let (network_service_exit_tx, network_service_exit_rx) = oneshot::channel(); let network = MockNetwork::new(event_stream_tx); - let validator_network = MockValidatorNetwork::new("address").await; + let validator_network = + MockValidatorNetwork::from(authorities[0].addresses(), authorities[0].peer_id()); let (connection_io, network_io, session_io) = setup_io(); - let (legacy_connection_io, legacy_network_io, legacy_session_io) = setup_io(); let connection_manager = ConnectionManager::new( validator_network.clone(), ConnectionManagerConfig::with_session_period(&SESSION_PERIOD, &MILLISECS_PER_BLOCK), ); - let legacy_connection_manager = ConnectionManager::::new( - authorities[0].clone(), - ConnectionManagerConfig::with_session_period(&SESSION_PERIOD, &MILLISECS_PER_BLOCK), - ); - - let session_manager = SessionManager::new(session_io, legacy_session_io); + let session_manager = SessionManager::new(session_io); let network_service = NetworkService::new( network.clone(), validator_network.clone(), task_manager.spawn_handle(), network_io, - legacy_network_io, ); let network_manager_task = async move { @@ -132,14 +126,6 @@ async fn prepare_one_session_test_data() -> TestData { }; }; - let legacy_network_manager_task = async move { - tokio::select! { - _ = legacy_connection_io - .run(legacy_connection_manager) => { }, - _ = legacy_network_manager_exit_rx => { }, - }; - }; - let network_service_task = async move { tokio::select! { _ = network_service.run() => { }, @@ -147,7 +133,6 @@ async fn prepare_one_session_test_data() -> TestData { }; }; let network_manager_handle = tokio::spawn(network_manager_task); - let legacy_network_manager_handle = tokio::spawn(legacy_network_manager_task); let network_service_handle = tokio::spawn(network_service_task); event_stream_rx.await.unwrap(); @@ -157,19 +142,17 @@ async fn prepare_one_session_test_data() -> TestData { authority_verifier, session_manager, network, - _validator_network: validator_network, + validator_network, network_manager_exit_tx, - legacy_network_manager_exit_tx, network_service_exit_tx, network_manager_handle, - legacy_network_manager_handle, network_service_handle, _task_manager: task_manager, } } impl TestData { - fn connect_identity_to_network(&mut self, peer_id: MockPeerId, protocol: Protocol) { + fn connect_identity_to_network(&mut self, peer_id: MockAuthPeerId, protocol: Protocol) { self.network .emit_event(MockEvent::StreamOpened(peer_id, protocol)); } @@ -216,15 +199,15 @@ impl TestData { .unwrap() } - async fn check_sends_add_reserved_node(&mut self) { + async fn check_add_connection(&mut self) { let mut reserved_addresses = HashSet::new(); for _ in self.authorities.iter().skip(1) { - let (addresses, protocol) = timeout(DEFAULT_TIMEOUT, self.network.add_reserved.next()) + let (_, addresses) = self + .validator_network + .add_connection + .next() .await - .ok() - .flatten() .expect("Should add reserved nodes"); - assert_eq!(protocol, Protocol::Validator); reserved_addresses.extend(addresses.into_iter()); } @@ -236,45 +219,15 @@ impl TestData { assert_eq!(reserved_addresses, expected_addresses); } - async fn check_sends_authentication( - &mut self, - authentication: Authentication, - ) { - let mut sent_auth = HashMap::new(); - while sent_auth.len() < NODES_N - 1 { - if let Some(( - MockNetworkData::Meta(DiscoveryMessage::Authentication(auth_data)), - peer_id, - _, - )) = timeout( - DEFAULT_TIMEOUT, - self.next_sent_authentication(Protocol::Generic), - ) - .await - .expect("Should send authentication") - { - sent_auth.insert(peer_id, auth_data); - } - } - - let mut expected_auth = HashMap::new(); - for authority in self.authorities.iter().skip(1) { - expected_auth.insert(authority.peer_id(), authentication.clone()); - } - - assert_eq!(sent_auth, expected_auth); - } - async fn connect_session_authorities(&mut self, session_id: u32) { for (index, authority) in self.authorities.clone().into_iter().enumerate().skip(1) { let handler = self.get_session_handler(index, session_id).await; - self.connect_identity_to_network(authority.peer_id(), Protocol::Generic); - self.connect_identity_to_network(authority.peer_id(), Protocol::Validator); + self.connect_identity_to_network(authority.auth_peer_id(), Protocol::Authentication); self.network.emit_event(MockEvent::Messages(vec![( - Protocol::Generic, - MockNetworkData::Meta(DiscoveryMessage::AuthenticationBroadcast( + Protocol::Authentication, + VersionedAuthentication::V1(DiscoveryMessage::AuthenticationBroadcast( handler.authentication().unwrap(), )) .encode() @@ -286,42 +239,27 @@ impl TestData { async fn start_session(&mut self, session_id: u32) -> impl DataNetwork { let data_network = self.start_validator_session(0, session_id).await; self.connect_session_authorities(session_id).await; - self.check_sends_add_reserved_node().await; - self.check_sends_authentication( - self.get_session_handler(0, session_id) - .await - .authentication() - .unwrap(), - ) - .await; + self.check_add_connection().await; data_network } - fn emit_notifications_received(&mut self, messages: Vec) { - self.network.emit_event(MockEvent::Messages( - messages - .iter() - .map(|m| (Protocol::Generic, m.encode().into())) - .collect(), - )); - } - - async fn next_sent( + async fn next_sent_auth( &mut self, - p: Protocol, ) -> Option<( - NetworkData, - MockPeerId, + VersionedAuthentication, + MockAuthPeerId, Protocol, )> { loop { match self.network.send_message.next().await { Some((data, peer_id, protocol)) => { - if protocol == p { + if protocol == Protocol::Authentication { return Some(( - NetworkData::::decode(&mut data.as_slice()) - .expect("should decode"), + VersionedAuthentication::::decode( + &mut data.as_slice(), + ) + .expect("should decode"), peer_id, protocol, )); @@ -332,97 +270,14 @@ impl TestData { } } - async fn next_sent_authentication_broadcast( - &mut self, - p: Protocol, - ) -> Option<( - NetworkData, - MockPeerId, - Protocol, - )> { - loop { - match self.next_sent(p).await { - Some(( - MockNetworkData::Meta(DiscoveryMessage::AuthenticationBroadcast(auth_data)), - peer_id, - protocol, - )) => { - return Some(( - MockNetworkData::Meta(DiscoveryMessage::AuthenticationBroadcast(auth_data)), - peer_id, - protocol, - )); - } - None => return None, - _ => {} - } - } - } - - async fn next_sent_authentication( - &mut self, - p: Protocol, - ) -> Option<( - NetworkData, - MockPeerId, - Protocol, - )> { - loop { - match self.next_sent(p).await { - Some(( - MockNetworkData::Meta(DiscoveryMessage::Authentication(auth_data)), - peer_id, - protocol, - )) => { - return Some(( - MockNetworkData::Meta(DiscoveryMessage::Authentication(auth_data)), - peer_id, - protocol, - )); - } - None => return None, - _ => {} - } - } - } - - async fn next_sent_data_message( - &mut self, - p: Protocol, - ) -> Option<( - NetworkData, - MockPeerId, - Protocol, - )> { - loop { - match self.next_sent(p).await { - Some((MockNetworkData::Data(data, session_id), peer_id, protocol)) => { - return Some((MockNetworkData::Data(data, session_id), peer_id, protocol)) - } - None => return None, - _ => {} - } - } - } - async fn cleanup(self) { self.network_manager_exit_tx.send(()).unwrap(); - self.legacy_network_manager_exit_tx.send(()).unwrap(); self.network_service_exit_tx.send(()).unwrap(); self.network_manager_handle.await.unwrap(); - self.legacy_network_manager_handle.await.unwrap(); self.network_service_handle.await.unwrap(); - while let Some((data, peer_id, protocol)) = self.network.send_message.try_next().await { - if protocol == Protocol::Validator { - if let Ok(MockNetworkData::Data(data, session_id)) = - MockNetworkData::decode(&mut data.as_slice()) - { - panic!("No Data messages to validators should be sent during cleanup. All data messages should be handled before.\ - Got: {:?} in {:?} to {:?} with protocol {:?}", data, session_id, peer_id, protocol); - } - } - } + while let Some(_) = self.network.send_message.try_next().await {} self.network.close_channels().await; + self.validator_network.close_channels().await; } } @@ -430,88 +285,26 @@ impl TestData { async fn test_sends_discovery_message() { let session_id = 43; let mut test_data = prepare_one_session_test_data().await; - let connected_peer_id = test_data.authorities[1].peer_id(); - test_data.connect_identity_to_network(connected_peer_id, Protocol::Generic); + let connected_peer_id = test_data.authorities[1].auth_peer_id(); + test_data.connect_identity_to_network(connected_peer_id, Protocol::Authentication); let mut data_network = test_data.start_validator_session(0, session_id).await; let handler = test_data.get_session_handler(0, session_id).await; - let mut sent_legacy_authentications = 0; - for _ in 0..10 { - match timeout( - DEFAULT_TIMEOUT, - test_data.next_sent_authentication_broadcast(Protocol::Generic), - ) - .await - .ok() - .flatten() - { + for _ in 0..4 { + match test_data.next_sent_auth().await { Some(( - MockNetworkData::Meta(DiscoveryMessage::AuthenticationBroadcast(auth_data)), + VersionedAuthentication::V1(DiscoveryMessage::AuthenticationBroadcast(auth_data)), peer_id, _, )) => { assert_eq!(peer_id, connected_peer_id); assert_eq!(auth_data, handler.authentication().unwrap()); - sent_legacy_authentications += 1; } - Some(_) => {} - None => panic!("Should broadcast authentication"), + None => panic!("Not sending authentications"), + _ => panic!("Should broadcast own authentication, nothing else"), } } - if sent_legacy_authentications < 3 { - panic!("Should broadcast legacy authentications") - } - test_data.cleanup().await; - assert_eq!( - timeout(DEFAULT_TIMEOUT, data_network.next()).await, - Ok(None) - ); -} - -#[tokio::test] -async fn test_sends_authentication_on_receiving_broadcast() { - let session_id = 43; - let mut test_data = prepare_one_session_test_data().await; - let mut data_network = test_data.start_validator_session(0, session_id).await; - let handler = test_data.get_session_handler(0, session_id).await; - let sending_peer_handler = test_data.get_session_handler(1, session_id).await; - let sending_peer = test_data.authorities[1].clone(); - test_data.connect_identity_to_network(sending_peer.peer_id(), Protocol::Generic); - - test_data.network.emit_event(MockEvent::Messages(vec![( - Protocol::Generic, - MockNetworkData::Meta(DiscoveryMessage::AuthenticationBroadcast( - sending_peer_handler.authentication().unwrap(), - )) - .encode() - .into(), - )])); - - assert_eq!( - timeout(DEFAULT_TIMEOUT, test_data.network.add_reserved.next()) - .await - .ok() - .flatten() - .expect("Should add reserved nodes"), - ( - sending_peer.addresses().into_iter().collect(), - Protocol::Validator - ), - ); - - if let Some((MockNetworkData::Meta(DiscoveryMessage::Authentication(auth_data)), peer_id, _)) = - timeout( - DEFAULT_TIMEOUT, - test_data.next_sent_authentication(Protocol::Generic), - ) - .await - .expect("Should send authentication") - { - assert_eq!(peer_id, sending_peer.peer_id()); - assert_eq!(auth_data, handler.authentication().unwrap()); - } - test_data.cleanup().await; assert_eq!( timeout(DEFAULT_TIMEOUT, data_network.next()).await, @@ -529,12 +322,12 @@ async fn test_forwards_authentication_broadcast() { let sending_peer_handler = test_data.get_session_handler(1, session_id).await; for authority in test_data.authorities.clone().iter().skip(1) { - test_data.connect_identity_to_network(authority.peer_id(), Protocol::Generic); + test_data.connect_identity_to_network(authority.auth_peer_id(), Protocol::Authentication); } test_data.network.emit_event(MockEvent::Messages(vec![( - Protocol::Generic, - MockNetworkData::Meta(DiscoveryMessage::AuthenticationBroadcast( + Protocol::Authentication, + VersionedAuthentication::V1(DiscoveryMessage::AuthenticationBroadcast( sending_peer_handler.authentication().unwrap(), )) .encode() @@ -542,21 +335,19 @@ async fn test_forwards_authentication_broadcast() { )])); assert_eq!( - timeout(DEFAULT_TIMEOUT, test_data.network.add_reserved.next()) + test_data + .validator_network + .add_connection + .next() .await - .ok() - .flatten() .expect("Should add reserved nodes"), - ( - sending_peer.addresses().into_iter().collect(), - Protocol::Validator - ), + (sending_peer.peer_id(), sending_peer.addresses()), ); let mut expected_authentication = HashMap::new(); for authority in test_data.authorities.iter().skip(1) { expected_authentication.insert( - authority.peer_id(), + authority.auth_peer_id(), sending_peer_handler.authentication().unwrap(), ); } @@ -564,15 +355,10 @@ async fn test_forwards_authentication_broadcast() { let mut sent_authentication = HashMap::new(); while sent_authentication.len() < NODES_N - 1 { if let Some(( - MockNetworkData::Meta(DiscoveryMessage::AuthenticationBroadcast(auth_data)), + VersionedAuthentication::V1(DiscoveryMessage::AuthenticationBroadcast(auth_data)), peer_id, _, - )) = timeout( - DEFAULT_TIMEOUT, - test_data.next_sent_authentication_broadcast(Protocol::Generic), - ) - .await - .expect("Should send Authentication Broadcast") + )) = test_data.next_sent_auth().await { if auth_data != handler.authentication().unwrap() { sent_authentication.insert(peer_id, auth_data); @@ -596,10 +382,11 @@ async fn test_connects_to_others() { let mut data_network = test_data.start_session(session_id).await; let data = vec![1, 2, 3]; - test_data.emit_notifications_received(vec![MockNetworkData::Data( - data.clone(), - SessionId(session_id), - )]); + test_data.validator_network.next.send(DataInSession { + data: data.clone(), + session_id: SessionId(session_id), + }); + assert_eq!( timeout(DEFAULT_TIMEOUT, data_network.next()).await, Ok(Some(data)) @@ -615,23 +402,15 @@ async fn test_connects_to_others_early_validator() { let mut test_data = prepare_one_session_test_data().await; test_data.early_start_validator_session(0, session_id); test_data.connect_session_authorities(session_id).await; - test_data.check_sends_add_reserved_node().await; - test_data - .check_sends_authentication( - test_data - .get_session_handler(0, session_id) - .await - .authentication() - .unwrap(), - ) - .await; + test_data.check_add_connection().await; + let mut data_network = test_data.start_validator_session(0, session_id).await; let data = vec![1, 2, 3]; - test_data.emit_notifications_received(vec![MockNetworkData::Data( - data.clone(), - SessionId(session_id), - )]); + test_data.validator_network.next.send(DataInSession { + data: data.clone(), + session_id: SessionId(session_id), + }); assert_eq!( timeout(DEFAULT_TIMEOUT, data_network.next()).await, Ok(Some(data.clone())) @@ -651,15 +430,18 @@ async fn test_stops_session() { .session_manager .stop_session(SessionId(session_id)) .unwrap(); - assert_eq!( - timeout(DEFAULT_TIMEOUT, test_data.network.remove_reserved.next()) + + let removed = HashSet::<_>::from_iter( + test_data + .validator_network + .remove_connection + .take(NODES_N - 1) .await - .ok() - .flatten(), - Some(( - HashSet::from_iter(test_data.authorities.iter().skip(1).map(|a| a.peer_id())), - Protocol::Validator - )) + .into_iter(), + ); + assert_eq!( + removed, + HashSet::from_iter(test_data.authorities.iter().skip(1).map(|a| a.peer_id())), ); // This assert should be before cleanup. We want to check whether `session_manager.stop_session(...)` @@ -686,14 +468,23 @@ async fn test_receives_data_in_correct_session() { let data_1_2 = vec![4, 5, 6]; let data_2_1 = vec![7, 8, 9]; let data_2_2 = vec![10, 11, 12]; - test_data.emit_notifications_received(vec![ - MockNetworkData::Data(data_1_1.clone(), SessionId(session_id_1)), - MockNetworkData::Data(data_2_1.clone(), SessionId(session_id_2)), - ]); - test_data.emit_notifications_received(vec![ - MockNetworkData::Data(data_2_2.clone(), SessionId(session_id_2)), - MockNetworkData::Data(data_1_2.clone(), SessionId(session_id_1)), - ]); + test_data.validator_network.next.send(DataInSession { + data: data_1_1.clone(), + session_id: SessionId(session_id_1), + }); + test_data.validator_network.next.send(DataInSession { + data: data_2_1.clone(), + session_id: SessionId(session_id_2), + }); + + test_data.validator_network.next.send(DataInSession { + data: data_2_2.clone(), + session_id: SessionId(session_id_2), + }); + test_data.validator_network.next.send(DataInSession { + data: data_1_2.clone(), + session_id: SessionId(session_id_1), + }); assert_eq!( timeout(DEFAULT_TIMEOUT, data_network_1.next()).await, @@ -758,14 +549,9 @@ async fn test_sends_data_to_correct_session() { let mut sent_data = HashSet::new(); while sent_data.len() < 2 * (NODES_N - 1) { - if let Some((MockNetworkData::Data(data, session_id), peer_id, _)) = timeout( - DEFAULT_TIMEOUT, - test_data.next_sent_data_message(Protocol::Validator), - ) - .await - .expect("Should send data") + if let Some((DataInSession { data, session_id }, peer_id)) = + test_data.validator_network.send.next().await { - println!("{:?} {:?}", data, peer_id); sent_data.insert((data, session_id, peer_id)); } } @@ -808,12 +594,8 @@ async fn test_broadcasts_data_to_correct_session() { let mut sent_data = HashSet::new(); while sent_data.len() < 2 * (NODES_N - 1) { - if let Some((MockNetworkData::Data(data, session_id), peer_id, _)) = timeout( - DEFAULT_TIMEOUT, - test_data.next_sent_data_message(Protocol::Validator), - ) - .await - .expect("Should send data") + if let Some((DataInSession { data, session_id }, peer_id)) = + test_data.validator_network.send.next().await { sent_data.insert((data, session_id, peer_id)); }