From 2cab1d2066a021152370b3c688352a7dcf6acc95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Cig=C3=A1nek?= Date: Mon, 20 May 2024 14:12:42 +0200 Subject: [PATCH 01/15] Add sync progress reporter to the swarm benchmark --- lib/benches/bench_swarm.rs | 43 +++++++++++---- lib/src/progress.rs | 2 +- lib/tests/common/mod.rs | 1 + lib/tests/common/sync_reporter.rs | 88 +++++++++++++++++++++++++++++++ 4 files changed, 123 insertions(+), 11 deletions(-) create mode 100644 lib/tests/common/sync_reporter.rs diff --git a/lib/benches/bench_swarm.rs b/lib/benches/bench_swarm.rs index 0adee96f3..021fd8d7d 100644 --- a/lib/benches/bench_swarm.rs +++ b/lib/benches/bench_swarm.rs @@ -6,11 +6,11 @@ mod common; use clap::Parser; -use common::{actor, sync_watch, Env, Proto, DEFAULT_REPO}; +use common::{actor, sync_reporter::SyncReporter, sync_watch, Env, Proto, DEFAULT_REPO}; use ouisync::{AccessMode, File}; use rand::{distributions::Standard, rngs::StdRng, Rng, SeedableRng}; use std::{fmt, process::ExitCode, sync::Arc, time::Instant}; -use tokio::sync::Barrier; +use tokio::{select, sync::Barrier}; fn main() -> ExitCode { let options = Options::parse(); @@ -36,6 +36,8 @@ fn main() -> ExitCode { // remain online for other actors to sync from. let barrier = Arc::new(Barrier::new(actors.len())); + let reporter = SyncReporter::new(); + let file_name = "file.dat"; let file_seed = 0; @@ -53,6 +55,7 @@ fn main() -> ExitCode { .flatten(); let watch_rx = watch_rx.clone(); let barrier = barrier.clone(); + let reporter = reporter.clone(); env.actor(&actor.to_string(), async move { let network = actor::create_network(proto).await; @@ -67,16 +70,28 @@ fn main() -> ExitCode { let repo = actor::create_repo_with_mode(DEFAULT_REPO, access_mode).await; let _reg = network.register(repo.handle()).await; - if let Some(watch_tx) = watch_tx { - drop(watch_rx); - + // One writer creates the file initially. + if watch_tx.is_some() { let mut file = repo.create_file(file_name).await.unwrap(); write_random_file(&mut file, file_seed, file_size).await; + } - watch_tx.run(&repo).await; - } else { - watch_rx.run(&repo).await; + let run = async { + if let Some(watch_tx) = watch_tx { + drop(watch_rx); + watch_tx.run(&repo).await; + } else { + watch_rx.run(&repo).await; + } + }; + + select! { + _ = run => (), + _ = reporter.run(&repo) => (), + } + // Check the file content matches the original file. + { let mut file = repo.open_file(file_name).await.unwrap(); check_random_file(&mut file, file_seed, file_size).await; } @@ -119,8 +134,8 @@ struct Options { #[arg(short, long, value_parser, default_value_t = Proto::Quic)] pub protocol: Proto, - // `cargo bench` passes the `--bench` flag down to the bench binary so we need to accept it even - // if we don't use it. + // The following arguments may be passed down from `cargo bench` so we need to accept them even + // if we don't use them. #[arg( long = "bench", hide = true, @@ -128,6 +143,14 @@ struct Options { hide_long_help = true )] _bench: bool, + + #[arg( + long = "profile-time", + hide = true, + hide_short_help = true, + hide_long_help = true + )] + _profile_time: Option, } #[derive(Clone, Copy, Eq, PartialEq)] diff --git a/lib/src/progress.rs b/lib/src/progress.rs index ab8b1079a..b8d40c2e7 100644 --- a/lib/src/progress.rs +++ b/lib/src/progress.rs @@ -5,7 +5,7 @@ use std::{ }; /// Progress of a task. -#[derive(Clone, Copy, Eq, PartialEq, Debug, Serialize, Deserialize)] +#[derive(Default, Clone, Copy, Eq, PartialEq, Debug, Serialize, Deserialize)] pub struct Progress { pub value: u64, pub total: u64, diff --git a/lib/tests/common/mod.rs b/lib/tests/common/mod.rs index 4d61d1d4d..51db3436a 100644 --- a/lib/tests/common/mod.rs +++ b/lib/tests/common/mod.rs @@ -4,6 +4,7 @@ mod macros; pub(crate) mod dump; +pub(crate) mod sync_reporter; pub(crate) mod sync_watch; pub(crate) mod traffic_monitor; mod wait_map; diff --git a/lib/tests/common/sync_reporter.rs b/lib/tests/common/sync_reporter.rs new file mode 100644 index 000000000..9c5cb3415 --- /dev/null +++ b/lib/tests/common/sync_reporter.rs @@ -0,0 +1,88 @@ +use ouisync::{Progress, Repository}; +use std::{ + sync::{Arc, Mutex as BlockingMutex}, + time::{Duration, Instant}, +}; +use tokio::sync::broadcast::error::RecvError; + +const REPORT_INTERVAL: Duration = Duration::from_secs(1); + +/// Reports total sync progress of a group of actors. +#[derive(Clone)] +pub struct SyncReporter { + shared: Arc>, + progress: Progress, + prefix: String, +} + +struct Shared { + progress: Progress, + report_timestamp: Instant, +} + +impl SyncReporter { + pub fn new() -> Self { + Self { + shared: Arc::new(BlockingMutex::new(Shared { + progress: Progress::default(), + report_timestamp: Instant::now(), + })), + progress: Progress::default(), + prefix: String::new(), + } + } + + pub fn with_prefix(mut self, prefix: impl Into) -> Self { + self.prefix = prefix.into(); + self + } + + pub async fn run(mut self, repo: &Repository) { + let mut rx = repo.subscribe(); + + loop { + let progress = repo.sync_progress().await.unwrap(); + self.report(progress); + + match rx.recv().await { + Ok(_) | Err(RecvError::Lagged(_)) => continue, + Err(RecvError::Closed) => break, + } + } + } + + fn report(&mut self, progress: Progress) { + let mut shared = self.shared.lock().unwrap(); + + shared.progress = sub(shared.progress, self.progress); + shared.progress = add(shared.progress, progress); + self.progress = progress; + + let now = Instant::now(); + if (now.duration_since(shared.report_timestamp) < REPORT_INTERVAL) { + return; + } + + println!("{}{}", self.prefix, shared.progress.percent()); + + shared.report_timestamp = now; + } +} + +fn add(a: Progress, b: Progress) -> Progress { + Progress { + value: a.value + b.value, + total: a.total + b.total, + } +} + +fn sub(a: Progress, b: Progress) -> Progress { + Progress { + value: a.value - b.value, + total: a.total - b.total, + } +} + +fn is_complete(progress: &Progress) -> bool { + progress.total > 0 && progress.value >= progress.total +} From 0e98348e807aa1c30ae9c434ea803720c366276d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Cig=C3=A1nek?= Date: Tue, 21 May 2024 10:41:33 +0200 Subject: [PATCH 02/15] Report sent and received bytes stats in the swarm benchmark --- lib/Cargo.toml | 2 +- lib/benches/bench_swarm.rs | 66 ++++++++++++++++++--- lib/tests/common/mod.rs | 89 +++++++++++++++-------------- lib/tests/common/traffic_monitor.rs | 4 +- lib/tests/gc.rs | 3 +- lib/tests/sync.rs | 29 ++++++++-- 6 files changed, 133 insertions(+), 60 deletions(-) diff --git a/lib/Cargo.toml b/lib/Cargo.toml index ecee9fea6..1e6e1a404 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -93,9 +93,9 @@ network-interface = "0.1.3" assert_matches = { workspace = true } clap = { workspace = true } criterion = { version = "0.4", features = ["html_reports"] } +hdrhistogram = { version = "7.5.4", default-features = false, features = ["sync"] } metrics_ext = { path = "../metrics_ext" } ouisync-tracing-fmt = { path = "../tracing_fmt" } -parse-size = "1.0" proptest = "1.0" rmp-serde = { workspace = true } serde_json = { workspace = true } diff --git a/lib/benches/bench_swarm.rs b/lib/benches/bench_swarm.rs index 021fd8d7d..3a871f647 100644 --- a/lib/benches/bench_swarm.rs +++ b/lib/benches/bench_swarm.rs @@ -7,9 +7,15 @@ mod common; use clap::Parser; use common::{actor, sync_reporter::SyncReporter, sync_watch, Env, Proto, DEFAULT_REPO}; -use ouisync::{AccessMode, File}; +use hdrhistogram::{Histogram, SyncHistogram}; +use ouisync::{network::TrafficStats, AccessMode, File, StorageSize}; use rand::{distributions::Standard, rngs::StdRng, Rng, SeedableRng}; -use std::{fmt, process::ExitCode, sync::Arc, time::Instant}; +use std::{ + fmt, + process::ExitCode, + sync::Arc, + time::{Duration, Instant}, +}; use tokio::{select, sync::Barrier}; fn main() -> ExitCode { @@ -36,7 +42,9 @@ fn main() -> ExitCode { // remain online for other actors to sync from. let barrier = Arc::new(Barrier::new(actors.len())); - let reporter = SyncReporter::new(); + let progress_reporter = SyncReporter::new(); + let mut send_histogram = SyncHistogram::from(Histogram::::new(3).unwrap()); + let mut recv_histogram = SyncHistogram::from(Histogram::::new(3).unwrap()); let file_name = "file.dat"; let file_seed = 0; @@ -55,7 +63,9 @@ fn main() -> ExitCode { .flatten(); let watch_rx = watch_rx.clone(); let barrier = barrier.clone(); - let reporter = reporter.clone(); + let progress_reporter = progress_reporter.clone(); + let mut send_histogram_recorder = send_histogram.recorder(); + let mut recv_histogram_recorder = recv_histogram.recorder(); env.actor(&actor.to_string(), async move { let network = actor::create_network(proto).await; @@ -76,6 +86,7 @@ fn main() -> ExitCode { write_random_file(&mut file, file_seed, file_size).await; } + // Wait until fully synced + report progress let run = async { if let Some(watch_tx) = watch_tx { drop(watch_rx); @@ -87,7 +98,7 @@ fn main() -> ExitCode { select! { _ = run => (), - _ = reporter.run(&repo) => (), + _ = progress_reporter.run(&repo) => (), } // Check the file content matches the original file. @@ -99,17 +110,31 @@ fn main() -> ExitCode { info!("done"); barrier.wait().await; + + let TrafficStats { send, recv } = network.traffic_stats(); + + info!(send, recv); + + send_histogram_recorder.record(send).unwrap(); + recv_histogram_recorder.record(recv).unwrap(); }); } drop(watch_rx); let start = Instant::now(); - info!("simulation started"); + drop(env); - info!( - "simulation completed in {:.3}s", - start.elapsed().as_secs_f64() + + send_histogram.refresh(); + recv_histogram.refresh(); + + println!(); + println!( + "duration: {}, send: {{ {} }}, recv: {{ {} }}", + Seconds(start.elapsed()), + DisplayHistogram(&send_histogram), + DisplayHistogram(&recv_histogram) ); ExitCode::SUCCESS @@ -244,3 +269,26 @@ impl RandomChunks { true } } + +struct DisplayHistogram<'a>(&'a Histogram); + +impl fmt::Display for DisplayHistogram<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "min: {}, max: {}, mean: {}, stdev: {}", + StorageSize::from_bytes(self.0.min()), + StorageSize::from_bytes(self.0.max()), + StorageSize::from_bytes(self.0.mean().round() as u64), + StorageSize::from_bytes(self.0.stdev().round() as u64), + ) + } +} + +struct Seconds(Duration); + +impl fmt::Display for Seconds { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:.2} s", self.0.as_secs_f64()) + } +} diff --git a/lib/tests/common/mod.rs b/lib/tests/common/mod.rs index 51db3436a..5cce1beaf 100644 --- a/lib/tests/common/mod.rs +++ b/lib/tests/common/mod.rs @@ -170,6 +170,7 @@ pub(crate) mod env { pub(crate) mod actor { use super::*; use metrics::Key; + use metrics_ext::Pair; use ouisync::{AccessMode, RepositoryParams}; use state_monitor::StateMonitor; use tokio::sync::watch; @@ -227,29 +228,43 @@ pub(crate) mod actor { } } - pub(crate) fn get_repo_params_and_secrets( + pub(crate) fn get_repo_params(name: &str) -> RepositoryParams { + get_repo_params_without_recorder(name).with_recorder(get_default_repo_recorder()) + } + + pub(crate) fn get_repo_params_with_recorder( name: &str, - ) -> ( - RepositoryParams>, - AccessSecrets, - ) { + recorder: R, + ) -> RepositoryParams> { + get_repo_params_without_recorder(name) + .with_recorder(Pair(get_default_repo_recorder(), recorder)) + } + + pub(crate) fn get_repo_params_without_recorder(name: &str) -> RepositoryParams { ACTOR.with(|actor| { - let recorder = metrics_ext::AddLabels::new( + RepositoryParams::new(actor.repo_path(name)) + .with_device_id(actor.device_id) + .with_parent_monitor(actor.monitor.clone()) + }) + } + + pub(crate) fn get_default_repo_recorder() -> DefaultRecorder { + ACTOR.with(|actor| { + metrics_ext::AddLabels::new( vec![Label::new("actor", actor.name.clone())], actor.context.recorder.clone(), - ); + ) + }) + } - let params = RepositoryParams::new(actor.repo_path(name)) - .with_device_id(actor.device_id) - .with_recorder(recorder) - .with_parent_monitor(actor.monitor.clone()); + type DefaultRecorder = metrics_ext::AddLabels; - let secrets = actor + pub(crate) fn get_repo_secrets(name: &str) -> AccessSecrets { + ACTOR.with(|actor| { + actor .context .repo_map - .get_or_insert_with(name.to_owned(), AccessSecrets::random_write); - - (params, secrets) + .get_or_insert_with(name.to_owned(), AccessSecrets::random_write) }) } @@ -258,7 +273,8 @@ pub(crate) mod actor { } pub(crate) async fn create_repo_with_mode(name: &str, mode: AccessMode) -> Repository { - let (params, secrets) = get_repo_params_and_secrets(name); + let params = get_repo_params(name); + let secrets = get_repo_secrets(name); Repository::create(¶ms, Access::new(None, None, secrets.with_mode(mode))) .await @@ -285,10 +301,6 @@ pub(crate) mod actor { let (repo, reg) = create_linked_repo(DEFAULT_REPO, &network).await; (network, repo, reg) } - - pub(crate) fn recorder_subscriber() -> WatchRecorderSubscriber { - ACTOR.with(|actor| actor.context.recorder_subscriber.clone()) - } } task_local! { @@ -300,7 +312,6 @@ struct Context { addr_map: WaitMap, repo_map: WaitMap, recorder: metrics_ext::Shared, - recorder_subscriber: WatchRecorderSubscriber, monitor: StateMonitor, } @@ -308,16 +319,13 @@ impl Context { fn new(runtime: &Handle) -> Self { init_log(); - let watch_recorder = WatchRecorder::new(); - let recorder_subscriber = watch_recorder.subscriber(); - let recorder = init_recorder(runtime, watch_recorder); + let recorder = init_recorder(runtime); Self { base_dir: TempDir::new(), addr_map: WaitMap::new(), repo_map: WaitMap::new(), recorder, - recorder_subscriber, monitor: StateMonitor::make_root(), } } @@ -329,12 +337,14 @@ struct Actor { base_dir: PathBuf, device_id: DeviceId, monitor: StateMonitor, + recorder: WatchRecorder, } impl Actor { fn new(name: String, context: Arc) -> Self { let base_dir = context.base_dir.path().join(&name); let monitor = context.monitor.make_child(&name); + let recorder = WatchRecorder::new(); Actor { name, @@ -342,6 +352,7 @@ impl Actor { base_dir, device_id: rand::random(), monitor, + recorder, } } @@ -715,17 +726,15 @@ pub(crate) fn init_log() { } #[cfg(feature = "prometheus")] -fn init_recorder(runtime: &Handle, watch_recorder: WatchRecorder) -> metrics_ext::Shared { - use metrics_ext::{Pair, Shared}; - - Shared::new(Pair(watch_recorder, init_prometheus_recorder(runtime))) +fn init_recorder(runtime: &Handle) -> metrics_ext::Shared { + use metrics_ext::Shared; + Shared::new(init_prometheus_recorder(runtime)) } #[cfg(feature = "influxdb")] -fn init_recorder(runtime: &Handle, watch_recorder: WatchRecorder) -> metrics_ext::Shared { - use metrics_ext::{Pair, Shared}; - - Shared::new(Pair(watch_recorder, init_influxdb_recorder(runtime))) +fn init_recorder(runtime: &Handle) -> metrics_ext::Shared { + use metrics_ext::Shared; + Shared::new(init_influxdb_recorder(runtime)) } #[cfg(all(feature = "prometheus", feature = "influxdb"))] @@ -733,19 +742,15 @@ fn init_recorder(runtime: &Handle, watch_recorder: WatchRecorder) -> metrics_ext use metrics_ext::{Pair, Shared}; Shared::new(Pair( - watch_recorder, - Pair( - init_prometheus_recorder(runtime), - init_influxdb_recorder(runtime), - ), + init_prometheus_recorder(runtime), + init_influxdb_recorder(runtime), )) } #[cfg(not(any(feature = "prometheus", feature = "influxdb")))] -fn init_recorder(_runtime: &Handle, watch_recorder: WatchRecorder) -> metrics_ext::Shared { - use metrics_ext::{Pair, Shared}; - - Shared::new(watch_recorder) +fn init_recorder(_runtime: &Handle) -> metrics_ext::Shared { + use metrics_ext::Shared; + Shared::new(NoopRecorder) } #[cfg(feature = "prometheus")] diff --git a/lib/tests/common/traffic_monitor.rs b/lib/tests/common/traffic_monitor.rs index 9bf278118..b430a69a6 100644 --- a/lib/tests/common/traffic_monitor.rs +++ b/lib/tests/common/traffic_monitor.rs @@ -1,3 +1,4 @@ +use metrics_ext::WatchRecorderSubscriber; use ouisync::Repository; use state_monitor::StateMonitor; use tokio::{select, sync::watch}; @@ -14,8 +15,7 @@ pub(crate) struct TrafficMonitor { } impl TrafficMonitor { - pub fn new() -> Self { - let subscriber = actor::recorder_subscriber(); + pub fn new(subscriber: WatchRecorderSubscriber) -> Self { let requests_pending_rx = subscriber.gauge("requests pending".into()); Self { diff --git a/lib/tests/gc.rs b/lib/tests/gc.rs index c0489d52b..9ea455c21 100644 --- a/lib/tests/gc.rs +++ b/lib/tests/gc.rs @@ -232,7 +232,8 @@ fn change_access_mode() { let content2 = common::random_bytes(3 * BLOCK_SIZE - BLOB_HEADER_SIZE); async fn create_repo_and_reopen_in_blind_mode() -> Repository { - let (params, secrets) = actor::get_repo_params_and_secrets(DEFAULT_REPO); + let params = actor::get_repo_params(DEFAULT_REPO); + let secrets = actor::get_repo_secrets(DEFAULT_REPO); let repo = Repository::create( ¶ms, ouisync::Access::WriteUnlocked { diff --git a/lib/tests/sync.rs b/lib/tests/sync.rs index 1336babd0..0b5ec9d4c 100644 --- a/lib/tests/sync.rs +++ b/lib/tests/sync.rs @@ -7,6 +7,7 @@ use self::common::{ actor, dump, sync_watch, traffic_monitor::TrafficMonitor, Env, Proto, DEFAULT_REPO, }; use assert_matches::assert_matches; +use metrics_ext::WatchRecorder; use ouisync::{ Access, AccessMode, EntryType, Error, Repository, StorageSize, StoreError, VersionVector, BLOB_HEADER_SIZE, BLOCK_SIZE, @@ -528,7 +529,8 @@ fn recreate_local_branch() { let network = actor::create_network(proto).await; // 1. Create the repo but don't link it yet. - let (params, secrets) = actor::get_repo_params_and_secrets(DEFAULT_REPO); + let params = actor::get_repo_params(DEFAULT_REPO); + let secrets = actor::get_repo_secrets(DEFAULT_REPO); let repo = Repository::create(¶ms, Access::new(None, None, secrets)) .await .unwrap(); @@ -1110,11 +1112,20 @@ fn quota_exceed() { env.actor("reader", { async move { - let mut traffic = TrafficMonitor::new(); + let watch_recorder = WatchRecorder::new(); + let mut traffic = TrafficMonitor::new(watch_recorder.subscriber()); let network = actor::create_network(Proto::Tcp).await; - let repo = actor::create_repo_with_mode(DEFAULT_REPO, AccessMode::Read).await; + let params = actor::get_repo_params_with_recorder(DEFAULT_REPO, watch_recorder); + let secrets = actor::get_repo_secrets(DEFAULT_REPO); + let repo = Repository::create( + ¶ms, + Access::new(None, None, secrets.with_mode(AccessMode::Read)), + ) + .await + .unwrap(); + repo.set_quota(Some(quota)).await.unwrap(); let _reg = network.register(repo.handle()).await; @@ -1176,13 +1187,21 @@ fn quota_concurrent_writes() { } env.actor("reader", async move { - let mut traffic = TrafficMonitor::new(); + let watch_recorder = WatchRecorder::new(); + let mut traffic = TrafficMonitor::new(watch_recorder.subscriber()); let network = actor::create_network(Proto::Tcp).await; network.add_user_provided_peer(&actor::lookup_addr("writer-0").await); network.add_user_provided_peer(&actor::lookup_addr("writer-1").await); - let repo = actor::create_repo_with_mode(DEFAULT_REPO, AccessMode::Read).await; + let params = actor::get_repo_params_with_recorder(DEFAULT_REPO, watch_recorder); + let secrets = actor::get_repo_secrets(DEFAULT_REPO); + let repo = Repository::create( + ¶ms, + Access::new(None, None, secrets.with_mode(AccessMode::Read)), + ) + .await + .unwrap(); repo.set_quota(Some(quota)).await.unwrap(); let _reg = network.register(repo.handle()).await; From 8004ea911edb3a584b606d261a962143516b9643 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Cig=C3=A1nek?= Date: Tue, 21 May 2024 16:44:07 +0200 Subject: [PATCH 03/15] Add optional progress bar to the swarm benchmark --- lib/Cargo.toml | 1 + lib/benches/bench_swarm.rs | 83 +++++++++++++------- lib/tests/common/mod.rs | 2 +- lib/tests/common/progress.rs | 124 ++++++++++++++++++++++++++++++ lib/tests/common/sync_reporter.rs | 88 --------------------- 5 files changed, 182 insertions(+), 116 deletions(-) create mode 100644 lib/tests/common/progress.rs delete mode 100644 lib/tests/common/sync_reporter.rs diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 1e6e1a404..2b60379c4 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -94,6 +94,7 @@ assert_matches = { workspace = true } clap = { workspace = true } criterion = { version = "0.4", features = ["html_reports"] } hdrhistogram = { version = "7.5.4", default-features = false, features = ["sync"] } +indicatif = "0.17.8" metrics_ext = { path = "../metrics_ext" } ouisync-tracing-fmt = { path = "../tracing_fmt" } proptest = "1.0" diff --git a/lib/benches/bench_swarm.rs b/lib/benches/bench_swarm.rs index 3a871f647..83098fd91 100644 --- a/lib/benches/bench_swarm.rs +++ b/lib/benches/bench_swarm.rs @@ -6,12 +6,12 @@ mod common; use clap::Parser; -use common::{actor, sync_reporter::SyncReporter, sync_watch, Env, Proto, DEFAULT_REPO}; +use common::{actor, progress::ProgressReporter, sync_watch, Env, Proto, DEFAULT_REPO}; use hdrhistogram::{Histogram, SyncHistogram}; use ouisync::{network::TrafficStats, AccessMode, File, StorageSize}; use rand::{distributions::Standard, rngs::StdRng, Rng, SeedableRng}; use std::{ - fmt, + fmt, io, process::ExitCode, sync::Arc, time::{Duration, Instant}, @@ -42,7 +42,7 @@ fn main() -> ExitCode { // remain online for other actors to sync from. let barrier = Arc::new(Barrier::new(actors.len())); - let progress_reporter = SyncReporter::new(); + let progress_reporter = options.progress.then(ProgressReporter::new); let mut send_histogram = SyncHistogram::from(Histogram::::new(3).unwrap()); let mut recv_histogram = SyncHistogram::from(Histogram::::new(3).unwrap()); @@ -96,9 +96,13 @@ fn main() -> ExitCode { } }; - select! { - _ = run => (), - _ = progress_reporter.run(&repo) => (), + if let Some(progress_reporter) = progress_reporter { + select! { + _ = run => (), + _ = progress_reporter.run(&repo) => (), + } + } else { + run.await; } // Check the file content matches the original file. @@ -125,17 +129,19 @@ fn main() -> ExitCode { let start = Instant::now(); drop(env); + drop(progress_reporter); send_histogram.refresh(); recv_histogram.refresh(); println!(); - println!( - "duration: {}, send: {{ {} }}, recv: {{ {} }}", - Seconds(start.elapsed()), - DisplayHistogram(&send_histogram), - DisplayHistogram(&recv_histogram) - ); + print_summary( + &mut io::stdout().lock(), + start.elapsed(), + &send_histogram, + &recv_histogram, + ) + .unwrap(); ExitCode::SUCCESS } @@ -159,6 +165,10 @@ struct Options { #[arg(short, long, value_parser, default_value_t = Proto::Quic)] pub protocol: Proto, + /// Whether to show progress bars. + #[arg(long)] + pub progress: bool, + // The following arguments may be passed down from `cargo bench` so we need to accept them even // if we don't use them. #[arg( @@ -270,21 +280,6 @@ impl RandomChunks { } } -struct DisplayHistogram<'a>(&'a Histogram); - -impl fmt::Display for DisplayHistogram<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "min: {}, max: {}, mean: {}, stdev: {}", - StorageSize::from_bytes(self.0.min()), - StorageSize::from_bytes(self.0.max()), - StorageSize::from_bytes(self.0.mean().round() as u64), - StorageSize::from_bytes(self.0.stdev().round() as u64), - ) - } -} - struct Seconds(Duration); impl fmt::Display for Seconds { @@ -292,3 +287,37 @@ impl fmt::Display for Seconds { write!(f, "{:.2} s", self.0.as_secs_f64()) } } + +fn print_summary( + writer: &mut W, + duration: Duration, + send_histogram: &Histogram, + recv_histogram: &Histogram, +) -> io::Result<()> { + writeln!(writer, "duration: {}", Seconds(duration))?; + + for (histogram, label) in [(send_histogram, "send"), (recv_histogram, "recv")] { + writeln!( + writer, + "{label}.min: {}", + StorageSize::from_bytes(histogram.min()) + )?; + writeln!( + writer, + "{label}.max: {}", + StorageSize::from_bytes(histogram.max()) + )?; + writeln!( + writer, + "{label}.mean: {}", + StorageSize::from_bytes(histogram.mean().round() as u64) + )?; + writeln!( + writer, + "{label}.stdev: {}", + StorageSize::from_bytes(histogram.stdev().round() as u64) + )?; + } + + Ok(()) +} diff --git a/lib/tests/common/mod.rs b/lib/tests/common/mod.rs index 5cce1beaf..d3e41eef1 100644 --- a/lib/tests/common/mod.rs +++ b/lib/tests/common/mod.rs @@ -4,7 +4,7 @@ mod macros; pub(crate) mod dump; -pub(crate) mod sync_reporter; +pub(crate) mod progress; pub(crate) mod sync_watch; pub(crate) mod traffic_monitor; mod wait_map; diff --git a/lib/tests/common/progress.rs b/lib/tests/common/progress.rs new file mode 100644 index 000000000..24e859a11 --- /dev/null +++ b/lib/tests/common/progress.rs @@ -0,0 +1,124 @@ +use super::actor; +use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle}; +use ouisync::{Progress, Repository, BLOCK_SIZE}; +use std::{ + fmt::Write, + sync::{Arc, Mutex as BlockingMutex}, + time::{Duration, Instant}, +}; +use tokio::sync::broadcast::error::RecvError; + +const REPORT_INTERVAL: Duration = Duration::from_secs(1); + +/// Reports total sync progress of a group of actors. +#[derive(Clone)] +pub struct ProgressReporter { + all_progress: Arc>, + one_progress: Progress, + bars: MultiProgress, + all_bar: ProgressBar, +} + +impl ProgressReporter { + pub fn new() -> Self { + let all_progress = Arc::new(BlockingMutex::new(Progress::default())); + let one_progress = Progress::default(); + let bars = MultiProgress::new(); + + let all_bar = bars.add(ProgressBar::new(1).with_style(all_progress_style())); + all_bar.set_prefix("total"); + + Self { + all_progress, + one_progress, + bars, + all_bar, + } + } + + pub async fn run(mut self, repo: &Repository) { + let mut rx = repo.subscribe(); + let one_bar = self + .bars + .add(ProgressBar::new(1).with_style(one_progress_style())); + one_bar.set_prefix(actor::name()); + + let _finisher = ProgressBarFinisher(&one_bar); + + loop { + let new_one_progress = repo.sync_progress().await.unwrap(); + + let all_progress = { + let mut all_progress = self.all_progress.lock().unwrap(); + *all_progress = sub(*all_progress, self.one_progress); + *all_progress = add(*all_progress, new_one_progress); + *all_progress + }; + + self.one_progress = new_one_progress; + + one_bar.set_length((self.one_progress.total * BLOCK_SIZE as u64).max(1)); + one_bar.set_position(self.one_progress.value * BLOCK_SIZE as u64); + + self.all_bar + .set_length((all_progress.total * BLOCK_SIZE as u64).max(1)); + self.all_bar + .set_position((all_progress.value * BLOCK_SIZE as u64).max(1)); + + match rx.recv().await { + Ok(_) | Err(RecvError::Lagged(_)) => continue, + Err(RecvError::Closed) => break, + } + } + } +} + +impl Drop for ProgressReporter { + fn drop(&mut self) { + if Arc::strong_count(&self.all_progress) <= 1 { + self.all_bar.finish_and_clear(); + } + } +} + +fn add(a: Progress, b: Progress) -> Progress { + Progress { + value: a.value + b.value, + total: a.total + b.total, + } +} + +fn sub(a: Progress, b: Progress) -> Progress { + Progress { + value: a.value - b.value, + total: a.total - b.total, + } +} + +fn is_complete(progress: &Progress) -> bool { + progress.total > 0 && progress.value >= progress.total +} + +fn all_progress_style() -> ProgressStyle { + ProgressStyle::with_template( + "{prefix:5} {spinner:.green} [{elapsed_precise}] [{wide_bar:.green.bold/blue}] {percent_precise}%", + ) + .unwrap() + .progress_chars("#>-") +} + +fn one_progress_style() -> ProgressStyle { + ProgressStyle::with_template( + "{prefix:5} {spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}", + ) + .unwrap() + .progress_chars("#>-") +} + +struct ProgressBarFinisher<'a>(&'a ProgressBar); + +impl Drop for ProgressBarFinisher<'_> { + fn drop(&mut self) { + self.0.finish_and_clear(); + } +} diff --git a/lib/tests/common/sync_reporter.rs b/lib/tests/common/sync_reporter.rs deleted file mode 100644 index 9c5cb3415..000000000 --- a/lib/tests/common/sync_reporter.rs +++ /dev/null @@ -1,88 +0,0 @@ -use ouisync::{Progress, Repository}; -use std::{ - sync::{Arc, Mutex as BlockingMutex}, - time::{Duration, Instant}, -}; -use tokio::sync::broadcast::error::RecvError; - -const REPORT_INTERVAL: Duration = Duration::from_secs(1); - -/// Reports total sync progress of a group of actors. -#[derive(Clone)] -pub struct SyncReporter { - shared: Arc>, - progress: Progress, - prefix: String, -} - -struct Shared { - progress: Progress, - report_timestamp: Instant, -} - -impl SyncReporter { - pub fn new() -> Self { - Self { - shared: Arc::new(BlockingMutex::new(Shared { - progress: Progress::default(), - report_timestamp: Instant::now(), - })), - progress: Progress::default(), - prefix: String::new(), - } - } - - pub fn with_prefix(mut self, prefix: impl Into) -> Self { - self.prefix = prefix.into(); - self - } - - pub async fn run(mut self, repo: &Repository) { - let mut rx = repo.subscribe(); - - loop { - let progress = repo.sync_progress().await.unwrap(); - self.report(progress); - - match rx.recv().await { - Ok(_) | Err(RecvError::Lagged(_)) => continue, - Err(RecvError::Closed) => break, - } - } - } - - fn report(&mut self, progress: Progress) { - let mut shared = self.shared.lock().unwrap(); - - shared.progress = sub(shared.progress, self.progress); - shared.progress = add(shared.progress, progress); - self.progress = progress; - - let now = Instant::now(); - if (now.duration_since(shared.report_timestamp) < REPORT_INTERVAL) { - return; - } - - println!("{}{}", self.prefix, shared.progress.percent()); - - shared.report_timestamp = now; - } -} - -fn add(a: Progress, b: Progress) -> Progress { - Progress { - value: a.value + b.value, - total: a.total + b.total, - } -} - -fn sub(a: Progress, b: Progress) -> Progress { - Progress { - value: a.value - b.value, - total: a.total - b.total, - } -} - -fn is_complete(progress: &Progress) -> bool { - progress.total > 0 && progress.value >= progress.total -} From 32fecf5279a22decd51349aab1932fd223f68541 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Cig=C3=A1nek?= Date: Wed, 22 May 2024 12:53:36 +0200 Subject: [PATCH 04/15] Add option to output swarm benchmark summary to a file --- lib/benches/bench_swarm.rs | 101 ++++++++++++----------------------- lib/benches/utils/summary.rs | 89 ++++++++++++++++++++++++++++++ lib/tests/common/progress.rs | 10 ++-- 3 files changed, 126 insertions(+), 74 deletions(-) create mode 100644 lib/benches/utils/summary.rs diff --git a/lib/benches/bench_swarm.rs b/lib/benches/bench_swarm.rs index 83098fd91..74ab892b6 100644 --- a/lib/benches/bench_swarm.rs +++ b/lib/benches/bench_swarm.rs @@ -5,17 +5,22 @@ #[macro_use] mod common; +#[path = "utils/summary.rs"] +mod summary; + use clap::Parser; use common::{actor, progress::ProgressReporter, sync_watch, Env, Proto, DEFAULT_REPO}; -use hdrhistogram::{Histogram, SyncHistogram}; -use ouisync::{network::TrafficStats, AccessMode, File, StorageSize}; +use ouisync::{AccessMode, File}; use rand::{distributions::Standard, rngs::StdRng, Rng, SeedableRng}; use std::{ - fmt, io, + fmt, + fs::OpenOptions, + io::{self, Write}, + path::PathBuf, process::ExitCode, sync::Arc, - time::{Duration, Instant}, }; +use summary::SummaryRecorder; use tokio::{select, sync::Barrier}; fn main() -> ExitCode { @@ -43,8 +48,7 @@ fn main() -> ExitCode { let barrier = Arc::new(Barrier::new(actors.len())); let progress_reporter = options.progress.then(ProgressReporter::new); - let mut send_histogram = SyncHistogram::from(Histogram::::new(3).unwrap()); - let mut recv_histogram = SyncHistogram::from(Histogram::::new(3).unwrap()); + let summary_recorder = SummaryRecorder::new(); let file_name = "file.dat"; let file_seed = 0; @@ -64,8 +68,7 @@ fn main() -> ExitCode { let watch_rx = watch_rx.clone(); let barrier = barrier.clone(); let progress_reporter = progress_reporter.clone(); - let mut send_histogram_recorder = send_histogram.recorder(); - let mut recv_histogram_recorder = recv_histogram.recorder(); + let summary_recorder = summary_recorder.actor(); env.actor(&actor.to_string(), async move { let network = actor::create_network(proto).await; @@ -115,33 +118,32 @@ fn main() -> ExitCode { barrier.wait().await; - let TrafficStats { send, recv } = network.traffic_stats(); - - info!(send, recv); - - send_histogram_recorder.record(send).unwrap(); - recv_histogram_recorder.record(recv).unwrap(); + summary_recorder.record(&network); }); } drop(watch_rx); - - let start = Instant::now(); - drop(env); drop(progress_reporter); - send_histogram.refresh(); - recv_histogram.refresh(); + let summary = summary_recorder.finalize(); println!(); - print_summary( - &mut io::stdout().lock(), - start.elapsed(), - &send_histogram, - &recv_histogram, - ) - .unwrap(); + serde_json::to_writer_pretty(io::stdout().lock(), &summary).unwrap(); + println!(); + + if let Some(path) = options.output { + let mut file = match OpenOptions::new().create(true).append(true).open(&path) { + Ok(file) => file, + Err(error) => { + eprintln!("error: failed to open/create {}: {}", path.display(), error); + return ExitCode::FAILURE; + } + }; + + serde_json::to_writer(&mut file, &summary).unwrap(); + file.write_all(b"\n").unwrap(); + } ExitCode::SUCCESS } @@ -169,6 +171,11 @@ struct Options { #[arg(long)] pub progress: bool, + /// File to append the summary to. Will be created if not exists. If ommited prints the summary + /// to stdout. + #[arg(short, long, value_name = "PATH")] + pub output: Option, + // The following arguments may be passed down from `cargo bench` so we need to accept them even // if we don't use them. #[arg( @@ -279,45 +286,3 @@ impl RandomChunks { true } } - -struct Seconds(Duration); - -impl fmt::Display for Seconds { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{:.2} s", self.0.as_secs_f64()) - } -} - -fn print_summary( - writer: &mut W, - duration: Duration, - send_histogram: &Histogram, - recv_histogram: &Histogram, -) -> io::Result<()> { - writeln!(writer, "duration: {}", Seconds(duration))?; - - for (histogram, label) in [(send_histogram, "send"), (recv_histogram, "recv")] { - writeln!( - writer, - "{label}.min: {}", - StorageSize::from_bytes(histogram.min()) - )?; - writeln!( - writer, - "{label}.max: {}", - StorageSize::from_bytes(histogram.max()) - )?; - writeln!( - writer, - "{label}.mean: {}", - StorageSize::from_bytes(histogram.mean().round() as u64) - )?; - writeln!( - writer, - "{label}.stdev: {}", - StorageSize::from_bytes(histogram.stdev().round() as u64) - )?; - } - - Ok(()) -} diff --git a/lib/benches/utils/summary.rs b/lib/benches/utils/summary.rs new file mode 100644 index 000000000..0a1bf175c --- /dev/null +++ b/lib/benches/utils/summary.rs @@ -0,0 +1,89 @@ +use hdrhistogram::{sync::Recorder, Histogram, SyncHistogram}; +use ouisync::network::{Network, TrafficStats}; +use serde::{ser::SerializeMap, Serialize}; +use std::{ + mem, + time::{Duration, Instant}, +}; + +pub(crate) struct SummaryRecorder { + send: SyncHistogram, + recv: SyncHistogram, + start: Instant, +} + +impl SummaryRecorder { + pub fn new() -> Self { + let send = SyncHistogram::from(Histogram::::new(3).unwrap()); + let recv = SyncHistogram::from(Histogram::::new(3).unwrap()); + + Self { + send, + recv, + start: Instant::now(), + } + } + + pub fn actor(&self) -> ActorSummaryRecorder { + ActorSummaryRecorder { + send: self.send.recorder(), + recv: self.recv.recorder(), + } + } + + pub fn finalize(mut self) -> Summary { + self.send.refresh(); + self.recv.refresh(); + + Summary { + duration: self.start.elapsed(), + send: mem::replace(&mut self.send, Histogram::new(3).unwrap()), + recv: mem::replace(&mut self.recv, Histogram::new(3).unwrap()), + } + } +} + +pub(crate) struct ActorSummaryRecorder { + send: Recorder, + recv: Recorder, +} + +impl ActorSummaryRecorder { + pub fn record(mut self, network: &Network) { + let TrafficStats { send, recv } = network.traffic_stats(); + + info!(send, recv); + + self.send.record(send).unwrap(); + self.recv.record(recv).unwrap(); + } +} + +#[derive(Serialize)] +pub(crate) struct Summary { + #[serde(serialize_with = "serialize_duration")] + pub duration: Duration, + #[serde(serialize_with = "serialize_histogram")] + pub send: Histogram, + #[serde(serialize_with = "serialize_histogram")] + pub recv: Histogram, +} + +fn serialize_duration(value: &Duration, s: S) -> Result +where + S: serde::Serializer, +{ + value.as_secs_f64().serialize(s) +} + +fn serialize_histogram(value: &Histogram, s: S) -> Result +where + S: serde::Serializer, +{ + let mut s = s.serialize_map(Some(4))?; + s.serialize_entry("min", &value.min())?; + s.serialize_entry("max", &value.max())?; + s.serialize_entry("mean", &(value.mean().round() as u64))?; + s.serialize_entry("stdev", &(value.stdev().round() as u64))?; + s.end() +} diff --git a/lib/tests/common/progress.rs b/lib/tests/common/progress.rs index 24e859a11..dccaa59d5 100644 --- a/lib/tests/common/progress.rs +++ b/lib/tests/common/progress.rs @@ -101,18 +101,16 @@ fn is_complete(progress: &Progress) -> bool { fn all_progress_style() -> ProgressStyle { ProgressStyle::with_template( - "{prefix:5} {spinner:.green} [{elapsed_precise}] [{wide_bar:.green.bold/blue}] {percent_precise}%", + "{prefix:5} [{elapsed_precise}] [{wide_bar:.green.bold/blue}] {percent_precise}% {bytes_per_sec:.dim}", ) .unwrap() .progress_chars("#>-") } fn one_progress_style() -> ProgressStyle { - ProgressStyle::with_template( - "{prefix:5} {spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}", - ) - .unwrap() - .progress_chars("#>-") + ProgressStyle::with_template("{prefix:5} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}") + .unwrap() + .progress_chars("#>-") } struct ProgressBarFinisher<'a>(&'a ProgressBar); From 54c0a12ab9bd830492aa9bb95f112ccf60dea264 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Cig=C3=A1nek?= Date: Wed, 22 May 2024 14:21:42 +0200 Subject: [PATCH 05/15] Support log together with progress bar in swarm benchmark --- lib/benches/bench_swarm.rs | 18 ++++-------- lib/tests/common/mod.rs | 33 ++++++++++++++------- lib/tests/common/progress.rs | 56 ++++++++++++++++++++++++++++++++++++ 3 files changed, 84 insertions(+), 23 deletions(-) diff --git a/lib/benches/bench_swarm.rs b/lib/benches/bench_swarm.rs index 74ab892b6..e1ec80507 100644 --- a/lib/benches/bench_swarm.rs +++ b/lib/benches/bench_swarm.rs @@ -37,6 +37,9 @@ fn main() -> ExitCode { .collect(); let proto = options.protocol; + let progress_reporter = ProgressReporter::new(); + common::init_log_with_writer(progress_reporter.stdout_writer()); + let mut env = Env::new(); // Wait until everyone is fully synced. @@ -47,7 +50,6 @@ fn main() -> ExitCode { // remain online for other actors to sync from. let barrier = Arc::new(Barrier::new(actors.len())); - let progress_reporter = options.progress.then(ProgressReporter::new); let summary_recorder = SummaryRecorder::new(); let file_name = "file.dat"; @@ -99,13 +101,9 @@ fn main() -> ExitCode { } }; - if let Some(progress_reporter) = progress_reporter { - select! { - _ = run => (), - _ = progress_reporter.run(&repo) => (), - } - } else { - run.await; + select! { + _ = run => (), + _ = progress_reporter.run(&repo) => (), } // Check the file content matches the original file. @@ -167,10 +165,6 @@ struct Options { #[arg(short, long, value_parser, default_value_t = Proto::Quic)] pub protocol: Proto, - /// Whether to show progress bars. - #[arg(long)] - pub progress: bool, - /// File to append the summary to. Will be created if not exists. If ommited prints the summary /// to stdout. #[arg(short, long, value_name = "PATH")] diff --git a/lib/tests/common/mod.rs b/lib/tests/common/mod.rs index d3e41eef1..ed9edd27d 100644 --- a/lib/tests/common/mod.rs +++ b/lib/tests/common/mod.rs @@ -28,6 +28,7 @@ use state_monitor::StateMonitor; use std::{ fmt, future::Future, + io, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, path::{Path, PathBuf}, str::FromStr, @@ -43,7 +44,10 @@ use tokio::{ use tracing::metadata::LevelFilter; use tracing::{instrument, Instrument, Span}; use tracing_subscriber::{ - fmt::time::SystemTime, layer::SubscriberExt, util::SubscriberInitExt, Layer, + fmt::{time::SystemTime, MakeWriter, TestWriter}, + layer::SubscriberExt, + util::SubscriberInitExt, + EnvFilter, Layer, }; pub(crate) const DEFAULT_REPO: &str = "default"; @@ -94,6 +98,10 @@ pub(crate) mod env { } } + // pub fn with_log_writer(writer: W) -> Self { + + // } + pub fn actor(&mut self, name: &str, f: Fut) where Fut: Future + Send + 'static, @@ -705,21 +713,24 @@ fn to_megabytes(bytes: usize) -> usize { } pub(crate) fn init_log() { - // Log to stdout - let stdout_layer = tracing_subscriber::fmt::layer() + // log output is captured by default and only shown on failure. Run tests with `--nocapture` to + // override. + init_log_with_writer(TestWriter::default()) +} + +pub(crate) fn init_log_with_writer(writer: W) +where + W: for<'w> MakeWriter<'w> + Send + Sync + 'static, +{ + tracing_subscriber::fmt() .event_format(Formatter::::default()) - // log output is captured by default and only shown on failure. Run tests with - // `--nocapture` to override. - .with_test_writer() - .with_filter( + .with_writer(writer) + .with_env_filter( tracing_subscriber::EnvFilter::builder() // Only show the logs if explicitly enabled with the `RUST_LOG` env variable. .with_default_directive(LevelFilter::OFF.into()) .from_env_lossy(), - ); - - tracing_subscriber::registry() - .with(stdout_layer) + ) .try_init() // `Err` here just means the logger is already initialized, it's OK to ignore it. .unwrap_or(()); diff --git a/lib/tests/common/progress.rs b/lib/tests/common/progress.rs index dccaa59d5..b3fc4a1b6 100644 --- a/lib/tests/common/progress.rs +++ b/lib/tests/common/progress.rs @@ -3,6 +3,7 @@ use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle}; use ouisync::{Progress, Repository, BLOCK_SIZE}; use std::{ fmt::Write, + io::{self, Stderr, Stdout}, sync::{Arc, Mutex as BlockingMutex}, time::{Duration, Instant}, }; @@ -36,6 +37,20 @@ impl ProgressReporter { } } + pub fn stdout_writer(&self) -> MakeWriter Stdout> { + MakeWriter { + bars: self.bars.clone(), + inner: io::stdout, + } + } + + pub fn stderr_writer(&self) -> MakeWriter Stderr> { + MakeWriter { + bars: self.bars.clone(), + inner: io::stderr, + } + } + pub async fn run(mut self, repo: &Repository) { let mut rx = repo.subscribe(); let one_bar = self @@ -120,3 +135,44 @@ impl Drop for ProgressBarFinisher<'_> { self.0.finish_and_clear(); } } + +pub struct Writer { + bars: MultiProgress, + inner: W, +} + +impl io::Write for Writer +where + W: io::Write, +{ + fn write(&mut self, buf: &[u8]) -> io::Result { + self.bars.suspend(|| self.inner.write(buf)) + } + + fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result { + self.bars.suspend(|| self.inner.write_vectored(bufs)) + } + + fn flush(&mut self) -> io::Result<()> { + self.bars.suspend(|| self.inner.flush()) + } +} + +pub struct MakeWriter { + bars: MultiProgress, + inner: T, +} + +impl<'w, T> tracing_subscriber::fmt::MakeWriter<'w> for MakeWriter +where + T: tracing_subscriber::fmt::MakeWriter<'w>, +{ + type Writer = Writer; + + fn make_writer(&'w self) -> Self::Writer { + Writer { + bars: self.bars.clone(), + inner: self.inner.make_writer(), + } + } +} From 4fea11170f3cd24ac32c554ed6ff84b6128ff5e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Cig=C3=A1nek?= Date: Wed, 22 May 2024 17:04:33 +0200 Subject: [PATCH 06/15] Implement throughtup and progress monitoring in swarm benchmark --- lib/benches/bench_swarm.rs | 108 ++++++++++++++++++++++++----- lib/src/network/traffic_tracker.rs | 14 ++-- lib/tests/common/mod.rs | 23 ++---- lib/tests/common/progress.rs | 43 ++++++------ lib/tests/sync.rs | 4 +- 5 files changed, 129 insertions(+), 63 deletions(-) diff --git a/lib/benches/bench_swarm.rs b/lib/benches/bench_swarm.rs index e1ec80507..7f271ca71 100644 --- a/lib/benches/bench_swarm.rs +++ b/lib/benches/bench_swarm.rs @@ -10,7 +10,11 @@ mod summary; use clap::Parser; use common::{actor, progress::ProgressReporter, sync_watch, Env, Proto, DEFAULT_REPO}; -use ouisync::{AccessMode, File}; +use metrics::{Counter, Gauge, Recorder}; +use ouisync::{ + network::{Network, TrafficStats}, + Access, AccessMode, File, Repository, +}; use rand::{distributions::Standard, rngs::StdRng, Rng, SeedableRng}; use std::{ fmt, @@ -19,9 +23,10 @@ use std::{ path::PathBuf, process::ExitCode, sync::Arc, + time::Duration, }; use summary::SummaryRecorder; -use tokio::{select, sync::Barrier}; +use tokio::{select, sync::Barrier, time}; fn main() -> ExitCode { let options = Options::parse(); @@ -75,47 +80,61 @@ fn main() -> ExitCode { env.actor(&actor.to_string(), async move { let network = actor::create_network(proto).await; - // Connect to the other peers - for other_actor in other_actors { - let addr = actor::lookup_addr(&other_actor.to_string()).await; - network.add_user_provided_peer(&addr); - } + let recorder = actor::get_default_recorder(); + let progress_monitor = ProgressMonitor::new(&recorder); + let throughput_monitor = ThroughputMonitor::new(&recorder); // Create the repo - let repo = actor::create_repo_with_mode(DEFAULT_REPO, access_mode).await; + let params = actor::get_repo_params(DEFAULT_REPO).with_recorder(recorder); + let secrets = actor::get_repo_secrets(DEFAULT_REPO); + let repo = Repository::create( + ¶ms, + Access::new(None, None, secrets.with_mode(access_mode)), + ) + .await + .unwrap(); let _reg = network.register(repo.handle()).await; - // One writer creates the file initially. + // Create the file by one of the writers if watch_tx.is_some() { let mut file = repo.create_file(file_name).await.unwrap(); write_random_file(&mut file, file_seed, file_size).await; } - // Wait until fully synced + report progress + // Connect to the other peers + for other_actor in other_actors { + let addr = actor::lookup_addr(&other_actor.to_string()).await; + network.add_user_provided_peer(&addr); + } + let run = async { + // Wait until fully synced if let Some(watch_tx) = watch_tx { drop(watch_rx); watch_tx.run(&repo).await; } else { watch_rx.run(&repo).await; } + + // Check the file content matches the original file. + { + let mut file = repo.open_file(file_name).await.unwrap(); + check_random_file(&mut file, file_seed, file_size).await; + } + + // Wait until everyone finished + barrier.wait().await; }; select! { _ = run => (), _ = progress_reporter.run(&repo) => (), - } - - // Check the file content matches the original file. - { - let mut file = repo.open_file(file_name).await.unwrap(); - check_random_file(&mut file, file_seed, file_size).await; + _ = progress_monitor.run(&repo) => (), + _ = throughput_monitor.run(&network) => (), } info!("done"); - barrier.wait().await; - summary_recorder.record(&network); }); } @@ -280,3 +299,56 @@ impl RandomChunks { true } } + +struct ProgressMonitor { + progress: Gauge, +} + +impl ProgressMonitor { + fn new(recorder: &impl Recorder) -> Self { + metrics::with_local_recorder(recorder, || Self { + progress: metrics::gauge!("progress"), + }) + } + + async fn run(&self, repo: &Repository) { + use tokio::sync::broadcast::error::RecvError; + + let mut rx = repo.subscribe(); + + loop { + let progress = repo.sync_progress().await.unwrap(); + self.progress.set(progress.ratio()); + + match rx.recv().await { + Ok(_) | Err(RecvError::Lagged(_)) => (), + Err(RecvError::Closed) => break, + } + } + } +} + +struct ThroughputMonitor { + bytes_sent: Counter, + bytes_received: Counter, +} + +impl ThroughputMonitor { + fn new(recorder: &impl Recorder) -> Self { + metrics::with_local_recorder(recorder, || Self { + bytes_sent: metrics::counter!("bytes_sent"), + bytes_received: metrics::counter!("bytes_received"), + }) + } + + async fn run(&self, network: &Network) { + loop { + let TrafficStats { send, recv } = network.traffic_stats(); + + self.bytes_sent.absolute(send); + self.bytes_received.absolute(recv); + + time::sleep(Duration::from_millis(250)).await; + } + } +} diff --git a/lib/src/network/traffic_tracker.rs b/lib/src/network/traffic_tracker.rs index 2ac6f43b8..fc736c4bf 100644 --- a/lib/src/network/traffic_tracker.rs +++ b/lib/src/network/traffic_tracker.rs @@ -24,10 +24,7 @@ impl TrafficTracker { } pub fn get(&self) -> TrafficStats { - TrafficStats { - send: self.counters.send.load(Ordering::Acquire), - recv: self.counters.recv.load(Ordering::Acquire), - } + self.counters.get() } } @@ -45,3 +42,12 @@ struct Counters { send: AtomicU64, recv: AtomicU64, } + +impl Counters { + fn get(&self) -> TrafficStats { + TrafficStats { + send: self.send.load(Ordering::Acquire), + recv: self.recv.load(Ordering::Acquire), + } + } +} diff --git a/lib/tests/common/mod.rs b/lib/tests/common/mod.rs index ed9edd27d..0cd81dfed 100644 --- a/lib/tests/common/mod.rs +++ b/lib/tests/common/mod.rs @@ -98,10 +98,6 @@ pub(crate) mod env { } } - // pub fn with_log_writer(writer: W) -> Self { - - // } - pub fn actor(&mut self, name: &str, f: Fut) where Fut: Future + Send + 'static, @@ -236,27 +232,16 @@ pub(crate) mod actor { } } - pub(crate) fn get_repo_params(name: &str) -> RepositoryParams { - get_repo_params_without_recorder(name).with_recorder(get_default_repo_recorder()) - } - - pub(crate) fn get_repo_params_with_recorder( - name: &str, - recorder: R, - ) -> RepositoryParams> { - get_repo_params_without_recorder(name) - .with_recorder(Pair(get_default_repo_recorder(), recorder)) - } - - pub(crate) fn get_repo_params_without_recorder(name: &str) -> RepositoryParams { + pub(crate) fn get_repo_params(name: &str) -> RepositoryParams { ACTOR.with(|actor| { RepositoryParams::new(actor.repo_path(name)) .with_device_id(actor.device_id) .with_parent_monitor(actor.monitor.clone()) + .with_recorder(NoopRecorder) }) } - pub(crate) fn get_default_repo_recorder() -> DefaultRecorder { + pub(crate) fn get_default_recorder() -> DefaultRecorder { ACTOR.with(|actor| { metrics_ext::AddLabels::new( vec![Label::new("actor", actor.name.clone())], @@ -265,7 +250,7 @@ pub(crate) mod actor { }) } - type DefaultRecorder = metrics_ext::AddLabels; + pub(crate) type DefaultRecorder = metrics_ext::AddLabels; pub(crate) fn get_repo_secrets(name: &str) -> AccessSecrets { ACTOR.with(|actor| { diff --git a/lib/tests/common/progress.rs b/lib/tests/common/progress.rs index b3fc4a1b6..193e9d1d8 100644 --- a/lib/tests/common/progress.rs +++ b/lib/tests/common/progress.rs @@ -1,13 +1,14 @@ use super::actor; -use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle}; -use ouisync::{Progress, Repository, BLOCK_SIZE}; +use futures_util::StreamExt; +use indicatif::{HumanBytes, MultiProgress, ProgressBar, ProgressState, ProgressStyle}; +use ouisync::{network::Network, Progress, Repository, BLOCK_SIZE}; use std::{ fmt::Write, io::{self, Stderr, Stdout}, sync::{Arc, Mutex as BlockingMutex}, time::{Duration, Instant}, }; -use tokio::sync::broadcast::error::RecvError; +use tokio::{select, sync::broadcast::error::RecvError, time}; const REPORT_INTERVAL: Duration = Duration::from_secs(1); @@ -15,7 +16,6 @@ const REPORT_INTERVAL: Duration = Duration::from_secs(1); #[derive(Clone)] pub struct ProgressReporter { all_progress: Arc>, - one_progress: Progress, bars: MultiProgress, all_bar: ProgressBar, } @@ -23,7 +23,6 @@ pub struct ProgressReporter { impl ProgressReporter { pub fn new() -> Self { let all_progress = Arc::new(BlockingMutex::new(Progress::default())); - let one_progress = Progress::default(); let bars = MultiProgress::new(); let all_bar = bars.add(ProgressBar::new(1).with_style(all_progress_style())); @@ -31,7 +30,6 @@ impl ProgressReporter { Self { all_progress, - one_progress, bars, all_bar, } @@ -53,27 +51,30 @@ impl ProgressReporter { pub async fn run(mut self, repo: &Repository) { let mut rx = repo.subscribe(); - let one_bar = self - .bars - .add(ProgressBar::new(1).with_style(one_progress_style())); - one_bar.set_prefix(actor::name()); + let one_bar = self.bars.add( + ProgressBar::new(1) + .with_style(one_progress_style()) + .with_prefix(actor::name()), + ); let _finisher = ProgressBarFinisher(&one_bar); + let mut old_one_progress = Progress::default(); + loop { let new_one_progress = repo.sync_progress().await.unwrap(); let all_progress = { let mut all_progress = self.all_progress.lock().unwrap(); - *all_progress = sub(*all_progress, self.one_progress); + *all_progress = sub(*all_progress, old_one_progress); *all_progress = add(*all_progress, new_one_progress); *all_progress }; - self.one_progress = new_one_progress; + old_one_progress = new_one_progress; - one_bar.set_length((self.one_progress.total * BLOCK_SIZE as u64).max(1)); - one_bar.set_position(self.one_progress.value * BLOCK_SIZE as u64); + one_bar.set_length((old_one_progress.total * BLOCK_SIZE as u64).max(1)); + one_bar.set_position(old_one_progress.value * BLOCK_SIZE as u64); self.all_bar .set_length((all_progress.total * BLOCK_SIZE as u64).max(1)); @@ -91,7 +92,7 @@ impl ProgressReporter { impl Drop for ProgressReporter { fn drop(&mut self) { if Arc::strong_count(&self.all_progress) <= 1 { - self.all_bar.finish_and_clear(); + let _ = ProgressBarFinisher(&self.all_bar); } } } @@ -116,23 +117,25 @@ fn is_complete(progress: &Progress) -> bool { fn all_progress_style() -> ProgressStyle { ProgressStyle::with_template( - "{prefix:5} [{elapsed_precise}] [{wide_bar:.green.bold/blue}] {percent_precise}% {bytes_per_sec:.dim}", + "{prefix:5} [{elapsed_precise}] [{wide_bar:.green.bold/blue}] {percent_precise}%", ) .unwrap() .progress_chars("#>-") } fn one_progress_style() -> ProgressStyle { - ProgressStyle::with_template("{prefix:5} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}") - .unwrap() - .progress_chars("#>-") + ProgressStyle::with_template( + "{prefix:5} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes}", + ) + .unwrap() + .progress_chars("#>-") } struct ProgressBarFinisher<'a>(&'a ProgressBar); impl Drop for ProgressBarFinisher<'_> { fn drop(&mut self) { - self.0.finish_and_clear(); + self.0.finish(); } } diff --git a/lib/tests/sync.rs b/lib/tests/sync.rs index 0b5ec9d4c..f656ded2e 100644 --- a/lib/tests/sync.rs +++ b/lib/tests/sync.rs @@ -1117,7 +1117,7 @@ fn quota_exceed() { let network = actor::create_network(Proto::Tcp).await; - let params = actor::get_repo_params_with_recorder(DEFAULT_REPO, watch_recorder); + let params = actor::get_repo_params(DEFAULT_REPO).with_recorder(watch_recorder); let secrets = actor::get_repo_secrets(DEFAULT_REPO); let repo = Repository::create( ¶ms, @@ -1194,7 +1194,7 @@ fn quota_concurrent_writes() { network.add_user_provided_peer(&actor::lookup_addr("writer-0").await); network.add_user_provided_peer(&actor::lookup_addr("writer-1").await); - let params = actor::get_repo_params_with_recorder(DEFAULT_REPO, watch_recorder); + let params = actor::get_repo_params(DEFAULT_REPO).with_recorder(watch_recorder); let secrets = actor::get_repo_secrets(DEFAULT_REPO); let repo = Repository::create( ¶ms, From 808df7f28c4c80ece7164ea68f789ed4c215bd99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Cig=C3=A1nek?= Date: Mon, 27 May 2024 13:47:08 +0200 Subject: [PATCH 07/15] Add benchtool --- Cargo.toml | 4 +- bridge/Cargo.toml | 2 +- cli/Cargo.toml | 2 +- lib/Cargo.toml | 2 +- lib/benches/bench_swarm.rs | 15 +- lib/benches/utils/summary.rs | 5 +- lib/tests/common/progress.rs | 8 +- lib/tests/common/sync_watch.rs | 1 + utils/benchtool/Cargo.toml | 20 ++ utils/benchtool/src/main.rs | 501 +++++++++++++++++++++++++++++ utils/protocol-analyzer/Cargo.toml | 2 +- utils/stress-test/Cargo.toml | 2 +- utils/swarm/Cargo.toml | 2 +- vfs/Cargo.toml | 2 +- 14 files changed, 549 insertions(+), 19 deletions(-) create mode 100644 utils/benchtool/Cargo.toml create mode 100644 utils/benchtool/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index 72c60f9fd..679072931 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ members = [ "utils/protocol-analyzer", "vfs", "utils/repogen" -] +, "utils/benchtool"] resolver = "2" [workspace.package] @@ -32,6 +32,7 @@ edition = "2021" rust-version = "1.77.1" [workspace.dependencies] +anyhow = "1.0.86" assert_matches = "1.5" async-trait = "0.1.73" btdht = { git = "https://github.com/equalitie/btdht.git", rev = "e7ddf5607b20f0b82cbc3ea6259425c00bd8d16b" } @@ -51,6 +52,7 @@ rustls = { version = "0.21.0", default-features = false } serde = { version = "1.0", features = ["derive", "rc"] } serde_bytes = "0.11.8" serde_json = "1.0.94" +tempfile = "3.2" thiserror = "1.0.49" tokio = { version = "1.34.0", default-features = false } tokio-rustls = "0.24.1" diff --git a/bridge/Cargo.toml b/bridge/Cargo.toml index e2e297c96..b110e7db8 100644 --- a/bridge/Cargo.toml +++ b/bridge/Cargo.toml @@ -45,4 +45,4 @@ paranoid-android = "0.2.1" [dev-dependencies] assert_matches = { workspace = true } rcgen = { workspace = true } -tempfile = "3.2" +tempfile = { workspace = true } diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 0cc07f20c..141c5b47c 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -49,7 +49,7 @@ backoff = "0.4.0" hex = "0.4.3" once_cell = { workspace = true } rcgen = { workspace = true, features = ["pem"] } -tempfile = "3.2" +tempfile = { workspace = true } [target.'cfg(any(target_os = "linux", target_os = "osx"))'.dev-dependencies] libc = "0.2.126" diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 2b60379c4..e95261a53 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -102,7 +102,7 @@ rmp-serde = { workspace = true } serde_json = { workspace = true } serde_test = "1.0.176" similar-asserts = "1.5.0" -tempfile = "3.2" +tempfile = { workspace = true } test-strategy = "0.2.1" tokio = { workspace = true, features = ["process", "test-util"] } diff --git a/lib/benches/bench_swarm.rs b/lib/benches/bench_swarm.rs index 7f271ca71..d2919f94d 100644 --- a/lib/benches/bench_swarm.rs +++ b/lib/benches/bench_swarm.rs @@ -143,11 +143,7 @@ fn main() -> ExitCode { drop(env); drop(progress_reporter); - let summary = summary_recorder.finalize(); - - println!(); - serde_json::to_writer_pretty(io::stdout().lock(), &summary).unwrap(); - println!(); + let summary = summary_recorder.finalize(options.label); if let Some(path) = options.output { let mut file = match OpenOptions::new().create(true).append(true).open(&path) { @@ -160,6 +156,10 @@ fn main() -> ExitCode { serde_json::to_writer(&mut file, &summary).unwrap(); file.write_all(b"\n").unwrap(); + } else { + println!(); + serde_json::to_writer_pretty(io::stdout().lock(), &summary).unwrap(); + println!(); } ExitCode::SUCCESS @@ -189,6 +189,11 @@ struct Options { #[arg(short, long, value_name = "PATH")] pub output: Option, + /// Human-readable label for this execution of the benchmark. Useful to distinguish outputs of + /// mutliple versions of this benchmark. + #[arg(short, long, default_value_t)] + pub label: String, + // The following arguments may be passed down from `cargo bench` so we need to accept them even // if we don't use them. #[arg( diff --git a/lib/benches/utils/summary.rs b/lib/benches/utils/summary.rs index 0a1bf175c..fc0a6ab80 100644 --- a/lib/benches/utils/summary.rs +++ b/lib/benches/utils/summary.rs @@ -31,11 +31,12 @@ impl SummaryRecorder { } } - pub fn finalize(mut self) -> Summary { + pub fn finalize(mut self, label: String) -> Summary { self.send.refresh(); self.recv.refresh(); Summary { + label, duration: self.start.elapsed(), send: mem::replace(&mut self.send, Histogram::new(3).unwrap()), recv: mem::replace(&mut self.recv, Histogram::new(3).unwrap()), @@ -61,6 +62,8 @@ impl ActorSummaryRecorder { #[derive(Serialize)] pub(crate) struct Summary { + #[serde(skip_serializing_if = "String::is_empty")] + pub label: String, #[serde(serialize_with = "serialize_duration")] pub duration: Duration, #[serde(serialize_with = "serialize_histogram")] diff --git a/lib/tests/common/progress.rs b/lib/tests/common/progress.rs index 193e9d1d8..cd0b8dd07 100644 --- a/lib/tests/common/progress.rs +++ b/lib/tests/common/progress.rs @@ -1,12 +1,10 @@ use super::actor; -use futures_util::StreamExt; -use indicatif::{HumanBytes, MultiProgress, ProgressBar, ProgressState, ProgressStyle}; -use ouisync::{network::Network, Progress, Repository, BLOCK_SIZE}; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; +use ouisync::{Progress, Repository, BLOCK_SIZE}; use std::{ - fmt::Write, io::{self, Stderr, Stdout}, sync::{Arc, Mutex as BlockingMutex}, - time::{Duration, Instant}, + time::Duration, }; use tokio::{select, sync::broadcast::error::RecvError, time}; diff --git a/lib/tests/common/sync_watch.rs b/lib/tests/common/sync_watch.rs index 0c99b7e00..a045f252a 100644 --- a/lib/tests/common/sync_watch.rs +++ b/lib/tests/common/sync_watch.rs @@ -57,6 +57,7 @@ impl Receiver { // debug!(progress = %progress.percent()); if progress.total > 0 && progress.value == progress.total { + // FIXME: This doesn't work for read/blind replicas let this_vv = branch.version_vector().await.unwrap(); let that_vv = self.0.borrow(); diff --git a/utils/benchtool/Cargo.toml b/utils/benchtool/Cargo.toml new file mode 100644 index 000000000..f195d27d2 --- /dev/null +++ b/utils/benchtool/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "benchtool" +description = "Utility to run and compare multiple versions of the same benchmarks" +publish = false +version.workspace = true +authors.workspace = true +repository.workspace = true +license.workspace = true +edition.workspace = true +rust-version.workspace = true + +[dependencies] +anyhow = { workspace = true } +clap = { workspace = true } +comfy-table = "7.1.1" +indicatif = "0.17.8" +rand = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tempfile = { workspace = true } \ No newline at end of file diff --git a/utils/benchtool/src/main.rs b/utils/benchtool/src/main.rs new file mode 100644 index 000000000..5a01b0d02 --- /dev/null +++ b/utils/benchtool/src/main.rs @@ -0,0 +1,501 @@ +use anyhow::{format_err, Result}; +use clap::{Args, Parser, Subcommand}; +use comfy_table::{Attribute, Cell, CellAlignment, Table}; +use indicatif::HumanBytes; +use rand::seq::SliceRandom; +use serde::{de::Error as _, Deserialize, Deserializer}; +use std::{ + env, + ffi::OsString, + fs::{self, File}, + io::{BufRead, BufReader}, + ops::{Add, Div}, + path::{Path, PathBuf}, + process::{self, Stdio}, + str, + time::Duration, +}; +use tempfile::NamedTempFile; + +const BENCH_DIR: &str = "benches"; + +fn main() -> Result<()> { + let options = Options::parse(); + + match options.command { + Command::Build(args) => build(args), + Command::List(args) => list(args), + Command::Run(args) => run(args), + Command::Clean(args) => clean(args), + } +} + +/// Build, run and compare different versions of the same benchmark. +#[derive(Parser, Debug)] +struct Options { + #[command(subcommand)] + command: Command, +} + +#[derive(Subcommand, Debug)] +enum Command { + /// Build a bench version + Build(BuildArgs), + /// List all bench binaries + List(ListArgs), + /// Run and compare benches + Run(RunArgs), + /// Remove all bench binaries + Clean(CleanArgs), +} + +#[derive(Args, Debug)] +struct BuildArgs { + #[command(flatten)] + common: CommonArgs, + + /// Package to build + #[arg(short, long, value_name = "SPEC")] + package: Option, + + /// Space or comma separated list of features to activate + #[arg(short = 'F', long)] + features: Vec, + + /// Label to append to the bench binary name. Default is the current git commit hash. + #[arg(short, long)] + label: Option, + + /// Bench target(s) to build + #[arg(value_name = "NAME")] + benches: Vec, +} + +#[derive(Args, Debug)] +struct ListArgs { + #[command(flatten)] + common: CommonArgs, + + /// Bench(es) to list the versions of. + #[arg(value_name = "NAME")] + benches: Vec, +} + +#[derive(Args, Debug)] +struct RunArgs { + #[command(flatten)] + common: CommonArgs, + + /// Run each bench version this many times and average the results. + #[arg(short, long, default_value_t = 1)] + samples: usize, + + /// Bench to run and compare. + #[arg(value_name = "NAME")] + bench: String, + + /// Args to the bench. + #[arg(trailing_var_arg = true)] + args: Vec, +} + +#[derive(Args, Debug)] +struct CleanArgs { + #[command(flatten)] + common: CommonArgs, +} + +#[derive(Args, Debug)] +struct CommonArgs { + /// Coloring: auto, always, never + #[arg(long, default_value = "auto")] + color: String, + + /// Directory for all generated artifacts + #[arg(long, value_name = "PATH")] + target_dir: Option, +} + +impl CommonArgs { + fn bench_dir(&self) -> PathBuf { + self.target_dir + .as_ref() + .cloned() + .unwrap_or_else(default_target_dir) + .join(BENCH_DIR) + } +} + +#[derive(Deserialize)] +struct BuildMessage<'a> { + #[serde(borrow)] + executable: Option<&'a Path>, +} + +#[derive(Default, Debug, Deserialize)] +struct Summary { + #[serde(default)] + label: String, + #[serde(deserialize_with = "deserialize_duration")] + duration: Duration, + send: BytesSummary, + recv: BytesSummary, +} + +impl Summary { + fn avg<'a>(iter: impl IntoIterator) -> Self { + let (sum, count) = + iter.into_iter() + .fold((Summary::default(), 0u32), |(sum, count), item| { + ( + Summary { + label: if sum.label.is_empty() { + item.label.clone() + } else { + sum.label + }, + duration: sum.duration + item.duration, + send: sum.send + item.send, + recv: sum.recv + item.recv, + }, + count + 1, + ) + }); + + sum / count + } +} + +impl Div for Summary { + type Output = Self; + + fn div(self, rhs: u32) -> Self::Output { + Self { + label: self.label, + duration: self.duration / rhs, + send: self.send / rhs, + recv: self.recv / rhs, + } + } +} + +#[derive(Default, Copy, Clone, Debug, Deserialize)] +struct BytesSummary { + min: u64, + max: u64, + mean: u64, + stdev: u64, +} + +impl Add for BytesSummary { + type Output = Self; + + fn add(self, rhs: Self) -> Self::Output { + Self { + min: self.min + rhs.min, + max: self.max + rhs.max, + mean: self.mean + rhs.mean, + stdev: self.stdev + rhs.stdev, + } + } +} + +impl Div for BytesSummary { + type Output = Self; + + fn div(self, rhs: u32) -> Self::Output { + Self { + min: self.min / rhs as u64, + max: self.max / rhs as u64, + mean: self.mean / rhs as u64, + stdev: self.stdev / rhs as u64, + } + } +} + +fn build(args: BuildArgs) -> Result<()> { + let mut command = process::Command::new("cargo"); + command + .arg("build") + .arg("--message-format") + .arg("json") + .arg("--color") + .arg(&args.common.color); + + if let Some(value) = &args.common.target_dir { + command.arg("--target-dir").arg(value); + } + + if let Some(value) = args.package { + command.arg("--package").arg(value); + } + + for value in args.features { + command.arg("--features").arg(value); + } + + for value in args.benches { + command.arg("--bench").arg(value); + } + + println!( + "Running `{} {}`", + format_command(&command), + format_command_args(&command) + ); + + command.stderr(Stdio::inherit()); + + let output = command.output()?; + + if !output.status.success() { + return Err(format_err!( + "{} returned {}", + format_command(&command), + output.status + )); + } + + let stdout = BufReader::new(&output.stdout[..]); + let dst_dir = args.common.bench_dir(); + + let label = match args.label { + Some(label) => label, + None => get_git_commit()?, + }; + + for line in stdout.lines() { + let line = line?; + let message: BuildMessage = serde_json::from_str(&line)?; + + let Some(src) = message.executable else { + continue; + }; + + let dst = make_bench_file_name(src, &label); + let dst = dst_dir.join(dst); + + println!("Built bench: {}", dst.display()); + + fs::create_dir_all(dst.parent().unwrap())?; + fs::copy(src, dst)?; + } + + Ok(()) +} + +fn list(args: ListArgs) -> Result<()> { + let dir = args.common.bench_dir(); + + for entry in fs::read_dir(dir)? { + println!("{}", entry?.path().display()); + } + + Ok(()) +} + +fn run(args: RunArgs) -> Result<()> { + let dir = args.common.bench_dir(); + let mut bench_versions = list_bench_versions(&dir, &args.bench)?; + let mut output = NamedTempFile::new()?; + let mut rng = rand::thread_rng(); + + for i in 0..args.samples { + println!("Running sample {}/{}", i + 1, args.samples); + println!(); + + bench_versions.shuffle(&mut rng); + + for bench_version in &bench_versions { + let label = extract_bench_label(bench_version); + + let mut command = process::Command::new(bench_version); + command + .arg("--label") + .arg(label) + .arg("--output") + .arg(output.path()); + + for arg in &args.args { + command.arg(arg); + } + + println!( + "πŸš€πŸš€πŸš€ Running `{} {}` πŸš€πŸš€πŸš€", + format_command(&command), + format_command_args(&command) + ); + + let status = command.status()?; + if !status.success() { + return Err(format_err!( + "{} returned {}", + format_command(&command), + status + )); + } + } + + println!(); + } + + let summaries = read_summaries(output.as_file_mut())?; + let summaries = aggregate_summaries(summaries); + + let table = build_comparison_table(summaries); + + println!("{table}"); + + Ok(()) +} + +fn list_bench_versions(dir: &Path, bench: &str) -> Result> { + fs::read_dir(dir)? + .map(|entry| { + let path = entry?.path(); + + if path + .file_name() + .unwrap() + .to_str() + .unwrap() + .starts_with(bench) + { + Ok(Some(path)) + } else { + Ok(None) + } + }) + .filter_map(Result::transpose) + .collect() +} + +fn clean(args: CleanArgs) -> Result<()> { + fs::remove_dir_all(args.common.bench_dir())?; + Ok(()) +} + +fn default_target_dir() -> PathBuf { + env::var_os("CARGO_TARGET_DIR") + .map(PathBuf::from) + .unwrap_or_else(|| PathBuf::from("target")) +} + +fn make_bench_file_name(src: &Path, label: &str) -> PathBuf { + let name = src.file_stem().unwrap(); + let ext = src.extension(); + + let name = name.to_str().unwrap(); + let name = name + .rsplit_once('-') + .map(|(prefix, _)| prefix) + .unwrap_or(name); + + let mut name = PathBuf::from(format!("{name}-{label}")); + + if let Some(ext) = ext { + name.set_extension(ext); + } + + name +} + +fn extract_bench_label(path: &Path) -> &str { + path.file_stem() + .and_then(|stem| stem.to_str()) + .and_then(|stem| stem.rsplit_once('-')) + .map(|(_, suffix)| suffix) + .unwrap_or_default() +} + +fn get_git_commit() -> Result { + Ok(str::from_utf8( + process::Command::new("git") + .arg("rev-parse") + .arg("--short") + .arg("HEAD") + .output()? + .stdout + .as_ref(), + )? + .trim() + .to_owned()) +} + +fn format_command(command: &process::Command) -> String { + command.get_program().to_str().unwrap().to_owned() +} + +fn format_command_args(command: &process::Command) -> String { + command + .get_args() + .map(|arg| arg.to_str().unwrap()) + .collect::>() + .join(" ") +} + +fn deserialize_duration<'de, D: Deserializer<'de>>(d: D) -> Result { + let secs = f64::deserialize(d)?; + Duration::try_from_secs_f64(secs).map_err(D::Error::custom) +} + +fn read_summaries(file: &mut File) -> Result> { + BufReader::new(file) + .lines() + .map(|line| Ok(serde_json::from_slice(line?.as_bytes())?)) + .collect() +} + +fn aggregate_summaries(mut summaries: Vec) -> Vec { + summaries.sort_by(|a, b| a.label.cmp(&b.label)); + summaries + .chunk_by(|a, b| a.label == b.label) + .map(Summary::avg) + .collect() +} + +fn build_comparison_table(summaries: Vec) -> Table { + let mut table = Table::new(); + + table.set_header(vec![ + Cell::new("label"), + Cell::new("duration"), + Cell::new("send min").add_attribute(Attribute::Dim), + Cell::new("send max").add_attribute(Attribute::Dim), + Cell::new("send mean").add_attribute(Attribute::Dim), + Cell::new("send stdev"), + Cell::new("recv min").add_attribute(Attribute::Dim), + Cell::new("recv max").add_attribute(Attribute::Dim), + Cell::new("recv mean").add_attribute(Attribute::Dim), + Cell::new("recv stdev"), + ]); + + for summary in summaries { + table.add_row(vec![ + Cell::new(summary.label), + Cell::new(format!("{:.2} s", summary.duration.as_secs_f64())), + Cell::new(HumanBytes(summary.send.min)) + .set_alignment(CellAlignment::Right) + .add_attribute(Attribute::Dim), + Cell::new(HumanBytes(summary.send.max)) + .set_alignment(CellAlignment::Right) + .add_attribute(Attribute::Dim), + Cell::new(HumanBytes(summary.send.mean)) + .set_alignment(CellAlignment::Right) + .add_attribute(Attribute::Dim), + Cell::new(HumanBytes(summary.send.stdev)).set_alignment(CellAlignment::Right), + Cell::new(HumanBytes(summary.recv.min)) + .set_alignment(CellAlignment::Right) + .add_attribute(Attribute::Dim), + Cell::new(HumanBytes(summary.recv.max)) + .set_alignment(CellAlignment::Right) + .add_attribute(Attribute::Dim), + Cell::new(HumanBytes(summary.recv.mean)) + .set_alignment(CellAlignment::Right) + .add_attribute(Attribute::Dim), + Cell::new(HumanBytes(summary.recv.stdev)).set_alignment(CellAlignment::Right), + ]); + } + + table +} diff --git a/utils/protocol-analyzer/Cargo.toml b/utils/protocol-analyzer/Cargo.toml index 66dbc3e26..817f6952e 100644 --- a/utils/protocol-analyzer/Cargo.toml +++ b/utils/protocol-analyzer/Cargo.toml @@ -14,7 +14,7 @@ name = "protocol-analyzer" path = "src/main.rs" [dependencies] -anyhow = "1.0.75" +anyhow = { workspace = true } clap = { workspace = true } tokio = { workspace = true, features = ["signal", "io-std", "fs", "macros", "rt-multi-thread", "io-util"] } tokio-stream = { workspace = true, features = ["sync"] } diff --git a/utils/stress-test/Cargo.toml b/utils/stress-test/Cargo.toml index 7e2c40c6a..3d5a311a4 100644 --- a/utils/stress-test/Cargo.toml +++ b/utils/stress-test/Cargo.toml @@ -17,4 +17,4 @@ path = "src/main.rs" clap = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } -tempfile = "3.2" +tempfile = { workspace = true } diff --git a/utils/swarm/Cargo.toml b/utils/swarm/Cargo.toml index 35f363bae..2ca0fd226 100644 --- a/utils/swarm/Cargo.toml +++ b/utils/swarm/Cargo.toml @@ -14,7 +14,7 @@ name = "swarm" path = "src/main.rs" [dependencies] -anyhow = "1.0.75" +anyhow = { workspace = true } clap = { workspace = true } ctrlc = { version = "3.4.1", features = ["termination"] } os_pipe = "1.1.4" diff --git a/vfs/Cargo.toml b/vfs/Cargo.toml index 82dd4b4a8..2cdc5674a 100644 --- a/vfs/Cargo.toml +++ b/vfs/Cargo.toml @@ -37,7 +37,7 @@ winapi = { version = "0.3.9", features = ["ntstatus", "winnt"] } criterion = { version = "0.4", features = ["html_reports"] } proptest = "1.0" rand = "0.8.5" -tempfile = "3.2" +tempfile = { workspace = true } test-strategy = "0.2.1" tracing-subscriber = { workspace = true, features = [ "env-filter" ] } From 99a10931af59880410c4bb977a2af9a2acab8241 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Cig=C3=A1nek?= Date: Mon, 27 May 2024 15:47:06 +0200 Subject: [PATCH 08/15] Make progress bar opt-in in the swarm benchmark --- lib/benches/bench_swarm.rs | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/lib/benches/bench_swarm.rs b/lib/benches/bench_swarm.rs index d2919f94d..c69db9355 100644 --- a/lib/benches/bench_swarm.rs +++ b/lib/benches/bench_swarm.rs @@ -19,6 +19,7 @@ use rand::{distributions::Standard, rngs::StdRng, Rng, SeedableRng}; use std::{ fmt, fs::OpenOptions, + future, io::{self, Write}, path::PathBuf, process::ExitCode, @@ -42,8 +43,10 @@ fn main() -> ExitCode { .collect(); let proto = options.protocol; - let progress_reporter = ProgressReporter::new(); - common::init_log_with_writer(progress_reporter.stdout_writer()); + let progress_reporter = options.progress.then(ProgressReporter::new); + if let Some(progress_reporter) = &progress_reporter { + common::init_log_with_writer(progress_reporter.stdout_writer()); + } let mut env = Env::new(); @@ -107,7 +110,7 @@ fn main() -> ExitCode { network.add_user_provided_peer(&addr); } - let run = async { + let run_sync = async { // Wait until fully synced if let Some(watch_tx) = watch_tx { drop(watch_rx); @@ -126,9 +129,17 @@ fn main() -> ExitCode { barrier.wait().await; }; + let run_progress_reporter = async { + if let Some(progress_reporter) = progress_reporter { + progress_reporter.run(&repo).await + } else { + future::pending().await + } + }; + select! { - _ = run => (), - _ = progress_reporter.run(&repo) => (), + _ = run_sync => (), + _ = run_progress_reporter => (), _ = progress_monitor.run(&repo) => (), _ = throughput_monitor.run(&network) => (), } @@ -194,6 +205,10 @@ struct Options { #[arg(short, long, default_value_t)] pub label: String, + /// Show progress bar. + #[arg(long)] + pub progress: bool, + // The following arguments may be passed down from `cargo bench` so we need to accept them even // if we don't use them. #[arg( From ef1415db718114403b5d97cab586288341da4d66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Cig=C3=A1nek?= Date: Mon, 27 May 2024 16:07:36 +0200 Subject: [PATCH 09/15] Simplify benchtool (combine build and run) --- utils/benchtool/src/main.rs | 317 +++++++++++++++--------------------- 1 file changed, 128 insertions(+), 189 deletions(-) diff --git a/utils/benchtool/src/main.rs b/utils/benchtool/src/main.rs index 5a01b0d02..de4a52e74 100644 --- a/utils/benchtool/src/main.rs +++ b/utils/benchtool/src/main.rs @@ -1,5 +1,5 @@ use anyhow::{format_err, Result}; -use clap::{Args, Parser, Subcommand}; +use clap::Parser; use comfy_table::{Attribute, Cell, CellAlignment, Table}; use indicatif::HumanBytes; use rand::seq::SliceRandom; @@ -22,101 +22,57 @@ const BENCH_DIR: &str = "benches"; fn main() -> Result<()> { let options = Options::parse(); - match options.command { - Command::Build(args) => build(args), - Command::List(args) => list(args), - Command::Run(args) => run(args), - Command::Clean(args) => clean(args), + build(&options)?; + + if !options.no_run { + run(&options)?; } + + Ok(()) } /// Build, run and compare different versions of the same benchmark. #[derive(Parser, Debug)] struct Options { - #[command(subcommand)] - command: Command, -} - -#[derive(Subcommand, Debug)] -enum Command { - /// Build a bench version - Build(BuildArgs), - /// List all bench binaries - List(ListArgs), - /// Run and compare benches - Run(RunArgs), - /// Remove all bench binaries - Clean(CleanArgs), -} - -#[derive(Args, Debug)] -struct BuildArgs { - #[command(flatten)] - common: CommonArgs, - /// Package to build #[arg(short, long, value_name = "SPEC")] package: Option, + /// Directory for all generated artifacts + #[arg(long, value_name = "PATH")] + target_dir: Option, + /// Space or comma separated list of features to activate #[arg(short = 'F', long)] features: Vec, - /// Label to append to the bench binary name. Default is the current git commit hash. + /// Coloring: auto, always, never + #[arg(long, default_value = "auto")] + color: String, + + /// Label to append to the bench binary name. Used to identify bench versions when comparing + /// results. Default is the current git commit hash. #[arg(short, long)] label: Option, - /// Bench target(s) to build - #[arg(value_name = "NAME")] - benches: Vec, -} - -#[derive(Args, Debug)] -struct ListArgs { - #[command(flatten)] - common: CommonArgs, - - /// Bench(es) to list the versions of. - #[arg(value_name = "NAME")] - benches: Vec, -} - -#[derive(Args, Debug)] -struct RunArgs { - #[command(flatten)] - common: CommonArgs, - /// Run each bench version this many times and average the results. - #[arg(short, long, default_value_t = 1)] + #[arg(short, long, default_value_t = 1, conflicts_with = "no_run")] samples: usize, - /// Bench to run and compare. + /// Build but don't run the bench. + #[arg(long)] + no_run: bool, + + /// Bench target to build and run. #[arg(value_name = "NAME")] bench: String, - /// Args to the bench. - #[arg(trailing_var_arg = true)] + /// Args to the bench target. + #[arg(trailing_var_arg = true, conflicts_with = "no_run")] args: Vec, } -#[derive(Args, Debug)] -struct CleanArgs { - #[command(flatten)] - common: CommonArgs, -} - -#[derive(Args, Debug)] -struct CommonArgs { - /// Coloring: auto, always, never - #[arg(long, default_value = "auto")] - color: String, - - /// Directory for all generated artifacts - #[arg(long, value_name = "PATH")] - target_dir: Option, -} - -impl CommonArgs { +impl Options { fn bench_dir(&self) -> PathBuf { self.target_dir .as_ref() @@ -126,117 +82,28 @@ impl CommonArgs { } } -#[derive(Deserialize)] -struct BuildMessage<'a> { - #[serde(borrow)] - executable: Option<&'a Path>, -} - -#[derive(Default, Debug, Deserialize)] -struct Summary { - #[serde(default)] - label: String, - #[serde(deserialize_with = "deserialize_duration")] - duration: Duration, - send: BytesSummary, - recv: BytesSummary, -} - -impl Summary { - fn avg<'a>(iter: impl IntoIterator) -> Self { - let (sum, count) = - iter.into_iter() - .fold((Summary::default(), 0u32), |(sum, count), item| { - ( - Summary { - label: if sum.label.is_empty() { - item.label.clone() - } else { - sum.label - }, - duration: sum.duration + item.duration, - send: sum.send + item.send, - recv: sum.recv + item.recv, - }, - count + 1, - ) - }); - - sum / count - } -} - -impl Div for Summary { - type Output = Self; - - fn div(self, rhs: u32) -> Self::Output { - Self { - label: self.label, - duration: self.duration / rhs, - send: self.send / rhs, - recv: self.recv / rhs, - } - } -} - -#[derive(Default, Copy, Clone, Debug, Deserialize)] -struct BytesSummary { - min: u64, - max: u64, - mean: u64, - stdev: u64, -} - -impl Add for BytesSummary { - type Output = Self; - - fn add(self, rhs: Self) -> Self::Output { - Self { - min: self.min + rhs.min, - max: self.max + rhs.max, - mean: self.mean + rhs.mean, - stdev: self.stdev + rhs.stdev, - } - } -} - -impl Div for BytesSummary { - type Output = Self; - - fn div(self, rhs: u32) -> Self::Output { - Self { - min: self.min / rhs as u64, - max: self.max / rhs as u64, - mean: self.mean / rhs as u64, - stdev: self.stdev / rhs as u64, - } - } -} - -fn build(args: BuildArgs) -> Result<()> { +fn build(options: &Options) -> Result<()> { let mut command = process::Command::new("cargo"); command .arg("build") .arg("--message-format") .arg("json") .arg("--color") - .arg(&args.common.color); + .arg(&options.color); - if let Some(value) = &args.common.target_dir { + if let Some(value) = &options.target_dir { command.arg("--target-dir").arg(value); } - if let Some(value) = args.package { + if let Some(value) = &options.package { command.arg("--package").arg(value); } - for value in args.features { + for value in &options.features { command.arg("--features").arg(value); } - for value in args.benches { - command.arg("--bench").arg(value); - } + command.arg("--bench").arg(&options.bench); println!( "Running `{} {}`", @@ -257,10 +124,10 @@ fn build(args: BuildArgs) -> Result<()> { } let stdout = BufReader::new(&output.stdout[..]); - let dst_dir = args.common.bench_dir(); + let dst_dir = options.bench_dir(); - let label = match args.label { - Some(label) => label, + let label = match &options.label { + Some(label) => label.to_owned(), None => get_git_commit()?, }; @@ -284,24 +151,14 @@ fn build(args: BuildArgs) -> Result<()> { Ok(()) } -fn list(args: ListArgs) -> Result<()> { - let dir = args.common.bench_dir(); - - for entry in fs::read_dir(dir)? { - println!("{}", entry?.path().display()); - } - - Ok(()) -} - -fn run(args: RunArgs) -> Result<()> { - let dir = args.common.bench_dir(); - let mut bench_versions = list_bench_versions(&dir, &args.bench)?; +fn run(options: &Options) -> Result<()> { + let dir = options.bench_dir(); + let mut bench_versions = list_bench_versions(&dir, &options.bench)?; let mut output = NamedTempFile::new()?; let mut rng = rand::thread_rng(); - for i in 0..args.samples { - println!("Running sample {}/{}", i + 1, args.samples); + for i in 0..options.samples { + println!("Running sample {}/{}", i + 1, options.samples); println!(); bench_versions.shuffle(&mut rng); @@ -316,7 +173,7 @@ fn run(args: RunArgs) -> Result<()> { .arg("--output") .arg(output.path()); - for arg in &args.args { + for arg in &options.args { command.arg(arg); } @@ -370,11 +227,6 @@ fn list_bench_versions(dir: &Path, bench: &str) -> Result> { .collect() } -fn clean(args: CleanArgs) -> Result<()> { - fs::remove_dir_all(args.common.bench_dir())?; - Ok(()) -} - fn default_target_dir() -> PathBuf { env::var_os("CARGO_TARGET_DIR") .map(PathBuf::from) @@ -499,3 +351,90 @@ fn build_comparison_table(summaries: Vec) -> Table { table } + +#[derive(Deserialize)] +struct BuildMessage<'a> { + #[serde(borrow)] + executable: Option<&'a Path>, +} + +#[derive(Default, Debug, Deserialize)] +struct Summary { + #[serde(default)] + label: String, + #[serde(deserialize_with = "deserialize_duration")] + duration: Duration, + send: BytesSummary, + recv: BytesSummary, +} + +impl Summary { + fn avg<'a>(iter: impl IntoIterator) -> Self { + let (sum, count) = + iter.into_iter() + .fold((Summary::default(), 0u32), |(sum, count), item| { + ( + Summary { + label: if sum.label.is_empty() { + item.label.clone() + } else { + sum.label + }, + duration: sum.duration + item.duration, + send: sum.send + item.send, + recv: sum.recv + item.recv, + }, + count + 1, + ) + }); + + sum / count + } +} + +impl Div for Summary { + type Output = Self; + + fn div(self, rhs: u32) -> Self::Output { + Self { + label: self.label, + duration: self.duration / rhs, + send: self.send / rhs, + recv: self.recv / rhs, + } + } +} + +#[derive(Default, Copy, Clone, Debug, Deserialize)] +struct BytesSummary { + min: u64, + max: u64, + mean: u64, + stdev: u64, +} + +impl Add for BytesSummary { + type Output = Self; + + fn add(self, rhs: Self) -> Self::Output { + Self { + min: self.min + rhs.min, + max: self.max + rhs.max, + mean: self.mean + rhs.mean, + stdev: self.stdev + rhs.stdev, + } + } +} + +impl Div for BytesSummary { + type Output = Self; + + fn div(self, rhs: u32) -> Self::Output { + Self { + min: self.min / rhs as u64, + max: self.max / rhs as u64, + mean: self.mean / rhs as u64, + stdev: self.stdev / rhs as u64, + } + } +} From 0a2801a99bf19d0b1ffdb3a9aaf8770d28f15e10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Cig=C3=A1nek?= Date: Tue, 28 May 2024 08:18:13 +0200 Subject: [PATCH 10/15] Fix premature completion in sync_watch --- lib/benches/bench_swarm.rs | 6 +++++- lib/tests/common/sync_watch.rs | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/benches/bench_swarm.rs b/lib/benches/bench_swarm.rs index c69db9355..a8fbd3df8 100644 --- a/lib/benches/bench_swarm.rs +++ b/lib/benches/bench_swarm.rs @@ -121,7 +121,11 @@ fn main() -> ExitCode { // Check the file content matches the original file. { - let mut file = repo.open_file(file_name).await.unwrap(); + let mut file = repo + .open_file(file_name) + .await + .inspect_err(|error| error!(?error)) + .unwrap(); check_random_file(&mut file, file_seed, file_size).await; } diff --git a/lib/tests/common/sync_watch.rs b/lib/tests/common/sync_watch.rs index a045f252a..1db4a731d 100644 --- a/lib/tests/common/sync_watch.rs +++ b/lib/tests/common/sync_watch.rs @@ -63,7 +63,7 @@ impl Receiver { debug!(?this_vv, that_vv = ?*that_vv); - if this_vv == *that_vv { + if !that_vv.is_empty() && this_vv == *that_vv { break; } } From 8b20e04379716776b7b5f1c7655e3b347c9c739b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Cig=C3=A1nek?= Date: Tue, 28 May 2024 08:52:40 +0200 Subject: [PATCH 11/15] benchtool: Use branch name as default label --- utils/benchtool/src/main.rs | 42 +++++++++++++++++++++++++++++++++---- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/utils/benchtool/src/main.rs b/utils/benchtool/src/main.rs index de4a52e74..20e2c4b30 100644 --- a/utils/benchtool/src/main.rs +++ b/utils/benchtool/src/main.rs @@ -32,7 +32,17 @@ fn main() -> Result<()> { } /// Build, run and compare different versions of the same benchmark. +/// +/// Typical usage is to build a bench in the master branch, then switch to a branch that contains +/// potential perf improvements, build the same bench there, run both versions and compare the +/// results: +/// +/// > git checkout master +/// > cargo run -p benchtool -- --no-run +/// > git checkout perf-improvements +/// > cargo run -p benchtool -- --samples 10 #[derive(Parser, Debug)] +#[command(verbatim_doc_comment)] struct Options { /// Package to build #[arg(short, long, value_name = "SPEC")] @@ -51,7 +61,7 @@ struct Options { color: String, /// Label to append to the bench binary name. Used to identify bench versions when comparing - /// results. Default is the current git commit hash. + /// results. Default is the current git branch name. #[arg(short, long)] label: Option, @@ -128,7 +138,7 @@ fn build(options: &Options) -> Result<()> { let label = match &options.label { Some(label) => label.to_owned(), - None => get_git_commit()?, + None => get_default_label()?, }; for line in stdout.lines() { @@ -243,7 +253,7 @@ fn make_bench_file_name(src: &Path, label: &str) -> PathBuf { .map(|(prefix, _)| prefix) .unwrap_or(name); - let mut name = PathBuf::from(format!("{name}-{label}")); + let mut name = PathBuf::from(format!("{name}@{label}")); if let Some(ext) = ext { name.set_extension(ext); @@ -255,7 +265,7 @@ fn make_bench_file_name(src: &Path, label: &str) -> PathBuf { fn extract_bench_label(path: &Path) -> &str { path.file_stem() .and_then(|stem| stem.to_str()) - .and_then(|stem| stem.rsplit_once('-')) + .and_then(|stem| stem.rsplit_once('@')) .map(|(_, suffix)| suffix) .unwrap_or_default() } @@ -274,6 +284,30 @@ fn get_git_commit() -> Result { .to_owned()) } +fn get_git_branch() -> Result { + Ok(str::from_utf8( + process::Command::new("git") + .arg("rev-parse") + .arg("--abbrev-ref") + .arg("HEAD") + .output()? + .stdout + .as_ref(), + )? + .trim() + .to_owned()) +} + +fn get_default_label() -> Result { + let branch = get_git_branch()?; + + if branch != "HEAD" { + Ok(branch) + } else { + get_git_commit() + } +} + fn format_command(command: &process::Command) -> String { command.get_program().to_str().unwrap().to_owned() } From fc55e0fd246110643132465212bc9d7605defb5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Cig=C3=A1nek?= Date: Tue, 28 May 2024 09:20:36 +0200 Subject: [PATCH 12/15] Remove the bench_ prefix from benches --- lib/Cargo.toml | 6 +++--- lib/benches/{bench_lib.rs => basic.rs} | 6 +++--- lib/benches/{bench_large_file.rs => large_file.rs} | 0 lib/benches/{bench_swarm.rs => swarm.rs} | 0 4 files changed, 6 insertions(+), 6 deletions(-) rename lib/benches/{bench_lib.rs => basic.rs} (96%) rename lib/benches/{bench_large_file.rs => large_file.rs} (100%) rename lib/benches/{bench_swarm.rs => swarm.rs} (100%) diff --git a/lib/Cargo.toml b/lib/Cargo.toml index e95261a53..0f49ba8d2 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -10,15 +10,15 @@ rust-version.workspace = true version.workspace = true [[bench]] -name = "bench_lib" +name = "basic" harness = false [[bench]] -name = "bench_large_file" +name = "large_file" harness = false [[bench]] -name = "bench_swarm" +name = "swarm" harness = false [dependencies] diff --git a/lib/benches/bench_lib.rs b/lib/benches/basic.rs similarity index 96% rename from lib/benches/bench_lib.rs rename to lib/benches/basic.rs index 89bedddaf..1998c0d7c 100644 --- a/lib/benches/bench_lib.rs +++ b/lib/benches/basic.rs @@ -14,7 +14,7 @@ criterion_main!(default); fn write_file(c: &mut Criterion) { let runtime = Runtime::new().unwrap(); - let mut group = c.benchmark_group("lib/write_file"); + let mut group = c.benchmark_group("write_file"); group.sample_size(10); let buffer_size = 4096; @@ -56,7 +56,7 @@ fn write_file(c: &mut Criterion) { fn read_file(c: &mut Criterion) { let runtime = Runtime::new().unwrap(); - let mut group = c.benchmark_group("lib/read_file"); + let mut group = c.benchmark_group("read_file"); group.sample_size(10); let buffer_size = 4096; @@ -109,7 +109,7 @@ fn read_file(c: &mut Criterion) { fn sync(c: &mut Criterion) { let runtime = Runtime::new().unwrap(); - let mut group = c.benchmark_group("lib/sync"); + let mut group = c.benchmark_group("sync"); group.sample_size(10); for m in [1, 8] { diff --git a/lib/benches/bench_large_file.rs b/lib/benches/large_file.rs similarity index 100% rename from lib/benches/bench_large_file.rs rename to lib/benches/large_file.rs diff --git a/lib/benches/bench_swarm.rs b/lib/benches/swarm.rs similarity index 100% rename from lib/benches/bench_swarm.rs rename to lib/benches/swarm.rs From 462e2843e6ed99c942f714154e745550fd112ad4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Cig=C3=A1nek?= Date: Tue, 28 May 2024 09:30:47 +0200 Subject: [PATCH 13/15] Bump to v0.8.3 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 679072931..c2cff18f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ members = [ resolver = "2" [workspace.package] -version = "0.8.2" +version = "0.8.3" authors = ["Adam CigΓ‘nek ", "Peter Jankuliak "] repository = "https://github.com/equalitie/ouisync" license = "MPL-2.0" From 0cb77380943f8492e9e19851db41b20c8f5cab7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Cig=C3=A1nek?= Date: Tue, 28 May 2024 12:03:41 +0200 Subject: [PATCH 14/15] Remove indicatif progress bars from the swarm bench They don't work when the output is piped, over ssh, and so on. Replaced with simply printing the progress to the stdout every second. --- lib/Cargo.toml | 1 - lib/benches/swarm.rs | 9 +- lib/tests/common/macros.rs | 6 ++ lib/tests/common/mod.rs | 8 ++ lib/tests/common/progress.rs | 203 +++++++++++++---------------------- net/Cargo.toml | 4 +- 6 files changed, 96 insertions(+), 135 deletions(-) diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 0f49ba8d2..203990581 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -94,7 +94,6 @@ assert_matches = { workspace = true } clap = { workspace = true } criterion = { version = "0.4", features = ["html_reports"] } hdrhistogram = { version = "7.5.4", default-features = false, features = ["sync"] } -indicatif = "0.17.8" metrics_ext = { path = "../metrics_ext" } ouisync-tracing-fmt = { path = "../tracing_fmt" } proptest = "1.0" diff --git a/lib/benches/swarm.rs b/lib/benches/swarm.rs index a8fbd3df8..8bd2d84f5 100644 --- a/lib/benches/swarm.rs +++ b/lib/benches/swarm.rs @@ -43,11 +43,6 @@ fn main() -> ExitCode { .collect(); let proto = options.protocol; - let progress_reporter = options.progress.then(ProgressReporter::new); - if let Some(progress_reporter) = &progress_reporter { - common::init_log_with_writer(progress_reporter.stdout_writer()); - } - let mut env = Env::new(); // Wait until everyone is fully synced. @@ -59,6 +54,10 @@ fn main() -> ExitCode { let barrier = Arc::new(Barrier::new(actors.len())); let summary_recorder = SummaryRecorder::new(); + let progress_reporter = { + let _enter = env.runtime().enter(); + options.progress.then(ProgressReporter::new) + }; let file_name = "file.dat"; let file_seed = 0; diff --git a/lib/tests/common/macros.rs b/lib/tests/common/macros.rs index 5611e3332..cf0bdaca0 100644 --- a/lib/tests/common/macros.rs +++ b/lib/tests/common/macros.rs @@ -47,3 +47,9 @@ macro_rules! trace { event!(tracing::Level::TRACE, $($tokens)*) } } + +macro_rules! event_enabled { + ($($tokens:tt)*) => { + tracing::event_enabled!(target: "ouisync-test", $($tokens)*) + } +} diff --git a/lib/tests/common/mod.rs b/lib/tests/common/mod.rs index 0cd81dfed..a32d9e4ff 100644 --- a/lib/tests/common/mod.rs +++ b/lib/tests/common/mod.rs @@ -110,6 +110,10 @@ pub(crate) mod env { self.tasks.push(self.runtime.spawn(f)); } + + pub fn runtime(&self) -> Handle { + self.runtime.handle().clone() + } } impl Drop for Env { @@ -160,6 +164,10 @@ pub(crate) mod env { self.runner.client(name, f); } + + pub fn runtime(&self) -> Handle { + unimplemented!("not supported in simulation") + } } impl Drop for Env<'_> { diff --git a/lib/tests/common/progress.rs b/lib/tests/common/progress.rs index cd0b8dd07..1c4701262 100644 --- a/lib/tests/common/progress.rs +++ b/lib/tests/common/progress.rs @@ -1,83 +1,37 @@ -use super::actor; -use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; -use ouisync::{Progress, Repository, BLOCK_SIZE}; +use ouisync::{Progress, Repository}; use std::{ - io::{self, Stderr, Stdout}, sync::{Arc, Mutex as BlockingMutex}, - time::Duration, + time::{Duration, Instant}, +}; +use tokio::{ + select, + sync::{broadcast::error::RecvError, mpsc}, + task, time, }; -use tokio::{select, sync::broadcast::error::RecvError, time}; - -const REPORT_INTERVAL: Duration = Duration::from_secs(1); /// Reports total sync progress of a group of actors. #[derive(Clone)] pub struct ProgressReporter { - all_progress: Arc>, - bars: MultiProgress, - all_bar: ProgressBar, + tx: mpsc::Sender, } impl ProgressReporter { pub fn new() -> Self { - let all_progress = Arc::new(BlockingMutex::new(Progress::default())); - let bars = MultiProgress::new(); - - let all_bar = bars.add(ProgressBar::new(1).with_style(all_progress_style())); - all_bar.set_prefix("total"); - - Self { - all_progress, - bars, - all_bar, - } + let (tx, rx) = mpsc::channel(1024); + task::spawn(handle(rx)); + Self { tx } } - pub fn stdout_writer(&self) -> MakeWriter Stdout> { - MakeWriter { - bars: self.bars.clone(), - inner: io::stdout, - } - } + pub async fn run(self, repo: &Repository) { + self.tx.send(Command::Join).await.unwrap(); - pub fn stderr_writer(&self) -> MakeWriter Stderr> { - MakeWriter { - bars: self.bars.clone(), - inner: io::stderr, - } - } - - pub async fn run(mut self, repo: &Repository) { let mut rx = repo.subscribe(); - - let one_bar = self.bars.add( - ProgressBar::new(1) - .with_style(one_progress_style()) - .with_prefix(actor::name()), - ); - let _finisher = ProgressBarFinisher(&one_bar); - - let mut old_one_progress = Progress::default(); + let mut old = Progress::default(); loop { - let new_one_progress = repo.sync_progress().await.unwrap(); - - let all_progress = { - let mut all_progress = self.all_progress.lock().unwrap(); - *all_progress = sub(*all_progress, old_one_progress); - *all_progress = add(*all_progress, new_one_progress); - *all_progress - }; - - old_one_progress = new_one_progress; - - one_bar.set_length((old_one_progress.total * BLOCK_SIZE as u64).max(1)); - one_bar.set_position(old_one_progress.value * BLOCK_SIZE as u64); - - self.all_bar - .set_length((all_progress.total * BLOCK_SIZE as u64).max(1)); - self.all_bar - .set_position((all_progress.value * BLOCK_SIZE as u64).max(1)); + let new = repo.sync_progress().await.unwrap(); + self.tx.send(Command::Record { new, old }).await.unwrap(); + old = new; match rx.recv().await { Ok(_) | Err(RecvError::Lagged(_)) => continue, @@ -87,10 +41,46 @@ impl ProgressReporter { } } -impl Drop for ProgressReporter { - fn drop(&mut self) { - if Arc::strong_count(&self.all_progress) <= 1 { - let _ = ProgressBarFinisher(&self.all_bar); +async fn handle(mut rx: mpsc::Receiver) { + let mut progress = Progress::default(); + let mut num_actors = 0; + let mut num_synced = 0; + let mut change = false; + let mut wakeup = Instant::now(); + + loop { + let command = select! { + Some(command) = rx.recv() => command, + _ = time::sleep_until(wakeup.into()) => Command::Report, + else => break, + }; + + match command { + Command::Join => { + num_actors += 1; + } + Command::Record { old, new } => { + progress = sub(progress, old); + progress = add(progress, new); + + if is_complete(&old) { + num_synced -= 1; + } + + if is_complete(&new) { + num_synced += 1; + } + + change = true; + } + Command::Report => { + if change { + report(progress, num_synced, num_actors) + } + + change = false; + wakeup = Instant::now() + Duration::from_secs(1); + } } } } @@ -113,67 +103,26 @@ fn is_complete(progress: &Progress) -> bool { progress.total > 0 && progress.value >= progress.total } -fn all_progress_style() -> ProgressStyle { - ProgressStyle::with_template( - "{prefix:5} [{elapsed_precise}] [{wide_bar:.green.bold/blue}] {percent_precise}%", - ) - .unwrap() - .progress_chars("#>-") -} - -fn one_progress_style() -> ProgressStyle { - ProgressStyle::with_template( - "{prefix:5} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes}", - ) - .unwrap() - .progress_chars("#>-") -} - -struct ProgressBarFinisher<'a>(&'a ProgressBar); - -impl Drop for ProgressBarFinisher<'_> { - fn drop(&mut self) { - self.0.finish(); +fn report(progress: Progress, num_synced: usize, num_actors: usize) { + if event_enabled!(tracing::Level::INFO) { + info!( + "progress: {:.2} ({}/{})", + progress.percent(), + num_synced, + num_actors + ) + } else { + println!( + "progress: {:.2} ({}/{})", + progress.percent(), + num_synced, + num_actors + ) } } -pub struct Writer { - bars: MultiProgress, - inner: W, -} - -impl io::Write for Writer -where - W: io::Write, -{ - fn write(&mut self, buf: &[u8]) -> io::Result { - self.bars.suspend(|| self.inner.write(buf)) - } - - fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result { - self.bars.suspend(|| self.inner.write_vectored(bufs)) - } - - fn flush(&mut self) -> io::Result<()> { - self.bars.suspend(|| self.inner.flush()) - } -} - -pub struct MakeWriter { - bars: MultiProgress, - inner: T, -} - -impl<'w, T> tracing_subscriber::fmt::MakeWriter<'w> for MakeWriter -where - T: tracing_subscriber::fmt::MakeWriter<'w>, -{ - type Writer = Writer; - - fn make_writer(&'w self) -> Self::Writer { - Writer { - bars: self.bars.clone(), - inner: self.inner.make_writer(), - } - } +enum Command { + Join, + Record { old: Progress, new: Progress }, + Report, } diff --git a/net/Cargo.toml b/net/Cargo.toml index b85674373..bba3122ee 100644 --- a/net/Cargo.toml +++ b/net/Cargo.toml @@ -19,12 +19,12 @@ rustls = { workspace = true, features = ["quic", "dangerous_configuration"] } socket2 = "0.4.4" # To be able to setsockopts before a socket is bound stun_codec = "0.3.4" thiserror = "1.0.31" -tokio = { workspace = true, features = ["io-util", "net", "rt-multi-thread", "sync"] } +tokio = { workspace = true, features = ["io-util", "macros", "net", "rt-multi-thread", "sync"] } turmoil = { workspace = true, optional = true } [dev-dependencies] clap = { workspace = true } -tokio = { workspace = true, features = ["macros"] } +tokio = { workspace = true } [features] simulation = ["turmoil"] From c5b405337ab723ce296414b5ea892d5e3942998f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Cig=C3=A1nek?= Date: Tue, 28 May 2024 14:22:18 +0200 Subject: [PATCH 15/15] benchtool: Add --no-build --- utils/benchtool/src/main.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/utils/benchtool/src/main.rs b/utils/benchtool/src/main.rs index 20e2c4b30..02e4eb61e 100644 --- a/utils/benchtool/src/main.rs +++ b/utils/benchtool/src/main.rs @@ -22,7 +22,9 @@ const BENCH_DIR: &str = "benches"; fn main() -> Result<()> { let options = Options::parse(); - build(&options)?; + if !options.no_build { + build(&options)?; + } if !options.no_run { run(&options)?; @@ -66,19 +68,23 @@ struct Options { label: Option, /// Run each bench version this many times and average the results. - #[arg(short, long, default_value_t = 1, conflicts_with = "no_run")] + #[arg(short, long, default_value_t = 1)] samples: usize, - /// Build but don't run the bench. + /// Build but don't run the benches. #[arg(long)] no_run: bool, + /// Only run the existing benches, don't build. + #[arg(long)] + no_build: bool, + /// Bench target to build and run. #[arg(value_name = "NAME")] bench: String, /// Args to the bench target. - #[arg(trailing_var_arg = true, conflicts_with = "no_run")] + #[arg(trailing_var_arg = true)] args: Vec, }