Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat: relocate all joining infants during startup phase
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Nov 9, 2020
1 parent 777fac8 commit 492f4d7
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 158 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ log = "~0.4.8"
lru_time_cache = "~0.11.0"
qp2p = { version = "~0.8.5", features = ["upnp"] }
rand = "~0.7.3"
rand_xorshift = "~0.2.0"
rand_chacha = "~0.2.2"
serde = { version = "1.0.117", features = ["derive"] }
tiny-keccak = { version = "2.0.2", features = ["sha3"] }
tokio = { version = "~0.2.22", features = ["sync", "time", "rt-util"] }
Expand Down
4 changes: 0 additions & 4 deletions src/consensus/vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ pub(crate) enum Vote {
key_index: u64,
},

// Voted to change the age of the given node.
ChangeAge(MemberInfo),

// Voted to send an user message whose source is our section.
SendMessage {
message: Box<PlainMessage>,
Expand Down Expand Up @@ -97,7 +94,6 @@ impl<'a> Serialize for SignableView<'a> {
Vote::OurKey { key, .. } => key.serialize(serializer),
Vote::TheirKey { prefix, key } => (prefix, key).serialize(serializer),
Vote::TheirKnowledge { prefix, key_index } => (prefix, key_index).serialize(serializer),
Vote::ChangeAge(member_info) => member_info.serialize(serializer),
Vote::SendMessage { message, .. } => message.as_signable().serialize(serializer),
}
}
Expand Down
70 changes: 46 additions & 24 deletions src/relocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,25 @@
//! Relocation related types and utilities.
use crate::{
consensus::Proven,
crypto::{self, Keypair, Signature, Verifier},
error::Error,
messages::{Message, Variant},
network::Network,
peer::Peer,
section::{MemberInfo, Section},
MIN_AGE,
};
use bytes::Bytes;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha8Rng;
use serde::{de::Error as SerdeDeError, Deserialize, Deserializer, Serialize, Serializer};
use std::net::SocketAddr;
use std::{convert::TryInto, net::SocketAddr};
use tokio::sync::mpsc;
use xor_name::XorName;

pub(crate) const MIN_STARTUP_PHASE_AGE: u8 = MIN_AGE + 1;
pub(crate) const MAX_STARTUP_PHASE_AGE: u8 = 32;

/// Find all nodes to relocate after a churn event and create the relocate actions for them.
pub(crate) fn actions(
section: &Section,
Expand All @@ -31,12 +37,12 @@ pub(crate) fn actions(
) -> Vec<(MemberInfo, RelocateAction)> {
section
.members()
.proven_joined()
.filter(|info| check(info.value.peer.age(), churn_signature))
.joined()
.filter(|info| check(info.peer.age(), churn_signature))
.map(|info| {
(
info.value,
RelocateAction::new(section, network, info, churn_name),
*info,
RelocateAction::new(section, network, &info.peer, churn_name),
)
})
.collect()
Expand All @@ -61,18 +67,34 @@ impl RelocateDetails {
pub(crate) fn new(
section: &Section,
network: &Network,
info: &MemberInfo,
peer: &Peer,
destination: XorName,
) -> Self {
Self::with_age(
section,
network,
peer,
destination,
peer.age().saturating_add(1),
)
}

pub(crate) fn with_age(
section: &Section,
network: &Network,
peer: &Peer,
destination: XorName,
age: u8,
) -> Self {
let destination_key = *network
.key_by_name(&destination)
.unwrap_or_else(|| section.chain().first_key());

Self {
pub_id: *info.peer.name(),
pub_id: *peer.name(),
destination,
destination_key,
age: info.peer.age().saturating_add(1),
age,
}
}
}
Expand Down Expand Up @@ -195,26 +217,16 @@ pub(crate) enum RelocateAction {
}

impl RelocateAction {
pub fn new(
section: &Section,
network: &Network,
info: &Proven<MemberInfo>,
churn_name: &XorName,
) -> Self {
let destination = destination(info.value.peer.name(), churn_name);
pub fn new(section: &Section, network: &Network, peer: &Peer, churn_name: &XorName) -> Self {
let destination = destination(peer.name(), churn_name);

if section.is_elder(info.value.peer.name()) {
if section.is_elder(peer.name()) {
RelocateAction::Delayed(RelocatePromise {
name: *info.value.peer.name(),
name: *peer.name(),
destination,
})
} else {
RelocateAction::Instant(RelocateDetails::new(
section,
network,
&info.value,
destination,
))
RelocateAction::Instant(RelocateDetails::new(section, network, peer, destination))
}
}

Expand All @@ -234,6 +246,16 @@ pub(crate) fn check(age: u8, churn_signature: &bls::Signature) -> bool {
trailing_zeros(&churn_signature.to_bytes()[..]) >= age as u32
}

// Generate age for relocated peer during the starup phase, using the `Online` vote signature as
// the random seed.
pub(crate) fn startup_phase_age(signature: &bls::Signature) -> u8 {
let seed = signature.to_bytes()[..32]
.try_into()
.expect("invalid signature length");
let mut rng = ChaCha8Rng::from_seed(seed);
rng.gen_range(MIN_STARTUP_PHASE_AGE, MAX_STARTUP_PHASE_AGE)
}

// Compute the destination for the node with `relocating_name` to be relocated to. `churn_name` is
// the name of the joined/left node that triggered the relocation.
fn destination(relocating_name: &XorName, churn_name: &XorName) -> XorName {
Expand Down
137 changes: 71 additions & 66 deletions src/routing/approved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,6 @@ impl Approved {
self.handle_their_knowledge_event(prefix, key_index, proof);
Ok(vec![])
}
Vote::ChangeAge(member_info) => {
self.handle_change_age_event(member_info, proof);
Ok(vec![])
}
Vote::SendMessage {
message,
proof_chain,
Expand Down Expand Up @@ -855,10 +851,13 @@ impl Approved {
}

if let Some(info) = self.section.members().get(&promise.name) {
let details =
RelocateDetails::new(&self.section, &self.network, info, promise.destination);
let peer = info.peer;
commands.extend(self.send_relocate(&peer, details)?);
let details = RelocateDetails::new(
&self.section,
&self.network,
&info.peer,
promise.destination,
);
commands.extend(self.send_relocate(&info.peer, details)?);
} else {
error!(
"ignore returned RelocatePromise from {} - unknown node",
Expand Down Expand Up @@ -944,7 +943,7 @@ impl Approved {
if !self.section.prefix().matches(&details.destination) {
debug!(
"Ignoring relocation JoinRequest from {} - destination {} doesn't match \
our prefix {:?}.",
our prefix {:?}.",
pub_id,
details.destination,
self.section.prefix()
Expand Down Expand Up @@ -1015,10 +1014,6 @@ impl Approved {
.into_commands(&self.node)
}

////////////////////////////////////////////////////////////////////////////
// Accumulated events handling
////////////////////////////////////////////////////////////////////////////

// Generate a new section info based on the current set of members and vote for it if it
// changed.
fn promote_and_demote_elders(&mut self) -> Result<Vec<Command>> {
Expand All @@ -1031,31 +1026,13 @@ impl Approved {
Ok(commands)
}

fn increment_ages(
fn relocate_peers(
&self,
churn_name: &XorName,
churn_signature: &bls::Signature,
) -> Result<Vec<Command>> {
let mut commands = vec![];

if self.is_in_startup_phase() {
// We are in the startup phase - don't relocate, just increment everyones ages
// (excluding the new node).
let votes: Vec<_> = self
.section
.members()
.joined()
.filter(|info| info.peer.name() != churn_name)
.map(|info| Vote::ChangeAge(info.clone().increment_age()))
.collect();

for vote in votes {
commands.extend(self.vote(vote)?);
}

return Ok(commands);
}

// As a measure against sybil attacks, don't relocate on infant churn.
if !self.section.is_adult_or_elder(churn_name) {
trace!("Skip relocation on infant churn");
Expand Down Expand Up @@ -1090,6 +1067,25 @@ impl Approved {
Ok(commands)
}

fn relocate_peer_in_startup_phase(
&self,
peer: Peer,
signature: &bls::Signature,
) -> Result<Vec<Command>> {
let age = relocation::startup_phase_age(signature);
let details =
RelocateDetails::with_age(&self.section, &self.network, &peer, *peer.name(), age);

trace!(
"Relocating {:?} to {} with age {} in startup phase",
peer,
details.destination,
details.age
);

self.send_relocate(&peer, details)
}

// Are we in the startup phase? Startup phase is when the network consists of only one section
// and it has no more than `recommended_section_size` members.
fn is_in_startup_phase(&self) -> bool {
Expand All @@ -1104,40 +1100,56 @@ impl Approved {
their_knowledge: Option<bls::PublicKey>,
proof: Proof,
) -> Result<Vec<Command>> {
let mut commands = Vec::new();

let peer = member_info.peer;
let age = peer.age();
let signature = proof.signature.clone();

if !self.section.update_member(Proven {
value: member_info,
proof,
}) {
info!("ignore Online: {:?}", peer);
return Ok(commands);
}
let mut commands = vec![];

info!("handle Online: {:?} (age: {})", peer, age);
if self.is_in_startup_phase() && peer.age() <= MIN_AGE {
// In startup phase, instantly relocate the joining peer in order to promote it to
// adult.

commands.extend(self.increment_ages(peer.name(), &signature)?);
commands.extend(self.promote_and_demote_elders()?);
commands.push(self.send_node_approval(&peer, their_knowledge)?);
if self.section.members().is_known(peer.name()) {
info!("ignore Online: {:?}", peer);
return Ok(vec![]);
}

if let Some(previous_name) = previous_name {
self.send_event(Event::MemberJoined {
name: *peer.name(),
previous_name,
age,
});
// TODO: consider handling the relocation inside the bootstrap phase, to avoid having
// to send this `NodeApproval`.
commands.push(self.send_node_approval(&peer, their_knowledge)?);
commands.extend(self.relocate_peer_in_startup_phase(peer, &signature)?);
} else {
self.send_event(Event::InfantJoined {
name: *peer.name(),
age,
});
}
// Post startup phase, add the new peer normally.

if !self.section.update_member(Proven {
value: member_info,
proof,
}) {
info!("ignore Online: {:?}", peer);
return Ok(vec![]);
}

self.print_network_stats();
info!("handle Online: {:?}", peer);

commands.push(self.send_node_approval(&peer, their_knowledge)?);
commands.extend(self.relocate_peers(peer.name(), &signature)?);
commands.extend(self.promote_and_demote_elders()?);

if let Some(previous_name) = previous_name {
self.send_event(Event::MemberJoined {
name: *peer.name(),
previous_name,
age: peer.age(),
});
} else {
self.send_event(Event::InfantJoined {
name: *peer.name(),
age: peer.age(),
});
}

self.print_network_stats();
}

Ok(commands)
}
Expand All @@ -1163,7 +1175,7 @@ impl Approved {

info!("handle Offline: {:?}", peer);

commands.extend(self.increment_ages(peer.name(), &signature)?);
commands.extend(self.relocate_peers(peer.name(), &signature)?);
commands.extend(self.promote_and_demote_elders()?);

self.send_event(Event::MemberLeft {
Expand Down Expand Up @@ -1251,13 +1263,6 @@ impl Approved {
self.network.update_knowledge(knowledge)
}

fn handle_change_age_event(&mut self, member_info: MemberInfo, proof: Proof) {
let _ = self.section.update_member(Proven {
value: member_info,
proof,
});
}

fn handle_send_message_event(
&self,
message: PlainMessage,
Expand Down
6 changes: 5 additions & 1 deletion src/routing/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,11 @@ async fn accumulate_votes() -> Result<()> {
#[tokio::test]
async fn handle_consensus_on_online_of_infant() -> Result<()> {
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
let (elders_info, mut nodes) = create_elders_info();

// Use non-default prefix to skip the startup phase.
let prefix: Prefix = "0".parse().unwrap();

let (elders_info, mut nodes) = gen_elders_info(prefix, ELDER_SIZE);
let sk_set = SecretKeySet::random();
let (section, section_key_share) = create_section(&sk_set, &elders_info)?;
let node = nodes.remove(0);
Expand Down
Loading

0 comments on commit 492f4d7

Please sign in to comment.