Skip to content

Commit

Permalink
Merge branch 'sync-benchmarks'
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed May 20, 2024
2 parents c8cc2dc + 4a5c1c8 commit 4a8e73c
Show file tree
Hide file tree
Showing 10 changed files with 358 additions and 44 deletions.
6 changes: 6 additions & 0 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ harness = false
name = "bench_large_file"
harness = false

[[bench]]
name = "bench_swarm"
harness = false

[dependencies]
# NOTE: There is a newer version of argon2, but that one is not backward
# compatible with 0.4.1. Thus before we can bump the argon2 version, we need to
Expand Down Expand Up @@ -87,9 +91,11 @@ network-interface = "0.1.3"

[dev-dependencies]
assert_matches = { workspace = true }
clap = { workspace = true }
criterion = { version = "0.4", features = ["html_reports"] }
metrics_ext = { path = "../metrics_ext" }
ouisync-tracing-fmt = { path = "../tracing_fmt" }
parse-size = "1.0"
proptest = "1.0"
rmp-serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
223 changes: 223 additions & 0 deletions lib/benches/bench_swarm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
//! Simulation of a swarm of Ouisync instances sharing a repository. Useful for benchmarking sync
//! performance.
#[path = "../tests/common/mod.rs"]
#[macro_use]
mod common;

use clap::Parser;
use common::{actor, sync_watch, Env, Proto, DEFAULT_REPO};
use ouisync::{AccessMode, File};
use rand::{distributions::Standard, rngs::StdRng, Rng, SeedableRng};
use std::{fmt, process::ExitCode, sync::Arc, time::Instant};
use tokio::sync::Barrier;

fn main() -> ExitCode {
let options = Options::parse();
if options.num_writers == 0 {
eprintln!("error: at least one write replica required");
return ExitCode::FAILURE;
}

let file_size = options.file_size;
let actors: Vec<_> = (0..options.num_writers)
.map(|i| ActorId(AccessMode::Write, i))
.chain((0..options.num_readers).map(|i| ActorId(AccessMode::Read, i)))
.collect();
let proto = options.protocol;

let mut env = Env::new();

// Wait until everyone is fully synced.
let (watch_tx, watch_rx) = sync_watch::channel();
let mut watch_tx = Some(watch_tx);

// Then wait until everyone is done. This is so that even actors that finished syncing still
// remain online for other actors to sync from.
let barrier = Arc::new(Barrier::new(actors.len()));

let file_name = "file.dat";
let file_seed = 0;

for actor in &actors {
let other_actors: Vec<_> = actors
.iter()
.filter(|other_actor| *other_actor != actor)
.copied()
.collect();

let access_mode = actor.0;

let watch_tx = (actor.0 == AccessMode::Write)
.then(|| watch_tx.take())
.flatten();
let watch_rx = watch_rx.clone();
let barrier = barrier.clone();

env.actor(&actor.to_string(), async move {
let network = actor::create_network(proto).await;

// Connect to the other peers
for other_actor in other_actors {
let addr = actor::lookup_addr(&other_actor.to_string()).await;
network.add_user_provided_peer(&addr);
}

// Create the repo
let repo = actor::create_repo_with_mode(DEFAULT_REPO, access_mode).await;
let _reg = network.register(repo.handle()).await;

if let Some(watch_tx) = watch_tx {
drop(watch_rx);

let mut file = repo.create_file(file_name).await.unwrap();
write_random_file(&mut file, file_seed, file_size).await;

watch_tx.run(&repo).await;
} else {
watch_rx.run(&repo).await;

let mut file = repo.open_file(file_name).await.unwrap();
check_random_file(&mut file, file_seed, file_size).await;
}

info!("done");

barrier.wait().await;
});
}

drop(watch_rx);

let start = Instant::now();
info!("simulation started");
drop(env);
info!(
"simulation completed in {:.3}s",
start.elapsed().as_secs_f64()
);

ExitCode::SUCCESS
}

#[derive(Parser, Debug)]
struct Options {
/// Size of the file to share in bytes. Can use metric (kB, MB, ...) or binary (kiB, MiB, ...)
/// suffixes.
#[arg(short = 's', long, value_parser = parse_size, default_value_t = 1024 * 1024)]
pub file_size: u64,

/// Number of replicas with write access. Must be at least 1.
#[arg(short = 'w', long, default_value_t = 2)]
pub num_writers: usize,

/// Number of replicas with read access.
#[arg(short = 'r', long, default_value_t = 0)]
pub num_readers: usize,

/// Network protocol to use (QUIC or TCP).
#[arg(short, long, value_parser, default_value_t = Proto::Quic)]
pub protocol: Proto,

// `cargo bench` passes the `--bench` flag down to the bench binary so we need to accept it even
// if we don't use it.
#[arg(
long = "bench",
hide = true,
hide_short_help = true,
hide_long_help = true
)]
_bench: bool,
}

#[derive(Clone, Copy, Eq, PartialEq)]
struct ActorId(AccessMode, usize);

impl fmt::Display for ActorId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}{}",
match self.0 {
AccessMode::Write => "w",
AccessMode::Read => "r",
AccessMode::Blind => "b",
},
self.1
)
}
}

fn parse_size(input: &str) -> Result<u64, parse_size::Error> {
parse_size::parse_size(input)
}

async fn write_random_file(file: &mut File, seed: u64, size: u64) {
let mut chunk = Vec::new();
let mut gen = RandomChunks::new(seed, size);

while gen.next(&mut chunk) {
file.write_all(&chunk).await.unwrap();
}

file.flush().await.unwrap();
}

async fn check_random_file(file: &mut File, seed: u64, size: u64) {
let mut expected = Vec::new();
let mut actual = Vec::new();
let mut gen = RandomChunks::new(seed, size);
let mut offset = 0;

while gen.next(&mut expected) {
actual.clear();
actual.resize(expected.len(), 0);

file.read_all(&mut actual).await.unwrap();

similar_asserts::assert_eq!(
actual,
expected,
"actor: {}, offset: {}",
actor::name(),
offset
);

offset += expected.len();
}
}

const CHUNK_SIZE: usize = 4096;

// Generate random byte chunks of `CHUNK_SIZE` up to the given total size.
struct RandomChunks {
rng: StdRng,
remaining: usize,
}

impl RandomChunks {
fn new(seed: u64, size: u64) -> Self {
Self {
rng: StdRng::seed_from_u64(seed),
remaining: size as usize,
}
}

fn next(&mut self, chunk: &mut Vec<u8>) -> bool {
if self.remaining == 0 {
return false;
}

let chunk_size = CHUNK_SIZE.min(self.remaining);
self.remaining -= chunk_size;

chunk.clear();
chunk.extend(
(&mut self.rng)
.sample_iter::<u8, _>(Standard)
.take(chunk_size),
);

true
}
}
2 changes: 1 addition & 1 deletion lib/src/network/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl Inner {
self.vault.monitor.responses_received.increment(1);

// TODO: The `BlockOffer` response doesn't require write access to the store and so
// can be processed faster than the other response types and. Furthermode, it can be
// can be processed faster than the other response types and furthermore, it can be
// processed concurrently. Consider using a separate queue and a separate `select`
// branch for it to speed things up.

Expand Down
9 changes: 1 addition & 8 deletions lib/src/network/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,21 +300,14 @@ impl Stacks {

fn start_punching_holes(&self, addr: PeerAddr) -> Option<scoped_task::ScopedJoinHandle<()>> {
if !addr.is_quic() {
tracing::debug!("Hole punching not started - not QUIC address");
return None;
}

if !ip::is_global(&addr.ip()) {
tracing::debug!("Hole punching not started - not global address");
return None;
}

let stack = if let Some(stack) = self.quic_stack_for(&addr.ip()) {
stack
} else {
tracing::debug!("Hole punching not started - no QUIC stack");
return None;
};
let stack = self.quic_stack_for(&addr.ip())?;

let sender = stack.hole_puncher.clone();
let task = scoped_task::spawn(
Expand Down
14 changes: 8 additions & 6 deletions lib/src/network/message_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,17 +155,13 @@ impl MessageBroker {
tracker: self.tracker.clone(),
};

tracing::info!(?role, "Link created");

drop(span_enter);

let task = async move {
select! {
_ = link.maintain() => (),
_ = abort_rx => (),
}

tracing::info!("Link destroyed")
};
let task = task.instrument(span);

Expand Down Expand Up @@ -312,14 +308,20 @@ async fn run_link(
let (response_tx, response_rx) = mpsc::channel(1);
let (content_tx, content_rx) = mpsc::channel(1);

tracing::info!("Link opened");

// Run everything in parallel:
select! {
let flow = select! {
flow = run_client(repo.clone(), content_tx.clone(), response_rx, request_limiter) => flow,
flow = run_server(repo.clone(), content_tx.clone(), request_rx, choker) => flow,
flow = recv_messages(stream, request_tx, response_tx, pex_rx) => flow,
flow = send_messages(content_rx, sink) => flow,
_ = pex_tx.run(content_tx) => ControlFlow::Continue,
}
};

tracing::info!("Link closed");

flow
}

// Handle incoming messages
Expand Down
4 changes: 4 additions & 0 deletions lib/src/network/message_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,10 @@ async fn multi_stream_runner(
// has, then they'll send some small number of messages from their Barrier code. That's
// fine because that number does not exceed MAX_QUEUED_MESSAGES and so the above `tx.send`
// won't block for long.

// FIXME: When this timeout is triggered it closes this stream which closes all its channels
// and the links attached to them. This can cause sync to stop. We should find a better way
// to handle this or at least restart the channels when this happens.
match time::timeout(KEEP_ALIVE_RECV_INTERVAL, tx.send((permit_id, message))).await {
Ok(Ok(())) => (),
Err(_) | Ok(Err(_)) => break,
Expand Down
39 changes: 38 additions & 1 deletion lib/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::{
future::Future,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
path::{Path, PathBuf},
str::FromStr,
sync::Arc,
thread,
};
Expand Down Expand Up @@ -172,6 +173,10 @@ pub(crate) mod actor {
use state_monitor::StateMonitor;
use tokio::sync::watch;

pub(crate) fn name() -> String {
ACTOR.with(|actor| actor.name.clone())
}

pub(crate) fn create_unbound_network() -> Network {
Network::new(None, StateMonitor::make_root())
}
Expand Down Expand Up @@ -368,7 +373,7 @@ impl Drop for TempDir {
}
}

#[derive(Clone, Copy, Eq, PartialEq)]
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
pub(crate) enum Proto {
Tcp,
Quic,
Expand All @@ -390,6 +395,38 @@ impl Proto {
}
}

impl FromStr for Proto {
type Err = ProtoParseError;

fn from_str(input: &str) -> Result<Self, Self::Err> {
match input.trim().to_lowercase().as_str() {
"tcp" => Ok(Self::Tcp),
"quic" => Ok(Self::Quic),
_ => Err(ProtoParseError),
}
}
}

impl fmt::Display for Proto {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Tcp => write!(f, "TCP"),
Self::Quic => write!(f, "QUIC"),
}
}
}

#[derive(Debug)]
pub struct ProtoParseError;

impl fmt::Display for ProtoParseError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "failed to parse protocol")
}
}

impl std::error::Error for ProtoParseError {}

// Keep calling `f` until it returns `true`. Wait for repo notification between calls.

pub(crate) async fn eventually<F, Fut>(repo: &Repository, mut f: F)
Expand Down
Loading

0 comments on commit 4a8e73c

Please sign in to comment.