diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..d7e1d04 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +/target +/assets +/.github \ No newline at end of file diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 22c490a..6f263bf 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -4,19 +4,34 @@ on: push: branches: [ master ] pull_request: - types: [ opened, synchronize, reopened ] + branches: [ master ] env: CARGO_TERM_COLOR: always jobs: - build: + check: + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ubuntu-latest, macOS-latest, windows-latest] + rust: [stable] - runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: Swatinem/rust-cache@v1 + - name: Check all targets + run: cargo check --all --all-targets --all-features + + build: + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ubuntu-latest, macOS-latest, windows-latest] + rust: [stable] steps: - - uses: actions/checkout@v2 - - name: Build - run: cargo build --verbose - - name: Run tests - run: cargo test --verbose + - uses: actions/checkout@v2 + - uses: Swatinem/rust-cache@v2 + - name: Run doc tests + run: cargo test --all \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index e4a3abb..1834581 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,3 +19,8 @@ serde_json = "1" tokio = { version = "1", features = ["rt", "rt-multi-thread", "net", "macros"] } tokio-native-tls = "0.3" tower = { version = "0.4", features = ["util"] } + +[workspace] +members = [ + "rewrk-core" +] \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index e814597..f92ee42 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,18 +1,25 @@ -FROM chillfish8/rust-builder:latest as builder +FROM rust:slim-buster as build -WORKDIR /home/rust/ +WORKDIR /code -# Avoid having to install/build all dependencies by copying -# the Cargo files and making a dummy src/main.rs -COPY . . -RUN cargo build --release --target x86_64-unknown-linux-musl +COPY . /code -# Size optimization -RUN strip target/x86_64-unknown-linux-musl/release/rewrk +RUN apt-get update \ + && apt-get install -y libssl-dev pkg-config \ + && rm -rf /var/lib/apt/lists/* + +RUN cargo build --release + +# Copy the binary into a new container for a smaller docker image +FROM debian:buster-slim -# Start building the final image -FROM scratch WORKDIR /etc/rewrk -COPY --from=builder /home/rust/target/x86_64-unknown-linux-musl/release/rewrk . -ENTRYPOINT ["./rewrk"] +USER root +RUN apt-get update \ + && apt-get install -y ca-certificates \ + && rm -rf /var/lib/apt/lists/* + +COPY --from=build /code/target/release/rewrk . + +ENTRYPOINT ["./rewrk"] \ No newline at end of file diff --git a/assets/rewrk-core.png b/assets/rewrk-core.png new file mode 100644 index 0000000..03a66c3 Binary files /dev/null and b/assets/rewrk-core.png differ diff --git a/benchmarks/bench_pyre.py b/benchmarks/bench_pyre.py deleted file mode 100644 index ea673c3..0000000 --- a/benchmarks/bench_pyre.py +++ /dev/null @@ -1,77 +0,0 @@ -""" -This is an example benchmarking test using python to benchmark Pyre one of -my pet projects. - -This measures the latency and throughput and displays them with matplotlib. -""" - -import matplotlib.pyplot as plt -import sys -import os - -from subprocess import Popen, PIPE -from json import loads - - -def start_benchmark( - host: str, - connections: int, - time: str = "10s", - rounds: int = 3, -) -> list: - command = f"cargo run --release -- " \ - f"-h {host} " \ - f"-c {connections} " \ - f"-d {time} " \ - f"-t 12 " \ - f"--rounds {rounds} " \ - f"--json" - process = Popen(command, shell=True, stdout=PIPE, stderr=PIPE) - - out, err = process.communicate() - print(err) - out = out.decode(sys.stdin.encoding) - return [loads(o) for o in out.splitlines()] - - -def get_avg_without_enom(inputs: list) -> float: - _, *good, _ = sorted(inputs) - return sum(good) / len(good) - - -def make_runs(): - host = "http://127.0.0.1:5000" - connections_start = 60 - connections_end = 100 - connections_step = 5 - - x_index = [] - latencies = [] - req_secs = [] - for conns in range(connections_start, connections_end, connections_step): - results = start_benchmark(host, conns, time="10s", rounds=5) - avg_latency = get_avg_without_enom([o['latency_avg'] for o in results]) - avg_req_sec = get_avg_without_enom([o['requests_avg'] for o in results]) - - x_index.append(conns) - latencies.append(avg_latency) - req_secs.append(avg_req_sec) - - plt.figure() - plt.xlabel("Connection Concurrency") - plt.ylabel("Latency / ms") - plt.title("Benchmark Results") - plt.plot(x_index, latencies) - plt.savefig("./latencies.png") - plt.close() - - plt.figure() - plt.xlabel("Connection Concurrency") - plt.ylabel("Request Per Second") - plt.title("Benchmark Results") - plt.plot(x_index, req_secs) - plt.savefig("./requests.png") - - -if __name__ == '__main__': - make_runs() \ No newline at end of file diff --git a/rewrk-core/Cargo.toml b/rewrk-core/Cargo.toml new file mode 100644 index 0000000..a9859a4 --- /dev/null +++ b/rewrk-core/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "rewrk-core" +version = "0.1.0" +edition = "2021" +description = "HTTP benchmarking as a library made simple." +license = "MIT" +readme = "README.md" +repository = "https://github.com/lnx-search/rewrk" +keywords = ["tokio", "async"] +categories = ["concurrency", "asynchronous"] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1" +futures-util = "0.3" +http = "0.2" +pin-project-lite = "0.2" +flume = "0.10.14" +hdrhistogram = "7" +thiserror = "1" +async-trait = "0.1.64" +tracing = "0.1.37" +num_cpus = "1.15.0" + +hyper = { version = "0.14", features = ["runtime", "client", "http1", "http2"] } +native-tls = { version = "0.2", features = ["alpn"] } +tokio = { version = "1", features = ["rt", "rt-multi-thread", "net"] } +tokio-native-tls = "0.3" +tower = { version = "0.4", features = ["util"] } + +[dev-dependencies] +axum = "0.6.5" +tracing-subscriber = "0.3.16" + +tokio = { version = "1", features = ["full"] } diff --git a/rewrk-core/README.md b/rewrk-core/README.md new file mode 100644 index 0000000..fcfd815 --- /dev/null +++ b/rewrk-core/README.md @@ -0,0 +1,113 @@ +# ReWrk Core + +HTTP benchmarking as a library made simple. + +ReWrk Core is a easily configurable and extendable framework for benchmarking +HTTP servers providing things like response validation, custom result collectors and +custom request producers. + +It measures some of the key metrics like latency, write IO and read IO and provides you +with a way of grouping results together with the concept of `tags`. + +```rust +use axum::routing::get; +use axum::Router; +use anyhow::Result; +use http::{Method, Request, Uri}; +use hyper::Body; +use rewrk_core::{ + Batch, + HttpProtocol, + Producer, + ReWrkBenchmark, + RequestBatch, + Sample, + SampleCollector, +}; + +static ADDR: &str = "127.0.0.1:8080"; + +#[tokio::test] +async fn test_basic_benchmark() -> Result<()> { + tracing_subscriber::fmt::try_init()?; + + tokio::spawn(run_server()); + + let uri = Uri::builder() + .scheme("http") + .authority(ADDR) + .path_and_query("/") + .build()?; + + let mut benchmarker = ReWrkBenchmark::create( + uri, + 1, + HttpProtocol::HTTP1, + BasicProducer::default(), + BasicCollector::default(), + ) + .await?; + benchmarker.set_num_workers(1); + benchmarker.run().await; + + let mut collector = benchmarker.consume_collector().await; + let sample = collector.samples.remove(0); + assert_eq!(sample.tag(), 0); + assert_eq!(sample.latency().len(), 1); + assert_eq!(sample.read_transfer().len(), 1); + assert_eq!(sample.write_transfer().len(), 1); +} + +async fn run_server() { + // build our application with a single route + let app = Router::new().route("/", get(|| async { "Hello, World!" })); + + axum::Server::bind(&ADDR.parse().unwrap()) + .serve(app.into_make_service()) + .await + .unwrap(); +} + +#[derive(Default, Clone)] +pub struct BasicProducer { + count: usize, +} + +#[rewrk_core::async_trait] +impl Producer for BasicProducer { + fn ready(&mut self) { + self.count = 1; + } + + async fn create_batch(&mut self) -> Result { + if self.count > 0 { + self.count -= 1; + + let uri = Uri::builder().path_and_query("/").build()?; + let request = Request::builder() + .method(Method::GET) + .uri(uri) + .body(Body::empty())?; + Ok(RequestBatch::Batch(Batch { + tag: 0, + requests: vec![request], + })) + } else { + Ok(RequestBatch::End) + } + } +} + +#[derive(Default)] +pub struct BasicCollector { + samples: Vec, +} + +#[rewrk_core::async_trait] +impl SampleCollector for BasicCollector { + async fn process_sample(&mut self, sample: Sample) -> Result<()> { + self.samples.push(sample); + Ok(()) + } +} +``` \ No newline at end of file diff --git a/rewrk-core/src/connection/conn.rs b/rewrk-core/src/connection/conn.rs new file mode 100644 index 0000000..f4a17ae --- /dev/null +++ b/rewrk-core/src/connection/conn.rs @@ -0,0 +1,223 @@ +use std::future::Future; +use std::net::SocketAddr; + +use http::response::Parts; +use http::{header, HeaderValue, Request, Response, Uri}; +use hyper::body::Bytes; +use hyper::client::conn; +use hyper::client::conn::SendRequest; +use hyper::Body; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::TcpStream; +use tokio::task::JoinHandle; +use tokio::time::{timeout_at, Duration, Instant}; + +use crate::connection::{HttpProtocol, Scheme}; +use crate::utils::IoUsageTracker; + +/// The maximum number of attempts to try connect before aborting. +const RETRY_MAX_DEFAULT: usize = 3; + +#[derive(Clone)] +/// The initial HTTP connector for benchmarking. +pub struct ReWrkConnector { + uri: Uri, + host_header: HeaderValue, + addr: SocketAddr, + protocol: HttpProtocol, + scheme: Scheme, + host: String, + retry_max: usize, +} + +impl ReWrkConnector { + /// Create a new connector. + pub fn new( + uri: Uri, + host_header: HeaderValue, + addr: SocketAddr, + protocol: HttpProtocol, + scheme: Scheme, + host: impl Into, + ) -> Self { + Self { + uri, + host_header, + addr, + protocol, + scheme, + host: host.into(), + retry_max: RETRY_MAX_DEFAULT, + } + } + + /// Set a new max retry attempt. + pub fn set_retry_max(&mut self, max: usize) { + self.retry_max = max; + } + + /// Establish a new connection using the given connector. + /// + /// This will attempt to connect to the URI within the given duration. + /// If the timeout elapses, `None` is returned. + pub async fn connect_timeout( + &self, + dur: Duration, + ) -> anyhow::Result> { + let deadline = Instant::now() + dur; + let mut last_error: Option = None; + let mut attempts_left = self.retry_max; + + loop { + let result = timeout_at(deadline, self.connect()).await; + + match result { + Err(_) => { + return if let Some(error) = last_error { + Err(error) + } else { + Ok(None) + } + }, + Ok(Err(e)) => { + if attempts_left == 0 { + return Err(e); + } + + attempts_left -= 1; + last_error = Some(e); + tokio::time::sleep(Duration::from_millis(500)).await; + }, + Ok(Ok(connection)) => return Ok(Some(connection)), + } + } + } + + /// Establish a new connection using the given connector. + /// + /// This method has no timeout and will block until the connection + /// is established. + pub async fn connect(&self) -> anyhow::Result { + let mut conn_builder = conn::Builder::new(); + + if self.protocol.is_http2() { + conn_builder.http2_only(true); + } + + let stream = TcpStream::connect(self.addr).await?; + + let usage_tracker = IoUsageTracker::new(); + let stream = usage_tracker.wrap_stream(stream); + + let stream = match self.scheme { + Scheme::Http => handshake(conn_builder, stream).await?, + Scheme::Https(ref tls_connector) => { + let stream = tls_connector.connect(&self.host, stream).await?; + handshake(conn_builder, stream).await? + }, + }; + + Ok(ReWrkConnection::new( + self.uri.clone(), + self.host_header.clone(), + stream, + usage_tracker, + )) + } +} + +/// An established HTTP connection for benchmarking. +pub struct ReWrkConnection { + uri: Uri, + host_header: HeaderValue, + stream: HttpStream, + io_tracker: IoUsageTracker, +} + +impl ReWrkConnection { + #[inline] + /// Creates a new live connection from an existing stream + fn new( + uri: Uri, + host_header: HeaderValue, + stream: HttpStream, + io_tracker: IoUsageTracker, + ) -> Self { + Self { + uri, + host_header, + stream, + io_tracker, + } + } + + #[inline] + pub(crate) fn usage(&self) -> &IoUsageTracker { + &self.io_tracker + } + + #[inline] + /// Executes a request. + /// + /// This will override the request host, scheme, port and host headers. + pub(crate) async fn execute_req( + &mut self, + mut request: Request, + ) -> Result<(Parts, Bytes), hyper::Error> { + let request_uri = request.uri(); + let mut builder = Uri::builder() + .scheme(self.uri.scheme().unwrap().clone()) + .authority(self.uri.authority().unwrap().clone()); + if let Some(path) = request_uri.path_and_query() { + builder = builder.path_and_query(path.clone()); + } + (*request.uri_mut()) = builder.build().unwrap(); + request + .headers_mut() + .insert(header::HOST, self.host_header.clone()); + + let resp = self.stream.send(request).await?; + let (head, body) = resp.into_parts(); + let body = hyper::body::to_bytes(body).await?; + Ok((head, body)) + } +} + +/// Performs the HTTP handshake +async fn handshake( + conn_builder: conn::Builder, + stream: S, +) -> Result +where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + let (send_request, connection) = conn_builder.handshake(stream).await?; + let connection_task = tokio::spawn(connection); + Ok(HttpStream { + conn: send_request, + waiter: connection_task, + }) +} + +/// The established HTTP stream. +pub struct HttpStream { + /// The live connection to send requests. + conn: SendRequest, + /// The hyper connection task handle. + waiter: JoinHandle>, +} + +impl HttpStream { + pub fn send( + &mut self, + request: Request, + ) -> impl Future, hyper::Error>> { + self.conn.send_request(request) + } +} + +impl Drop for HttpStream { + fn drop(&mut self) { + self.waiter.abort(); + } +} diff --git a/rewrk-core/src/connection/mod.rs b/rewrk-core/src/connection/mod.rs new file mode 100644 index 0000000..865d7c3 --- /dev/null +++ b/rewrk-core/src/connection/mod.rs @@ -0,0 +1,41 @@ +use tokio_native_tls::TlsConnector; + +mod conn; + +pub use self::conn::{HttpStream, ReWrkConnection, ReWrkConnector}; + +/// The type of bench that is being ran. +#[derive(Clone, Copy, Debug)] +pub enum HttpProtocol { + /// Sets the http protocol to be used as h1 + HTTP1, + + /// Sets the http protocol to be used as h2 + HTTP2, +} + +impl HttpProtocol { + pub fn is_http1(&self) -> bool { + matches!(self, Self::HTTP1) + } + + pub fn is_http2(&self) -> bool { + matches!(self, Self::HTTP2) + } +} + +#[derive(Clone)] +/// The HTTP scheme used for the connection. +pub enum Scheme { + Http, + Https(TlsConnector), +} + +impl Scheme { + pub fn default_port(&self) -> u16 { + match self { + Self::Http => 80, + Self::Https(_) => 443, + } + } +} diff --git a/rewrk-core/src/lib.rs b/rewrk-core/src/lib.rs new file mode 100644 index 0000000..7b20ff0 --- /dev/null +++ b/rewrk-core/src/lib.rs @@ -0,0 +1,138 @@ +//! # ReWrk Core +//! +//! HTTP benchmarking as a library made simple. +//! +//! ReWrk Core is a easily configurable and extendable framework for benchmarking +//! HTTP servers providing things like response validation, custom result collectors and +//! custom request producers. +//! +//! It measures some of the key metrics like latency, write IO and read IO and provides you +//! with a way of grouping results together with the concept of `tags`. +//! +//! ``` +//! use axum::routing::get; +//! use axum::Router; +//! use anyhow::Result; +//! use http::{Method, Request, Uri}; +//! use hyper::Body; +//! use rewrk_core::{ +//! Batch, +//! HttpProtocol, +//! Producer, +//! ReWrkBenchmark, +//! RequestBatch, +//! Sample, +//! SampleCollector, +//! }; +//! +//! static ADDR: &str = "127.0.0.1:8080"; +//! +//! #[tokio::test] +//! async fn test_basic_benchmark() -> Result<()> { +//! tracing_subscriber::fmt::try_init()?; +//! +//! tokio::spawn(run_server()); +//! +//! let uri = Uri::builder() +//! .scheme("http") +//! .authority(ADDR) +//! .path_and_query("/") +//! .build()?; +//! +//! let mut benchmarker = ReWrkBenchmark::create( +//! uri, +//! 1, +//! HttpProtocol::HTTP1, +//! BasicProducer::default(), +//! BasicCollector::default(), +//! ) +//! .await?; +//! benchmarker.set_num_workers(1); +//! benchmarker.run().await; +//! +//! let mut collector = benchmarker.consume_collector().await; +//! let sample = collector.samples.remove(0); +//! assert_eq!(sample.tag(), 0); +//! assert_eq!(sample.latency().len(), 1); +//! assert_eq!(sample.read_transfer().len(), 1); +//! assert_eq!(sample.write_transfer().len(), 1); +//! } +//! +//! async fn run_server() { +//! // build our application with a single route +//! let app = Router::new().route("/", get(|| async { "Hello, World!" })); +//! +//! axum::Server::bind(&ADDR.parse().unwrap()) +//! .serve(app.into_make_service()) +//! .await +//! .unwrap(); +//! } +//! +//! #[derive(Default, Clone)] +//! pub struct BasicProducer { +//! count: usize, +//! } +//! +//! #[rewrk_core::async_trait] +//! impl Producer for BasicProducer { +//! fn ready(&mut self) { +//! self.count = 1; +//! } +//! +//! async fn create_batch(&mut self) -> Result { +//! if self.count > 0 { +//! self.count -= 1; +//! +//! let uri = Uri::builder().path_and_query("/").build()?; +//! let request = Request::builder() +//! .method(Method::GET) +//! .uri(uri) +//! .body(Body::empty())?; +//! Ok(RequestBatch::Batch(Batch { +//! tag: 0, +//! requests: vec![request], +//! })) +//! } else { +//! Ok(RequestBatch::End) +//! } +//! } +//! } +//! +//! #[derive(Default)] +//! pub struct BasicCollector { +//! samples: Vec, +//! } +//! +//! #[rewrk_core::async_trait] +//! impl SampleCollector for BasicCollector { +//! async fn process_sample(&mut self, sample: Sample) -> Result<()> { +//! self.samples.push(sample); +//! Ok(()) +//! } +//! } +//! ``` +//! + +#[macro_use] +extern crate tracing; + +mod connection; +mod producer; +mod recording; +mod runtime; +mod utils; +mod validator; + +pub use async_trait::async_trait; +pub use http; + +pub use self::connection::{HttpProtocol, Scheme}; +pub use self::producer::{Batch, Producer, ProducerBatches, RequestBatch}; +pub use self::recording::{Sample, SampleCollector}; +pub use self::runtime::{ + Error, + ReWrkBenchmark, + DEFAULT_WAIT_WARNING_THRESHOLD, + DEFAULT_WINDOW_DURATION, +}; +pub use self::validator::{DefaultValidator, ResponseValidator, ValidationError}; diff --git a/rewrk-core/src/producer.rs b/rewrk-core/src/producer.rs new file mode 100644 index 0000000..db17272 --- /dev/null +++ b/rewrk-core/src/producer.rs @@ -0,0 +1,144 @@ +use async_trait::async_trait; +use flume::Receiver; +use http::Request; +use hyper::Body; +use tokio::sync::oneshot; + +/// A batch of requests or single to the workers. +pub enum RequestBatch { + /// All requests have been produced and no more will be returned + /// + /// This will cause the workers to start shutting down. + End, + /// A new batch to process. + Batch(Batch), +} + +pub struct Batch { + /// A optional tag ID for grouping results together. + /// + /// This is a `usize` for the sake of efficiency, this can + /// be used as a mapping key for example. + /// + /// Samples are produced on a per-tag basis, so if a tag changes + /// from the current sample vs the tag provided by the batch + /// a new sample will be created. + pub tag: usize, + /// The batch requests. + pub requests: Vec>, +} + +#[async_trait] +/// A producer creates requests used in benchmarking +/// +/// It's important to note that one producer supplies all of a worker thread's +/// concurrent connections at once. +/// +/// When a producer returns [RequestBatch::End] workers will finish +/// the remaining requests then shutdown. +/// +/// # Example +/// +/// Here we have a basic implementation that produces `10` batches +/// of requests for the benchmarker to execute before completing the +/// benchmark. +/// ``` +/// use http::{Method, Request, Uri}; +/// use hyper::Body; +/// use rewrk_core::{Batch, Producer, RequestBatch}; +/// +/// #[derive(Default, Clone)] +/// pub struct BasicProducer { +/// count: usize, +/// } +/// +/// #[rewrk_core::async_trait] +/// impl Producer for BasicProducer { +/// fn ready(&mut self) { +/// self.count = 10; +/// } +/// +/// async fn create_batch(&mut self) -> anyhow::Result { +/// if self.count > 0 { +/// self.count -= 1; +/// +/// let uri = Uri::builder().path_and_query("/").build()?; +/// let request = Request::builder() +/// .method(Method::GET) +/// .uri(uri) +/// .body(Body::empty())?; +/// Ok(RequestBatch::Batch(Batch { +/// tag: 0, +/// requests: vec![request], +/// })) +/// } else { +/// Ok(RequestBatch::End) +/// } +/// } +/// } +/// ``` +pub trait Producer: Send + 'static { + /// Signals to the producer that the system is ready and about to + /// start benchmarking. + fn ready(&mut self); + + /// Creates a new match of documents to be sent to workers. + /// + /// It's important to note that in order to accurately measure throughput + /// the producer must be able to produce more requests than the target server + /// can consume, otherwise the statistics may not be as accurate. + async fn create_batch(&mut self) -> anyhow::Result; +} + +pub type ProducerBatches = Receiver; + +/// A sample collector which waits for and calls the +/// specific collector handler. +pub struct ProducerActor; + +impl ProducerActor { + /// Spawn a new collector actor for processing incoming samples. + pub async fn spawn( + buffer_size: usize, + worker_id: usize, + mut producer: impl Producer, + ready: oneshot::Receiver<()>, + ) -> ProducerBatches { + let (tx, rx) = flume::bounded(buffer_size); + + tokio::spawn(async move { + info!(worker_id = worker_id, "Starting producer actor."); + + let _ = ready.await; + producer.ready(); + + loop { + let batch = match producer.create_batch().await { + Ok(RequestBatch::End) => break, + Ok(RequestBatch::Batch(batch)) => batch, + Err(e) => { + error!( + worker_id = worker_id, + error = ?e, + "Failed to produce batch due to error, aborting...", + ); + break; + }, + }; + + debug!( + worker_id = worker_id, + batch_tag = batch.tag, + "Submitting request batch." + ); + if tx.send_async(batch).await.is_err() { + break; + } + } + + info!(worker_id = worker_id, "Producer actor has shutdown."); + }); + + rx + } +} diff --git a/rewrk-core/src/recording/collector.rs b/rewrk-core/src/recording/collector.rs new file mode 100644 index 0000000..662663b --- /dev/null +++ b/rewrk-core/src/recording/collector.rs @@ -0,0 +1,65 @@ +use async_trait::async_trait; +use flume::Sender; +use tokio::task::JoinHandle; + +use super::sample::Sample; + +#[async_trait] +/// A collector for processing submitted samples. +pub trait SampleCollector: Send + 'static { + async fn process_sample(&mut self, sample: Sample) -> anyhow::Result<()>; +} + +pub type CollectorMailbox = Sender; + +/// A sample collector which waits for and calls the +/// specific collector handler. +/// +/// # Example +/// +/// This basic collector simply appends each sample into a vector which +/// can then be consumed after the benchmark using `benchmarker.consume_collector().await` +/// +/// ``` +/// use rewrk_core::{Sample, SampleCollector}; +/// +/// #[derive(Default)] +/// pub struct BasicCollector { +/// samples: Vec, +/// } +/// +/// #[rewrk_core::async_trait] +/// impl SampleCollector for BasicCollector { +/// async fn process_sample(&mut self, sample: Sample) -> anyhow::Result<()> { +/// self.samples.push(sample); +/// Ok(()) +/// } +/// } +/// ``` +pub struct CollectorActor(pub(crate) JoinHandle); + +impl CollectorActor +where + C: SampleCollector, +{ + /// Spawn a new collector actor for processing incoming samples. + pub async fn spawn(mut collector: C) -> (Self, CollectorMailbox) { + let (tx, rx) = flume::unbounded(); + + let handle = tokio::spawn(async move { + info!("Starting collector actor"); + + while let Ok(sample) = rx.recv_async().await { + trace!(sample = ?sample, "Collector actor received processing sample."); + if let Err(e) = collector.process_sample(sample).await { + warn!(error = ?e, "Collector failed to process sample due to error."); + } + } + + info!("Collector actor has shutdown."); + collector + }); + + (Self(handle), tx) + } +} diff --git a/rewrk-core/src/recording/mod.rs b/rewrk-core/src/recording/mod.rs new file mode 100644 index 0000000..afdd1a5 --- /dev/null +++ b/rewrk-core/src/recording/mod.rs @@ -0,0 +1,6 @@ +mod collector; +mod sample; + +pub use collector::SampleCollector; +pub(crate) use collector::{CollectorActor, CollectorMailbox}; +pub use sample::{Sample, SampleFactory, SampleMetadata}; diff --git a/rewrk-core/src/recording/sample.rs b/rewrk-core/src/recording/sample.rs new file mode 100644 index 0000000..4a20a57 --- /dev/null +++ b/rewrk-core/src/recording/sample.rs @@ -0,0 +1,182 @@ +use std::fmt::{Debug, Formatter}; +use std::time::{Duration, Instant}; + +use flume::TrySendError; +use hdrhistogram::Histogram; + +use crate::recording::collector::CollectorMailbox; +use crate::validator::ValidationError; + +#[derive(Debug, Clone, Copy)] +pub struct SampleMetadata { + /// The unique ID of the worker thread. + pub worker_id: usize, +} + +#[derive(Debug, thiserror::Error)] +#[error("The service should shutdown.")] +/// The service worker has shutdown and should no longer process requests. +pub struct Shutdown; + +#[derive(Clone)] +/// A sample factory produces and submits samples. +pub struct SampleFactory { + /// The duration which should elapse before a sample + /// is submitted to be processed. + window_timeout: Duration, + + /// Metadata associated with the specific sample factory thread. + metadata: SampleMetadata, + submitter: CollectorMailbox, +} + +impl SampleFactory { + /// Create a new sample factory. + pub fn new( + window_timeout: Duration, + metadata: SampleMetadata, + submitter: CollectorMailbox, + ) -> Self { + Self { + window_timeout, + metadata, + submitter, + } + } + + #[inline] + /// Check if the handler should submit the current sample. + pub fn should_submit(&self, instant: Instant) -> bool { + self.window_timeout <= instant.elapsed() + } + + #[inline] + /// Create a new sample to record metrics. + pub fn new_sample(&self, tag: usize) -> Sample { + Sample { + tag, + latency_hist: Histogram::new(2).unwrap(), + write_transfer_hist: Histogram::new(2).unwrap(), + read_transfer_hist: Histogram::new(2).unwrap(), + errors: Vec::with_capacity(4), + metadata: self.metadata, + } + } + + #[inline] + /// Attempts to submit a sample to the processor. + pub fn submit_sample(&self, sample: Sample) -> Result<(), Shutdown> { + debug!(sample = ?sample, "Submitting sample to processor"); + // This should never block as it's an unbounded channel. + let result = self.submitter.try_send(sample); + + match result { + Ok(()) => Ok(()), + Err(TrySendError::Full(_)) => { + panic!("Sample submitter should never be full.") + }, + Err(TrySendError::Disconnected(_)) => Err(Shutdown), + } + } +} + +#[derive(Clone)] +/// A collection of metrics taken from the benchmark for a given time window. +/// +/// The sample contains the standard metrics (latency, IO, etc...) along with +/// any errors, the worker ID and sample tag which can be used to group results. +/// +/// Internally this uses HDR Histograms which can generate the min, max, stdev and +/// varying percentile statistics of the benchmark. +pub struct Sample { + tag: usize, + latency_hist: Histogram, + write_transfer_hist: Histogram, + read_transfer_hist: Histogram, + + errors: Vec, + metadata: SampleMetadata, +} + +impl Debug for Sample { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Sample") + .field("num_records", &self.latency().len()) + .field("metadata", &self.metadata) + .finish() + } +} + +impl Sample { + /// The sample metadata. + pub fn metadata(&self) -> SampleMetadata { + self.metadata + } + + /// The sample latency histogram + pub fn latency(&self) -> &Histogram { + &self.latency_hist + } + + /// The sample write transfer rate histogram + pub fn write_transfer(&self) -> &Histogram { + &self.write_transfer_hist + } + + /// The sample read transfer rate histogram + pub fn read_transfer(&self) -> &Histogram { + &self.read_transfer_hist + } + + #[inline] + /// The current sample batch tag. + pub fn tag(&self) -> usize { + self.tag + } + + #[inline] + /// Record a request validation error. + pub(crate) fn record_error(&mut self, e: ValidationError) { + self.errors.push(e); + } + + #[inline] + /// Record a latency duration. + /// + /// This value is converted to micro seconds. + pub(crate) fn record_latency(&mut self, dur: Duration) { + let micros = dur.as_micros() as u64; + self.latency_hist.record(micros).expect("Record value"); + } + + #[inline] + /// Record a write transfer rate. + pub(crate) fn record_write_transfer( + &mut self, + start_count: u64, + end_count: u64, + dur: Duration, + ) { + self.write_transfer_hist + .record(calculate_rate(start_count, end_count, dur)) + .expect("Record value"); + } + + #[inline] + /// Record a read transfer rate. + pub(crate) fn record_read_transfer( + &mut self, + start_count: u64, + end_count: u64, + dur: Duration, + ) { + self.read_transfer_hist + .record(calculate_rate(start_count, end_count, dur)) + .expect("Record value"); + } +} + +#[inline] +fn calculate_rate(start: u64, stop: u64, dur: Duration) -> u64 { + ((stop - start) as f64 / dur.as_secs_f64()).round() as u64 +} diff --git a/rewrk-core/src/runtime/mod.rs b/rewrk-core/src/runtime/mod.rs new file mode 100644 index 0000000..0f009b5 --- /dev/null +++ b/rewrk-core/src/runtime/mod.rs @@ -0,0 +1,237 @@ +mod worker; + +use std::future::Future; +use std::io::ErrorKind; +use std::net::ToSocketAddrs; +use std::sync::Arc; +use std::time::Duration; +use std::{cmp, io}; + +use http::{HeaderValue, Uri}; +use tokio_native_tls::TlsConnector; + +pub(crate) use self::worker::{spawn_workers, ShutdownHandle, WorkerConfig}; +use crate::connection::ReWrkConnector; +use crate::producer::Producer; +use crate::recording::CollectorActor; +use crate::{ + DefaultValidator, + HttpProtocol, + ResponseValidator, + SampleCollector, + Scheme, +}; + +/// The default percentage workers must be waiting on +/// producers for in order to raise a warning. +/// +/// The default of `5%` here is fairly arbitrary but it's a safe +/// default without being too annoying. +pub const DEFAULT_WAIT_WARNING_THRESHOLD: f32 = 5.0; +/// The default period of time that should elapse before +/// a [Sample](crate::Sample) is sent to a collector. +pub const DEFAULT_WINDOW_DURATION: Duration = Duration::from_secs(10); + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("The provided base URI is missing the required scheme (http, https)")] + /// The base URI is missing the HTTP scheme. + MissingScheme, + #[error("The provided base URI with an invalid scheme expected 'http' or 'https' got {0:?}")] + /// The base URI has a scheme which is not supported. + InvalidScheme(String), + #[error("The provided base URI is missing the required host")] + /// The base URI is missing the server host. + MissingHost, + #[error("An error occurred while building the TLS config: {0}")] + /// An error occurred while building the TLS config. + TlsError(native_tls::Error), + #[error("Failed to resolve the host socket address: {0}")] + /// The system failed to resolve the socket address. + AddressLookup(io::Error), +} + +/// The core benchmarker runtime. +/// +/// Once a benchmarker is created you can run the benchmark +/// several times using the `run` method which returns a future +/// that will complete once the benchmark is over. +/// +/// By default this system will use `n - 1` worker threads where `n` +/// is the number of logical CPU cores available, this can be +/// overriden using the [ReWrkBenchmark::set_num_workers] method. +pub struct ReWrkBenchmark +where + P: Producer + Clone, + C: SampleCollector, +{ + shutdown: ShutdownHandle, + collector_handle: CollectorActor, + num_workers: usize, + concurrency: usize, + worker_config: WorkerConfig

, +} + +impl ReWrkBenchmark +where + P: Producer + Clone, + C: SampleCollector, +{ + /// Creates a new [ReWrkBenchmark]. + /// + /// This sets up the connector and collector actor. + /// + /// Once created benchmarks can be started by calling the `run` method. + pub async fn create( + base_uri: Uri, + concurrency: usize, + protocol: HttpProtocol, + producer: P, + collector: C, + ) -> Result { + let connector = create_connector(base_uri, protocol)?; + let (collector_handle, collector) = CollectorActor::spawn(collector).await; + let shutdown = ShutdownHandle::default(); + let worker_config = WorkerConfig { + connector, + validator: Arc::new(DefaultValidator), + collector, + producer, + sample_window: DEFAULT_WINDOW_DURATION, + producer_wait_warning_threshold: DEFAULT_WAIT_WARNING_THRESHOLD, + }; + + let num_workers = cmp::max(num_cpus::get() - 1, 1); + + Ok(Self { + shutdown, + collector_handle, + num_workers, + concurrency, + worker_config, + }) + } + + /// Run a benchmark. + /// + /// This returns a future which will complete once all + /// workers for the benchmark have completed. + pub fn run(&self) -> impl Future { + info!( + num_workers = self.num_workers, + concurrency = self.concurrency, + "Starting benchmark." + ); + + let waiter = spawn_workers( + self.shutdown.clone(), + self.num_workers, + self.concurrency, + self.worker_config.clone(), + ); + + async move { + let _ = waiter.recv_async().await; + } + } + + /// Shuts the benchmarker down and returns the + /// collector once complete. + pub async fn consume_collector(self) -> C { + self.shutdown(); + drop(self.worker_config); + + let handle = self.collector_handle; + handle.0.await.expect("Join task") + } + + /// Sets the shutdown flag for the running benchmark. + pub fn shutdown(&self) { + self.shutdown.set_abort(); + } + + /// Sets the maximum number of times the connector will attempt + /// to connect to the server before error. + pub fn set_connection_retry_max(&mut self, max: usize) { + self.worker_config.connector.set_retry_max(max) + } + + /// Sets the benchmark validator. + pub fn set_validator(&mut self, validator: impl ResponseValidator) { + self.worker_config.validator = Arc::new(validator); + } + + /// Set the number of workers to spawn. + pub fn set_num_workers(&mut self, n: usize) { + self.num_workers = n; + } + + /// Set the duration which should elapse before a sample + /// is submitted to be processed in the collector. + pub fn set_sample_window(&mut self, dur: Duration) { + self.worker_config.sample_window = dur; + } + + /// Set the percentage threshold that the system must be + /// waiting on the producer in order for a warning to be raised. + /// + /// This is useful in situations where you know the producer will + /// take more time than normal and want to silence the warning. + pub fn set_producer_wait_warning_threshold(&mut self, pct: f32) { + self.worker_config.producer_wait_warning_threshold = pct; + } +} + +/// Creates a new [ReWrkConnector] using a provided protocol and URI. +fn create_connector(uri: Uri, protocol: HttpProtocol) -> Result { + let scheme = uri.scheme_str().ok_or(Error::MissingScheme)?; + let scheme = match scheme { + "http" => Scheme::Http, + "https" => { + let mut builder = native_tls::TlsConnector::builder(); + + builder + .danger_accept_invalid_certs(true) + .danger_accept_invalid_hostnames(true); + + match protocol { + HttpProtocol::HTTP1 => builder.request_alpns(&["http/1.1"]), + HttpProtocol::HTTP2 => builder.request_alpns(&["h2"]), + }; + + let cfg = builder.build().map_err(Error::TlsError)?; + Scheme::Https(TlsConnector::from(cfg)) + }, + _ => return Err(Error::InvalidScheme(scheme.to_string())), + }; + + let authority = uri.authority().ok_or(Error::MissingHost)?; + let host = authority.host(); + let port = authority + .port_u16() + .unwrap_or_else(|| scheme.default_port()); + + // Prefer ipv4. + let addr_iter = (host, port) + .to_socket_addrs() + .map_err(Error::AddressLookup)?; + let mut last_addr = None; + for addr in addr_iter { + last_addr = Some(addr); + if addr.is_ipv4() { + break; + } + } + let addr = last_addr.ok_or_else(|| { + Error::AddressLookup(io::Error::new( + ErrorKind::Other, + "Failed to lookup hostname", + )) + })?; + let host_header = HeaderValue::from_str(host).map_err(|_| Error::MissingHost)?; + let host = host.to_string(); + + let connector = ReWrkConnector::new(uri, host_header, addr, protocol, scheme, host); + + Ok(connector) +} diff --git a/rewrk-core/src/runtime/worker.rs b/rewrk-core/src/runtime/worker.rs new file mode 100644 index 0000000..5410098 --- /dev/null +++ b/rewrk-core/src/runtime/worker.rs @@ -0,0 +1,443 @@ +use std::borrow::Cow; +use std::mem; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use futures_util::future::join_all; +use http::Request; +use hyper::Body; +use tokio::sync::oneshot; +use tokio::task::JoinHandle; + +use crate::connection::{ReWrkConnection, ReWrkConnector}; +use crate::producer::{Batch, Producer, ProducerActor, ProducerBatches}; +use crate::recording::{CollectorMailbox, SampleFactory, SampleMetadata}; +use crate::utils::RuntimeTimings; +use crate::validator::ValidationError; +use crate::{ResponseValidator, Sample}; + +const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); +type ConnectionTask = JoinHandle; +type WorkerGuard = flume::Receiver<()>; + +#[derive(Clone)] +pub(crate) struct WorkerConfig

+where + P: Producer + Clone, +{ + /// The benchmarking connector. + pub connector: ReWrkConnector, + /// The selected validator for the benchmark. + pub validator: Arc, + /// The sample results collector. + pub collector: CollectorMailbox, + /// The request batch producer. + pub producer: P, + /// The duration which should elapse before a sample + /// is submitted to be processed. + pub sample_window: Duration, + /// The percentage threshold that the system must be + /// waiting on the producer in order for a warning to be raised. + /// + /// This is useful in situations where you know the producer will + /// take more time than normal and want to silence the warning. + pub producer_wait_warning_threshold: f32, +} + +/// Spawns N worker runtimes for executing search requests. +pub(crate) fn spawn_workers

( + shutdown: ShutdownHandle, + num_workers: usize, + concurrency: usize, + config: WorkerConfig

, +) -> WorkerGuard +where + P: Producer + Clone, +{ + // We use a channel here as a guard in order to wait for all workers to shutdown. + let (guard, waiter) = flume::bounded(1); + + let per_worker_concurrency = concurrency / num_workers; + let mut remaining_concurrency = concurrency - (per_worker_concurrency * num_workers); + + for worker_id in 0..num_workers { + let concurrency_modifier = if remaining_concurrency != 0 { + remaining_concurrency -= 1; + 1 + } else { + 0 + }; + let concurrency = per_worker_concurrency + concurrency_modifier; + + spawn_worker( + worker_id, + concurrency, + guard.clone(), + shutdown.clone(), + config.clone(), + ); + } + + waiter +} + +/// Spawns a new runtime worker thread. +fn spawn_worker

( + worker_id: usize, + concurrency: usize, + guard: flume::Sender<()>, + handle: ShutdownHandle, + config: WorkerConfig

, +) where + P: Producer + Clone, +{ + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Create runtime"); + + std::thread::Builder::new() + .name(format!("rewrk-worker-{worker_id}")) + .spawn(move || { + debug!(worker_id = worker_id, "Spawning worker"); + rt.block_on(run_worker(worker_id, concurrency, handle, config)); + + // Drop the guard explicitly to make sure it's not dropped + // until after the runtime has completed. + drop(guard); + + debug!(worker_id = worker_id, "Worker successfully shutdown"); + }) + .expect("Spawn thread"); +} + +/// Runs a worker task. +/// +/// This acts as the main runtime entrypoint. +async fn run_worker

( + worker_id: usize, + concurrency: usize, + shutdown: ShutdownHandle, + config: WorkerConfig

, +) where + P: Producer + Clone, +{ + let (ready_tx, ready_rx) = oneshot::channel(); + let producer = + ProducerActor::spawn(concurrency * 4, worker_id, config.producer, ready_rx) + .await; + let metadata = SampleMetadata { worker_id }; + let sample_factory = + SampleFactory::new(config.sample_window, metadata, config.collector); + + let mut pending_futures = Vec::::with_capacity(concurrency); + for _ in 0..concurrency { + let task_opt = create_worker_connection( + worker_id, + &config.connector, + shutdown.clone(), + sample_factory.clone(), + config.validator.clone(), + producer.clone(), + ) + .await; + + match task_opt { + None => { + info!(worker_id = ?worker_id, "Cleaning up futures and shutting down..."); + for pending in pending_futures { + pending.abort(); + } + return; + }, + Some(task) => { + pending_futures.push(task); + }, + } + } + + // Begin benchmarking. + let _ = ready_tx.send(()); + + // Wait for all tasks to complete. + let timings = join_all(pending_futures) + .await + .into_iter() + .collect::>() + .expect("Join tasks"); + + info!(worker_id = worker_id, "Benchmark completed for worker."); + + let total_duration = timings.execute_wait_runtime + timings.producer_wait_runtime; + let producer_wait_pct = (timings.producer_wait_runtime.as_secs_f32() + / total_duration.as_secs_f32()) + * 100.0; + + if producer_wait_pct >= config.producer_wait_warning_threshold { + warn!( + worker_id = worker_id, + producer_wait_pct = producer_wait_pct, + request_execute_wait_duration = ?timings.execute_wait_runtime, + producer_wait_duration = ?timings.producer_wait_runtime, + total_runtime_duration = ?total_duration, + "The system spent {producer_wait_pct:.2}% of it's runtime waiting for the producer.\ + Results may not be accurate." + ); + } +} + +async fn create_worker_connection( + worker_id: usize, + connector: &ReWrkConnector, + shutdown: ShutdownHandle, + sample_factory: SampleFactory, + validator: Arc, + producer: ProducerBatches, +) -> Option { + let connect_result = connector.connect_timeout(CONNECT_TIMEOUT).await; + let conn = match connect_result { + Err(e) => { + // We check this to prevent spam of the logs. + if !shutdown.should_abort() { + error!(worker_id = worker_id, error = ?e, "Failed to connect to server due to error, aborting."); + shutdown.set_abort(); + } + return None; + }, + Ok(None) => { + // We check this to prevent spam of the logs. + if !shutdown.should_abort() { + error!(worker_id = worker_id, "Worker failed to connect to server within {CONNECT_TIMEOUT:?}, aborting."); + shutdown.set_abort(); + } + return None; + }, + Ok(Some(conn)) => conn, + }; + + let mut connection = WorkerConnection::new( + conn, + sample_factory, + validator, + producer, + shutdown.clone(), + ); + + let fut = async move { + while !shutdown.should_abort() { + let can_continue = connection.execute_next_batch().await; + + if !can_continue { + break; + } + } + + // Submit the remaining sample. + connection.submit_sample(0); + + connection.timings + }; + + Some(tokio::spawn(fut)) +} + +#[derive(Default, Clone)] +pub struct ShutdownHandle { + /// A signal flag telling all workers to shutdown. + should_stop: Arc, +} + +impl ShutdownHandle { + /// Checks if the worker should abort processing. + pub fn should_abort(&self) -> bool { + self.should_stop.load(Ordering::Relaxed) + } + + /// Sets the abort flag across workers. + pub fn set_abort(&self) { + self.should_stop.store(true, Ordering::Relaxed); + } +} + +pub struct WorkerConnection { + /// The ReWrk benchmarking connection. + conn: ReWrkConnection, + /// The sample factory for producing metric samples. + sample_factory: SampleFactory, + /// The current sample being populated with metrics. + sample: Sample, + /// The selected validator for the benchmark. + validator: Arc, + /// The request batch producer. + producer: ProducerBatches, + /// The point in time when the last sample was submitted to + /// the collectors. + last_sent_sample: Instant, + /// A signal flag telling all workers to shutdown. + shutdown: ShutdownHandle, + /// Internal timings which are useful for debugging. + timings: RuntimeTimings, + /// A check for if the first batch has been received already. + /// + /// This is so that timings can be adjusted while waiting for + /// benchmarking to start, which would otherwise skew results. + is_first_batch: bool, +} + +impl WorkerConnection { + /// Create a new worker instance + fn new( + conn: ReWrkConnection, + sample_factory: SampleFactory, + validator: Arc, + producer: ProducerBatches, + shutdown: ShutdownHandle, + ) -> Self { + let sample = sample_factory.new_sample(0); + let last_sent_sample = Instant::now(); + + Self { + conn, + sample_factory, + sample, + validator, + producer, + last_sent_sample, + shutdown, + timings: RuntimeTimings::default(), + is_first_batch: true, + } + } + + /// Sets the abort flag across workers. + fn set_abort(&self) { + self.shutdown.set_abort() + } + + /// Submit the current sample to the collectors and create a new + /// sample with a given tag. + fn submit_sample(&mut self, next_sample_tag: usize) -> bool { + let new_sample = self.sample_factory.new_sample(next_sample_tag); + let old_sample = mem::replace(&mut self.sample, new_sample); + if self.sample_factory.submit_sample(old_sample).is_err() { + return false; + } + self.last_sent_sample = Instant::now(); + true + } + + /// Gets the next batch from the producer and submits it to be executed. + /// + /// The method returns if more batches are possibly available. + async fn execute_next_batch(&mut self) -> bool { + let producer_start = Instant::now(); + let batch = match self.producer.recv_async().await { + Ok(batch) => batch, + // We've completed all batches. + Err(_) => return false, + }; + let producer_elapsed = producer_start.elapsed(); + + if self.is_first_batch { + self.is_first_batch = false; + } else { + self.timings.producer_wait_runtime += producer_elapsed; + } + + let execute_start = Instant::now(); + self.execute_batch(batch).await; + self.timings.execute_wait_runtime += execute_start.elapsed(); + + true + } + + /// Executes a batch of requests to measure the metrics. + async fn execute_batch(&mut self, batch: Batch) { + if self.sample.tag() != batch.tag { + let success = self.submit_sample(batch.tag); + + if !success { + self.set_abort(); + return; + } + } + + for request in batch.requests { + let result = self.send(request).await; + + match result { + Ok(should_continue) if !should_continue => { + self.set_abort(); + return; + }, + Err(e) => { + error!(error = ?e, "Worker encountered an error while benchmarking, aborting..."); + self.set_abort(); + return; + }, + _ => {}, + } + } + } + + /// Send a HTTP request and record the relevant metrics + async fn send(&mut self, request: Request) -> Result { + let read_transfer_start = self.conn.usage().get_received_count(); + let write_transfer_start = self.conn.usage().get_written_count(); + let start = Instant::now(); + + let (head, body) = match self.conn.execute_req(request).await { + Ok(resp) => resp, + Err(e) => { + if e.is_body_write_aborted() || e.is_closed() || e.is_connect() { + self.sample.record_error(ValidationError::ConnectionAborted); + return Ok(false); + } else if e.is_incomplete_message() + || e.is_parse() + || e.is_parse_too_large() + || e.is_parse_status() + { + self.sample.record_error(ValidationError::InvalidBody( + Cow::Borrowed("invalid-http-body"), + )); + } else if e.is_timeout() { + self.sample.record_error(ValidationError::Timeout); + } else { + return Err(e); + } + + return Ok(true); + }, + }; + + let elapsed_time = start.elapsed(); + let read_transfer_end = self.conn.usage().get_received_count(); + let write_transfer_end = self.conn.usage().get_written_count(); + + if let Err(e) = self.validator.validate(head, body) { + self.sample.record_error(e); + } else { + self.sample.record_latency(elapsed_time); + self.sample.record_read_transfer( + read_transfer_start, + read_transfer_end, + elapsed_time, + ); + self.sample.record_write_transfer( + write_transfer_start, + write_transfer_end, + elapsed_time, + ); + } + + // Submit the sample if it's window interval has elapsed. + if self.sample_factory.should_submit(self.last_sent_sample) { + let batch_tag = self.sample.tag(); + let success = self.submit_sample(batch_tag); + return Ok(success); + } + + Ok(true) + } +} diff --git a/rewrk-core/src/utils/io_usage.rs b/rewrk-core/src/utils/io_usage.rs new file mode 100644 index 0000000..94a8dfc --- /dev/null +++ b/rewrk-core/src/utils/io_usage.rs @@ -0,0 +1,92 @@ +use std::io; +use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use pin_project_lite::pin_project; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; + +#[derive(Clone, Default)] +/// A utility for wrapping streams and measuring the number of +/// bytes being passed through the wrapped stream. +pub(crate) struct IoUsageTracker { + received: Arc, + written: Arc, +} + +impl IoUsageTracker { + /// Create a new usage tracker. + pub(crate) fn new() -> Self { + Self::default() + } + + /// Wrap an existing stream with the usage tracker. + pub(crate) fn wrap_stream(&self, stream: I) -> RecordStream { + RecordStream::new(stream, self.clone()) + } + + /// Get the current received usage count. + pub(crate) fn get_received_count(&self) -> u64 { + self.received.load(Ordering::SeqCst) + } + /// Get the current written usage count. + pub(crate) fn get_written_count(&self) -> u64 { + self.written.load(Ordering::SeqCst) + } +} + +pin_project! { + pub(crate) struct RecordStream { + #[pin] + inner: I, + usage: IoUsageTracker, + } +} + +impl RecordStream { + fn new(inner: I, usage: IoUsageTracker) -> Self { + Self { inner, usage } + } +} + +impl AsyncRead for RecordStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let this = self.project(); + let poll_result = this.inner.poll_read(cx, buf); + + this.usage + .received + .fetch_add(buf.filled().len() as u64, Ordering::SeqCst); + + poll_result + } +} + +impl AsyncWrite for RecordStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.usage + .written + .fetch_add(buf.len() as u64, Ordering::SeqCst); + self.project().inner.poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().inner.poll_shutdown(cx) + } +} diff --git a/rewrk-core/src/utils/mod.rs b/rewrk-core/src/utils/mod.rs new file mode 100644 index 0000000..be0edcd --- /dev/null +++ b/rewrk-core/src/utils/mod.rs @@ -0,0 +1,5 @@ +mod io_usage; +mod timings; + +pub(crate) use io_usage::IoUsageTracker; +pub(crate) use timings::RuntimeTimings; diff --git a/rewrk-core/src/utils/timings.rs b/rewrk-core/src/utils/timings.rs new file mode 100644 index 0000000..8740d2e --- /dev/null +++ b/rewrk-core/src/utils/timings.rs @@ -0,0 +1,39 @@ +use std::ops::{Add, AddAssign}; +use std::time::Duration; + +#[derive(Debug, Default)] +pub struct RuntimeTimings { + /// The total runtime duration waiting on the producer. + pub producer_wait_runtime: Duration, + /// The total runtime duration waiting on the requests to execute. + pub execute_wait_runtime: Duration, +} + +impl Add for RuntimeTimings { + type Output = Self; + + fn add(self, rhs: Self) -> Self::Output { + Self { + producer_wait_runtime: self.producer_wait_runtime + + rhs.producer_wait_runtime, + execute_wait_runtime: self.execute_wait_runtime + rhs.execute_wait_runtime, + } + } +} + +impl AddAssign for RuntimeTimings { + fn add_assign(&mut self, rhs: Self) { + self.producer_wait_runtime += rhs.producer_wait_runtime; + self.execute_wait_runtime += rhs.execute_wait_runtime; + } +} + +impl FromIterator for RuntimeTimings { + fn from_iter>(iter: T) -> Self { + let mut total = Self::default(); + for slf in iter { + total += slf; + } + total + } +} diff --git a/rewrk-core/src/validator.rs b/rewrk-core/src/validator.rs new file mode 100644 index 0000000..bb4c52f --- /dev/null +++ b/rewrk-core/src/validator.rs @@ -0,0 +1,78 @@ +use std::borrow::Cow; +use std::fmt::Debug; + +use http::response::Parts; +use hyper::body::Bytes; + +#[derive(Debug, thiserror::Error, Clone)] +/// The provided request is invalid and should not be counted. +pub enum ValidationError { + #[error("The returned status code is not valid: {0}")] + /// The returned status code is not valid + InvalidStatus(u16), + #[error("The body of the request does not match the expected structure: {0}")] + /// The body of the request does not match the expected structure + InvalidBody(Cow<'static, str>), + #[error("The request is missing a required header: {0}")] + /// The request is missing a required header + MissingHeader(Cow<'static, str>), + #[error("The request contained a header, but it was invalid: {0}")] + /// The request contained a header, but it was invalid + InvalidHeader(Cow<'static, str>), + #[error("The connection was aborted by the remote serve.")] + /// The connection was aborted by the remote server + ConnectionAborted, + #[error("The connection took to long to respond")] + /// The connection took to long to respond + Timeout, + #[error("A validation error rejected the request: {0}")] + /// A validation error rejected the request + Other(Cow<'static, str>), +} + +/// A validating utility for checking responses returned by the webserver are correct. +/// +/// It's important that these operations are light weight as they are called on the same +/// runtime as the request runtime which may block operations. +/// +/// # Example +/// +/// This example is just the [DefaultValidator] implementation, it can do as much or +/// as little as you'd like but it's important that it does not block heavily as it +/// will reduce the overall throughput of the worker thread. +/// +/// ``` +/// use http::response::Parts; +/// use hyper::body::Bytes; +/// use rewrk_core::{ResponseValidator, ValidationError}; +/// +/// #[derive(Debug)] +/// pub struct DefaultValidator; +/// +/// impl ResponseValidator for DefaultValidator { +/// fn validate(&self, head: Parts, _body: Bytes) -> Result<(), ValidationError> { +/// if head.status.is_success() { +/// Ok(()) +/// } else { +/// Err(ValidationError::InvalidStatus(head.status.as_u16())) +/// } +/// } +/// } +/// ``` +pub trait ResponseValidator: Send + Sync + 'static { + fn validate(&self, head: Parts, body: Bytes) -> Result<(), ValidationError>; +} + +#[derive(Debug)] +/// The default validator handler. +pub struct DefaultValidator; + +impl ResponseValidator for DefaultValidator { + fn validate(&self, head: Parts, _body: Bytes) -> Result<(), ValidationError> { + if head.status.is_success() { + Ok(()) + } else { + Err(ValidationError::InvalidStatus(head.status.as_u16())) + } + } +} diff --git a/rewrk-core/tests/basic_benchmark.rs b/rewrk-core/tests/basic_benchmark.rs new file mode 100644 index 0000000..85e0921 --- /dev/null +++ b/rewrk-core/tests/basic_benchmark.rs @@ -0,0 +1,101 @@ +use axum::routing::get; +use axum::Router; +use http::{Method, Request, Uri}; +use hyper::Body; +use rewrk_core::{ + Batch, + HttpProtocol, + Producer, + ReWrkBenchmark, + RequestBatch, + Sample, + SampleCollector, +}; + +static ADDR: &str = "127.0.0.1:19999"; + +#[tokio::test] +async fn test_basic_benchmark() { + let _ = tracing_subscriber::fmt::try_init(); + + tokio::spawn(run_server()); + + let uri = Uri::builder() + .scheme("http") + .authority(ADDR) + .path_and_query("/") + .build() + .expect("Create URI"); + + let mut benchmarker = ReWrkBenchmark::create( + uri, + 1, + HttpProtocol::HTTP1, + BasicProducer::default(), + BasicCollector::default(), + ) + .await + .expect("Create benchmark"); + benchmarker.set_num_workers(1); + benchmarker.run().await; + + let mut collector = benchmarker.consume_collector().await; + let sample = collector.samples.remove(0); + assert_eq!(sample.tag(), 0); + assert_eq!(sample.latency().len(), 1); + assert_eq!(sample.read_transfer().len(), 1); + assert_eq!(sample.write_transfer().len(), 1); +} + +async fn run_server() { + // build our application with a single route + let app = Router::new().route("/", get(|| async { "Hello, World!" })); + + axum::Server::bind(&ADDR.parse().unwrap()) + .serve(app.into_make_service()) + .await + .unwrap(); +} + +#[derive(Default, Clone)] +pub struct BasicProducer { + count: usize, +} + +#[rewrk_core::async_trait] +impl Producer for BasicProducer { + fn ready(&mut self) { + self.count = 1; + } + + async fn create_batch(&mut self) -> anyhow::Result { + if self.count > 0 { + self.count -= 1; + + let uri = Uri::builder().path_and_query("/").build()?; + let request = Request::builder() + .method(Method::GET) + .uri(uri) + .body(Body::empty())?; + Ok(RequestBatch::Batch(Batch { + tag: 0, + requests: vec![request], + })) + } else { + Ok(RequestBatch::End) + } + } +} + +#[derive(Default)] +pub struct BasicCollector { + samples: Vec, +} + +#[rewrk_core::async_trait] +impl SampleCollector for BasicCollector { + async fn process_sample(&mut self, sample: Sample) -> anyhow::Result<()> { + self.samples.push(sample); + Ok(()) + } +} diff --git a/rewrk-core/tests/http2_benchmark.rs b/rewrk-core/tests/http2_benchmark.rs new file mode 100644 index 0000000..37c8723 --- /dev/null +++ b/rewrk-core/tests/http2_benchmark.rs @@ -0,0 +1,101 @@ +use axum::routing::get; +use axum::Router; +use http::{Method, Request, Uri}; +use hyper::Body; +use rewrk_core::{ + Batch, + HttpProtocol, + Producer, + ReWrkBenchmark, + RequestBatch, + Sample, + SampleCollector, +}; + +static ADDR: &str = "127.0.0.1:20000"; + +#[tokio::test] +async fn test_basic_benchmark() { + let _ = tracing_subscriber::fmt::try_init(); + + tokio::spawn(run_server()); + + let uri = Uri::builder() + .scheme("http") + .authority(ADDR) + .path_and_query("/") + .build() + .expect("Create URI"); + + let mut benchmarker = ReWrkBenchmark::create( + uri, + 1, + HttpProtocol::HTTP2, + BasicProducer::default(), + BasicCollector::default(), + ) + .await + .expect("Create benchmark"); + benchmarker.set_num_workers(1); + benchmarker.run().await; + + let mut collector = benchmarker.consume_collector().await; + let sample = collector.samples.remove(0); + assert_eq!(sample.tag(), 0); + assert_eq!(sample.latency().len(), 1); + assert_eq!(sample.read_transfer().len(), 1); + assert_eq!(sample.write_transfer().len(), 1); +} + +async fn run_server() { + // build our application with a single route + let app = Router::new().route("/", get(|| async { "Hello, World!" })); + + axum::Server::bind(&ADDR.parse().unwrap()) + .serve(app.into_make_service()) + .await + .unwrap(); +} + +#[derive(Default, Clone)] +pub struct BasicProducer { + count: usize, +} + +#[rewrk_core::async_trait] +impl Producer for BasicProducer { + fn ready(&mut self) { + self.count = 1; + } + + async fn create_batch(&mut self) -> anyhow::Result { + if self.count > 0 { + self.count -= 1; + + let uri = Uri::builder().path_and_query("/").build()?; + let request = Request::builder() + .method(Method::GET) + .uri(uri) + .body(Body::empty())?; + Ok(RequestBatch::Batch(Batch { + tag: 0, + requests: vec![request], + })) + } else { + Ok(RequestBatch::End) + } + } +} + +#[derive(Default)] +pub struct BasicCollector { + samples: Vec, +} + +#[rewrk_core::async_trait] +impl SampleCollector for BasicCollector { + async fn process_sample(&mut self, sample: Sample) -> anyhow::Result<()> { + self.samples.push(sample); + Ok(()) + } +} diff --git a/rewrk-core/tests/timed_benchmark.rs b/rewrk-core/tests/timed_benchmark.rs new file mode 100644 index 0000000..b09b4e7 --- /dev/null +++ b/rewrk-core/tests/timed_benchmark.rs @@ -0,0 +1,118 @@ +use std::time::{Duration, Instant}; + +use axum::routing::get; +use axum::Router; +use http::{Method, Request, Uri}; +use hyper::Body; +use rewrk_core::{ + Batch, + HttpProtocol, + Producer, + ReWrkBenchmark, + RequestBatch, + Sample, + SampleCollector, +}; + +static ADDR: &str = "127.0.0.1:19999"; + +#[tokio::test] +async fn test_basic_benchmark() { + let _ = tracing_subscriber::fmt::try_init(); + + tokio::spawn(run_server()); + + let uri = Uri::builder() + .scheme("http") + .authority(ADDR) + .path_and_query("/") + .build() + .expect("Create URI"); + + let mut benchmarker = ReWrkBenchmark::create( + uri, + 1, + HttpProtocol::HTTP1, + TimedProducer::default(), + BasicCollector::default(), + ) + .await + .expect("Create benchmark"); + benchmarker.set_sample_window(Duration::from_secs(30)); + benchmarker.set_num_workers(1); + + let start = Instant::now(); + benchmarker.run().await; + assert!(start.elapsed() >= Duration::from_secs(10)); + + let mut collector = benchmarker.consume_collector().await; + let sample = collector.samples.remove(0); + assert_eq!(sample.tag(), 0); + dbg!( + sample.latency().len(), + sample.latency().min(), + sample.latency().max(), + sample.latency().stdev(), + ); +} + +async fn run_server() { + // build our application with a single route + let app = Router::new().route("/", get(|| async { "Hello, World!" })); + + axum::Server::bind(&ADDR.parse().unwrap()) + .serve(app.into_make_service()) + .await + .unwrap(); +} + +#[derive(Clone)] +pub struct TimedProducer { + start: Instant, +} + +impl Default for TimedProducer { + fn default() -> Self { + Self { + start: Instant::now(), + } + } +} + +#[rewrk_core::async_trait] +impl Producer for TimedProducer { + fn ready(&mut self) { + self.start = Instant::now(); + } + + async fn create_batch(&mut self) -> anyhow::Result { + if self.start.elapsed() >= Duration::from_secs(10) { + return Ok(RequestBatch::End); + } + + let uri = Uri::builder().path_and_query("/").build()?; + let requests = (0..500) + .map(|_| { + Request::builder() + .method(Method::GET) + .uri(uri.clone()) + .body(Body::empty()) + }) + .collect::, _>>()?; + + Ok(RequestBatch::Batch(Batch { tag: 0, requests })) + } +} + +#[derive(Default)] +pub struct BasicCollector { + samples: Vec, +} + +#[rewrk_core::async_trait] +impl SampleCollector for BasicCollector { + async fn process_sample(&mut self, sample: Sample) -> anyhow::Result<()> { + self.samples.push(sample); + Ok(()) + } +}