Skip to content

Commit

Permalink
feat: iroh-perf (#2186)
Browse files Browse the repository at this point in the history
## Description

In progress thing for what should later be `iroh-perf`. For now you can
run with `cargo run --release -- iroh` and `cargo run --release --
quinn` to do a simple comparison benchmark.

More knobs are available for tuning with `--help`.
There's a much more stark difference on macOS compared to running on
Linux.

Next steps bumping to test against `quinn 0.11`

Sample output:
```
iroh (macOS)

Client 0 stats:
Connect time: 13.788583ms
Overall download stats:

Transferred 1073741824 bytes on 1 streams in 5.47s (187.27 MiB/s)

Time to first byte (TTFB): 1ms

Total chunks: 130319

Average chunk time: 41.936ms

Average chunk size: 8.04KiB

Stream download metrics:

      │  Throughput   │ Duration 
──────┼───────────────┼──────────
 AVG  │  187.31 MiB/s │     5.47s
 P0   │  187.25 MiB/s │     5.46s
 P10  │  187.37 MiB/s │     5.47s
 P50  │  187.37 MiB/s │     5.47s
 P90  │  187.37 MiB/s │     5.47s
 P100 │  187.37 MiB/s │     5.47s
 ```
 
```
quinn (macOS)

Client 0 stats:
Connect time: 1.81875ms
Overall download stats:

Transferred 1073741824 bytes on 1 streams in 2.85s (358.92 MiB/s)

Time to first byte (TTFB): 61.168ms

Total chunks: 47412

Average chunk time: 60.176ms

Average chunk size: 22.12KiB

Stream download metrics:

      │  Throughput   │ Duration 
──────┼───────────────┼──────────
 AVG  │  358.88 MiB/s │     2.85s
 P0   │  358.75 MiB/s │     2.85s
 P10  │  359.00 MiB/s │     2.85s
 P50  │  359.00 MiB/s │     2.85s
 P90  │  359.00 MiB/s │     2.85s
 P100 │  359.00 MiB/s │     2.85s
 ```

```
iroh (linux)

Client 0 stats:
Connect time: 3.83554ms
Overall download stats:

Transferred 1073741824 bytes on 1 streams in 1.16s (884.52 MiB/s)

Time to first byte (TTFB): 2.014ms

Total chunks: 38046

Average chunk time: 30.408ms

Average chunk size: 27.55KiB

Stream download metrics:

      │  Throughput   │ Duration 
──────┼───────────────┼──────────
 AVG  │  884.75 MiB/s │     1.16s
 P0   │  884.50 MiB/s │     1.16s
 P10  │  885.00 MiB/s │     1.16s
 P50  │  885.00 MiB/s │     1.16s
 P90  │  885.00 MiB/s │     1.16s
 P100 │  885.00 MiB/s │     1.16s
 ```
 
```
quinn (linux)

Client 0 stats:
Connect time: 1.22085ms
Overall download stats:

Transferred 1073741824 bytes on 1 streams in 994.57ms (1029.59 MiB/s)

Time to first byte (TTFB): 1.483ms

Total chunks: 32809

Average chunk time: 30.296ms

Average chunk size: 31.96KiB

Stream download metrics:

      │  Throughput   │ Duration 
──────┼───────────────┼──────────
 AVG  │ 1030.50 MiB/s │  993.00ms
 P0   │ 1030.00 MiB/s │  993.00ms
 P10  │ 1031.00 MiB/s │  993.00ms
 P50  │ 1031.00 MiB/s │  993.00ms
 P90  │ 1031.00 MiB/s │  993.00ms
 P100 │ 1031.00 MiB/s │  993.00ms
 ```

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [ ] Self-review.
- [ ] Documentation updates if relevant.
- [ ] Tests if relevant.
  • Loading branch information
Arqu authored May 22, 2024
1 parent e41d1d9 commit 98d45f3
Show file tree
Hide file tree
Showing 10 changed files with 861 additions and 292 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ jobs:
# uses: obi1kenobi/cargo-semver-checks-action@v2
uses: n0-computer/cargo-semver-checks-action@feat-baseline
with:
package: iroh, iroh-base, iroh-blobs, iroh-cli, iroh-dns-server, iroh-gossip, iroh-metrics, iroh-net, iroh-docs
package: iroh, iroh-base, iroh-blobs, iroh-cli, iroh-dns-server, iroh-gossip, iroh-metrics, iroh-net, iroh-net-bench, iroh-docs
baseline-rev: ${{ env.HEAD_COMMIT_SHA }}
use-cache: false

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ env:
RUSTFLAGS: -Dwarnings
RUSTDOCFLAGS: -Dwarnings
SCCACHE_CACHE_SIZE: "50G"
CRATES_LIST: "iroh,iroh-blobs,iroh-gossip,iroh-metrics,iroh-net,iroh-docs,iroh-test,iroh-cli,iroh-dns-server"
CRATES_LIST: "iroh,iroh-blobs,iroh-gossip,iroh-metrics,iroh-net,iroh-net-bench,iroh-docs,iroh-test,iroh-cli,iroh-dns-server"

jobs:
build_and_test_nix:
Expand Down
52 changes: 52 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions iroh-net/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ anyhow = "1.0.22"
bytes = "1"
hdrhistogram = { version = "7.2", default-features = false }
iroh-net = { path = ".." }
quinn = "0.10"
rcgen = "0.11.1"
rustls = { version = "0.21.0", default-features = false, features = ["quic"] }
clap = { version = "4", features = ["derive"] }
tokio = { version = "1.0.1", features = ["rt", "sync"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3.0", default-features = false, features = ["env-filter", "fmt", "ansi", "time", "local-time"] }
socket2 = "0.5"
218 changes: 61 additions & 157 deletions iroh-net/bench/src/bin/bulk.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,42 @@
use std::{
sync::{Arc, Mutex},
time::Instant,
};

use anyhow::{Context, Result};
use anyhow::Result;
use clap::Parser;
use iroh_net::{
endpoint::{self, Connection},
Endpoint, NodeAddr,
};
use tokio::sync::Semaphore;
use tracing::{info, trace};

use iroh_net_bench::{
configure_tracing_subscriber, connect_client, drain_stream, rt, send_data_on_stream,
server_endpoint,
stats::{Stats, TransferResult},
Opt,
};

use iroh_net_bench::{configure_tracing_subscriber, iroh, quinn, rt, s2n, Commands, Opt};

fn main() {
let opt = Opt::parse();
let cmd = Commands::parse();
configure_tracing_subscriber();

match cmd {
Commands::Iroh(opt) => {
if let Err(e) = run_iroh(opt) {
eprintln!("failed: {e:#}");
}
}
Commands::Quinn(opt) => {
if let Err(e) = run_quinn(opt) {
eprintln!("failed: {e:#}");
}
}
Commands::S2n(opt) => {
if let Err(e) = run_s2n(opt) {
eprintln!("failed: {e:#}");
}
}
}
}

pub fn run_iroh(opt: Opt) -> Result<()> {
let server_span = tracing::error_span!("server");
let runtime = rt();
let (server_addr, endpoint) = {
let _guard = server_span.enter();
server_endpoint(&runtime, &opt)
iroh::server_endpoint(&runtime, &opt)
};

let server_thread = std::thread::spawn(move || {
let _guard = server_span.entered();
if let Err(e) = runtime.block_on(server(endpoint, opt)) {
if let Err(e) = runtime.block_on(iroh::server(endpoint, opt)) {
eprintln!("server failed: {e:#}");
}
});
Expand All @@ -43,7 +47,7 @@ fn main() {
handles.push(std::thread::spawn(move || {
let _guard = tracing::error_span!("client", id).entered();
let runtime = rt();
match runtime.block_on(client(server_addr, opt)) {
match runtime.block_on(iroh::client(server_addr, opt)) {
Ok(stats) => Ok(stats),
Err(e) => {
eprintln!("client failed: {e:#}");
Expand All @@ -62,153 +66,53 @@ fn main() {
}

server_thread.join().expect("server thread");
}

async fn server(endpoint: Endpoint, opt: Opt) -> Result<()> {
let mut server_tasks = Vec::new();

// Handle only the expected amount of clients
for _ in 0..opt.clients {
let handshake = endpoint.accept().await.unwrap();
let connection = handshake.await.context("handshake failed")?;

server_tasks.push(tokio::spawn(async move {
loop {
let (mut send_stream, mut recv_stream) = match connection.accept_bi().await {
Err(endpoint::ConnectionError::ApplicationClosed(_)) => break,
Err(e) => {
eprintln!("accepting stream failed: {e:?}");
break;
}
Ok(stream) => stream,
};
trace!("stream established");

tokio::spawn(async move {
drain_stream(&mut recv_stream, opt.read_unordered).await?;
send_data_on_stream(&mut send_stream, opt.download_size).await?;
Ok::<_, anyhow::Error>(())
});
}

if opt.stats {
println!("\nServer connection stats:\n{:#?}", connection.stats());
}
}));
}

// Await all the tasks. We have to do this to prevent the runtime getting dropped
// and all server tasks to be cancelled
for handle in server_tasks {
if let Err(e) = handle.await {
eprintln!("Server task error: {e:?}");
};
}

Ok(())
}

async fn client(server_addr: NodeAddr, opt: Opt) -> Result<ClientStats> {
let (endpoint, connection) = connect_client(server_addr, opt).await?;

let start = Instant::now();

let connection = Arc::new(connection);

let mut stats = ClientStats::default();
let mut first_error = None;

let sem = Arc::new(Semaphore::new(opt.max_streams));
let results = Arc::new(Mutex::new(Vec::new()));
for _ in 0..opt.streams {
let permit = sem.clone().acquire_owned().await.unwrap();
let results = results.clone();
let connection = connection.clone();
tokio::spawn(async move {
let result =
handle_client_stream(connection, opt.upload_size, opt.read_unordered).await;
info!("stream finished: {:?}", result);
results.lock().unwrap().push(result);
drop(permit);
});
}
pub fn run_quinn(opt: Opt) -> Result<()> {
let server_span = tracing::error_span!("server");
let runtime = rt();
let (server_addr, endpoint) = {
let _guard = server_span.enter();
quinn::server_endpoint(&runtime, &opt)
};

// Wait for remaining streams to finish
let _ = sem.acquire_many(opt.max_streams as u32).await.unwrap();
let server_thread = std::thread::spawn(move || {
let _guard = server_span.entered();
if let Err(e) = runtime.block_on(quinn::server(endpoint, opt)) {
eprintln!("server failed: {e:#}");
}
});

for result in results.lock().unwrap().drain(..) {
match result {
Ok((upload_result, download_result)) => {
stats.upload_stats.stream_finished(upload_result);
stats.download_stats.stream_finished(download_result);
}
Err(e) => {
if first_error.is_none() {
first_error = Some(e);
let mut handles = Vec::new();
for id in 0..opt.clients {
handles.push(std::thread::spawn(move || {
let _guard = tracing::error_span!("client", id).entered();
let runtime = rt();
match runtime.block_on(quinn::client(server_addr, opt)) {
Ok(stats) => Ok(stats),
Err(e) => {
eprintln!("client failed: {e:#}");
Err(e)
}
}
}
}

stats.upload_stats.total_duration = start.elapsed();
stats.download_stats.total_duration = start.elapsed();

// Explicit close of the connection, since handles can still be around due
// to `Arc`ing them
connection.close(0u32.into(), b"Benchmark done");

endpoint.close(0u32.into(), b"").await?;

if opt.stats {
println!("\nClient connection stats:\n{:#?}", connection.stats());
}));
}

match first_error {
None => Ok(stats),
Some(e) => Err(e),
for (id, handle) in handles.into_iter().enumerate() {
// We print all stats at the end of the test sequentially to avoid
// them being garbled due to being printed concurrently
if let Ok(stats) = handle.join().expect("client thread") {
stats.print(id);
}
}
}

async fn handle_client_stream(
connection: Arc<Connection>,
upload_size: u64,
read_unordered: bool,
) -> Result<(TransferResult, TransferResult)> {
let start = Instant::now();

let (mut send_stream, mut recv_stream) = connection
.open_bi()
.await
.context("failed to open stream")?;

send_data_on_stream(&mut send_stream, upload_size).await?;

let upload_result = TransferResult::new(start.elapsed(), upload_size);

let start = Instant::now();
let size = drain_stream(&mut recv_stream, read_unordered).await?;
let download_result = TransferResult::new(start.elapsed(), size as u64);

Ok((upload_result, download_result))
}
server_thread.join().expect("server thread");

#[derive(Default)]
struct ClientStats {
upload_stats: Stats,
download_stats: Stats,
Ok(())
}

impl ClientStats {
pub fn print(&self, client_id: usize) {
println!();
println!("Client {client_id} stats:");

if self.upload_stats.total_size != 0 {
self.upload_stats.print("upload");
}

if self.download_stats.total_size != 0 {
self.download_stats.print("download");
}
}
pub fn run_s2n(_opt: s2n::Opt) -> Result<()> {
unimplemented!()
}
Loading

0 comments on commit 98d45f3

Please sign in to comment.