Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
discovery: Retry failed pings with exponential backoff.
Browse files Browse the repository at this point in the history
UDP packets may get dropped, so instead of immediately booting nodes that fail
to respond to a ping, retry 4 times with exponential backoff.
  • Loading branch information
jimpo committed Jun 7, 2018
1 parent bb81e63 commit ff11cfc
Showing 1 changed file with 64 additions and 16 deletions.
80 changes: 64 additions & 16 deletions util/network-devp2p/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const PING_TIMEOUT: Duration = Duration::from_millis(300);
const FIND_NODE_TIMEOUT: Duration = Duration::from_secs(2);
const EXPIRY_TIME: Duration = Duration::from_secs(60);
const MAX_NODES_PING: usize = 32; // Max nodes to add/ping at once
const REQUEST_BACKOFF: [u64; 4] = [1, 4, 16, 64];

#[derive(Clone, Debug)]
pub struct NodeEntry {
Expand All @@ -62,6 +63,21 @@ pub struct BucketEntry {
pub address: NodeEntry,
pub id_hash: H256,
pub last_seen: Instant,
backoff_until: Instant,
fail_count: usize,
}

impl BucketEntry {
fn new(address: NodeEntry) -> Self {
let now = Instant::now();
BucketEntry {
id_hash: keccak(address.id),
address: address,
last_seen: now,
backoff_until: now,
fail_count: 0,
}
}
}

pub struct NodeBucket {
Expand Down Expand Up @@ -112,6 +128,7 @@ pub struct Discovery {
check_timestamps: bool,
adding_nodes: Vec<NodeEntry>,
ip_filter: IpFilter,
request_backoff: Vec<Duration>,
}

pub struct TableUpdates {
Expand Down Expand Up @@ -140,6 +157,7 @@ impl Discovery {
check_timestamps: true,
adding_nodes: Vec::new(),
ip_filter: ip_filter,
request_backoff: REQUEST_BACKOFF.iter().map(|s| Duration::from_secs(*s)).collect(),
}
}

Expand Down Expand Up @@ -188,21 +206,17 @@ impl Discovery {
let updated = if let Some(node) = bucket.nodes.iter_mut().find(|n| n.address.id == e.id) {
node.address = e.clone();
node.last_seen = Instant::now();
node.backoff_until = Instant::now();
node.fail_count = 0;
true
} else { false };

if !updated {
added_map.insert(e.id, e.clone());
bucket.nodes.push_front(BucketEntry {
id_hash: id_hash,
address: e,
last_seen: Instant::now(),
});
bucket.nodes.push_front(BucketEntry::new(e));

if bucket.nodes.len() > BUCKET_SIZE {
//ping least active node
let last = bucket.nodes.back_mut().expect("Last item is always present when len() > 0");
Some(last.address.clone())
select_bucket_ping(bucket.nodes.iter())
} else { None }
} else { None }
};
Expand Down Expand Up @@ -657,9 +671,20 @@ impl Discovery {
.expect("distance is None only if id hashes are equal; will never send request to self; qed");
let bucket = &mut self.node_buckets[dist];
if let Some(index) = bucket.nodes.iter().position(|n| n.id_hash == id_hash) {
removed.insert(node_id);
let node = bucket.nodes.remove(index).expect("index was located in if condition");
trace!(target: "discovery", "Removed expired node {:?}", &node.address);
if bucket.nodes[index].fail_count < self.request_backoff.len() {
let node = &mut bucket.nodes[index];
node.backoff_until = Instant::now() + self.request_backoff[node.fail_count];
node.fail_count += 1;
trace!(
target: "discovery",
"Requests to node {:?} timed out {} consecutive time(s)",
&node.address, node.fail_count
);
} else {
removed.insert(node_id);
let node = bucket.nodes.remove(index).expect("index was located in if condition");
debug!(target: "discovery", "Removed expired node {:?}", &node.address);
}
}
}
}
Expand Down Expand Up @@ -718,6 +743,18 @@ fn assemble_packet(packet_id: u8, bytes: &[u8], secret: &Secret) -> Result<Bytes
Ok(packet)
}

// Selects the next node in a bucket to ping. Chooses the eligible node least recently seen.
fn select_bucket_ping<'a, I>(nodes: I) -> Option<NodeEntry>
where
I: Iterator<Item=&'a BucketEntry>
{
let now = Instant::now();
nodes
.filter(|n| n.backoff_until < now)
.min_by_key(|n| n.last_seen)
.map(|n| n.address.clone())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -821,6 +858,7 @@ mod tests {
let key = Random.generate().unwrap();
let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40446").unwrap(), udp_port: 40447 };
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default());
discovery.request_backoff = vec![];

let total_bucket_nodes = |node_buckets: &Vec<NodeBucket>| -> usize {
node_buckets.iter().map(|bucket| bucket.nodes.len()).sum()
Expand Down Expand Up @@ -874,6 +912,19 @@ mod tests {

let removed = discovery.check_expired(Instant::now() + FIND_NODE_TIMEOUT).len();
assert_eq!(removed, 0);

// Test bucket evictions with retries.
discovery.request_backoff = vec![Duration::new(0, 0); 2];

for _ in 0..2 {
discovery.ping(&node_entries[101]).unwrap();
let removed = discovery.check_expired(Instant::now() + PING_TIMEOUT).len();
assert_eq!(removed, 0);
}

discovery.ping(&node_entries[101]).unwrap();
let removed = discovery.check_expired(Instant::now() + PING_TIMEOUT).len();
assert_eq!(removed, 1);
}

#[test]
Expand All @@ -885,11 +936,8 @@ mod tests {
let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default());

for _ in 0..(16 + 10) {
discovery.node_buckets[0].nodes.push_back(BucketEntry {
address: NodeEntry { id: NodeId::new(), endpoint: ep.clone() },
last_seen: Instant::now(),
id_hash: keccak(NodeId::new()),
});
let entry = BucketEntry::new(NodeEntry { id: NodeId::new(), endpoint: ep.clone() });
discovery.node_buckets[0].nodes.push_back(entry);
}
let nearest = discovery.nearest_node_entries(&NodeId::new());
assert_eq!(nearest.len(), 16)
Expand Down

0 comments on commit ff11cfc

Please sign in to comment.