diff --git a/Cargo.lock b/Cargo.lock index ca359685..df6563c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -794,7 +794,6 @@ version = "0.2.0" dependencies = [ "b-x", "buffet", - "codspeed-criterion-compat", "color-eyre", "eyre", "httpwg-harness", diff --git a/Justfile b/Justfile index f2ee4384..9649766f 100644 --- a/Justfile +++ b/Justfile @@ -63,7 +63,7 @@ httpwg-over-tcp *args='': cargo build --release \ --package httpwg-loona \ --package httpwg-cli - export PROTO=h2 + export PROTO=h2c export PORT=8001 export RUST_LOG=${RUST_LOG:-info} ./target/release/httpwg --frame-timeout 2000 --connect-timeout 2000 --address localhost:8001 "$@" -- ./target/release/httpwg-loona @@ -84,7 +84,7 @@ samply: --package httpwg-loona \ --profile profiling \ --features tracing/release_max_level_info - export PROTO=h2 + export PROTO=h2c export PORT=8002 target/profiling/httpwg-loona diff --git a/crates/httpwg-harness/src/lib.rs b/crates/httpwg-harness/src/lib.rs index ace82888..5ab79c9d 100644 --- a/crates/httpwg-harness/src/lib.rs +++ b/crates/httpwg-harness/src/lib.rs @@ -72,6 +72,18 @@ impl Settings { server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; Ok(server_config) } + + pub fn message_for_404() -> &'static str { + r#"404 Not Found + +This server serves the following routes: + +/echo-body — Echoes back the request body. +/status/{code} — Returns a response with the specified status code. +/repeat-4k-blocks/{repeat} — Streams the specified number of 4KB blocks (from memory) +/stream-file/{name} — Streams the contents of a file from `/tmp/stream-file/{name}` — see `scripts/mkfiles.sh` +/"# + } } /// A sample block of 4KiB of data. diff --git a/crates/httpwg-hyper/src/service.rs b/crates/httpwg-hyper/src/service.rs index 0091bc56..33ae8fc2 100644 --- a/crates/httpwg-hyper/src/service.rs +++ b/crates/httpwg-hyper/src/service.rs @@ -9,7 +9,7 @@ //! - Any other path: Returns a 404 Not Found response. use http_body_util::{BodyExt, StreamBody}; -use httpwg_harness::SAMPLE_4K_BLOCK; +use httpwg_harness::{Settings, SAMPLE_4K_BLOCK}; use tokio::io::AsyncReadExt; use std::{convert::Infallible, fmt::Debug, pin::Pin}; @@ -46,23 +46,24 @@ where let path = parts.uri.path(); let parts = path.trim_start_matches('/').split('/').collect::>(); - if let ["echo-body"] = parts.as_slice() { - let body: BoxBody = Box::pin(req_body); - let res = Response::builder().body(body).unwrap(); - Ok(res) - } else { - let body: BoxBody = - Box::pin(http_body_util::Empty::new().map_err(|_| unreachable!())); - - if let ["status", code] = parts.as_slice() { + match parts.as_slice() { + ["echo-body"] => { + let body: BoxBody = Box::pin(req_body); + let res = Response::builder().body(body).unwrap(); + Ok(res) + } + ["status", code] => { // drain body while let Some(_frame) = req_body.frame().await {} let code = code.parse::().unwrap(); + let body: BoxBody = + Box::pin(http_body_util::Empty::new().map_err(|_| unreachable!())); let res = Response::builder().status(code).body(body).unwrap(); debug!("Replying with {:?} {:?}", res.status(), res.headers()); Ok(res) - } else if let ["repeat-4k-blocks", repeat] = parts.as_slice() { + } + ["repeat-4k-blocks", repeat] => { // drain body while let Some(_frame) = req_body.frame().await {} @@ -83,7 +84,8 @@ where let body: BoxBody = Box::pin(StreamBody::new(rx)); let res = Response::builder().body(body).unwrap(); Ok(res) - } else if let ["stream-file", name] = parts.as_slice() { + } + ["stream-file", name] => { // drain body while let Some(_frame) = req_body.frame().await {} @@ -110,20 +112,22 @@ where let body: BoxBody = Box::pin(StreamBody::new(rx)); let res = Response::builder().body(body).unwrap(); Ok(res) - } else if parts.as_slice().is_empty() { + } + [""] => { // drain body while let Some(_frame) = req_body.frame().await {} - let body = "it's less dire to lose, than to lose oneself".to_string(); + let body = "See /help for a list of routes".to_string(); let body: BoxBody = Box::pin(body.map_err(|_| unreachable!())); let res = Response::builder().status(200).body(body).unwrap(); Ok(res) - } else { + } + _ => { // drain body while let Some(_frame) = req_body.frame().await {} // return a 404 - let body = "404 Not Found".to_string(); + let body = Settings::message_for_404().to_string(); let body: BoxBody = Box::pin(body.map_err(|_| unreachable!())); let res = Response::builder().status(404).body(body).unwrap(); Ok(res) diff --git a/crates/httpwg-loona/Cargo.toml b/crates/httpwg-loona/Cargo.toml index 44d386e2..57eebcb2 100644 --- a/crates/httpwg-loona/Cargo.toml +++ b/crates/httpwg-loona/Cargo.toml @@ -17,11 +17,12 @@ tracing-subscriber = "0.3.18" tokio = { version = "1.39.2", features = ["macros", "sync", "process"] } eyre = { version = "0.6.12", default-features = false } b-x = { version = "1.0.0", path = "../b-x" } -rcgen = { version = "0.13.1", default-features = false, features = ["aws_lc_rs"] } -tokio-rustls = "0.26.0" -ktls = "6.0.0" +rcgen = { version = "0.13.1", default-features = false, features = [ + "aws_lc_rs", +] } httpwg-harness = { version = "0.1.0", path = "../httpwg-harness" } socket2 = "0.5.7" -[dev-dependencies] -codspeed-criterion-compat = "2.6.0" +[target.'cfg(target_os = "linux")'.dependencies] +ktls = "6.0.0" +tokio-rustls = "0.26.0" diff --git a/crates/httpwg-loona/src/driver.rs b/crates/httpwg-loona/src/driver.rs index b0a69550..c41d5ff1 100644 --- a/crates/httpwg-loona/src/driver.rs +++ b/crates/httpwg-loona/src/driver.rs @@ -1,5 +1,5 @@ use b_x::{BxForResults, BX}; -use httpwg_harness::SAMPLE_4K_BLOCK; +use httpwg_harness::{Settings, SAMPLE_4K_BLOCK}; use buffet::Piece; use loona::{ @@ -93,6 +93,21 @@ where .await .bx()? } + // apparently `/` gives us that + [""] => { + drain_body(req_body).await?; + + let body = "See /help for a list of routes"; + res.write_final_response_with_body( + Response { + status: StatusCode::OK, + ..Default::default() + }, + &mut SinglePieceBody::from(body), + ) + .await + .bx()? + } _ => { drain_body(req_body).await?; @@ -102,7 +117,7 @@ where status: StatusCode::NOT_FOUND, ..Default::default() }, - &mut SinglePieceBody::from("404 Not Found"), + &mut SinglePieceBody::from(Settings::message_for_404()), ) .await .bx()? diff --git a/crates/httpwg-loona/src/main.rs b/crates/httpwg-loona/src/main.rs index b8301aab..23bf8096 100644 --- a/crates/httpwg-loona/src/main.rs +++ b/crates/httpwg-loona/src/main.rs @@ -1,28 +1,23 @@ use driver::TestDriver; -use httpwg_harness::{Proto, Settings}; -use ktls::CorkStream; -use std::{ - mem::ManuallyDrop, - os::fd::{AsRawFd, FromRawFd, IntoRawFd}, - rc::Rc, - sync::Arc, -}; -use tokio_rustls::TlsAcceptor; - -use buffet::{ - net::{TcpListener, TcpStream}, - IntoHalves, RollMut, -}; -use loona::{ - error::ServeError, - h1, - h2::{self, types::H2ConnectionError}, -}; +use httpwg_harness::Proto; +use httpwg_harness::Settings; +use std::rc::Rc; + +use buffet::net::TcpListener; +use buffet::IntoHalves; +use buffet::RollMut; +use loona::error::ServeError; +use loona::h1; +use loona::h2; +use loona::h2::types::H2ConnectionError; use tracing::Level; use tracing_subscriber::{filter::Targets, layer::SubscriberExt, util::SubscriberInitExt}; mod driver; +#[cfg(target_os = "linux")] +mod tls; + fn main() { setup_tracing_and_error_reporting(); buffet::start(real_main()); @@ -87,48 +82,9 @@ async fn real_main() { #[cfg(target_os = "linux")] Proto::TLS => { - let mut server_config = Settings::gen_rustls_server_config().unwrap(); - server_config.enable_secret_extraction = true; - let driver = TestDriver; - let h1_conf = Rc::new(h1::ServerConf::default()); - let h2_conf = Rc::new(h2::ServerConf::default()); - - // until we come up with `loona-rustls`, we need to temporarily go through a - // tokio TcpStream - let acceptor = TlsAcceptor::from(Arc::new(server_config)); - let stream = unsafe { std::net::TcpStream::from_raw_fd(stream.into_raw_fd()) }; - stream.set_nonblocking(true).unwrap(); - let stream = tokio::net::TcpStream::from_std(stream)?; - let stream = CorkStream::new(stream); - let stream = acceptor.accept(stream).await?; - - let is_h2 = matches!(stream.get_ref().1.alpn_protocol(), Some(b"h2")); - tracing::debug!(%is_h2, "Performed TLS handshake"); - - let stream = ktls::config_ktls_server(stream).await?; - - tracing::debug!("Set up kTLS"); - let (drained, stream) = stream.into_raw(); - let drained = drained.unwrap_or_default(); - tracing::debug!("{} bytes already decoded by rustls", drained.len()); - - // and back to a buffet TcpStream - let stream = stream.to_uring_tcp_stream()?; - - let mut client_buf = RollMut::alloc()?; - client_buf.put(&drained[..])?; - - if is_h2 { - tracing::info!("Using HTTP/2"); - h2::serve(stream.into_halves(), h2_conf, client_buf, Rc::new(driver)) - .await - .map_err(|e| eyre::eyre!("h2 server error: {e:?}"))?; - } else { - tracing::info!("Using HTTP/1.1"); - h1::serve(stream.into_halves(), h1_conf, client_buf, driver) - .await - .map_err(|e| eyre::eyre!("h1 server error: {e:?}"))?; - } + tls::handle_tls_conn(stream) + .await + .map_err(|e| eyre::eyre!("tls error: {e:?}"))?; } } Ok::<_, eyre::Report>(()) @@ -164,21 +120,3 @@ fn setup_tracing_and_error_reporting() { .with(fmt_layer) .init(); } - -pub trait ToUringTcpStream { - fn to_uring_tcp_stream(self) -> std::io::Result; -} - -impl ToUringTcpStream for tokio::net::TcpStream { - fn to_uring_tcp_stream(self) -> std::io::Result { - { - let sock = ManuallyDrop::new(unsafe { socket2::Socket::from_raw_fd(self.as_raw_fd()) }); - // tokio needs the socket to be "non-blocking" (as in: return EAGAIN) - // buffet needs it to be "blocking" (as in: let io_uring do the op async) - sock.set_nonblocking(false)?; - } - let stream = unsafe { TcpStream::from_raw_fd(self.as_raw_fd()) }; - std::mem::forget(self); - Ok(stream) - } -} diff --git a/crates/httpwg-loona/src/tls.rs b/crates/httpwg-loona/src/tls.rs new file mode 100644 index 00000000..480ce2a8 --- /dev/null +++ b/crates/httpwg-loona/src/tls.rs @@ -0,0 +1,78 @@ +use b_x::BxForResults; +use buffet::net::TcpStream; +use buffet::IntoHalves; +use buffet::RollMut; +use httpwg_harness::Settings; +use ktls::CorkStream; +use loona::h1; +use loona::h2; +use std::mem::ManuallyDrop; +use std::os::fd::AsRawFd; +use std::os::fd::FromRawFd; +use std::os::fd::IntoRawFd; +use std::rc::Rc; +use std::sync::Arc; +use tokio_rustls::TlsAcceptor; + +use crate::driver::TestDriver; + +pub(super) async fn handle_tls_conn(stream: TcpStream) -> b_x::Result<()> { + let mut server_config = Settings::gen_rustls_server_config().unwrap(); + server_config.enable_secret_extraction = true; + let driver = TestDriver; + let h1_conf = Rc::new(h1::ServerConf::default()); + let h2_conf = Rc::new(h2::ServerConf::default()); + + // until we come up with `loona-rustls`, we need to temporarily go through a + // tokio TcpStream + let acceptor = TlsAcceptor::from(Arc::new(server_config)); + let stream = unsafe { std::net::TcpStream::from_raw_fd(stream.into_raw_fd()) }; + stream.set_nonblocking(true).unwrap(); + let stream = tokio::net::TcpStream::from_std(stream)?; + let stream = CorkStream::new(stream); + let stream = acceptor.accept(stream).await?; + + let is_h2 = matches!(stream.get_ref().1.alpn_protocol(), Some(b"h2")); + tracing::debug!(%is_h2, "Performed TLS handshake"); + + let stream = ktls::config_ktls_server(stream).await.bx()?; + + tracing::debug!("Set up kTLS"); + let (drained, stream) = stream.into_raw(); + let drained = drained.unwrap_or_default(); + tracing::debug!("{} bytes already decoded by rustls", drained.len()); + + // and back to a buffet TcpStream + let stream = stream.to_uring_tcp_stream()?; + + let mut client_buf = RollMut::alloc()?; + client_buf.put(&drained[..])?; + + if is_h2 { + tracing::info!("Using HTTP/2"); + h2::serve(stream.into_halves(), h2_conf, client_buf, Rc::new(driver)).await?; + } else { + tracing::info!("Using HTTP/1.1"); + h1::serve(stream.into_halves(), h1_conf, client_buf, driver).await?; + } + Ok(()) +} + +pub trait ToUringTcpStream { + fn to_uring_tcp_stream(self) -> std::io::Result; +} + +impl ToUringTcpStream for tokio::net::TcpStream { + fn to_uring_tcp_stream(self) -> std::io::Result { + { + let sock = ManuallyDrop::new(unsafe { socket2::Socket::from_raw_fd(self.as_raw_fd()) }); + // tokio needs the socket to be "non-blocking" (as in: return + // EAGAIN) buffet needs it to be + // "blocking" (as in: let io_uring do the op async) + sock.set_nonblocking(false)?; + } + let stream = unsafe { TcpStream::from_raw_fd(self.as_raw_fd()) }; + std::mem::forget(self); + Ok(stream) + } +} diff --git a/crates/loona/tests/testbed.rs b/crates/loona/tests/testbed.rs index 1583eaf9..8bbef48e 100644 --- a/crates/loona/tests/testbed.rs +++ b/crates/loona/tests/testbed.rs @@ -28,6 +28,7 @@ pub async fn start() -> b_x::Result<(SocketAddr, impl Any)> { eprintln!("Using testbed binary: {}", binary_path.display()); let mut cmd = Command::new(binary_path); cmd.stdout(Stdio::piped()); + cmd.env("PROTO", "h1"); // Only Linux gets the nice "I'm taking you with me" feature for now. #[cfg(target_os = "linux")] @@ -48,13 +49,8 @@ pub async fn start() -> b_x::Result<(SocketAddr, impl Any)> { let stdout = BufReader::new(stdout); let mut lines = stdout.lines(); while let Some(line) = lines.next_line().await.unwrap() { - if let Some(rest) = line.strip_prefix("🌎🦊👉 ") { - let addr = rest - .split_whitespace() - .next() - .unwrap() - .parse::() - .unwrap(); + let settings = httpwg_harness::Settings::from_env().unwrap(); + if let Ok(Some(addr)) = settings.decode_listen_line(&line) { if let Some(addr_tx) = addr_tx.take() { addr_tx.send(addr).unwrap(); } diff --git a/scripts/perfstat.sh b/scripts/perfstat.sh index 1448303d..e5845839 100755 --- a/scripts/perfstat.sh +++ b/scripts/perfstat.sh @@ -22,23 +22,38 @@ trap 'kill -TERM -$$' EXIT pkill -9 -f httpwg-hyper pkill -9 -f httpwg-loona +# Set protocol, default to h2c +PROTO=${PROTO:-h2c} +export PROTO + # Launch hyper server -export TEST_PROTO=h2 ADDR=0.0.0.0 PORT=8001 -"$LOONA_DIR/target/release/httpwg-hyper" & +ADDR=0.0.0.0 PORT=8001 "$LOONA_DIR/target/release/httpwg-hyper" & HYPER_PID=$! -echo "Hyper PID: $HYPER_PID" +echo "hyper PID: $HYPER_PID" +HYPER_ADDR="http://localhost:8001" # Launch loona server -export TEST_PROTO=h2 ADDR=0.0.0.0 PORT=8002 -"$LOONA_DIR/target/release/httpwg-loona" & +ADDR=0.0.0.0 PORT=8002 "$LOONA_DIR/target/release/httpwg-loona" & LOONA_PID=$! -echo "Loona PID: $LOONA_PID" - -HYPER_ADDR="http://localhost:8001" +echo "loona PID: $LOONA_PID" LOONA_ADDR="http://localhost:8002" ENDPOINT="${ENDPOINT:-/repeat-4k-blocks/128}" +# Declare h2load args based on PROTO +declare -a H2LOAD_ARGS +if [[ "$PROTO" == "h1" ]]; then + H2LOAD_ARGS=() +elif [[ "$PROTO" == "h2c" ]]; then + H2LOAD_ARGS=(--h1) +elif [[ "$PROTO" == "tls" ]]; then + ALPN_LIST=${ALPN_LIST:-"h2,http/1.1"} + H2LOAD_ARGS=(--alpn-list="$ALPN_LIST") +else + echo "Error: Unknown PROTO '$PROTO'" + exit 1 +fi + declare -A servers=( [hyper]="$HYPER_PID $HYPER_ADDR" [loona]="$LOONA_PID $LOONA_ADDR" @@ -59,7 +74,7 @@ for server in "${!servers[@]}"; do echo -e "\033[1;36mLoona Git SHA: $(cd ~/bearcove/loona && git rev-parse --short HEAD)\033[0m" echo -e "\033[1;33m🚀 Benchmarking \033[1;32m$(cat /proc/$PID/cmdline | tr '\0' ' ')\033[0m" echo -e "\033[1;34m📊 Benchmark parameters: RPS=${RPS:-2}, CONNS=${CONNS:-40}, STREAMS=${STREAMS:-8}, NUM_REQUESTS=${NUM_REQUESTS:-100}, ENDPOINT=${ENDPOINT:-/stream-big-body}\033[0m" - perf stat -e "$PERF_EVENTS" -p "$PID" -- h2load --rps "${RPS:-2}" -c "${CONNS:-40}" -m "${STREAMS:-8}" -n "${NUM_REQUESTS:-100}" "${ADDR}${ENDPOINT}" + perf stat -e "$PERF_EVENTS" -p "$PID" -- h2load "${H2LOAD_ARGS[@]}" --rps "${RPS:-2}" -c "${CONNS:-40}" -m "${STREAMS:-8}" -n "${NUM_REQUESTS:-100}" "${ADDR}${ENDPOINT}" done # Kill the servers