Skip to content

Commit

Permalink
Refactor ABD_95 communicate method and add seeded RNG to turmoil tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kaymanb committed Oct 4, 2023
1 parent a281f78 commit 8ff3e50
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 78 deletions.
79 changes: 36 additions & 43 deletions todc-net/src/abd_95.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,68 +63,61 @@ impl<T: Clone + Debug + Default + DeserializeOwned + Ord + Send + Serialize + 's
}

/// TODO: Document
async fn communicate(
&self,
message: MessageType,
) -> Result<Vec<Option<LocalValue<T>>>, GenericError> {
async fn communicate(&self, message: MessageType) -> Result<Vec<LocalValue<T>>, GenericError> {
let local = self.local.lock().unwrap().clone();
let mut results: Vec<Option<LocalValue<T>>> = vec![Some(local.clone())];
results.resize_with(self.neighbors.len() + 1, Default::default);

// Communicate the message with all neighbors.
let mut handles = JoinSet::new();
let info = Arc::new(Mutex::new(results));

let num_neighbors = self.neighbors.iter().len();
println!("Communicating with {num_neighbors:?} other replicas...");

for (i, url) in self.neighbor_urls().into_iter().enumerate() {
println!("Sending request to {url:?}");
let info = info.clone();
for url in self.neighbor_urls().into_iter() {
let local = local.clone();
handles.spawn(async move {
let res = match message {
let result = match message {
MessageType::Announce => {
let body = serde_json::to_value(local)?;
post(url, body).await?
post(url, body).await
}
MessageType::Ask => get(url).await?,
MessageType::Ask => get(url).await,
};

let status = res.status();
println!("Got: {status:?}");
match result {
Err(error) => Err(error),
Ok(response) => {
if response.status().is_server_error() {
return Err(GenericError::from("Unexpected server error"));
}

if res.status().is_server_error() {
return Err(GenericError::from("Unexpected server error"));
let body = response.collect().await?.aggregate();
let value: LocalValue<T> = serde_json::from_reader(body.reader())?;
Ok(value)
}
}

let body = res.collect().await?.aggregate();
let value: LocalValue<T> = serde_json::from_reader(body.reader())?;
println!("Recieved {value:?} from {i}");
let mut info = info.lock().unwrap();
(*info)[i + 1] = Some(value);
Ok::<(), GenericError>(())
});
}

let mut acks = 1;
let mut failures = 0;
// Wait until a majority of neighbors have replied succesfully, and
// return their values.
let mut info: Vec<LocalValue<T>> = vec![local.clone()];

let mut acks: f32 = 1.0;
let mut failures: f32 = 0.0;
let minority = (self.neighbors.len() as f32 + 1_f32) / 2_f32;
while acks as f32 <= minority && (failures as f32) <= minority {
if let Some(response) = handles.join_next().await {
match response? {
Ok(_) => acks += 1,
Err(_) => failures += 1,
while acks <= minority && failures <= minority {
if let Some(result) = handles.join_next().await {
match result? {
Err(_) => failures += 1.0,
Ok(value) => {
info.push(value);
acks += 1.0;
}
}
}
}

if (failures as f32) > minority {
return Err(GenericError::from("A majority of neighbors are offline"));
if acks > minority {
Ok(info)
} else {
Err(GenericError::from("A majority of neighbors are offline"))
}

let results = info.lock().unwrap().clone();
println!("After communication: {results:?}");
Ok(results)
}

/// TODO: Document
Expand All @@ -143,7 +136,7 @@ impl<T: Clone + Debug + Default + DeserializeOwned + Ord + Send + Serialize + 's
/// TODO: Document
pub async fn read(&self) -> Result<T, GenericError> {
let info = self.communicate(MessageType::Ask).await?;
let max = info.into_iter().max().unwrap().unwrap();
let max = info.into_iter().max().unwrap();
let local = self.update(&max);
self.communicate(MessageType::Announce).await?;
Ok(local.value)
Expand Down Expand Up @@ -235,7 +228,7 @@ mod tests {
let info = register.communicate(MessageType::Ask).await.unwrap();

let local = register.local.lock().unwrap();
assert_eq!(info, vec![Some(local.clone())])
assert_eq!(info, vec![local.clone()])
}
}

Expand Down
1 change: 1 addition & 0 deletions todc-net/tests/abd_95.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(dead_code, unused_imports)]
mod abd_95 {
#[cfg(feature = "turmoil")]
mod common;
#[cfg(feature = "turmoil")]
mod linearizability;
Expand Down
61 changes: 38 additions & 23 deletions todc-net/tests/abd_95/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use hyper::http::StatusCode;
use hyper::server::conn::http1;
use hyper::{Request, Response, Uri};
use hyper_util::rt::TokioIo;
use rand::rngs::StdRng;
use rand::{thread_rng, Rng, SeedableRng};
use serde_json::Value as JSON;
use turmoil::net::{TcpListener, TcpStream};
use turmoil::{Builder, Sim};
Expand All @@ -19,30 +21,19 @@ pub const PORT: u32 = 9999;

type FetchResult<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

/// Simulate n replicas of a register.
/// Simulate n replicates of a register.
pub fn simulate_servers<'a>(n: usize) -> (Sim<'a>, Vec<AtomicRegister<u32>>) {
let mut sim = Builder::new().build();

let mut registers = Vec::new();

let neighbors: Vec<Uri> = (0..n)
.map(|i| {
format!("http://{SERVER_PREFIX}-{i}:{PORT}")
.parse()
.unwrap()
})
.collect();
let sim = Builder::new().build();
simulate_registers(n, sim)
}

for i in 0..n {
let mut neighbors = neighbors.clone();
neighbors.remove(i);
let register: AtomicRegister<u32> = AtomicRegister::new(neighbors);
let name = format!("{SERVER_PREFIX}-{i}");
let register_clone = register.clone();
sim.host(name, move || serve(register_clone.clone()));
registers.push(register);
}
(sim, registers)
/// Simulate n replicas of a register with a fixed RNG seed.
pub fn simulate_servers_with_seed<'a>(n: usize) -> (Sim<'a>, Vec<AtomicRegister<u32>>, u64) {
let seed: u64 = thread_rng().gen();
let rng = StdRng::seed_from_u64(seed);
let sim = Builder::new().build_with_rng(Box::new(rng));
let (sim, registers) = simulate_registers(n, sim);
(sim, registers, seed)
}

/// Submits a GET request to the URL.
Expand Down Expand Up @@ -96,6 +87,30 @@ pub async fn post(url: Uri, body: JSON) -> FetchResult<Response<Incoming>> {
Ok(res)
}

/// Adds n register instances to the simulation.
fn simulate_registers(n: usize, mut sim: Sim) -> (Sim, Vec<AtomicRegister<u32>>) {
let mut registers = Vec::new();

let neighbors: Vec<Uri> = (0..n)
.map(|i| {
format!("http://{SERVER_PREFIX}-{i}:{PORT}")
.parse()
.unwrap()
})
.collect();

for i in 0..n {
let mut neighbors = neighbors.clone();
neighbors.remove(i);
let register: AtomicRegister<u32> = AtomicRegister::new(neighbors);
let name = format!("{SERVER_PREFIX}-{i}");
let register_clone = register.clone();
sim.host(name, move || serve(register_clone.clone()));
registers.push(register);
}
(sim, registers)
}

/// Serve a register as a service.
async fn serve(register: AtomicRegister<u32>) -> Result<(), Box<dyn std::error::Error + 'static>> {
let addr = (IpAddr::from(Ipv4Addr::UNSPECIFIED), 9999);
Expand All @@ -106,7 +121,7 @@ async fn serve(register: AtomicRegister<u32>) -> Result<(), Box<dyn std::error::
let register = register.clone();
tokio::task::spawn(async move {
if let Err(err) = http1::Builder::new().serve_connection(io, register).await {
println!("Internal Server Error: {:?}", err);
println!("Error Serving Connection: {:?}", err);
}
});
}
Expand Down
41 changes: 29 additions & 12 deletions todc-net/tests/abd_95/linearizability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ use std::time::{Duration, Instant};

use rand::distributions::Standard;
use rand::prelude::Distribution;
use rand::rngs::ThreadRng;
use rand::rngs::StdRng;
use rand::seq::IteratorRandom;
use rand::{thread_rng, Rng};
use rand::{thread_rng, Rng, SeedableRng};
use serde::de::DeserializeOwned;
use serde::Serialize;

use todc_net::abd_95::AtomicRegister;
use todc_utils::specifications::register::{RegisterOperation, RegisterSpecification};
use todc_utils::{Action, History, WGLChecker};

use crate::abd_95::common::{simulate_servers, SERVER_PREFIX};
use crate::abd_95::common::{simulate_servers_with_seed, SERVER_PREFIX};

use RegisterOperation::{Read, Write};

Expand Down Expand Up @@ -71,7 +71,7 @@ struct RecordingRegisterClient<T: Clone + Debug + Default + DeserializeOwned + O
actions: Arc<Mutex<Vec<RecordedAction<T>>>>,
process: ProcessID,
register: AtomicRegister<T>,
rng: ThreadRng,
rng: StdRng,
value_type: PhantomData<T>,
}

Expand All @@ -83,13 +83,14 @@ where
fn new(
process: ProcessID,
register: AtomicRegister<T>,
rng: StdRng,
actions: Arc<Mutex<Vec<RecordedAction<T>>>>,
) -> Self {
Self {
actions,
process,
register,
rng: thread_rng(),
rng,
value_type: PhantomData,
}
}
Expand Down Expand Up @@ -139,20 +140,32 @@ where
/// linearizable history.
#[test]
fn random_reads_and_writes_with_random_failures() {
const NUM_CLIENTS: usize = 5;
// HACK: Run fewer iterations when calculating code coverage.
#[cfg(coverage)]
const NUM_CLIENTS: usize = 3;
#[cfg(coverage)]
const NUM_OPERATIONS: usize = 10;
#[cfg(coverage)]
const NUM_SERVERS: usize = 6;

#[cfg(not(coverage))]
const NUM_CLIENTS: usize = 10;
#[cfg(not(coverage))]
const NUM_OPERATIONS: usize = 50;
const NUM_SERVERS: usize = 10;
#[cfg(not(coverage))]
const NUM_SERVERS: usize = 20;

const WRITE_PROBABILITY: f64 = 1.0 / 2.0;
const FAILURE_RATE: f64 = 0.9;
const FAILURE_RATE: f64 = 0.8;

// Simulate a network where a random minority of servers
// fail with non-zero probability.
let (mut sim, registers) = simulate_servers(NUM_SERVERS);
let (mut sim, registers, seed) = simulate_servers_with_seed(NUM_SERVERS);
let servers: Vec<String> = (0..NUM_SERVERS)
.map(|i| format!("{SERVER_PREFIX}-{i}"))
.collect();

let mut rng = thread_rng();
let mut rng = StdRng::seed_from_u64(seed);
let minority = ((NUM_SERVERS as f32 / 2.0).ceil() - 1.0) as usize;

let faulty_servers: Vec<String> = servers
Expand Down Expand Up @@ -180,13 +193,14 @@ fn random_reads_and_writes_with_random_failures() {
let actions: Arc<Mutex<Vec<TimedAction<RegisterOperation<u32>>>>> =
Arc::new(Mutex::new(vec![]));

// Simulate clients that submit requests to correct servers.
// Simulate clients that submit requests.
assert!(NUM_CLIENTS <= correct_servers.len());
for (i, register) in registers.into_iter().enumerate().take(NUM_CLIENTS) {
let actions = actions.clone();
let rng = rng.clone();
let client_name = format!("client-{i}");
sim.client(client_name, async move {
let mut client = RecordingRegisterClient::<u32>::new(i, register.clone(), actions);
let mut client = RecordingRegisterClient::<u32>::new(i, register.clone(), rng, actions);
for _ in 0..NUM_OPERATIONS {
client.perform_random_operation(WRITE_PROBABILITY).await?;
}
Expand All @@ -196,6 +210,9 @@ fn random_reads_and_writes_with_random_failures() {

sim.run().unwrap();

// Print the seed to enable re-trying a failed test.
println!("This test used the random seed: {seed}");

// Collect log of call/response actions that occured during the simulation
// and assert that the resulting history is linearizable
let actions = Arc::try_unwrap(actions).unwrap().into_inner().unwrap();
Expand Down

0 comments on commit 8ff3e50

Please sign in to comment.