From 0ebb7c086f42712a219c920e560646837d0ee579 Mon Sep 17 00:00:00 2001 From: bochaco Date: Tue, 8 Jun 2021 13:08:08 -0300 Subject: [PATCH] refactor(bootstrap): changes to new messaging flow for peers joining the network BREAKING CHANGE: new node Join messaging is not backward compatible. --- Cargo.toml | 2 +- src/error.rs | 4 +- src/event.rs | 3 +- src/message_filter.rs | 7 +- src/messages/mod.rs | 72 +- src/node.rs | 2 +- src/relocation.rs | 7 +- src/routing/bootstrap.rs | 977 +++++++----------- src/routing/comm.rs | 72 +- src/routing/command.rs | 12 +- src/routing/core/anti_entropy.rs | 2 +- src/routing/core/{public_api.rs => api.rs} | 34 +- .../core/messaging/handling/agreement.rs | 3 +- .../core/messaging/handling/decisions.rs | 26 +- src/routing/core/messaging/handling/mod.rs | 163 ++- .../core/messaging/handling/resource_proof.rs | 6 +- src/routing/core/messaging/sending.rs | 8 +- src/routing/core/mod.rs | 2 +- src/routing/dispatcher.rs | 21 +- src/routing/tests/mod.rs | 106 +- src/section/mod.rs | 2 +- 21 files changed, 607 insertions(+), 924 deletions(-) rename src/routing/core/{public_api.rs => api.rs} (91%) diff --git a/Cargo.toml b/Cargo.toml index 8a895639bb..bb93b8e08d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ qp2p = "~0.12.0" rand = "~0.7.3" rand_chacha = "~0.2.2" resource_proof = "0.8.0" -sn_messaging = { git = "https://github.com/maqi/sn_messaging.git", branch = "refactory_sap" } +sn_messaging = "33.0.0" sn_data_types = "~0.18.3" thiserror = "1.0.23" tokio = "1.3.0" diff --git a/src/error.rs b/src/error.rs index 74d55339b4..9bd23479d8 100644 --- a/src/error.rs +++ b/src/error.rs @@ -78,6 +78,6 @@ pub enum Error { NoMatchingSection, #[error("No matching Elder")] NoMatchingElder, - #[error("Cannot start node since it is not externally reachable")] - NodeNotReachable, + #[error("Node cannot join the network since it is not externally reachable: {0}")] + NodeNotReachable(SocketAddr), } diff --git a/src/event.rs b/src/event.rs index 95a7579b3a..9f57cb7e3e 100644 --- a/src/event.rs +++ b/src/event.rs @@ -10,8 +10,7 @@ use bytes::Bytes; use ed25519_dalek::Keypair; use hex_fmt::HexFmt; pub use qp2p::{RecvStream, SendStream}; -use sn_messaging::Signed; -use sn_messaging::{client::ClientMsg, DstLocation, EndUser, SrcLocation}; +use sn_messaging::{client::ClientMsg, node::Signed, DstLocation, EndUser, SrcLocation}; use std::{ collections::BTreeSet, fmt::{self, Debug, Formatter}, diff --git a/src/message_filter.rs b/src/message_filter.rs index af13e68902..9fd13ea372 100644 --- a/src/message_filter.rs +++ b/src/message_filter.rs @@ -7,7 +7,6 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::cache::Cache; -use crate::messages::RoutingMsgUtils; use sn_messaging::{node::RoutingMsg, DstLocation, MessageId}; use std::time::Duration; use xor_name::XorName; @@ -59,17 +58,17 @@ impl MessageFilter { // pub async fn filter_outgoing(&self, msg: &RoutingMsg, pub_id: &XorName) -> FilteringResult { // Not filtering direct messages. - if let DstLocation::DirectAndUnrouted = msg.dst() { + if let DstLocation::DirectAndUnrouted = msg.dst { return FilteringResult::NewMessage; } if self .outgoing - .set((msg.id(), *pub_id), (), None) + .set((msg.id, *pub_id), (), None) .await .is_some() { - trace!("Outgoing message filtered: {:?}", msg.id()); + trace!("Outgoing message filtered: {:?}", msg.id); FilteringResult::KnownMessage } else { FilteringResult::NewMessage diff --git a/src/messages/mod.rs b/src/messages/mod.rs index 5266bd5b42..8de96af511 100644 --- a/src/messages/mod.rs +++ b/src/messages/mod.rs @@ -19,11 +19,11 @@ use crate::{ }; use secured_linked_list::{error::Error as SecuredLinkedListError, SecuredLinkedList}; use serde::Serialize; +use sn_messaging::node::{Signed, SignedShare}; use sn_messaging::{ - node::{PlainMessage, RoutingMsg, SrcAuthority, Variant}, + node::{JoinResponse, PlainMessage, RoutingMsg, SrcAuthority, Variant}, Aggregation, DstLocation, MessageId, }; -use sn_messaging::{Signed, SignedShare}; use std::fmt::Debug; use thiserror::Error; use xor_name::XorName; @@ -81,17 +81,6 @@ pub trait RoutingMsgUtils { /// Getter fn signed(&self) -> Option; - /// Getter - fn dst(&self) -> &DstLocation; - - fn id(&self) -> &MessageId; - - /// Getter - fn variant(&self) -> &Variant; - - /// Getter - fn src(&self) -> &SrcAuthority; - fn verify_variant<'a, I: IntoIterator>( &self, trusted_keys: I, @@ -127,7 +116,7 @@ impl RoutingMsgUtils for RoutingMsg { return Err(Error::CreateError(CreateError::FailedSignature)); } - if signed_share.public_key_set.public_key() != msg.section_pk() { + if signed_share.public_key_set.public_key() != msg.section_pk { error!( "Signed share public key doesn't match signed chain last key: {:?}", msg @@ -368,49 +357,34 @@ impl RoutingMsgUtils for RoutingMsg { } } - /// Getter - fn dst(&self) -> &DstLocation { - &self.dst - } - - /// Get the MessageId - fn id(&self) -> &MessageId { - &self.id - } - - /// Getter - fn variant(&self) -> &Variant { - &self.variant - } - - /// Getter - fn src(&self) -> &SrcAuthority { - &self.src - } - fn verify_variant<'a, I>(&self, trusted_keys: I) -> Result where I: IntoIterator, { let proof_chain = match &self.variant { - Variant::NodeApproval { - section_auth, - member_info, - section_chain, - .. - } => { - if !section_auth.verify(section_chain) { - return Err(Error::InvalidMessage); - } - - if !member_info.verify(section_chain) { - return Err(Error::InvalidMessage); + Variant::JoinResponse(resp) => { + if let JoinResponse::Approval { + ref section_auth, + ref member_info, + ref section_chain, + .. + } = **resp + { + if !section_auth.verify(section_chain) { + return Err(Error::InvalidMessage); + } + + if !member_info.verify(section_chain) { + return Err(Error::InvalidMessage); + } + + section_chain + } else { + return Ok(VerifyStatus::Full); } - - section_chain } Variant::SectionKnowledge { - src_info: (_, chain), + src_info: (_, ref chain), .. } => chain, Variant::Sync { section, .. } => section.chain(), diff --git a/src/node.rs b/src/node.rs index 3fde93406b..8cccfb9f09 100644 --- a/src/node.rs +++ b/src/node.rs @@ -20,7 +20,7 @@ use xor_name::{XorName, XOR_NAME_LEN}; /// Information and state of our node #[derive(Clone)] pub struct Node { - // Keep the secret key in Box to allow Clone while also preventing multiple copies to exist in + // Keep the secret key in Arc to allow Clone while also preventing multiple copies to exist in // memory which might be insecure. // TODO: find a way to not require `Clone`. pub keypair: Arc, diff --git a/src/relocation.rs b/src/relocation.rs index 5607ffbf29..9df6d94491 100644 --- a/src/relocation.rs +++ b/src/relocation.rs @@ -11,7 +11,6 @@ use crate::{ crypto::{self, Keypair, Verifier}, error::Error, - messages::RoutingMsgUtils, network::NetworkUtils, peer::PeerUtils, section::{SectionPeersUtils, SectionUtils}, @@ -120,7 +119,7 @@ pub trait SignedRelocateDetailsUtils { impl SignedRelocateDetailsUtils for SignedRelocateDetails { fn new(signed_msg: RoutingMsg) -> Result { - if let Variant::Relocate(_) = signed_msg.variant() { + if let Variant::Relocate(_) = signed_msg.variant { Ok(Self { signed_msg }) } else { Err(Error::InvalidMessage) @@ -128,8 +127,8 @@ impl SignedRelocateDetailsUtils for SignedRelocateDetails { } fn relocate_details(&self) -> Result<&RelocateDetails, Error> { - if let Variant::Relocate(details) = &self.signed_msg.variant() { - Ok(&details) + if let Variant::Relocate(details) = &self.signed_msg.variant { + Ok(details) } else { error!("SignedRelocateDetails does not contain Variant::Relocate"); Err(Error::InvalidMessage) diff --git a/src/routing/bootstrap.rs b/src/routing/bootstrap.rs index 64a6c64982..28b861d7f4 100644 --- a/src/routing/bootstrap.rs +++ b/src/routing/bootstrap.rs @@ -8,7 +8,7 @@ use super::{comm::ConnectionEvent, Comm}; use crate::{ - crypto::{self, Signature}, + crypto::{self}, error::{Error, Result}, messages::{RoutingMsgUtils, VerifyStatus}, node::Node, @@ -19,27 +19,23 @@ use crate::{ FIRST_SECTION_MAX_AGE, FIRST_SECTION_MIN_AGE, }; use futures::future; -use itertools::Itertools; use rand::seq::IteratorRandom; use resource_proof::ResourceProof; -use secured_linked_list::SecuredLinkedList; use sn_data_types::PublicKey; use sn_messaging::{ node::{ - JoinRequest, Proven, RelocatePayload, ResourceProofResponse, RoutingMsg, Section, - SectionAuthorityProvider, SignedRelocateDetails, Variant, + JoinRejectionReason, JoinRequest, JoinResponse, RelocatePayload, ResourceProofResponse, + RoutingMsg, Section, SignedRelocateDetails, Variant, }, - section_info::{GetSectionResponse, Message as SectionInfoMsg, SectionInfo}, DestInfo, DstLocation, MessageType, WireMsg, }; use std::{ - collections::{BTreeMap, HashSet, VecDeque}, - mem, + collections::{HashSet, VecDeque}, net::SocketAddr, }; use tokio::sync::mpsc; use tracing::Instrument; -use xor_name::{Prefix, XorName, XOR_NAME_LEN}; +use xor_name::{Prefix, XorName}; const BACKLOG_CAPACITY: usize = 100; @@ -121,200 +117,29 @@ impl<'a> State<'a> { } async fn run( - mut self, + self, bootstrap_addrs: Vec, genesis_key: Option, relocate_details: Option, ) -> Result<(Node, Section, Vec<(RoutingMsg, SocketAddr, DestInfo)>)> { - let (prefix, section_key, elders) = self - .bootstrap(bootstrap_addrs, relocate_details.as_ref()) - .await?; - - // For the first section, using age random among 6 to 100 to avoid relocating too many nodes - // at the same time. - if prefix.is_empty() && self.node.name()[XOR_NAME_LEN - 1] < FIRST_SECTION_MIN_AGE { - let age: u8 = (FIRST_SECTION_MIN_AGE..FIRST_SECTION_MAX_AGE) - .choose(&mut rand::thread_rng()) - .unwrap_or(FIRST_SECTION_MAX_AGE); - - let new_keypair = crypto::gen_keypair(&Prefix::default().range_inclusive(), age); - let new_name = crypto::name(&new_keypair.public); - - info!("Setting name to {}", new_name); - self.node = Node::new(new_keypair, self.node.addr); - } - - let relocate_payload = if let Some(details) = relocate_details { - Some(self.process_relocation(&prefix, details)?) - } else { - None - }; - - self.join(section_key, elders, genesis_key, relocate_payload) - .await - } - - // Send a `GetSectionQuery` and waits for the response. If the response is `Redirect`, - // repeat with the new set of contacts. If it is `Success`, proceeed to the `join` phase. - async fn bootstrap( - &mut self, - mut bootstrap_addrs: Vec, - relocate_details: Option<&SignedRelocateDetails>, - ) -> Result<(Prefix, bls::PublicKey, BTreeMap)> { - // Avoid sending more than one request to the same peer. - let mut used_addrs = HashSet::new(); - - loop { - used_addrs.extend(bootstrap_addrs.iter().copied()); - self.send_get_section_request(mem::take(&mut bootstrap_addrs), relocate_details) - .await?; - - let (response, sender, _dest_info) = - self.receive_get_section_response(relocate_details).await?; - - match response { - GetSectionResponse::Success(SectionInfo { - prefix, - pk_set, - elders, - joins_allowed, - }) => { - if !joins_allowed { - error!( - "Network is set to not taking any new joining node, try join later." - ); - return Err(Error::TryJoinLater); - } - let key = pk_set.public_key(); - info!( - "Joining a section ({:b}), key: {:?}, elders: {:?} (given by {:?})", - prefix, key, elders, sender - ); - return Ok((prefix, key, elders)); - } - GetSectionResponse::Redirect(mut new_bootstrap_addrs) => { - // Ignore already used addresses - new_bootstrap_addrs.retain(|addr| !used_addrs.contains(&addr.1)); - - if new_bootstrap_addrs.is_empty() { - debug!("Bootstrapping redirected to the same set of peers we already contacted - ignoring"); - } else { - info!( - "Bootstrapping redirected to another set of peers: {:?}", - new_bootstrap_addrs, - ); - bootstrap_addrs = new_bootstrap_addrs - .iter() - .map(|(_, addr)| addr) - .cloned() - .collect(); - } - } - GetSectionResponse::SectionInfoUpdate(error) => { - error!("Infrastructure error: {:?}", error); - } - GetSectionResponse::NodeNotReachable => { - return Err(Error::NodeNotReachable); - } - } - } - } - - async fn send_get_section_request( - &mut self, - recipients: Vec, - relocate_details: Option<&SignedRelocateDetails>, - ) -> Result<()> { - if recipients.is_empty() { - return Ok(()); - } - - debug!("Sending GetSectionQuery to {:?}", recipients); - let (dest_pk, dest_xorname) = match relocate_details { - Some(details) => ( - PublicKey::from(details.relocate_details()?.destination_key), + Some(ref details) => ( + details.relocate_details()?.destination_key, *details.destination()?, ), - None => (PublicKey::from(self.node.keypair.public), self.node.name()), - }; - - let message = SectionInfoMsg::GetSectionQuery { - public_key: PublicKey::from(self.node.keypair.public), - is_node: true, + None => { + // Use our XorName as we do not know their name or section key yet. + (bls::SecretKey::random().public_key(), self.node.name()) + } }; - // Group up with our XorName as we do not know their name yet. - let recipients = recipients + let elders = bootstrap_addrs .iter() .map(|addr| (dest_xorname, *addr)) .collect(); - let dest_info = DestInfo { - dest: dest_xorname, - dest_section_pk: PublicKey::bls(&dest_pk).unwrap_or_else(|| { - // Create a random PK as we'll be getting the right one with the response - bls::SecretKey::random().public_key() - }), - }; - let _ = self - .send_tx - .send(( - MessageType::SectionInfo { - msg: message, - dest_info, - }, - recipients, - )) - .await; - - Ok(()) - } - - async fn receive_get_section_response( - &mut self, - relocate_details: Option<&SignedRelocateDetails>, - ) -> Result<(GetSectionResponse, SocketAddr, DestInfo)> { - let destination = match relocate_details { - Some(details) => *details.destination()?, - None => self.node.name(), - }; - - while let Some((message, sender)) = self.recv_rx.next().await { - match message { - MessageType::SectionInfo { - msg: SectionInfoMsg::GetSectionResponse(response), - dest_info, - } => match response { - GetSectionResponse::Redirect(addrs) if addrs.is_empty() => { - error!("Invalid GetSectionResponse::Redirect: missing peers"); - continue; - } - GetSectionResponse::Success(SectionInfo { prefix, .. }) - if !prefix.matches(&destination) => - { - error!("Invalid GetSectionResponse::Success: bad prefix"); - continue; - } - GetSectionResponse::Redirect(_) - | GetSectionResponse::NodeNotReachable - | GetSectionResponse::Success { .. } - | GetSectionResponse::SectionInfoUpdate(_) => { - return Ok((response, sender, dest_info)) - } - }, - MessageType::Routing { msg, dest_info } => { - self.backlog_message(msg, sender, dest_info) - } - MessageType::Node { .. } - | MessageType::SectionInfo { .. } - | MessageType::Client { .. } => {} - } - } - - error!("RoutingMsg sender unexpectedly closed"); - // TODO: consider more specific error here (e.g. `BootstrapInterrupted`) - Err(Error::InvalidState) + self.join(dest_pk, elders, genesis_key, relocate_details) + .await } // Change our name to fit the destination section and apply the new age. @@ -343,38 +168,56 @@ impl<'a> State<'a> { Ok(relocate_payload) } - // Send `JoinRequest` and wait for the response. If the response is `Rejoin`, repeat with the - // new info. If it is `Approval`, returns the initial `Section` value to use by this node, - // completing the bootstrap. If it is a `Challenge`, carry out resource signed calculation. + // Send `JoinRequest` and wait for the response. If the response is: + // - `Retry`: repeat with the new info. + // - `Redirect`: repeat with the new set of addresses. + // - `ResourceChallenge`: carry out resource proof calculation. + // - `Approval`: returns the initial `Section` value to use by this node, + // completing the bootstrap. async fn join( mut self, mut section_key: bls::PublicKey, - elders: BTreeMap, + mut recipients: Vec<(XorName, SocketAddr)>, genesis_key: Option, - relocate_payload: Option, + relocate_details: Option, ) -> Result<(Node, Section, Vec<(RoutingMsg, SocketAddr, DestInfo)>)> { let join_request = JoinRequest { section_key, - relocate_payload: relocate_payload.clone(), + relocate_payload: None, resource_proof_response: None, }; - let recipients = elders - .into_iter() - .map(|(name, addr)| (name, addr)) - .collect_vec(); - self.send_join_requests(join_request, recipients, section_key) + + // Avoid sending more than one request to the same peer. + let mut used_recipient = HashSet::::new(); + + self.send_join_requests(join_request, &recipients, section_key) .await?; + let mut relocate_payload = None; loop { + used_recipient.extend(recipients.iter().map(|(_, addr)| addr)); + let (response, sender, dest_info) = self .receive_join_response(genesis_key.as_ref(), relocate_payload.as_ref()) .await?; match response { + JoinResponse::Rejected(JoinRejectionReason::NodeNotReachable(addr)) => { + error!( + "Node cannot join the network since it is not externally reachable: {}", + addr + ); + return Err(Error::NodeNotReachable(addr)); + } + JoinResponse::Rejected(JoinRejectionReason::JoinsDisallowed) => { + error!("Network is set to not taking any new joining node, try join later."); + return Err(Error::TryJoinLater); + } JoinResponse::Approval { section_auth, genesis_key, section_chain, + .. } => { return Ok(( self.node, @@ -382,31 +225,104 @@ impl<'a> State<'a> { self.backlog.into_iter().collect(), )); } - JoinResponse::Retry { - section_auth, - section_key: new_section_key, - } => { - if new_section_key == section_key { + JoinResponse::Retry(section_auth) => { + if section_auth.section_key() == section_key { continue; } - if section_auth.prefix().matches(&self.node.name()) { + let new_recipients: Vec<(XorName, SocketAddr)> = section_auth + .elders + .iter() + .map(|(name, addr)| (*name, *addr)) + .collect(); + + let prefix = section_auth.prefix; + + // For the first section, using age random among 6 to 100 to avoid + // relocating too many nodes at the same time. + if prefix.is_empty() && self.node.age() < FIRST_SECTION_MIN_AGE { + let age: u8 = (FIRST_SECTION_MIN_AGE..FIRST_SECTION_MAX_AGE) + .choose(&mut rand::thread_rng()) + .unwrap_or(FIRST_SECTION_MAX_AGE); + + let new_keypair = + crypto::gen_keypair(&Prefix::default().range_inclusive(), age); + let new_name = crypto::name(&new_keypair.public); + + info!("Setting Node name to {}", new_name); + self.node = Node::new(new_keypair, self.node.addr); + } + + // if we are relocating, and we didn't generate + // the relocation payload yet, we do it now + if relocate_payload.is_none() { + if let Some(ref details) = relocate_details { + relocate_payload = + Some(self.process_relocation(&prefix, details.clone())?); + } + } + + if relocate_payload.is_some() || prefix.matches(&self.node.name()) { + info!( + "Newer Join response for our prefix {:?} from {:?}", + section_auth, sender + ); + section_key = section_auth.section_key(); + let join_request = JoinRequest { + section_key, + relocate_payload: relocate_payload.clone(), + resource_proof_response: None, + }; + + recipients = new_recipients; + self.send_join_requests(join_request, &recipients, section_key) + .await?; + } else { + warn!( + "Newer Join response not for our prefix {:?} from {:?}", + section_auth, sender, + ); + } + } + JoinResponse::Redirect(section_auth) => { + if section_auth.section_key() == section_key { + continue; + } + + // Ignore already used recipients + let new_recipients: Vec<(XorName, SocketAddr)> = section_auth + .elders + .iter() + .filter(|(_, addr)| !used_recipient.contains(addr)) + .map(|(name, addr)| (*name, *addr)) + .collect(); + + if new_recipients.is_empty() { + debug!("Joining redirected to the same set of peers we already contacted - ignoring response"); + continue; + } else { + info!( + "Joining redirected to another set of peers: {:?}", + new_recipients, + ); + } + + let prefix = section_auth.prefix; + + if relocate_payload.is_some() || prefix.matches(&self.node.name()) { info!( "Newer Join response for our prefix {:?} from {:?}", section_auth, sender ); - section_key = new_section_key; + section_key = section_auth.section_key(); let join_request = JoinRequest { section_key, relocate_payload: relocate_payload.clone(), resource_proof_response: None, }; - let recipients = section_auth - .elders() - .iter() - .map(|(name, addr)| (*name, *addr)) - .collect(); - self.send_join_requests(join_request, recipients, section_key) + + recipients = new_recipients; + self.send_join_requests(join_request, &recipients, section_key) .await?; } else { warn!( @@ -436,7 +352,7 @@ impl<'a> State<'a> { nonce_signature, }), }; - let recipients = vec![(dest_info.dest, sender)]; + let recipients = &[(dest_info.dest, sender)]; self.send_join_requests(join_request, recipients, section_key) .await?; } @@ -447,7 +363,7 @@ impl<'a> State<'a> { async fn send_join_requests( &mut self, join_request: JoinRequest, - recipients: Vec<(XorName, SocketAddr)>, + recipients: &[(XorName, SocketAddr)], section_key: bls::PublicKey, ) -> Result<()> { info!("Sending {:?} to {:?}", join_request, recipients); @@ -466,12 +382,11 @@ impl<'a> State<'a> { MessageType::Routing { msg: message, dest_info: DestInfo { - // Will be overridden while sending to multiple elders - dest: XorName::random(), + dest: recipients[0].0, dest_section_pk: section_key, }, }, - recipients, + recipients.to_vec(), )) .await; @@ -483,63 +398,71 @@ impl<'a> State<'a> { expected_genesis_key: Option<&bls::PublicKey>, relocate_payload: Option<&RelocatePayload>, ) -> Result<(JoinResponse, SocketAddr, DestInfo)> { + let destination = match relocate_payload { + Some(payload) => *payload.details.destination()?, + None => self.node.name(), + }; + while let Some((message, sender)) = self.recv_rx.next().await { - let (message, dest_info) = match message { - MessageType::Routing { msg, dest_info } => (msg, dest_info), + // we are interested only in `JoinResponse` type of messages + let (routing_msg, dest_info, join_response) = match message { MessageType::Node { .. } | MessageType::Client { .. } | MessageType::SectionInfo { .. } => continue, + MessageType::Routing { msg, dest_info } => { + if let Variant::JoinResponse(resp) = &msg.variant { + let join_response = resp.clone(); + (msg, dest_info, *join_response) + } else { + self.backlog_message(msg, sender, dest_info); + continue; + } + } }; - match message.variant() { - Variant::JoinRetry { - section_auth, - section_key, - } => { - if !self.verify_message(&message, None) { + match join_response { + JoinResponse::Rejected(JoinRejectionReason::NodeNotReachable(_)) + | JoinResponse::Rejected(JoinRejectionReason::JoinsDisallowed) => { + return Ok((join_response, sender, dest_info)); + } + JoinResponse::Retry(ref section_auth) + | JoinResponse::Redirect(ref section_auth) => { + if !section_auth.prefix.matches(&destination) { + error!("Invalid JoinResponse bad prefix: {:?}", join_response); continue; } - return Ok(( - JoinResponse::Retry { - section_auth: section_auth.clone(), - section_key: *section_key, - }, - sender, - dest_info, - )); + if section_auth.elders.is_empty() { + error!( + "Invalid JoinResponse, empty list of Elders: {:?}", + join_response + ); + continue; + } + + if !self.verify_message(&routing_msg, None) { + continue; + } + + return Ok((join_response, sender, dest_info)); } - Variant::ResourceChallenge { - data_size, - difficulty, - nonce, - nonce_signature, - } => { + JoinResponse::ResourceChallenge { .. } => { if relocate_payload.is_some() { - trace!("Ignore ResourceChallenge when relocating"); + warn!("Ignoring ResourceChallenge received when relocating"); continue; } - if !self.verify_message(&message, None) { + if !self.verify_message(&routing_msg, None) { continue; } - return Ok(( - JoinResponse::ResourceChallenge { - data_size: *data_size, - difficulty: *difficulty, - nonce: *nonce, - nonce_signature: *nonce_signature, - }, - sender, - dest_info, - )); + return Ok((join_response, sender, dest_info)); } - Variant::NodeApproval { + JoinResponse::Approval { genesis_key, - section_auth, - member_info, - section_chain, + ref section_auth, + ref member_info, + .. } => { if member_info.value.peer.name() != &self.node.name() { trace!("Ignore NodeApproval not for us"); @@ -547,8 +470,8 @@ impl<'a> State<'a> { } if let Some(expected_genesis_key) = expected_genesis_key { - if expected_genesis_key != genesis_key { - trace!("Unexpected Genesis key"); + if expected_genesis_key != &genesis_key { + trace!("Genesis key doesn't match"); continue; } } @@ -559,7 +482,7 @@ impl<'a> State<'a> { None }; - if !self.verify_message(&message, trusted_key) { + if !self.verify_message(&routing_msg, trusted_key) { continue; } @@ -568,18 +491,8 @@ impl<'a> State<'a> { section_auth.value.prefix, ); - return Ok(( - JoinResponse::Approval { - section_auth: section_auth.clone(), - genesis_key: *genesis_key, - section_chain: section_chain.clone(), - }, - sender, - dest_info, - )); + return Ok((join_response, sender, dest_info)); } - - _ => self.backlog_message(message, sender, dest_info), } } @@ -613,24 +526,6 @@ impl<'a> State<'a> { } } -enum JoinResponse { - Approval { - section_auth: Proven, - genesis_key: bls::PublicKey, - section_chain: SecuredLinkedList, - }, - Retry { - section_auth: SectionAuthorityProvider, - section_key: bls::PublicKey, - }, - ResourceChallenge { - data_size: usize, - difficulty: u8, - nonce: [u8; 32], - nonce_signature: Signature, - }, -} - // Receiver of incoming messages that can be backed either by a raw `qp2p::ConnectionEvent` receiver // or by receiver of deserialized `RoutingMsg` and provides a unified interface on top of them. enum MessageReceiver<'a> { @@ -685,9 +580,12 @@ async fn send_messages( mod tests { use super::*; use crate::{ - agreement::test_utils::*, error::Error as RoutingError, messages::RoutingMsgUtils, - routing::tests::SecretKeySet, section::test_utils::*, section::MemberInfoUtils, ELDER_SIZE, - MIN_ADULT_AGE, MIN_AGE, + agreement::test_utils::*, + error::Error as RoutingError, + messages::RoutingMsgUtils, + section::test_utils::*, + section::{MemberInfoUtils, SectionAuthorityProviderUtils}, + ELDER_SIZE, MIN_ADULT_AGE, MIN_AGE, }; use anyhow::{anyhow, Error, Result}; use assert_matches::assert_matches; @@ -695,22 +593,22 @@ mod tests { future::{self, Either}, pin_mut, }; - use sn_messaging::{node::MemberInfo, section_info::SectionInfo}; + use secured_linked_list::SecuredLinkedList; + use sn_messaging::node::{MemberInfo, SectionAuthorityProvider}; + use std::collections::BTreeMap; use tokio::task; #[tokio::test] - async fn bootstrap_as_adult() -> Result<()> { + async fn join_as_adult() -> Result<()> { let (send_tx, mut send_rx) = mpsc::channel(1); let (recv_tx, recv_rx) = mpsc::channel(1); let recv_rx = MessageReceiver::Deserialized(recv_rx); - let (section_auth, mut nodes, _) = + let (section_auth, mut nodes, sk_set) = gen_section_authority_provider(Prefix::default(), ELDER_SIZE); let bootstrap_node = nodes.remove(0); let bootstrap_addr = bootstrap_node.addr; - let sk_set = SecretKeySet::random(); - let pk_set = sk_set.public_keys(); let sk = sk_set.secret_key(); let pk = sk.public_key(); @@ -734,44 +632,35 @@ mod tests { // Create the task that executes the body of the test, but don't run it either. let others = async { - // Receive GetSectionQuery + // Receive JoinRequest let (message, recipients) = send_rx .recv() .await - .ok_or_else(|| anyhow!("GetSectionQuery was not received"))?; + .ok_or_else(|| anyhow!("JoinRequest was not received"))?; let bootstrap_addrs: Vec = recipients.iter().map(|(_name, addr)| *addr).collect(); assert_eq!(bootstrap_addrs, [bootstrap_addr]); - assert_matches!(message, MessageType::SectionInfo{ msg: SectionInfoMsg::GetSectionQuery { public_key, is_node: true }, .. } => { - assert_eq!(XorName::from(public_key), *peer.name()); + + let (message, dest_info) = assert_matches!(message, MessageType::Routing { msg, dest_info } => + (msg, dest_info)); + + assert_eq!(dest_info.dest, *peer.name()); + assert_matches!(message.variant, Variant::JoinRequest(request) => { + assert!(request.resource_proof_response.is_none()); + assert!(request.relocate_payload.is_none()); }); - let infrastructure_info = SectionInfo { - prefix: section_auth.prefix, - pk_set, - elders: section_auth - .peers() - .map(|peer| (*peer.name(), *peer.addr())) - .collect(), - joins_allowed: true, - }; - // Send GetSectionResponse::Success - let message = SectionInfoMsg::GetSectionResponse(GetSectionResponse::Success( - infrastructure_info, - )); - recv_tx.try_send(( - MessageType::SectionInfo { - msg: message, - dest_info: DestInfo { - dest: *peer.name(), - dest_section_pk: pk, - }, - }, - bootstrap_addr, - ))?; + // Send JoinResponse::Retry with section auth provider info + send_response( + &recv_tx, + Variant::JoinResponse(Box::new(JoinResponse::Retry(section_auth.clone()))), + &bootstrap_node, + section_auth.section_key(), + *peer.name(), + )?; - // Receive JoinRequest + // Receive the second JoinRequest with correct section info let (message, recipients) = send_rx .recv() .await @@ -788,38 +677,28 @@ mod tests { .map(|(name, addr)| (*name, *addr)) .collect::>(), ); - assert_matches!(message.variant(), Variant::JoinRequest(request) => { + assert_matches!(message.variant, Variant::JoinRequest(request) => { assert_eq!(request.section_key, pk); assert!(request.relocate_payload.is_none()); }); - // Send NodeApproval + // Send JoinResponse::Approval let section_auth = proven(sk, section_auth.clone())?; let member_info = proven(sk, MemberInfo::joined(peer))?; let proof_chain = SecuredLinkedList::new(pk); - let message = RoutingMsg::single_src( - &bootstrap_node, - DstLocation::DirectAndUnrouted, - Variant::NodeApproval { + send_response( + &recv_tx, + Variant::JoinResponse(Box::new(JoinResponse::Approval { genesis_key: pk, section_auth: section_auth.clone(), member_info, section_chain: proof_chain, - }, + })), + &bootstrap_node, section_auth.value.section_key(), + *peer.name(), )?; - recv_tx.try_send(( - MessageType::Routing { - msg: message, - dest_info: DestInfo { - dest: *peer.name(), - dest_section_pk: pk, - }, - }, - bootstrap_addr, - ))?; - Ok(()) }; @@ -834,30 +713,30 @@ mod tests { } #[tokio::test] - async fn receive_get_section_response_redirect() -> Result<()> { + async fn join_receive_redirect_response() -> Result<()> { let (send_tx, mut send_rx) = mpsc::channel(1); let (recv_tx, recv_rx) = mpsc::channel(1); let recv_rx = MessageReceiver::Deserialized(recv_rx); - let bootstrap_node = Node::new( - crypto::gen_keypair(&Prefix::default().range_inclusive(), MIN_ADULT_AGE), - gen_addr(), - ); + let (section_auth, mut nodes, sk_set) = + gen_section_authority_provider(Prefix::default(), ELDER_SIZE); + let bootstrap_node = nodes.remove(0); + let pk_set = sk_set.public_keys(); let node = Node::new( crypto::gen_keypair(&Prefix::default().range_inclusive(), MIN_ADULT_AGE), gen_addr(), ); let name = node.name(); - let mut state = State::new(node, send_tx, recv_rx); + let state = State::new(node, send_tx, recv_rx); - let bootstrap_task = state.bootstrap(vec![bootstrap_node.addr], None); + let bootstrap_task = state.run(vec![bootstrap_node.addr], None, None); let test_task = async move { - // Receive GetSectionQuery + // Receive JoinRequest let (message, recipients) = send_rx .recv() .await - .ok_or_else(|| anyhow!("GetSectionQuery was not received"))?; + .ok_or_else(|| anyhow!("JoinRequest was not received"))?; assert_eq!( recipients @@ -866,39 +745,33 @@ mod tests { .collect::>(), vec![bootstrap_node.addr] ); - assert_matches!( - message, - MessageType::SectionInfo { - msg: SectionInfoMsg::GetSectionQuery { .. }, - .. - } - ); - // Send GetSectionResponse::Redirect - let new_bootstrap_addrs: Vec<_> = (0..ELDER_SIZE) + assert_matches!(message, MessageType::Routing { msg, .. } => + assert_matches!(msg.variant, Variant::JoinRequest{..})); + + // Send JoinResponse::Redirect + let new_bootstrap_addrs: BTreeMap<_, _> = (0..ELDER_SIZE) .map(|_| (XorName::random(), gen_addr())) .collect(); - let message = SectionInfoMsg::GetSectionResponse(GetSectionResponse::Redirect( - new_bootstrap_addrs.clone(), - )); - recv_tx.try_send(( - MessageType::SectionInfo { - msg: message, - dest_info: DestInfo { - dest: name, - dest_section_pk: bls::SecretKey::random().public_key(), - }, - }, - bootstrap_node.addr, - ))?; + send_response( + &recv_tx, + Variant::JoinResponse(Box::new(JoinResponse::Redirect(SectionAuthorityProvider { + prefix: Prefix::default(), + public_key_set: pk_set.clone(), + elders: new_bootstrap_addrs.clone(), + }))), + &bootstrap_node, + section_auth.section_key(), + name, + )?; task::yield_now().await; - // Receive new GetSectionQuery + // Receive new JoinRequest with redirected bootstrap contacts let (message, recipients) = send_rx .recv() .await - .ok_or_else(|| anyhow!("GetSectionQuery was not received"))?; + .ok_or_else(|| anyhow!("JoinRequest was not received"))?; assert_eq!( recipients @@ -906,17 +779,18 @@ mod tests { .map(|peer| peer.1) .collect::>(), new_bootstrap_addrs - .into_iter() - .map(|(_, addr)| addr) + .iter() + .map(|(_, addr)| *addr) .collect::>() ); - assert_matches!( - message, - MessageType::SectionInfo { - msg: SectionInfoMsg::GetSectionQuery { .. }, - .. - } - ); + + let (message, dest_info) = assert_matches!(message, MessageType::Routing { msg, dest_info } => + (msg, dest_info)); + + assert_eq!(dest_info.dest_section_pk, pk_set.public_key()); + assert_matches!(message.variant, Variant::JoinRequest(req) => { + assert_eq!(req.section_key, pk_set.public_key()); + }); Ok(()) }; @@ -931,81 +805,70 @@ mod tests { } #[tokio::test] - async fn invalid_get_section_response_redirect() -> Result<()> { + async fn join_invalid_redirect_response() -> Result<()> { let (send_tx, mut send_rx) = mpsc::channel(1); let (recv_tx, recv_rx) = mpsc::channel(1); let recv_rx = MessageReceiver::Deserialized(recv_rx); - let bootstrap_node = Node::new( - crypto::gen_keypair(&Prefix::default().range_inclusive(), MIN_ADULT_AGE), - gen_addr(), - ); + let (section_auth, mut nodes, sk_set) = + gen_section_authority_provider(Prefix::default(), ELDER_SIZE); + let bootstrap_node = nodes.remove(0); + let pk_set = sk_set.public_keys(); let node = Node::new( crypto::gen_keypair(&Prefix::default().range_inclusive(), MIN_ADULT_AGE), gen_addr(), ); let node_name = node.name(); - let mut state = State::new(node, send_tx, recv_rx); + let state = State::new(node, send_tx, recv_rx); - let bootstrap_task = state.bootstrap(vec![bootstrap_node.addr], None); + let bootstrap_task = state.run(vec![bootstrap_node.addr], None, None); let test_task = async { let (message, _) = send_rx .recv() .await - .ok_or_else(|| anyhow!("GetSectionQuery was not received"))?; - - assert_matches!( - message, - MessageType::SectionInfo { - msg: SectionInfoMsg::GetSectionQuery { .. }, - .. - } - ); + .ok_or_else(|| anyhow!("JoinRequest was not received"))?; - let message = SectionInfoMsg::GetSectionResponse(GetSectionResponse::Redirect(vec![])); + assert_matches!(message, MessageType::Routing { msg, .. } => + assert_matches!(msg.variant, Variant::JoinRequest{..})); - recv_tx.try_send(( - MessageType::SectionInfo { - msg: message, - dest_info: DestInfo { - dest: node_name, - dest_section_pk: bls::SecretKey::random().public_key(), - }, - }, - bootstrap_node.addr, - ))?; + send_response( + &recv_tx, + Variant::JoinResponse(Box::new(JoinResponse::Redirect(SectionAuthorityProvider { + prefix: Prefix::default(), + public_key_set: pk_set.clone(), + elders: BTreeMap::new(), + }))), + &bootstrap_node, + section_auth.section_key(), + node_name, + )?; task::yield_now().await; let addrs = (0..ELDER_SIZE) .map(|_| (XorName::random(), gen_addr())) .collect(); - let message = SectionInfoMsg::GetSectionResponse(GetSectionResponse::Redirect(addrs)); - recv_tx.try_send(( - MessageType::SectionInfo { - msg: message, - dest_info: DestInfo { - dest: node_name, - dest_section_pk: bls::SecretKey::random().public_key(), - }, - }, - bootstrap_node.addr, - ))?; + send_response( + &recv_tx, + Variant::JoinResponse(Box::new(JoinResponse::Redirect(SectionAuthorityProvider { + prefix: Prefix::default(), + public_key_set: pk_set.clone(), + elders: addrs, + }))), + &bootstrap_node, + section_auth.section_key(), + node_name, + )?; task::yield_now().await; let (message, _) = send_rx .recv() .await - .ok_or_else(|| anyhow!("GetSectionQuery was not received"))?; + .ok_or_else(|| anyhow!("JoinRequest was not received"))?; - assert_matches!( - message, - MessageType::SectionInfo { - msg: SectionInfoMsg::GetSectionQuery { .. }, - .. - } - ); + assert_matches!(message, MessageType::Routing { msg, .. } => + assert_matches!(msg.variant, Variant::JoinRequest{..})); Ok(()) }; @@ -1020,7 +883,7 @@ mod tests { } #[tokio::test] - async fn joins_disallowed_get_section_response_success() -> Result<()> { + async fn join_disallowed_response() -> Result<()> { let (send_tx, mut send_rx) = mpsc::channel(1); let (recv_tx, recv_rx) = mpsc::channel(1); let recv_rx = MessageReceiver::Deserialized(recv_rx); @@ -1028,11 +891,6 @@ mod tests { let (section_auth, mut nodes, _) = gen_section_authority_provider(Prefix::default(), ELDER_SIZE); let bootstrap_node = nodes.remove(0); - let bootstrap_addr = bootstrap_node.addr; - - let sk_set = SecretKeySet::random(); - let pk_set = sk_set.public_keys(); - let pk = pk_set.public_key(); let node = Node::new( crypto::gen_keypair(&Prefix::default().range_inclusive(), MIN_ADULT_AGE), @@ -1040,57 +898,34 @@ mod tests { ); let node_name = node.name(); + let state = State::new(node, send_tx, recv_rx); - let mut state = State::new(node, send_tx, recv_rx); - - let bootstrap_task = state.bootstrap(vec![bootstrap_addr], None); - - // Send an valid `BootstrapResponse::Join` followed by a valid one. The invalid one is - // ignored and the valid one processed normally. + let bootstrap_task = state.run(vec![bootstrap_node.addr], None, None); let test_task = async { let (message, _) = send_rx .recv() .await - .ok_or_else(|| anyhow!("GetSectionQuery was not received"))?; + .ok_or_else(|| anyhow!("JoinRequest was not received"))?; - assert_matches!( - message, - MessageType::SectionInfo { - msg: SectionInfoMsg::GetSectionQuery { .. }, - .. - } - ); + assert_matches!(message, MessageType::Routing { msg, .. } => + assert_matches!(msg.variant, Variant::JoinRequest{..})); - let infrastructure_info = SectionInfo { - prefix: section_auth.prefix, - pk_set, - elders: section_auth - .peers() - .map(|peer| (*peer.name(), *peer.addr())) - .collect(), - joins_allowed: false, - }; - // Send GetSectionResponse::Success with the flag of joins_allowed set to false. - let message = SectionInfoMsg::GetSectionResponse(GetSectionResponse::Success( - infrastructure_info, - )); - recv_tx.try_send(( - MessageType::SectionInfo { - msg: message, - dest_info: DestInfo { - dest: node_name, - dest_section_pk: pk, - }, - }, - bootstrap_addr, - ))?; + send_response( + &recv_tx, + Variant::JoinResponse(Box::new(JoinResponse::Rejected( + JoinRejectionReason::JoinsDisallowed, + ))), + &bootstrap_node, + section_auth.section_key(), + node_name, + )?; Ok(()) }; - let (bootstrap_result, test_result) = future::join(bootstrap_task, test_task).await; + let (join_result, test_result) = future::join(bootstrap_task, test_task).await; - if let Err(RoutingError::TryJoinLater) = bootstrap_result { + if let Err(RoutingError::TryJoinLater) = join_result { } else { return Err(anyhow!("Not getting an execpted network rejection.")); } @@ -1099,7 +934,7 @@ mod tests { } #[tokio::test] - async fn invalid_get_section_response_success() -> Result<()> { + async fn join_invalid_retry_prefix_response() -> Result<()> { let (send_tx, mut send_rx) = mpsc::channel(1); let (recv_tx, recv_rx) = mpsc::channel(1); let recv_rx = MessageReceiver::Deserialized(recv_rx); @@ -1108,110 +943,7 @@ mod tests { crypto::gen_keypair(&Prefix::default().range_inclusive(), MIN_ADULT_AGE), gen_addr(), ); - let node = Node::new( - crypto::gen_keypair(&Prefix::default().range_inclusive(), MIN_ADULT_AGE), - gen_addr(), - ); - let node_name = node.name(); - - let (good_prefix, bad_prefix) = { - let p0 = Prefix::default().pushed(false); - let p1 = Prefix::default().pushed(true); - - if node_name.bit(0) { - (p1, p0) - } else { - (p0, p1) - } - }; - - let mut state = State::new(node, send_tx, recv_rx); - - let bootstrap_task = state.bootstrap(vec![bootstrap_node.addr], None); - - // Send an invalid `BootstrapResponse::Join` followed by a valid one. The invalid one is - // ignored and the valid one processed normally. - let test_task = async { - let (message, _) = send_rx - .recv() - .await - .ok_or_else(|| anyhow!("GetSectionQuery was not received"))?; - - assert_matches!( - message, - MessageType::SectionInfo { - msg: SectionInfoMsg::GetSectionQuery { .. }, - .. - } - ); - - let infrastructure_info = SectionInfo { - prefix: bad_prefix, - pk_set: bls::SecretKeySet::random(0, &mut rand::thread_rng()).public_keys(), - elders: (0..ELDER_SIZE) - .map(|_| (bad_prefix.substituted_in(rand::random()), gen_addr())) - .collect(), - joins_allowed: true, - }; - - let message = SectionInfoMsg::GetSectionResponse(GetSectionResponse::Success( - infrastructure_info, - )); - - recv_tx.try_send(( - MessageType::SectionInfo { - msg: message, - dest_info: DestInfo { - dest: node_name, - dest_section_pk: bls::SecretKey::random().public_key(), - }, - }, - bootstrap_node.addr, - ))?; - task::yield_now().await; - - let infrastructure_info = SectionInfo { - prefix: good_prefix, - pk_set: bls::SecretKeySet::random(0, &mut rand::thread_rng()).public_keys(), - elders: (0..ELDER_SIZE) - .map(|_| (good_prefix.substituted_in(rand::random()), gen_addr())) - .collect(), - joins_allowed: true, - }; - - let message = SectionInfoMsg::GetSectionResponse(GetSectionResponse::Success( - infrastructure_info, - )); - - recv_tx.try_send(( - MessageType::SectionInfo { - msg: message, - dest_info: DestInfo { - dest: node_name, - dest_section_pk: bls::SecretKey::random().public_key(), - }, - }, - bootstrap_node.addr, - ))?; - Ok(()) - }; - - let (bootstrap_result, test_result) = future::join(bootstrap_task, test_task).await; - let _ = bootstrap_result?; - test_result - } - - #[tokio::test] - async fn invalid_join_response_rejoin() -> Result<()> { - let (send_tx, mut send_rx) = mpsc::channel(1); - let (recv_tx, recv_rx) = mpsc::channel(1); - let recv_rx = MessageReceiver::Deserialized(recv_rx); - - let bootstrap_node = Node::new( - crypto::gen_keypair(&Prefix::default().range_inclusive(), MIN_ADULT_AGE), - gen_addr(), - ); let node = Node::new( crypto::gen_keypair(&Prefix::default().range_inclusive(), MIN_ADULT_AGE), gen_addr(), @@ -1244,60 +976,38 @@ mod tests { .ok_or_else(|| anyhow!("RoutingMsg was not received"))?; let message = assert_matches!(message, MessageType::Routing{ msg, .. } => msg); - assert_matches!(message.variant(), Variant::JoinRequest(_)); - - // Send `Rejoin` with bad prefix - let message = RoutingMsg::single_src( + assert_matches!(message.variant, Variant::JoinRequest(_)); + + // Send `Retry` with bad prefix + send_response( + &recv_tx, + Variant::JoinResponse(Box::new(JoinResponse::Retry( + gen_section_authority_provider(bad_prefix, ELDER_SIZE).0, + ))), &bootstrap_node, - DstLocation::DirectAndUnrouted, - Variant::JoinRetry { - section_auth: gen_section_authority_provider(bad_prefix, ELDER_SIZE).0, - section_key: bls::SecretKey::random().public_key(), - }, section_key, + node_name, )?; - - recv_tx.try_send(( - MessageType::Routing { - msg: message, - dest_info: DestInfo { - dest: node_name, - dest_section_pk: section_key, - }, - }, - bootstrap_node.addr, - ))?; task::yield_now().await; - // Send `Rejoin` with good prefix - let message = RoutingMsg::single_src( + // Send `Retry` with good prefix + send_response( + &recv_tx, + Variant::JoinResponse(Box::new(JoinResponse::Retry( + gen_section_authority_provider(good_prefix, ELDER_SIZE).0, + ))), &bootstrap_node, - DstLocation::DirectAndUnrouted, - Variant::JoinRetry { - section_auth: gen_section_authority_provider(good_prefix, ELDER_SIZE).0, - section_key: bls::SecretKey::random().public_key(), - }, section_key, + node_name, )?; - recv_tx.try_send(( - MessageType::Routing { - msg: message, - dest_info: DestInfo { - dest: node_name, - dest_section_pk: section_key, - }, - }, - bootstrap_node.addr, - ))?; - let (message, _) = send_rx .recv() .await .ok_or_else(|| anyhow!("RoutingMsg was not received"))?; let message = assert_matches!(message, MessageType::Routing{ msg, .. } => msg); - assert_matches!(message.variant(), Variant::JoinRequest(_)); + assert_matches!(message.variant, Variant::JoinRequest(_)); Ok(()) }; @@ -1310,4 +1020,33 @@ mod tests { Either::Right((output, _)) => output, } } + + // test helper + fn send_response( + recv_tx: &mpsc::Sender<(MessageType, SocketAddr)>, + variant: Variant, + bootstrap_node: &Node, + section_key: bls::PublicKey, + node_name: XorName, + ) -> Result<()> { + let message = RoutingMsg::single_src( + bootstrap_node, + DstLocation::DirectAndUnrouted, + variant, + section_key, + )?; + + recv_tx.try_send(( + MessageType::Routing { + msg: message, + dest_info: DestInfo { + dest: node_name, + dest_section_pk: section_key, + }, + }, + bootstrap_node.addr, + ))?; + + Ok(()) + } } diff --git a/src/routing/comm.rs b/src/routing/comm.rs index 2345777bf4..b4a7c96dff 100644 --- a/src/routing/comm.rs +++ b/src/routing/comm.rs @@ -13,7 +13,6 @@ use futures::stream::{FuturesUnordered, StreamExt}; use hex_fmt::HexFmt; use qp2p::{Endpoint, QuicP2p}; use sn_messaging::MessageType; -use std::net::IpAddr; use std::{ fmt::{self, Debug, Formatter}, net::SocketAddr, @@ -115,10 +114,6 @@ impl Comm { self.endpoint.socket_addr() } - pub(crate) fn local_addr(&self) -> SocketAddr { - self.endpoint.local_addr() - } - /// Sends a message on an existing connection. If no such connection exists, returns an error. pub async fn send_on_existing_connection( &self, @@ -141,7 +136,30 @@ impl Comm { /// Tests whether the peer is reachable. pub async fn is_reachable(&self, peer: &SocketAddr) -> Result<(), Error> { - is_reachable(self.endpoint.local_addr().ip(), peer).await + let qp2p_config = qp2p::Config { + local_ip: Some(self.endpoint.local_addr().ip()), + local_port: Some(0), + forward_port: false, + ..Default::default() + }; + + let qp2p = QuicP2p::with_config(Some(qp2p_config), &[], false) + .map_err(|err| Error::InvalidConfig { err })?; + let (connectivity_endpoint, _, _, _) = qp2p + .new_endpoint() + .await + .map_err(|err| Error::CannotConnectEndpoint { err })?; + + connectivity_endpoint + .is_reachable(peer) + .await + .map_err(|err| { + info!("Peer {} is NOT externally reachable: {}", peer, err); + Error::AddressNotReachable { err } + }) + .map(|()| { + info!("Peer {} is externally reachable.", peer); + }) } /// Sends a message to multiple recipients. Attempts to send to `delivery_group_size` @@ -278,33 +296,6 @@ impl Comm { } } -pub(crate) async fn is_reachable(our_local_ip: IpAddr, peer: &SocketAddr) -> Result<()> { - let qp2p_config = qp2p::Config { - local_ip: Some(our_local_ip), - local_port: Some(0), - forward_port: false, - ..Default::default() - }; - - let qp2p = QuicP2p::with_config(Some(qp2p_config), &[], false) - .map_err(|err| Error::InvalidConfig { err })?; - let (connectivity_endpoint, _, _, _) = qp2p - .new_endpoint() - .await - .map_err(|err| Error::CannotConnectEndpoint { err })?; - - connectivity_endpoint - .is_reachable(peer) - .await - .map_err(|err| { - info!("Peer {} is NOT externally reachable: {}", peer, err); - Error::AddressNotReachable { err } - }) - .map(|()| { - info!("Peer {} is externally reachable.", peer); - }) -} - impl Drop for Comm { fn drop(&mut self) { self.endpoint.close() @@ -543,10 +534,7 @@ mod tests { // Send the first message. let key0 = bls::SecretKey::random().public_key(); let msg0 = MessageType::SectionInfo { - msg: Message::GetSectionQuery { - public_key: PublicKey::Bls(key0), - is_node: true, - }, + msg: Message::GetSectionQuery(PublicKey::Bls(key0)), dest_info: DestInfo { dest: name, dest_section_pk: key0, @@ -571,10 +559,7 @@ mod tests { // Send the second message. let key1 = bls::SecretKey::random().public_key(); let msg1 = MessageType::SectionInfo { - msg: Message::GetSectionQuery { - public_key: PublicKey::Bls(key1), - is_node: true, - }, + msg: Message::GetSectionQuery(PublicKey::Bls(key1)), dest_info: DestInfo { dest: name, dest_section_pk: key1, @@ -637,10 +622,7 @@ mod tests { fn new_section_info_message() -> MessageType { let random_bls_pk = bls::SecretKey::random().public_key(); MessageType::SectionInfo { - msg: Message::GetSectionQuery { - public_key: PublicKey::Bls(random_bls_pk), - is_node: true, - }, + msg: Message::GetSectionQuery(PublicKey::Bls(random_bls_pk)), dest_info: DestInfo { dest: XorName::random(), dest_section_pk: bls::SecretKey::random().public_key(), diff --git a/src/routing/command.rs b/src/routing/command.rs index 2b1d4fdfc6..2330fd02c6 100644 --- a/src/routing/command.rs +++ b/src/routing/command.rs @@ -9,10 +9,10 @@ use crate::{routing::Peer, section::SectionKeyShare, XorName}; use bytes::Bytes; use hex_fmt::HexFmt; -use sn_messaging::Signed; use sn_messaging::{ node::{ - DkgFailureSignedSet, Proposal, RoutingMsg, SectionAuthorityProvider, SignedRelocateDetails, + DkgFailureSignedSet, Proposal, RoutingMsg, SectionAuthorityProvider, Signed, + SignedRelocateDetails, }, section_info::Message as SectionInfoMsg, DestInfo, Itinerary, MessageType, @@ -85,12 +85,12 @@ pub(crate) enum Command { /// Attempt to set JoinsAllowed flag. SetJoinsAllowed(bool), /// Test peer's connectivity - TestConnectivity { + ProposeOnline { peer: Peer, // Previous name if relocated. previous_name: Option, // The key of the destination section that the joining node knows, if any. - their_knowledge: Option, + destination_key: Option, }, /// Proposes a peer as offline ProposeOffline(XorName), @@ -203,12 +203,12 @@ impl Debug for Command { .debug_tuple("SetJoinsAllowed") .field(joins_allowed) .finish(), - Self::TestConnectivity { + Self::ProposeOnline { peer, previous_name, .. } => f - .debug_struct("TestConnectivity") + .debug_struct("ProposeOnline") .field("peer", peer) .field("previous_name", previous_name) .finish(), diff --git a/src/routing/core/anti_entropy.rs b/src/routing/core/anti_entropy.rs index b949598ce9..78d878e017 100644 --- a/src/routing/core/anti_entropy.rs +++ b/src/routing/core/anti_entropy.rs @@ -144,7 +144,7 @@ mod tests { let (mut actions, _) = process(&env.node, &env.section, &msg, dest_info)?; assert_matches!(&actions.send.pop(), Some(message) => { - assert_matches!(message.variant(), Variant::SectionKnowledge { src_info, .. } => { + assert_matches!(message.variant, Variant::SectionKnowledge { ref src_info, .. } => { assert_eq!(src_info.0.value, *env.section.authority_provider()); assert_eq!(src_info.1, *env.section.chain()); }); diff --git a/src/routing/core/public_api.rs b/src/routing/core/api.rs similarity index 91% rename from src/routing/core/public_api.rs rename to src/routing/core/api.rs index 0ce9dca008..b7cb166956 100644 --- a/src/routing/core/public_api.rs +++ b/src/routing/core/api.rs @@ -21,7 +21,8 @@ use bytes::Bytes; use secured_linked_list::SecuredLinkedList; use sn_messaging::{ node::{ - MemberInfo, Network, Peer, Proposal, RoutingMsg, Section, SectionAuthorityProvider, Variant, + JoinRejectionReason, JoinResponse, MemberInfo, Network, Peer, Proposal, RoutingMsg, + Section, SectionAuthorityProvider, Variant, }, section_info::{Error as TargetSectionError, SectionInfo}, DestInfo, EndUser, Itinerary, SrcLocation, @@ -136,13 +137,13 @@ impl Core { // Send message over the network. pub async fn relay_message(&self, msg: &RoutingMsg) -> Result> { let (presumed_targets, dg_size) = delivery_group::delivery_targets( - msg.dst(), + &msg.dst, &self.node.name(), &self.section, &self.network, )?; - let target_name = msg.dst().name().ok_or(Error::CannotRoute)?; + let target_name = msg.dst.name().ok_or(Error::CannotRoute)?; let dest_pk = self.section_key_by_name(&target_name); let mut targets = vec![]; @@ -167,7 +168,7 @@ impl Core { msg, dg_size, targets, - msg.section_pk(), + msg.section_pk, ); let command = Command::send_message_to_nodes( @@ -300,12 +301,25 @@ impl Core { &self, peer: Peer, previous_name: Option, - their_knowledge: Option, + destination_key: Option, ) -> Result> { - self.propose(Proposal::Online { - member_info: MemberInfo::joined(peer), - previous_name, - their_knowledge, - }) + if peer.is_reachable() { + self.propose(Proposal::Online { + member_info: MemberInfo::joined(peer), + previous_name, + destination_key, + }) + } else { + let variant = Variant::JoinResponse(Box::new(JoinResponse::Rejected( + JoinRejectionReason::NodeNotReachable(*peer.addr()), + ))); + + trace!("Sending {:?} to {}", variant, peer); + Ok(vec![self.send_direct_message( + (*peer.name(), *peer.addr()), + variant, + *self.section.chain().last_key(), + )?]) + } } } diff --git a/src/routing/core/messaging/handling/agreement.rs b/src/routing/core/messaging/handling/agreement.rs index 4e27586e0b..da50b536ab 100644 --- a/src/routing/core/messaging/handling/agreement.rs +++ b/src/routing/core/messaging/handling/agreement.rs @@ -21,11 +21,10 @@ use crate::{ Error, Event, MIN_AGE, }; use secured_linked_list::SecuredLinkedList; -use sn_messaging::Signed; use sn_messaging::{ node::{ MemberInfo, PeerState, PlainMessage, Proposal, Proven, RoutingMsg, - SectionAuthorityProvider, Variant, + SectionAuthorityProvider, Signed, Variant, }, DestInfo, DstLocation, }; diff --git a/src/routing/core/messaging/handling/decisions.rs b/src/routing/core/messaging/handling/decisions.rs index d78389a8f1..6155bd772a 100644 --- a/src/routing/core/messaging/handling/decisions.rs +++ b/src/routing/core/messaging/handling/decisions.rs @@ -8,13 +8,12 @@ use super::Core; use crate::{ - messages::{MessageStatus, RoutingMsgUtils, SrcAuthorityUtils}, + messages::{MessageStatus, SrcAuthorityUtils}, section::{SectionAuthorityProviderUtils, SectionUtils}, Result, }; -use sn_messaging::SignedShare; use sn_messaging::{ - node::{Proposal, RelocatePromise, RoutingMsg, Variant}, + node::{JoinResponse, Proposal, RelocatePromise, RoutingMsg, SignedShare, Variant}, DstLocation, }; use xor_name::XorName; @@ -22,7 +21,7 @@ use xor_name::XorName; // Decisions impl Core { pub(crate) fn decide_message_status(&self, msg: &RoutingMsg) -> Result { - match msg.variant() { + match &msg.variant { Variant::SectionKnowledge { .. } => { if !self.is_elder() { return Ok(MessageStatus::Useless); @@ -31,7 +30,7 @@ impl Core { Variant::UserMessage(_) => { // If elder, always handle UserMessage, otherwise // handle it only if addressed directly to us as a node. - if !self.is_elder() && *msg.dst() != DstLocation::Node(self.node.name()) { + if !self.is_elder() && msg.dst != DstLocation::Node(self.node.name()) { return Ok(MessageStatus::Useless); } } @@ -55,10 +54,16 @@ impl Core { return Ok(MessageStatus::Useless); } } - Variant::NodeApproval { .. } | Variant::JoinRetry { .. } => { - // Skip validation of these. We will validate them inside the bootstrap task. - return Ok(MessageStatus::Useful); - } + Variant::JoinResponse(resp) => match **resp { + JoinResponse::Approval { .. } + | JoinResponse::Retry(_) + | JoinResponse::Redirect(_) + | JoinResponse::Rejected(_) => { + // Skip validation of these. We will validate them inside the bootstrap task. + return Ok(MessageStatus::Useful); + } + JoinResponse::ResourceChallenge { .. } => {} + }, Variant::Sync { section, .. } => { // Ignore `Sync` not for our section. if !section.prefix().matches(&self.node.name()) { @@ -86,8 +91,7 @@ impl Core { | Variant::DkgMessage { .. } | Variant::DkgFailureObservation { .. } | Variant::DkgFailureAgreement { .. } - | Variant::SectionKnowledgeQuery { .. } - | Variant::ResourceChallenge { .. } => {} + | Variant::SectionKnowledgeQuery { .. } => {} } if self.verify_message(msg)? { diff --git a/src/routing/core/messaging/handling/mod.rs b/src/routing/core/messaging/handling/mod.rs index 86d0151854..8845c163f7 100644 --- a/src/routing/core/messaging/handling/mod.rs +++ b/src/routing/core/messaging/handling/mod.rs @@ -21,7 +21,7 @@ use crate::{ network::NetworkUtils, peer::PeerUtils, relocation::{RelocatePayloadUtils, RelocateState, SignedRelocateDetailsUtils}, - routing::{comm, command::Command}, + routing::command::Command, section::{ SectionAuthorityProviderUtils, SectionKeyShare, SectionPeersUtils, SectionUtils, FIRST_SECTION_MAX_AGE, FIRST_SECTION_MIN_AGE, MIN_ADULT_AGE, @@ -32,17 +32,14 @@ use sn_messaging::node::Error as AggregatorError; use sn_messaging::{ client::ClientMsg, node::{ - DkgFailureSignedSet, JoinRequest, Network, Peer, Proposal, RoutingMsg, Section, - SectionAuthorityProvider, SignedRelocateDetails, SrcAuthority, Variant, + DkgFailureSignedSet, JoinRejectionReason, JoinRequest, JoinResponse, Network, Peer, + Proposal, RoutingMsg, Section, SectionAuthorityProvider, SignedRelocateDetails, + SrcAuthority, Variant, }, section_info::{GetSectionResponse, Message as SectionInfoMsg, SectionInfo}, DestInfo, DstLocation, EndUser, MessageType, }; -use std::{ - collections::BTreeSet, - iter, - net::{IpAddr, SocketAddr}, -}; +use std::{collections::BTreeSet, iter, net::SocketAddr}; use xor_name::XorName; // Message handling @@ -56,7 +53,7 @@ impl Core { let mut commands = vec![]; // Check if the message is for us. - let in_dst_location = msg.dst().contains(&self.node.name(), self.section.prefix()); + let in_dst_location = msg.dst.contains(&self.node.name(), self.section.prefix()); // TODO: Broadcast message to our section when src is a Node as nodes might not know // all the elders in our section and the msg needs to be propagated. if !in_dst_location { @@ -98,54 +95,48 @@ impl Core { sender: SocketAddr, message: SectionInfoMsg, dest_info: DestInfo, // The DestInfo contains the XorName of the sender and a random PK during the initial SectionQuery, - our_local_ip: IpAddr, ) -> Vec { - // Provide our PK as the dest PK, only redundant as the message itself contains details regarding relocation/registration. + // Provide our PK as the dest PK, only redundant as the message + // itself contains details regarding relocation/registration. let dest_info = DestInfo { dest: dest_info.dest, dest_section_pk: *self.section().chain().last_key(), }; + match message { - SectionInfoMsg::GetSectionQuery { - public_key, - is_node, - } => { + SectionInfoMsg::GetSectionQuery(public_key) => { let name = XorName::from(public_key); - let response = - if is_node && comm::is_reachable(our_local_ip, &sender).await.is_err() { - GetSectionResponse::NodeNotReachable - } else { - debug!("Received GetSectionQuery({}) from {}", name, sender); - - if let (true, Ok(pk_set)) = - (self.section.prefix().matches(&name), self.public_key_set()) - { - GetSectionResponse::Success(SectionInfo { - prefix: self.section.authority_provider().prefix(), - pk_set, - elders: self - .section - .authority_provider() - .peers() - .map(|peer| (*peer.name(), *peer.addr())) - .collect(), - joins_allowed: self.joins_allowed, - }) - } else { - // If we are elder, we should know a section that is closer to `name` that us. - // Otherwise redirect to our elders. - let section_auth = self - .network - .closest(&name) - .unwrap_or_else(|| self.section.authority_provider()); - let targets = section_auth - .elders() - .iter() - .map(|(name, addr)| (*name, *addr)) - .collect(); - GetSectionResponse::Redirect(targets) - } - }; + + debug!("Received GetSectionQuery({}) from {}", name, sender); + + let response = if let (true, Ok(pk_set)) = + (self.section.prefix().matches(&name), self.public_key_set()) + { + GetSectionResponse::Success(SectionInfo { + prefix: self.section.authority_provider().prefix(), + pk_set, + elders: self + .section + .authority_provider() + .peers() + .map(|peer| (*peer.name(), *peer.addr())) + .collect(), + joins_allowed: self.joins_allowed, + }) + } else { + // If we are elder, we should know a section that is closer to `name` that us. + // Otherwise redirect to our elders. + let section_auth = self + .network + .closest(&name) + .unwrap_or_else(|| self.section.authority_provider()); + let targets = section_auth + .elders() + .iter() + .map(|(name, addr)| (*name, *addr)) + .collect(); + GetSectionResponse::Redirect(targets) + }; let response = SectionInfoMsg::GetSectionResponse(response); debug!("Sending {:?} to {}", response, sender); @@ -237,7 +228,7 @@ impl Core { } pub(crate) fn aggregate_message(&mut self, msg: RoutingMsg) -> Result> { - let signed_share = if let SrcAuthority::BlsShare { signed_share, .. } = msg.src() { + let signed_share = if let SrcAuthority::BlsShare { signed_share, .. } = &msg.src { signed_share } else { // Not an aggregating message, return unchanged. @@ -368,9 +359,7 @@ impl Core { commands.extend(result?); Ok(commands) } - Variant::NodeApproval { .. } - | Variant::JoinRetry { .. } - | Variant::ResourceChallenge { .. } => { + Variant::JoinResponse(_) => { if let Some(RelocateState::InProgress(message_tx)) = &mut self.relocate_state { if let Some(sender) = sender { trace!("Forwarding {:?} to the bootstrap task", msg); @@ -463,17 +452,17 @@ impl Core { if let DstLocation::EndUser(EndUser { xorname: xor_name, socket_id, - }) = msg.dst() + }) = msg.dst { - if let Some(socket_addr) = self.get_socket_addr(*socket_id).copied() { + if let Some(socket_addr) = self.get_socket_addr(socket_id).copied() { trace!("sending user message {:?} to client {:?}", msg, socket_addr); return Ok(vec![Command::SendMessage { - recipients: vec![(*xor_name, socket_addr)], + recipients: vec![(xor_name, socket_addr)], delivery_group_size: 1, message: MessageType::Client { msg: ClientMsg::from(content)?, dest_info: DestInfo { - dest: *xor_name, + dest: xor_name, dest_section_pk: *self.section.chain().last_key(), }, }, @@ -490,9 +479,9 @@ impl Core { self.send_event(Event::MessageReceived { content, src: msg.src.src_location(), - dst: *msg.dst(), + dst: msg.dst, signed: msg.signed(), - section_pk: msg.section_pk(), + section_pk: msg.section_pk, }) .await; @@ -555,20 +544,18 @@ impl Core { ) -> Result> { debug!("Received {:?} from {}", join_request, peer); - if !self.section.prefix().matches(peer.name()) { + if !self.section.prefix().matches(peer.name()) + || join_request.section_key != *self.section.chain().last_key() + { debug!( - "Ignoring JoinRequest from {} - name doesn't match our prefix {:?}.", + "JoinRequest from {} - name doesn't match our prefix {:?}.", peer, self.section.prefix() ); - return Ok(vec![]); - } - if join_request.section_key != *self.section.chain().last_key() { - let variant = Variant::JoinRetry { - section_auth: self.section.authority_provider().clone(), - section_key: *self.section.chain().last_key(), - }; + let variant = Variant::JoinResponse(Box::new(JoinResponse::Retry( + self.section.authority_provider().clone(), + ))); trace!("Sending {:?} to {}", variant, peer); return Ok(vec![self.send_direct_message( (*peer.name(), *peer.addr()), @@ -586,7 +573,7 @@ impl Core { } // This joining node is being relocated to us. - let (mut age, previous_name, their_knowledge) = + let (mut age, previous_name, destination_key) = if let Some(ref payload) = join_request.relocate_payload { if !payload.verify_identity(peer.name()) { debug!( @@ -624,10 +611,18 @@ impl Core { ) } else if !self.joins_allowed { debug!( - "Ignoring JoinRequest from {} - new node not acceptable.", + "Rejecting JoinRequest from {} - joins currently not allowed.", peer, ); - return Ok(vec![]); + let variant = Variant::JoinResponse(Box::new(JoinResponse::Rejected( + JoinRejectionReason::JoinsDisallowed, + ))); + trace!("Sending {:?} to {}", variant, peer); + return Ok(vec![self.send_direct_message( + (*peer.name(), *peer.addr()), + variant, + *self.section.chain().last_key(), + )?]); } else { // Start as Adult as long as passed resource signeding. (MIN_ADULT_AGE, None, None) @@ -684,10 +679,10 @@ impl Core { } } - Ok(vec![Command::TestConnectivity { + Ok(vec![Command::ProposeOnline { peer, previous_name, - their_knowledge, + destination_key, }]) } @@ -702,24 +697,4 @@ impl Core { Ok(commands) } - - /* FIXME: bring back unresponsiveness detection - // Detect non-responsive peers and vote them out. - pub(crate) fn vote_for_remove_unresponsive_peers(&mut self, core: &mut Core) -> Result<()> { - let unresponsive_nodes: Vec<_> = self - .consensus_engine - .detect_unresponsive(self.shared_state.our_info()) - .into_iter() - .filter_map(|id| self.shared_state.our_members.get(id.name())) - .map(|info| info.clone().leave()) - .collect(); - - for info in unresponsive_nodes { - info!("Voting for unresponsive node {}", info.peer); - self.cast_unordered_vote(core, Vote::Offline(info))?; - } - - Ok(()) - } - */ } diff --git a/src/routing/core/messaging/handling/resource_proof.rs b/src/routing/core/messaging/handling/resource_proof.rs index 6cb85c947f..122f85520d 100644 --- a/src/routing/core/messaging/handling/resource_proof.rs +++ b/src/routing/core/messaging/handling/resource_proof.rs @@ -18,7 +18,7 @@ use crate::{ Error, Result, }; use ed25519_dalek::Verifier; -use sn_messaging::node::{Peer, ResourceProofResponse, Variant}; +use sn_messaging::node::{JoinResponse, Peer, ResourceProofResponse, Variant}; use xor_name::XorName; // Resource signed @@ -52,12 +52,12 @@ impl Core { let nonce: [u8; 32] = rand::random(); let serialized = bincode::serialize(&(peer.name(), &nonce)).map_err(|_| Error::InvalidMessage)?; - let response = Variant::ResourceChallenge { + let response = Variant::JoinResponse(Box::new(JoinResponse::ResourceChallenge { data_size: RESOURCE_PROOF_DATA_SIZE, difficulty: RESOURCE_PROOF_DIFFICULTY, nonce, nonce_signature: crypto::sign(&serialized, &self.node.keypair), - }; + })); self.send_direct_message( (*peer.name(), *peer.addr()), diff --git a/src/routing/core/messaging/sending.rs b/src/routing/core/messaging/sending.rs index 5637a811c9..1bd021fb24 100644 --- a/src/routing/core/messaging/sending.rs +++ b/src/routing/core/messaging/sending.rs @@ -20,8 +20,8 @@ use crate::{ use secured_linked_list::SecuredLinkedList; use sn_messaging::{ node::{ - DkgKey, ElderCandidates, MemberInfo, Network, Peer, PlainMessage, Proposal, Proven, - RelocateDetails, RelocatePromise, RoutingMsg, Section, Variant, + DkgKey, ElderCandidates, JoinResponse, MemberInfo, Network, Peer, PlainMessage, Proposal, + Proven, RelocateDetails, RelocatePromise, RoutingMsg, Section, Variant, }, DestInfo, DstLocation, }; @@ -41,12 +41,12 @@ impl Core { let addr = *member_info.value.peer.addr(); let name = *member_info.value.peer.name(); - let variant = Variant::NodeApproval { + let variant = Variant::JoinResponse(Box::new(JoinResponse::Approval { genesis_key: *self.section.genesis_key(), section_auth: self.section.proven_authority_provider().clone(), member_info, section_chain: self.section.chain().clone(), - }; + })); let message = RoutingMsg::single_src( &self.node, diff --git a/src/routing/core/mod.rs b/src/routing/core/mod.rs index f60c155dd8..99f74a696a 100644 --- a/src/routing/core/mod.rs +++ b/src/routing/core/mod.rs @@ -7,10 +7,10 @@ // permissions and limitations relating to use of the SAFE Network Software. mod anti_entropy; +mod api; mod connectivity; mod delivery_group; mod messaging; -mod public_api; use super::{command::Command, enduser_registry::EndUserRegistry, split_barrier::SplitBarrier}; use crate::{ diff --git a/src/routing/dispatcher.rs b/src/routing/dispatcher.rs index 7f04c0feaa..592d23e317 100644 --- a/src/routing/dispatcher.rs +++ b/src/routing/dispatcher.rs @@ -110,15 +110,12 @@ impl Dispatcher { sender, message, dest_info, - } => { - let our_local_ip = self.comm.local_addr().ip(); - Ok(self - .core - .write() - .await - .handle_section_info_msg(sender, message, dest_info, our_local_ip) - .await) - } + } => Ok(self + .core + .write() + .await + .handle_section_info_msg(sender, message, dest_info) + .await), Command::HandleTimeout(token) => self.core.write().await.handle_timeout(token), Command::HandleAgreement { proposal, signed } => { self.core @@ -180,16 +177,16 @@ impl Dispatcher { Command::SetJoinsAllowed(joins_allowed) => { self.core.read().await.set_joins_allowed(joins_allowed) } - Command::TestConnectivity { + Command::ProposeOnline { mut peer, previous_name, - their_knowledge, + destination_key, } => { peer.set_reachable(self.comm.is_reachable(peer.addr()).await.is_ok()); self.core .read() .await - .make_online_proposal(peer, previous_name, their_knowledge) + .make_online_proposal(peer, previous_name, destination_key) .await } Command::ProposeOffline(name) => self.core.read().await.propose_offline(name), diff --git a/src/routing/tests/mod.rs b/src/routing/tests/mod.rs index ed478060ec..4fb1434e65 100644 --- a/src/routing/tests/mod.rs +++ b/src/routing/tests/mod.rs @@ -33,13 +33,12 @@ use bytes::Bytes; use resource_proof::ResourceProof; use secured_linked_list::SecuredLinkedList; use sn_data_types::{Keypair, PublicKey}; -use sn_messaging::Signed; use sn_messaging::{ location::{Aggregation, Itinerary}, node::{ - JoinRequest, MemberInfo, Network, Peer, PeerState, PlainMessage, Proposal, Proven, - RelocateDetails, RelocatePayload, ResourceProofResponse, RoutingMsg, Section, - SectionAuthorityProvider, SignedRelocateDetails, Variant, + JoinRequest, JoinResponse, MemberInfo, Network, Peer, PeerState, PlainMessage, Proposal, + Proven, RelocateDetails, RelocatePayload, ResourceProofResponse, RoutingMsg, Section, + SectionAuthorityProvider, Signed, SignedRelocateDetails, Variant, }, section_info::{GetSectionResponse, Message as SectionInfoMsg}, DestInfo, DstLocation, MessageType, SrcLocation, @@ -71,10 +70,7 @@ async fn receive_matching_get_section_request_as_elder() -> Result<()> { ); let new_node_name = new_node.name(); - let message = SectionInfoMsg::GetSectionQuery { - public_key: PublicKey::from(new_node.keypair.public), - is_node: true, - }; + let message = SectionInfoMsg::GetSectionQuery(PublicKey::from(new_node.keypair.public)); let mut commands = dispatcher .handle_command(Command::HandleSectionInfoMsg { @@ -142,10 +138,7 @@ async fn receive_mismatching_get_section_request_as_adult() -> Result<()> { let new_node_comm = create_comm().await?; let new_node_addr = new_node_comm.our_connection_info(); - let message = SectionInfoMsg::GetSectionQuery { - public_key: random_pk, - is_node: true, - }; + let message = SectionInfoMsg::GetSectionQuery(random_pk); let mut commands = dispatcher .handle_command(Command::HandleSectionInfoMsg { @@ -217,14 +210,20 @@ async fn receive_join_request_without_resource_proof_response() -> Result<()> { .await? .into_iter(); - let response_message = assert_matches!( + let response_message_variant = assert_matches!( commands.next(), - Some(Command::SendMessage { message: MessageType::Routing { msg, .. }, .. }) => msg + Some(Command::SendMessage { + message: MessageType::Routing { + msg: RoutingMsg { variant: Variant::JoinResponse(variant), .. }, + .. + }, + .. + }) => variant ); assert_matches!( - response_message.variant(), - Variant::ResourceChallenge { .. } + *response_message_variant, + JoinResponse::ResourceChallenge { .. } ); Ok(()) @@ -282,17 +281,17 @@ async fn receive_join_request_with_resource_proof_response() -> Result<()> { let mut test_connectivity = false; for command in commands { - if let Command::TestConnectivity { + if let Command::ProposeOnline { peer, previous_name, - their_knowledge, + destination_key, } = command { assert_eq!(*peer.name(), new_node.name()); assert_eq!(*peer.addr(), new_node.addr); assert_eq!(peer.age(), FIRST_SECTION_MIN_AGE); assert_eq!(previous_name, None); - assert_eq!(their_knowledge, None); + assert_eq!(destination_key, None); test_connectivity = true; } @@ -387,15 +386,15 @@ async fn receive_join_request_from_relocated_node() -> Result<()> { let mut test_connectivity = false; for command in commands { - if let Command::TestConnectivity { + if let Command::ProposeOnline { peer, previous_name, - their_knowledge, + destination_key, } = command { assert_eq!(peer, relocated_node.peer()); assert_eq!(previous_name, Some(relocated_node_old_name)); - assert_eq!(their_knowledge, Some(section_key)); + assert_eq!(destination_key, Some(section_key)); test_connectivity = true; } @@ -425,7 +424,7 @@ async fn aggregate_proposals() -> Result<()> { let proposal = Proposal::Online { member_info, previous_name: None, - their_knowledge: None, + destination_key: None, }; for index in 0..THRESHOLD { @@ -566,7 +565,7 @@ async fn handle_agreement_on_online_of_elder_candidate() -> Result<()> { let proposal = Proposal::Online { member_info, previous_name: Some(XorName::random()), - their_knowledge: Some(sk_set.secret_key().public_key()), + destination_key: Some(sk_set.secret_key().public_key()), }; let signed = prove(sk_set.secret_key(), &proposal.as_signable())?; @@ -588,7 +587,7 @@ async fn handle_agreement_on_online_of_elder_candidate() -> Result<()> { _ => continue, }; - let actual_elder_candidates = match message.variant() { + let actual_elder_candidates = match message.variant { Variant::DkgStart { elder_candidates, .. } => elder_candidates, @@ -622,7 +621,7 @@ async fn handle_online_command( let proposal = Proposal::Online { member_info, previous_name: None, - their_knowledge: None, + destination_key: None, }; let signed = prove(sk_set.secret_key(), &proposal.as_signable())?; @@ -645,14 +644,17 @@ async fn handle_online_command( _ => continue, }; - match message.variant() { - Variant::NodeApproval { - section_auth: proven_section_auth, - .. - } => { - assert_eq!(proven_section_auth.value, *section_auth); - assert_eq!(recipients, [(*peer.name(), *peer.addr())]); - status.node_approval_sent = true; + match message.variant { + Variant::JoinResponse(response) => { + if let JoinResponse::Approval { + section_auth: proven_section_auth, + .. + } = *response + { + assert_eq!(proven_section_auth.value, *section_auth); + assert_eq!(recipients, [(*peer.name(), *peer.addr())]); + status.node_approval_sent = true; + } } Variant::Relocate(details) => { if details.pub_id != *peer.name() { @@ -828,7 +830,7 @@ async fn handle_agreement_on_offline_of_elder() -> Result<()> { _ => continue, }; - let actual_elder_candidates = match message.variant() { + let actual_elder_candidates = match message.variant { Variant::DkgStart { elder_candidates, .. } => elder_candidates, @@ -967,7 +969,7 @@ async fn handle_untrusted_message(source: UntrustedMessageSource) -> Result<()> continue; }; - if let Variant::BouncedUntrustedMessage { msg, dest_info } = message.variant() { + if let Variant::BouncedUntrustedMessage { msg, dest_info } = message.variant { assert_eq!( recipients .into_iter() @@ -975,7 +977,7 @@ async fn handle_untrusted_message(source: UntrustedMessageSource) -> Result<()> .collect::>(), expected_recipients ); - assert_eq!(**msg, original_message); + assert_eq!(*msg, original_message); assert_eq!(dest_info.dest_section_pk, pk0); bounce_sent = true; @@ -1079,11 +1081,11 @@ async fn handle_bounced_untrusted_message() -> Result<()> { _ => continue, }; - match message.variant() { + match message.variant { Variant::UserMessage(content) => { assert_eq!(recipients, [(other_node.name(), other_node.addr)]); assert_eq!(*content, original_message_content); - assert_eq!(message.section_pk(), pk0); + assert_eq!(message.section_pk, pk0); message_sent = true; } @@ -1245,9 +1247,9 @@ async fn handle_untrusted_sync() -> Result<()> { _ => continue, }; - match message.variant() { + match message.variant { Variant::BouncedUntrustedMessage { msg, .. } => { - assert_eq!(**msg, orig_message); + assert_eq!(*msg, orig_message); assert_eq!(recipients, [(sender.name(), sender.addr)]); bounce_sent = true; } @@ -1343,7 +1345,7 @@ async fn handle_bounced_untrusted_sync() -> Result<()> { _ => continue, }; - match message.variant() { + match message.variant { Variant::Sync { section, .. } => { assert_eq!(recipients, [(sender.name(), sender.addr)]); assert!(section.chain().has_key(&pk0)); @@ -1424,7 +1426,7 @@ async fn relocation(relocated_peer_role: RelocatedPeerRole) -> Result<()> { } match relocated_peer_role { RelocatedPeerRole::NonElder => { - let details = match message.variant() { + let details = match message.variant { Variant::Relocate(details) => details, _ => continue, }; @@ -1433,7 +1435,7 @@ async fn relocation(relocated_peer_role: RelocatedPeerRole) -> Result<()> { assert_eq!(details.age, relocated_peer.age() + 1); } RelocatedPeerRole::Elder => { - let promise = match message.variant() { + let promise = match message.variant { Variant::RelocatePromise(promise) => promise, _ => continue, }; @@ -1493,12 +1495,12 @@ async fn message_to_self(dst: MessageDst) -> Result<()> { assert_matches!(&commands[..], [Command::HandleMessage { sender, message, dest_info }] => { assert_eq!(sender.as_ref(), Some(peer.addr())); - assert_eq!(message.src().src_location(), src); - assert_eq!(message.dst(), &dst); + assert_eq!(message.src.src_location(), src); + assert_eq!(&message.dst, &dst); assert_eq!(dest_info.dest, dst_name); assert_matches!( - message.variant(), - Variant::UserMessage(actual_content) if actual_content == &content + &message.variant, + Variant::UserMessage(actual_content) if Bytes::from(actual_content.clone()) == content ); }); @@ -1578,8 +1580,8 @@ async fn handle_elders_update() -> Result<()> { _ => continue, }; - let section = match message.variant() { - Variant::Sync { section, .. } => section, + let section = match message.variant { + Variant::Sync { ref section, .. } => section, _ => continue, }; @@ -1703,7 +1705,7 @@ async fn handle_demote_during_split() -> Result<()> { _ => continue, }; - if matches!(message.variant(), Variant::Sync { .. }) { + if matches!(message.variant, Variant::Sync { .. }) { sync_recipients.extend(recipients); } } @@ -1820,7 +1822,7 @@ fn create_relocation_trigger(sk: &bls::SecretKey, age: u8) -> Result<(Proposal, let proposal = Proposal::Online { member_info: MemberInfo::joined(create_peer(MIN_ADULT_AGE)), previous_name: Some(rand::random()), - their_knowledge: None, + destination_key: None, }; let signature = sk.sign(&bincode::serialize(&proposal.as_signable())?); diff --git a/src/section/mod.rs b/src/section/mod.rs index c0edafae9d..c1a2132edf 100644 --- a/src/section/mod.rs +++ b/src/section/mod.rs @@ -33,8 +33,8 @@ use secured_linked_list::{error::Error as SecuredLinkedListError, SecuredLinkedL use serde::Serialize; use sn_messaging::node::{ ElderCandidates, MemberInfo, Peer, Proven, Section, SectionAuthorityProvider, SectionPeers, + Signed, }; -use sn_messaging::Signed; use std::{collections::BTreeSet, convert::TryInto, iter, marker::Sized, net::SocketAddr}; use xor_name::{Prefix, XorName};