diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 80c51c236d..5df8bcfd91 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -70,11 +70,15 @@ jobs: - name: Prepare machine run: sudo /root/bin/prep.sh - # Pin the benchmark run to core 0 and run all benchmarks at elevated priority. - name: Run cargo bench run: | - taskset -c 0 nice -n -20 \ - cargo "+$TOOLCHAIN" bench --features bench -- --noplot | tee results.txt + # Pin all but neqo-bin benchmarks to CPU 0. neqo-bin benchmarks run + # both a client and a server, thus benefiting from multiple CPU cores. + # + # Run all benchmarks at elevated priority. + taskset -c 0 nice -n -20 cargo "+$TOOLCHAIN" bench --workspace --exclude neqo-bin --features bench -- --noplot | tee results.txt + nice -n -20 cargo "+$TOOLCHAIN" bench --package neqo-bin --features bench -- --noplot | tee -a results.txt + # Compare various configurations of neqo against msquic, and gather perf data # during the hyperfine runs. diff --git a/neqo-bin/Cargo.toml b/neqo-bin/Cargo.toml index 04210e00db..a165a4ac32 100644 --- a/neqo-bin/Cargo.toml +++ b/neqo-bin/Cargo.toml @@ -11,12 +11,12 @@ license.workspace = true [[bin]] name = "neqo-client" -path = "src/bin/client/main.rs" +path = "src/bin/client.rs" bench = false [[bin]] name = "neqo-server" -path = "src/bin/server/main.rs" +path = "src/bin/server.rs" bench = false [lints] @@ -40,6 +40,18 @@ regex = { version = "1.9", default-features = false, features = ["unicode-perl"] tokio = { version = "1", default-features = false, features = ["net", "time", "macros", "rt", "rt-multi-thread"] } url = { version = "2.5", default-features = false } +[dev-dependencies] +criterion = { version = "0.5", default-features = false, features = ["html_reports", "async_tokio"] } +tokio = { version = "1", default-features = false, features = ["sync"] } + +[features] +bench = [] + [lib] # See https://github.com/bheisler/criterion.rs/blob/master/book/src/faq.md#cargo-bench-gives-unrecognized-option-errors-for-valid-command-line-options bench = false + +[[bench]] +name = "main" +harness = false +required-features = ["bench"] diff --git a/neqo-bin/benches/main.rs b/neqo-bin/benches/main.rs new file mode 100644 index 0000000000..fe3aba2714 --- /dev/null +++ b/neqo-bin/benches/main.rs @@ -0,0 +1,92 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::{path::PathBuf, str::FromStr}; + +use criterion::{criterion_group, criterion_main, BatchSize, Criterion, Throughput}; +use neqo_bin::{client, server}; +use tokio::runtime::Runtime; + +struct Benchmark { + name: String, + requests: Vec, + /// Download resources in series using separate connections. + download_in_series: bool, + sample_size: Option, +} + +fn transfer(c: &mut Criterion) { + neqo_common::log::init(Some(log::LevelFilter::Off)); + neqo_crypto::init_db(PathBuf::from_str("../test-fixture/db").unwrap()); + + let done_sender = spawn_server(); + + for Benchmark { + name, + requests, + download_in_series, + sample_size, + } in [ + Benchmark { + name: "1-conn/1-100mb-resp (aka. Download)".to_string(), + requests: vec![100 * 1024 * 1024], + download_in_series: false, + sample_size: Some(10), + }, + Benchmark { + name: "1-conn/10_000-1b-seq-resp (aka. RPS)".to_string(), + requests: vec![1; 10_000], + download_in_series: false, + sample_size: None, + }, + Benchmark { + name: "100-seq-conn/1-1b-resp (aka. HPS)".to_string(), + requests: vec![1; 100], + download_in_series: true, + sample_size: None, + }, + ] { + let mut group = c.benchmark_group(name); + group.throughput(if requests[0] > 1 { + assert_eq!(requests.len(), 1); + Throughput::Bytes(requests[0]) + } else { + Throughput::Elements(requests.len() as u64) + }); + if let Some(size) = sample_size { + group.sample_size(size); + } + group.bench_function("client", |b| { + b.to_async(Runtime::new().unwrap()).iter_batched( + || client::client(client::Args::new(&requests, download_in_series)), + |client| async move { + client.await.unwrap(); + }, + BatchSize::PerIteration, + ); + }); + group.finish(); + } + + done_sender.send(()).unwrap(); +} + +fn spawn_server() -> tokio::sync::oneshot::Sender<()> { + let (done_sender, mut done_receiver) = tokio::sync::oneshot::channel(); + std::thread::spawn(move || { + Runtime::new().unwrap().block_on(async { + let mut server = Box::pin(server::server(server::Args::default())); + tokio::select! { + _ = &mut done_receiver => {} + _ = &mut server => {} + } + }); + }); + done_sender +} + +criterion_group!(benches, transfer); +criterion_main!(benches); diff --git a/neqo-bin/src/bin/client.rs b/neqo-bin/src/bin/client.rs new file mode 100644 index 0000000000..25c0e8753f --- /dev/null +++ b/neqo-bin/src/bin/client.rs @@ -0,0 +1,14 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use clap::Parser; + +#[tokio::main] +async fn main() -> Result<(), neqo_bin::client::Error> { + let args = neqo_bin::client::Args::parse(); + + neqo_bin::client::client(args).await +} diff --git a/neqo-bin/src/bin/server.rs b/neqo-bin/src/bin/server.rs new file mode 100644 index 0000000000..8d166c7487 --- /dev/null +++ b/neqo-bin/src/bin/server.rs @@ -0,0 +1,14 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use clap::Parser; + +#[tokio::main] +async fn main() -> Result<(), std::io::Error> { + let args = neqo_bin::server::Args::parse(); + + neqo_bin::server::server(args).await +} diff --git a/neqo-bin/src/bin/client/http09.rs b/neqo-bin/src/client/http09.rs similarity index 99% rename from neqo-bin/src/bin/client/http09.rs rename to neqo-bin/src/client/http09.rs index 372a112853..9bdb6dca85 100644 --- a/neqo-bin/src/bin/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -25,8 +25,7 @@ use neqo_transport::{ }; use url::Url; -use super::{get_output_file, Args, KeyUpdateState, Res}; -use crate::qlog_new; +use super::{get_output_file, qlog_new, Args, KeyUpdateState, Res}; pub struct Handler<'a> { streams: HashMap>>, diff --git a/neqo-bin/src/bin/client/http3.rs b/neqo-bin/src/client/http3.rs similarity index 99% rename from neqo-bin/src/bin/client/http3.rs rename to neqo-bin/src/client/http3.rs index e9f5e406a5..c88a8448f6 100644 --- a/neqo-bin/src/bin/client/http3.rs +++ b/neqo-bin/src/client/http3.rs @@ -26,7 +26,7 @@ use neqo_transport::{ }; use url::Url; -use crate::{get_output_file, qlog_new, Args, KeyUpdateState, Res}; +use super::{get_output_file, qlog_new, Args, KeyUpdateState, Res}; pub(crate) struct Handler<'a> { #[allow( diff --git a/neqo-bin/src/bin/client/main.rs b/neqo-bin/src/client/mod.rs similarity index 91% rename from neqo-bin/src/bin/client/main.rs rename to neqo-bin/src/client/mod.rs index 63aa12db13..e0169e3f24 100644 --- a/neqo-bin/src/bin/client/main.rs +++ b/neqo-bin/src/client/mod.rs @@ -21,25 +21,26 @@ use futures::{ future::{select, Either}, FutureExt, TryFutureExt, }; -use neqo_bin::udp; use neqo_common::{self as common, qdebug, qerror, qinfo, qlog::NeqoQlog, qwarn, Datagram, Role}; use neqo_crypto::{ constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, init, Cipher, ResumptionToken, }; -use neqo_http3::{Error, Output}; +use neqo_http3::Output; use neqo_transport::{AppError, ConnectionId, Error as TransportError, Version}; use qlog::{events::EventImportance, streamer::QlogStreamer}; use tokio::time::Sleep; use url::{Origin, Url}; +use crate::{udp, SharedArgs}; + mod http09; mod http3; const BUFWRITER_BUFFER_SIZE: usize = 64 * 1024; #[derive(Debug)] -pub enum ClientError { +pub enum Error { ArgumentError(&'static str), Http3Error(neqo_http3::Error), IoError(io::Error), @@ -47,40 +48,40 @@ pub enum ClientError { TransportError(neqo_transport::Error), } -impl From for ClientError { +impl From for Error { fn from(err: io::Error) -> Self { Self::IoError(err) } } -impl From for ClientError { +impl From for Error { fn from(err: neqo_http3::Error) -> Self { Self::Http3Error(err) } } -impl From for ClientError { +impl From for Error { fn from(_err: qlog::Error) -> Self { Self::QlogError } } -impl From for ClientError { +impl From for Error { fn from(err: neqo_transport::Error) -> Self { Self::TransportError(err) } } -impl Display for ClientError { +impl Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "Error: {self:?}")?; Ok(()) } } -impl std::error::Error for ClientError {} +impl std::error::Error for Error {} -type Res = Result; +type Res = Result; /// Track whether a key update is needed. #[derive(Debug, PartialEq, Eq)] @@ -90,14 +91,14 @@ impl KeyUpdateState { pub fn maybe_update(&mut self, update_fn: F) -> Res<()> where F: FnOnce() -> Result<(), E>, - E: Into, + E: Into, { if self.0 { if let Err(e) = update_fn() { let e = e.into(); match e { - ClientError::TransportError(TransportError::KeyUpdateBlocked) - | ClientError::Http3Error(Error::TransportError( + Error::TransportError(TransportError::KeyUpdateBlocked) + | Error::Http3Error(neqo_http3::Error::TransportError( TransportError::KeyUpdateBlocked, )) => (), _ => return Err(e), @@ -123,7 +124,7 @@ pub struct Args { verbose: clap_verbosity_flag::Verbosity, #[command(flatten)] - shared: neqo_bin::SharedArgs, + shared: SharedArgs, urls: Vec, @@ -189,6 +190,36 @@ pub struct Args { } impl Args { + #[must_use] + #[cfg(feature = "bench")] + #[allow(clippy::missing_panics_doc)] + pub fn new(requests: &[u64], download_in_series: bool) -> Self { + use std::str::FromStr; + Self { + verbose: clap_verbosity_flag::Verbosity::::default(), + shared: crate::SharedArgs::default(), + urls: requests + .iter() + .map(|r| Url::from_str(&format!("http://[::1]:12345/{r}")).unwrap()) + .collect(), + method: "GET".into(), + header: vec![], + max_concurrent_push_streams: 10, + download_in_series, + concurrency: 100, + output_read_data: false, + output_dir: Some("/dev/null".into()), + resume: false, + key_update: false, + ech: None, + ipv4_only: false, + ipv6_only: false, + test: None, + upload_size: 100, + stats: false, + } + } + fn get_ciphers(&self) -> Vec { self.shared .ciphers @@ -445,10 +476,10 @@ fn qlog_new(args: &Args, hostname: &str, cid: &ConnectionId) -> Res { } } -#[tokio::main] -async fn main() -> Res<()> { - let mut args = Args::parse(); +pub async fn client(mut args: Args) -> Res<()> { neqo_common::log::init(Some(args.verbose.log_level_filter())); + init(); + args.update_for_tests(); init(); diff --git a/neqo-bin/src/lib.rs b/neqo-bin/src/lib.rs index b7bc158245..380c56ddce 100644 --- a/neqo-bin/src/lib.rs +++ b/neqo-bin/src/lib.rs @@ -4,6 +4,9 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +#![allow(clippy::missing_panics_doc)] +#![allow(clippy::missing_errors_doc)] + use std::{ fmt::{self, Display}, net::{SocketAddr, ToSocketAddrs}, @@ -17,7 +20,9 @@ use neqo_transport::{ Version, }; -pub mod udp; +pub mod client; +pub mod server; +mod udp; #[derive(Debug, Parser)] pub struct SharedArgs { @@ -57,6 +62,23 @@ pub struct SharedArgs { pub quic_parameters: QuicParameters, } +#[cfg(feature = "bench")] +impl Default for SharedArgs { + fn default() -> Self { + Self { + alpn: "h3".into(), + qlog_dir: None, + max_table_size_encoder: 16384, + max_table_size_decoder: 16384, + max_blocked_streams: 10, + ciphers: vec![], + qns_test: None, + use_old_http: false, + quic_parameters: QuicParameters::default(), + } + } +} + #[derive(Debug, Parser)] pub struct QuicParameters { #[arg( @@ -102,6 +124,22 @@ pub struct QuicParameters { pub preferred_address_v6: Option, } +#[cfg(feature = "bench")] +impl Default for QuicParameters { + fn default() -> Self { + Self { + quic_version: vec![], + max_streams_bidi: 16, + max_streams_uni: 16, + idle_timeout: 30, + congestion_control: CongestionControlAlgorithm::NewReno, + no_pacing: false, + preferred_address_v4: None, + preferred_address_v6: None, + } + } +} + impl QuicParameters { fn get_sock_addr(opt: &Option, v: &str, f: F) -> Option where diff --git a/neqo-bin/src/bin/server/main.rs b/neqo-bin/src/server/mod.rs similarity index 94% rename from neqo-bin/src/bin/server/main.rs rename to neqo-bin/src/server/mod.rs index 62eb19e78c..f89d6620de 100644 --- a/neqo-bin/src/bin/server/main.rs +++ b/neqo-bin/src/server/mod.rs @@ -25,28 +25,28 @@ use futures::{ future::{select, select_all, Either}, FutureExt, }; -use neqo_bin::udp; use neqo_common::{hex, qdebug, qerror, qinfo, qwarn, Datagram, Header}; use neqo_crypto::{ constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, generate_ech_keys, init_db, random, AntiReplay, Cipher, }; use neqo_http3::{ - Error, Http3OrWebTransportStream, Http3Parameters, Http3Server, Http3ServerEvent, StreamId, + Http3OrWebTransportStream, Http3Parameters, Http3Server, Http3ServerEvent, StreamId, }; use neqo_transport::{ server::ValidateAddress, ConnectionIdGenerator, Output, RandomConnectionIdGenerator, Version, }; +use old_https::Http09Server; use tokio::time::Sleep; -use crate::old_https::Http09Server; +use crate::{udp, SharedArgs}; const ANTI_REPLAY_WINDOW: Duration = Duration::from_secs(10); mod old_https; #[derive(Debug)] -pub enum ServerError { +pub enum Error { ArgumentError(&'static str), Http3Error(neqo_http3::Error), IoError(io::Error), @@ -54,47 +54,47 @@ pub enum ServerError { TransportError(neqo_transport::Error), } -impl From for ServerError { +impl From for Error { fn from(err: io::Error) -> Self { Self::IoError(err) } } -impl From for ServerError { +impl From for Error { fn from(err: neqo_http3::Error) -> Self { Self::Http3Error(err) } } -impl From for ServerError { +impl From for Error { fn from(_err: qlog::Error) -> Self { Self::QlogError } } -impl From for ServerError { +impl From for Error { fn from(err: neqo_transport::Error) -> Self { Self::TransportError(err) } } -impl Display for ServerError { +impl Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "Error: {self:?}")?; Ok(()) } } -impl std::error::Error for ServerError {} +impl std::error::Error for Error {} #[derive(Debug, Parser)] #[command(author, version, about, long_about = None)] -struct Args { +pub struct Args { #[command(flatten)] verbose: clap_verbosity_flag::Verbosity, #[command(flatten)] - shared: neqo_bin::SharedArgs, + shared: SharedArgs, /// List of IP:port to listen on #[arg(default_value = "[::]:4433")] @@ -119,6 +119,22 @@ struct Args { ech: bool, } +#[cfg(feature = "bench")] +impl Default for Args { + fn default() -> Self { + use std::str::FromStr; + Self { + verbose: clap_verbosity_flag::Verbosity::::default(), + shared: crate::SharedArgs::default(), + hosts: vec!["[::]:12345".to_string()], + db: PathBuf::from_str("../test-fixture/db").unwrap(), + key: "key".to_string(), + retry: false, + ech: false, + } + } +} + impl Args { fn get_ciphers(&self) -> Vec { self.shared @@ -339,7 +355,7 @@ impl HttpServer for SimpleServer { } } else { stream - .cancel_fetch(Error::HttpRequestIncomplete.code()) + .cancel_fetch(neqo_http3::Error::HttpRequestIncomplete.code()) .unwrap(); continue; }; @@ -565,11 +581,9 @@ enum Ready { Timeout, } -#[tokio::main] -async fn main() -> Result<(), io::Error> { +pub async fn server(mut args: Args) -> Result<(), io::Error> { const HQ_INTEROP: &str = "hq-interop"; - let mut args = Args::parse(); neqo_common::log::init(Some(args.verbose.log_level_filter())); assert!(!args.key.is_empty(), "Need at least one key"); diff --git a/neqo-bin/src/bin/server/old_https.rs b/neqo-bin/src/server/old_https.rs similarity index 100% rename from neqo-bin/src/bin/server/old_https.rs rename to neqo-bin/src/server/old_https.rs diff --git a/neqo-bin/src/udp.rs b/neqo-bin/src/udp.rs index f4ede0b5c2..7ccfa1f36f 100644 --- a/neqo-bin/src/udp.rs +++ b/neqo-bin/src/udp.rs @@ -23,6 +23,8 @@ use tokio::io::Interest; const RECV_BUF_SIZE: usize = u16::MAX as usize; pub struct Socket { + #[allow(unknown_lints)] // available with Rust v1.75 + #[allow(clippy::struct_field_names)] socket: tokio::net::UdpSocket, state: UdpSocketState, recv_buf: Vec,