Skip to content

Commit

Permalink
Merge pull request #226 from aeyakovenko/converge_test
Browse files Browse the repository at this point in the history
check convergence
  • Loading branch information
aeyakovenko authored May 17, 2018
2 parents 82aef7e + 63a0ba6 commit 4badeac
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 23 deletions.
28 changes: 15 additions & 13 deletions src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use rayon::prelude::*;
use result::{Error, Result};
use ring::rand::{SecureRandom, SystemRandom};
use signature::{PublicKey, Signature};
use std;
use std::collections::HashMap;
use std::io::Cursor;
use std::net::{SocketAddr, UdpSocket};
Expand Down Expand Up @@ -194,7 +195,6 @@ impl Crdt {
if nodes.len() < 1 {
return Err(Error::CrdtTooSmall);
}

info!("nodes table {}", nodes.len());
info!("blobs table {}", blobs.len());
// enumerate all the blobs, those are the indices
Expand Down Expand Up @@ -295,6 +295,12 @@ impl Crdt {
Ok(())
}

// max number of nodes that we could be converged to
pub fn convergence(&self) -> u64 {
let max = self.remote.values().len() as u64 + 1;
self.remote.values().fold(max, |a, b| std::cmp::min(a, *b))
}

fn random() -> u64 {
let rnd = SystemRandom::new();
let mut buf = [0u8; 8];
Expand Down Expand Up @@ -552,21 +558,16 @@ mod test {
.map(|&(ref c, _)| Crdt::gossip(c.clone(), exit.clone()))
.collect();
let mut done = true;
for _ in 0..(num * 32) {
done = true;
for i in 0..(num * 32) {
done = false;
trace!("round {}", i);
for &(ref c, _) in listen.iter() {
trace!(
"done updates {} {}",
c.read().unwrap().table.len(),
c.read().unwrap().update_index
);
//make sure the number of updates doesn't grow unbounded
assert!(c.read().unwrap().update_index <= num as u64);
//make sure we got all the updates
if c.read().unwrap().table.len() != num {
done = false;
if num == c.read().unwrap().convergence() as usize {
done = true;
break;
}
}
//at least 1 node converged
if done == true {
break;
}
Expand All @@ -590,6 +591,7 @@ mod test {
#[test]
#[ignore]
fn gossip_ring_test() {
logger::setup();
run_gossip_topo(|listen| {
let num = listen.len();
for n in 0..num {
Expand Down
15 changes: 5 additions & 10 deletions src/thin_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,21 +365,16 @@ mod tests {
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone());
let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone());
//wait for the network to converge
let mut converged = false;
for _ in 0..30 {
let len = spy_ref.read().unwrap().table.values().len();
let mut min = num_nodes as u64;
for u in spy_ref.read().unwrap().remote.values() {
if min > *u {
min = *u;
}
}
info!("length {} {}", len, min);
if num_nodes == len && min >= (num_nodes as u64) {
warn!("converged! {} {}", len, min);
let num = spy_ref.read().unwrap().convergence();
if num == num_nodes as u64 {
converged = true;
break;
}
sleep(Duration::new(1, 0));
}
assert!(converged);
threads.push(t_spy_listen);
threads.push(t_spy_gossip);
let v: Vec<SocketAddr> = spy_ref
Expand Down

0 comments on commit 4badeac

Please sign in to comment.