diff --git a/Cargo.toml b/Cargo.toml index 262f9b3e95..d8d7007f6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ log = "~0.4.8" lru_time_cache = "~0.11.0" qp2p = { version = "~0.8.5", features = ["upnp"] } rand = "~0.7.3" -rand_xorshift = "~0.2.0" +rand_chacha = "~0.2.2" serde = { version = "1.0.117", features = ["derive"] } tiny-keccak = { version = "2.0.2", features = ["sha3"] } tokio = { version = "~0.2.22", features = ["sync", "time", "rt-util"] } diff --git a/src/consensus/vote.rs b/src/consensus/vote.rs index 2191b5a337..45e23084db 100644 --- a/src/consensus/vote.rs +++ b/src/consensus/vote.rs @@ -53,9 +53,6 @@ pub(crate) enum Vote { key_index: u64, }, - // Voted to change the age of the given node. - ChangeAge(MemberInfo), - // Voted to send an user message whose source is our section. SendMessage { message: Box, @@ -97,7 +94,6 @@ impl<'a> Serialize for SignableView<'a> { Vote::OurKey { key, .. } => key.serialize(serializer), Vote::TheirKey { prefix, key } => (prefix, key).serialize(serializer), Vote::TheirKnowledge { prefix, key_index } => (prefix, key_index).serialize(serializer), - Vote::ChangeAge(member_info) => member_info.serialize(serializer), Vote::SendMessage { message, .. } => message.as_signable().serialize(serializer), } } diff --git a/src/relocation.rs b/src/relocation.rs index 431120b4fe..e176b40706 100644 --- a/src/relocation.rs +++ b/src/relocation.rs @@ -9,19 +9,25 @@ //! Relocation related types and utilities. use crate::{ - consensus::Proven, crypto::{self, Keypair, Signature, Verifier}, error::Error, messages::{Message, Variant}, network::Network, + peer::Peer, section::{MemberInfo, Section}, + MIN_AGE, }; use bytes::Bytes; +use rand::{Rng, SeedableRng}; +use rand_chacha::ChaCha8Rng; use serde::{de::Error as SerdeDeError, Deserialize, Deserializer, Serialize, Serializer}; -use std::net::SocketAddr; +use std::{convert::TryInto, net::SocketAddr}; use tokio::sync::mpsc; use xor_name::XorName; +pub(crate) const MIN_STARTUP_PHASE_AGE: u8 = MIN_AGE + 1; +pub(crate) const MAX_STARTUP_PHASE_AGE: u8 = 32; + /// Find all nodes to relocate after a churn event and create the relocate actions for them. pub(crate) fn actions( section: &Section, @@ -31,12 +37,12 @@ pub(crate) fn actions( ) -> Vec<(MemberInfo, RelocateAction)> { section .members() - .proven_joined() - .filter(|info| check(info.value.peer.age(), churn_signature)) + .joined() + .filter(|info| check(info.peer.age(), churn_signature)) .map(|info| { ( - info.value, - RelocateAction::new(section, network, info, churn_name), + *info, + RelocateAction::new(section, network, &info.peer, churn_name), ) }) .collect() @@ -61,18 +67,34 @@ impl RelocateDetails { pub(crate) fn new( section: &Section, network: &Network, - info: &MemberInfo, + peer: &Peer, + destination: XorName, + ) -> Self { + Self::with_age( + section, + network, + peer, + destination, + peer.age().saturating_add(1), + ) + } + + pub(crate) fn with_age( + section: &Section, + network: &Network, + peer: &Peer, destination: XorName, + age: u8, ) -> Self { let destination_key = *network .key_by_name(&destination) .unwrap_or_else(|| section.chain().first_key()); Self { - pub_id: *info.peer.name(), + pub_id: *peer.name(), destination, destination_key, - age: info.peer.age().saturating_add(1), + age, } } } @@ -195,26 +217,16 @@ pub(crate) enum RelocateAction { } impl RelocateAction { - pub fn new( - section: &Section, - network: &Network, - info: &Proven, - churn_name: &XorName, - ) -> Self { - let destination = destination(info.value.peer.name(), churn_name); + pub fn new(section: &Section, network: &Network, peer: &Peer, churn_name: &XorName) -> Self { + let destination = destination(peer.name(), churn_name); - if section.is_elder(info.value.peer.name()) { + if section.is_elder(peer.name()) { RelocateAction::Delayed(RelocatePromise { - name: *info.value.peer.name(), + name: *peer.name(), destination, }) } else { - RelocateAction::Instant(RelocateDetails::new( - section, - network, - &info.value, - destination, - )) + RelocateAction::Instant(RelocateDetails::new(section, network, peer, destination)) } } @@ -234,6 +246,16 @@ pub(crate) fn check(age: u8, churn_signature: &bls::Signature) -> bool { trailing_zeros(&churn_signature.to_bytes()[..]) >= age as u32 } +// Generate age for relocated peer during the starup phase, using the `Online` vote signature as +// the random seed. +pub(crate) fn startup_phase_age(signature: &bls::Signature) -> u8 { + let seed = signature.to_bytes()[..32] + .try_into() + .expect("invalid signature length"); + let mut rng = ChaCha8Rng::from_seed(seed); + rng.gen_range(MIN_STARTUP_PHASE_AGE, MAX_STARTUP_PHASE_AGE) +} + // Compute the destination for the node with `relocating_name` to be relocated to. `churn_name` is // the name of the joined/left node that triggered the relocation. fn destination(relocating_name: &XorName, churn_name: &XorName) -> XorName { diff --git a/src/routing/approved.rs b/src/routing/approved.rs index e40df2701f..857d2b4e7e 100644 --- a/src/routing/approved.rs +++ b/src/routing/approved.rs @@ -188,10 +188,6 @@ impl Approved { self.handle_their_knowledge_event(prefix, key_index, proof); Ok(vec![]) } - Vote::ChangeAge(member_info) => { - self.handle_change_age_event(member_info, proof); - Ok(vec![]) - } Vote::SendMessage { message, proof_chain, @@ -855,10 +851,13 @@ impl Approved { } if let Some(info) = self.section.members().get(&promise.name) { - let details = - RelocateDetails::new(&self.section, &self.network, info, promise.destination); - let peer = info.peer; - commands.extend(self.send_relocate(&peer, details)?); + let details = RelocateDetails::new( + &self.section, + &self.network, + &info.peer, + promise.destination, + ); + commands.extend(self.send_relocate(&info.peer, details)?); } else { error!( "ignore returned RelocatePromise from {} - unknown node", @@ -944,7 +943,7 @@ impl Approved { if !self.section.prefix().matches(&details.destination) { debug!( "Ignoring relocation JoinRequest from {} - destination {} doesn't match \ - our prefix {:?}.", + our prefix {:?}.", pub_id, details.destination, self.section.prefix() @@ -1015,10 +1014,6 @@ impl Approved { .into_commands(&self.node) } - //////////////////////////////////////////////////////////////////////////// - // Accumulated events handling - //////////////////////////////////////////////////////////////////////////// - // Generate a new section info based on the current set of members and vote for it if it // changed. fn promote_and_demote_elders(&mut self) -> Result> { @@ -1031,31 +1026,13 @@ impl Approved { Ok(commands) } - fn increment_ages( + fn relocate_peers( &self, churn_name: &XorName, churn_signature: &bls::Signature, ) -> Result> { let mut commands = vec![]; - if self.is_in_startup_phase() { - // We are in the startup phase - don't relocate, just increment everyones ages - // (excluding the new node). - let votes: Vec<_> = self - .section - .members() - .joined() - .filter(|info| info.peer.name() != churn_name) - .map(|info| Vote::ChangeAge(info.clone().increment_age())) - .collect(); - - for vote in votes { - commands.extend(self.vote(vote)?); - } - - return Ok(commands); - } - // As a measure against sybil attacks, don't relocate on infant churn. if !self.section.is_adult_or_elder(churn_name) { trace!("Skip relocation on infant churn"); @@ -1090,6 +1067,25 @@ impl Approved { Ok(commands) } + fn relocate_peer_in_startup_phase( + &self, + peer: Peer, + signature: &bls::Signature, + ) -> Result> { + let age = relocation::startup_phase_age(signature); + let details = + RelocateDetails::with_age(&self.section, &self.network, &peer, *peer.name(), age); + + trace!( + "Relocating {:?} to {} with age {} in startup phase", + peer, + details.destination, + details.age + ); + + self.send_relocate(&peer, details) + } + // Are we in the startup phase? Startup phase is when the network consists of only one section // and it has no more than `recommended_section_size` members. fn is_in_startup_phase(&self) -> bool { @@ -1104,40 +1100,56 @@ impl Approved { their_knowledge: Option, proof: Proof, ) -> Result> { - let mut commands = Vec::new(); - let peer = member_info.peer; - let age = peer.age(); let signature = proof.signature.clone(); - if !self.section.update_member(Proven { - value: member_info, - proof, - }) { - info!("ignore Online: {:?}", peer); - return Ok(commands); - } + let mut commands = vec![]; - info!("handle Online: {:?} (age: {})", peer, age); + if self.is_in_startup_phase() && peer.age() <= MIN_AGE { + // In startup phase, instantly relocate the joining peer in order to promote it to + // adult. - commands.extend(self.increment_ages(peer.name(), &signature)?); - commands.extend(self.promote_and_demote_elders()?); - commands.push(self.send_node_approval(&peer, their_knowledge)?); + if self.section.members().is_known(peer.name()) { + info!("ignore Online: {:?}", peer); + return Ok(vec![]); + } - if let Some(previous_name) = previous_name { - self.send_event(Event::MemberJoined { - name: *peer.name(), - previous_name, - age, - }); + // TODO: consider handling the relocation inside the bootstrap phase, to avoid having + // to send this `NodeApproval`. + commands.push(self.send_node_approval(&peer, their_knowledge)?); + commands.extend(self.relocate_peer_in_startup_phase(peer, &signature)?); } else { - self.send_event(Event::InfantJoined { - name: *peer.name(), - age, - }); - } + // Post startup phase, add the new peer normally. + + if !self.section.update_member(Proven { + value: member_info, + proof, + }) { + info!("ignore Online: {:?}", peer); + return Ok(vec![]); + } - self.print_network_stats(); + info!("handle Online: {:?}", peer); + + commands.push(self.send_node_approval(&peer, their_knowledge)?); + commands.extend(self.relocate_peers(peer.name(), &signature)?); + commands.extend(self.promote_and_demote_elders()?); + + if let Some(previous_name) = previous_name { + self.send_event(Event::MemberJoined { + name: *peer.name(), + previous_name, + age: peer.age(), + }); + } else { + self.send_event(Event::InfantJoined { + name: *peer.name(), + age: peer.age(), + }); + } + + self.print_network_stats(); + } Ok(commands) } @@ -1163,7 +1175,7 @@ impl Approved { info!("handle Offline: {:?}", peer); - commands.extend(self.increment_ages(peer.name(), &signature)?); + commands.extend(self.relocate_peers(peer.name(), &signature)?); commands.extend(self.promote_and_demote_elders()?); self.send_event(Event::MemberLeft { @@ -1251,13 +1263,6 @@ impl Approved { self.network.update_knowledge(knowledge) } - fn handle_change_age_event(&mut self, member_info: MemberInfo, proof: Proof) { - let _ = self.section.update_member(Proven { - value: member_info, - proof, - }); - } - fn handle_send_message_event( &self, message: PlainMessage, diff --git a/src/routing/tests/mod.rs b/src/routing/tests/mod.rs index c1d48477e3..09d80d2130 100644 --- a/src/routing/tests/mod.rs +++ b/src/routing/tests/mod.rs @@ -181,7 +181,11 @@ async fn accumulate_votes() -> Result<()> { #[tokio::test] async fn handle_consensus_on_online_of_infant() -> Result<()> { let (event_tx, mut event_rx) = mpsc::unbounded_channel(); - let (elders_info, mut nodes) = create_elders_info(); + + // Use non-default prefix to skip the startup phase. + let prefix: Prefix = "0".parse().unwrap(); + + let (elders_info, mut nodes) = gen_elders_info(prefix, ELDER_SIZE); let sk_set = SecretKeySet::random(); let (section, section_key_share) = create_section(&sk_set, &elders_info)?; let node = nodes.remove(0); diff --git a/src/section/member_info.rs b/src/section/member_info.rs index 857bcba654..f3c18c4151 100644 --- a/src/section/member_info.rs +++ b/src/section/member_info.rs @@ -49,14 +49,6 @@ impl MemberInfo { ..self } } - - // Converts this info into one with the age increased by one. - pub fn increment_age(self) -> Self { - Self { - peer: self.peer.increment_age(), - ..self - } - } } #[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize, Debug)] diff --git a/src/section/section_peers.rs b/src/section/section_peers.rs index ead77ffcc8..bec59e2c42 100644 --- a/src/section/section_peers.rs +++ b/src/section/section_peers.rs @@ -45,13 +45,6 @@ impl SectionPeers { .filter(|member| member.state == PeerState::Joined) } - /// Returns an iterator over the members that have state == `Joined` together with their proofs. - pub fn proven_joined(&self) -> impl Iterator> { - self.members - .values() - .filter(|member| member.value.state == PeerState::Joined) - } - /// Returns joined nodes from our section with age greater than `MIN_AGE` pub fn mature(&self) -> impl Iterator { self.joined() @@ -107,6 +100,11 @@ impl SectionPeers { .unwrap_or(false) } + /// Returns whether the given peer is known to us (joined or left) + pub fn is_known(&self, name: &XorName) -> bool { + self.members.contains_key(name) + } + /// Update a member of our section. /// Returns whether anything actually changed. pub fn update(&mut self, new_info: Proven) -> bool { diff --git a/tests/bootstrap.rs b/tests/bootstrap.rs index 614211b736..65f6f1f07a 100644 --- a/tests/bootstrap.rs +++ b/tests/bootstrap.rs @@ -13,11 +13,10 @@ use ed25519_dalek::Keypair; use futures::future; use sn_routing::{ event::{Connected, Event}, - EventStream, Routing, ELDER_SIZE, + EventStream, ELDER_SIZE, }; use tokio::time; use utils::*; -use xor_name::XorName; #[tokio::test] async fn test_genesis_node() -> Result<()> { @@ -47,10 +46,9 @@ async fn test_node_bootstrapping() -> Result<()> { let genesis_handler = tokio::spawn(async move { assert_next_event!(event_stream, Event::Connected(Connected::First)); assert_next_event!(event_stream, Event::PromotedToElder); - assert_next_event!(event_stream, Event::InfantJoined { age: 4, name: _ }); - // TODO: Should we expect EldersChanged event too ?? - // assert_next_event!(event_stream, Event::EldersChanged { .. })?; - Ok::<(), Error>(()) + assert_next_event!(event_stream, Event::MemberJoined { .. }); + // TODO: we should expect `EldersChanged` too. + // assert_next_event!(event_stream, Event::EldersChanged { .. }); }); // bootstrap a second node with genesis @@ -63,7 +61,7 @@ async fn test_node_bootstrapping() -> Result<()> { assert_next_event!(event_stream, Event::Connected(Connected::First)); // just await for genesis node to finish receiving all events - genesis_handler.await??; + genesis_handler.await?; let elder_size = 2; verify_invariants_for_node(&genesis_node, elder_size).await?; @@ -73,58 +71,55 @@ async fn test_node_bootstrapping() -> Result<()> { } #[tokio::test] -async fn test_section_bootstrapping() -> Result<()> { +async fn test_startup_section_bootstrapping() -> Result<()> { let (genesis_node, mut event_stream) = RoutingBuilder::new(None).first().create().await?; + let other_node_count = ELDER_SIZE - 1; // spawn genesis node events listener let genesis_handler = tokio::spawn(async move { + let mut joined_nodes = vec![]; // expect events for all nodes - let mut joined_nodes = Vec::default(); while let Some(event) = event_stream.next().await { - match event { - Event::InfantJoined { age, name } => { - assert_eq!(age, 4); - joined_nodes.push(name); - } - _other => {} + if let Event::MemberJoined { name, .. } = event { + joined_nodes.push(name) } - if joined_nodes.len() == ELDER_SIZE { + if joined_nodes.len() == other_node_count { break; } } - Ok::, Error>(joined_nodes) + joined_nodes }); // bootstrap several nodes with genesis to form a section let genesis_contact = genesis_node.our_connection_info()?; - let mut nodes_joining_tasks = Vec::with_capacity(ELDER_SIZE); - for _ in 0..ELDER_SIZE { - nodes_joining_tasks.push(async { + let nodes_joining_tasks: Vec<_> = (0..other_node_count) + .map(|_| async { let (node, mut event_stream) = RoutingBuilder::new(None) .with_contact(genesis_contact) .create() .await?; + // During the startup phase, joining nodes are instantly relocated. assert_next_event!(event_stream, Event::Connected(Connected::First)); + assert_next_event!(event_stream, Event::RelocationStarted { .. }); + assert_next_event!(event_stream, Event::Connected(Connected::Relocate { .. })); - Ok::(node) - }); - } + Ok::<_, Error>(node) + }) + .collect(); - let nodes = future::join_all(nodes_joining_tasks).await; + let nodes = future::try_join_all(nodes_joining_tasks).await?; // just await for genesis node to finish receiving all events - let joined_nodes = genesis_handler.await??; + let joined_nodes = genesis_handler.await?; - for result in nodes { - let node = result?; + for node in nodes { let name = node.name().await; // assert names of nodes joined match - let found = joined_nodes.iter().find(|n| **n == name); - assert!(found.is_some()); + assert!(joined_nodes.contains(&name)); verify_invariants_for_node(&node, ELDER_SIZE).await?; } diff --git a/tests/drop.rs b/tests/drop.rs index 7ba1cf11b4..3d11538ce1 100644 --- a/tests/drop.rs +++ b/tests/drop.rs @@ -11,13 +11,21 @@ mod utils; use self::utils::*; use anyhow::{format_err, Result}; use bytes::Bytes; -use sn_routing::{event::Event, DstLocation, SrcLocation}; +use sn_routing::{ + event::{Connected, Event}, + DstLocation, SrcLocation, +}; use tokio::time; #[tokio::test] async fn test_node_drop() -> Result<()> { let mut nodes = create_connected_nodes(2).await?; + // We are in the startup phase, so the second node is instantly relocated. Let's wait until it + // re-joins. + assert_next_event!(nodes[1].1, Event::RelocationStarted { .. }); + assert_next_event!(nodes[1].1, Event::Connected(Connected::Relocate { .. })); + // Drop one node let dropped_name = nodes.remove(1).0.name().await; diff --git a/tests/messages.rs b/tests/messages.rs index bd354f45ee..88d6954b4e 100644 --- a/tests/messages.rs +++ b/tests/messages.rs @@ -88,9 +88,14 @@ async fn test_messages_between_nodes() -> Result<()> { .with_contact(node1_contact) .create() .await?; - let node2_name = node2.name().await; + // We are in the startup phase, so node2 is instantly relocated. Let's wait until it re-joins. assert_next_event!(event_stream, Event::Connected(Connected::First)); + assert_next_event!(event_stream, Event::RelocationStarted { .. }); + assert_next_event!(event_stream, Event::Connected(Connected::Relocate { .. })); + + let node2_name = node2.name().await; + node2 .send_message( SrcLocation::Node(node2_name), diff --git a/tests/utils/mod.rs b/tests/utils/mod.rs index 4af8ff0314..f20895c122 100644 --- a/tests/utils/mod.rs +++ b/tests/utils/mod.rs @@ -132,32 +132,28 @@ pub async fn create_connected_nodes(count: usize) -> Result((node, event_stream)) }); - for result in future::join_all(other_nodes).await { - nodes.push(result?); + for node in future::try_join_all(other_nodes).await? { + nodes.push(node); } - // Wait until the first node receives `InfantJoined` event for all the other nodes. - let mut not_joined = HashSet::new(); - for (node, _) in &nodes[1..] { - let _ = not_joined.insert(node.name().await); - } + // Wait until the first node receives `InfantJoined`/`MemberJoined` event for all the other + // nodes. + let mut joined_count = 1; while let Some(event) = nodes[0].1.next().await { - if let Event::InfantJoined { name, age } = event { - assert_eq!(age, MIN_AGE); - let _ = not_joined.remove(&name); + match event { + Event::InfantJoined { name, .. } | Event::MemberJoined { name, .. } => { + joined_count += 1 + } + _ => {} } - if not_joined.is_empty() { + if joined_count == nodes.len() { break; } } - assert!( - not_joined.is_empty(), - "Event::InfantJoined not received for: {:?}", - not_joined - ); + assert_eq!(joined_count, nodes.len()); Ok(nodes) }