Skip to content

Commit

Permalink
Merge branch 'hyperium:master' into malformed-400-reset
Browse files Browse the repository at this point in the history
  • Loading branch information
franfastly authored May 29, 2024
2 parents 6ce9bbf + f161f7c commit 2e62b78
Show file tree
Hide file tree
Showing 16 changed files with 485 additions and 81 deletions.
14 changes: 14 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ jobs:
run: ./ci/h2spec.sh
if: matrix.rust == 'stable'

unexpected-cfgs:
runs-on: ubuntu-latest
needs: [style]
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@nightly
- uses: Swatinem/rust-cache@v2
- run: cargo check --all-features
env:
RUSTFLAGS: >-
-D unexpected_cfgs
--cfg h2_internal_check_unexpected_cfgs
--check-cfg=cfg(h2_internal_check_unexpected_cfgs,fuzzing)
#clippy_check:
# runs-on: ubuntu-latest
# steps:
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# 0.4.5 (May 17, 2024)

* Fix race condition that sometimes hung connections during shutdown.
* Fix pseudo header construction for CONNECT and OPTIONS requests.

# 0.4.4 (April 3, 2024)

* Limit number of CONTINUATION frames for misbehaving connections.
Expand Down
4 changes: 2 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
## Getting Help ##

If you have a question about the h2 library or have encountered problems using it, you may
[file an issue][issue] or ask a question on the [Tokio Gitter][gitter].
[file an issue][issue] or ask a question on the [Tokio Discord][discord].

## Submitting a Pull Request ##

Expand Down Expand Up @@ -81,4 +81,4 @@ Describe the testing you've done to validate your change. Performance-related
changes should include before- and after- benchmark results.

[issue]: https://github.com/hyperium/h2/issues/new
[gitter]: https://gitter.im/tokio-rs/tokio
[discord]: https://discord.gg/tokio
12 changes: 8 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "h2"
# When releasing to crates.io:
# - Update CHANGELOG.md.
# - Create git tag
version = "0.4.4"
version = "0.4.5"
license = "MIT"
authors = [
"Carl Lerche <me@carllerche.com>",
Expand Down Expand Up @@ -39,9 +39,9 @@ members = [
]

[dependencies]
atomic-waker = "1.0.0"
futures-core = { version = "0.3", default-features = false }
futures-sink = { version = "0.3", default-features = false }
futures-util = { version = "0.3", default-features = false }
tokio-util = { version = "0.7.1", features = ["codec", "io"] }
tokio = { version = "1", features = ["io-util"] }
bytes = "1"
Expand All @@ -66,8 +66,12 @@ serde_json = "1.0.0"
# Examples
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "net"] }
env_logger = { version = "0.10", default-features = false }
tokio-rustls = "0.24"
webpki-roots = "0.25"
tokio-rustls = "0.26"
webpki-roots = "0.26"

[package.metadata.docs.rs]
features = ["stream"]

[[bench]]
name = "main"
harness = false
148 changes: 148 additions & 0 deletions benches/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use bytes::Bytes;
use h2::{
client,
server::{self, SendResponse},
RecvStream,
};
use http::Request;

use std::{
error::Error,
time::{Duration, Instant},
};

use tokio::net::{TcpListener, TcpStream};

const NUM_REQUESTS_TO_SEND: usize = 100_000;

// The actual server.
async fn server(addr: &str) -> Result<(), Box<dyn Error + Send + Sync>> {
let listener = TcpListener::bind(addr).await?;

loop {
if let Ok((socket, _peer_addr)) = listener.accept().await {
tokio::spawn(async move {
if let Err(e) = serve(socket).await {
println!(" -> err={:?}", e);
}
});
}
}
}

async fn serve(socket: TcpStream) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut connection = server::handshake(socket).await?;
while let Some(result) = connection.accept().await {
let (request, respond) = result?;
tokio::spawn(async move {
if let Err(e) = handle_request(request, respond).await {
println!("error while handling request: {}", e);
}
});
}
Ok(())
}

async fn handle_request(
mut request: Request<RecvStream>,
mut respond: SendResponse<Bytes>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let body = request.body_mut();
while let Some(data) = body.data().await {
let data = data?;
let _ = body.flow_control().release_capacity(data.len());
}
let response = http::Response::new(());
let mut send = respond.send_response(response, false)?;
send.send_data(Bytes::from_static(b"pong"), true)?;

Ok(())
}

// The benchmark
async fn send_requests(addr: &str) -> Result<(), Box<dyn Error>> {
let tcp = loop {
let Ok(tcp) = TcpStream::connect(addr).await else {
continue;
};
break tcp;
};
let (client, h2) = client::handshake(tcp).await?;
// Spawn a task to run the conn...
tokio::spawn(async move {
if let Err(e) = h2.await {
println!("GOT ERR={:?}", e);
}
});

let mut handles = Vec::with_capacity(NUM_REQUESTS_TO_SEND);
for _i in 0..NUM_REQUESTS_TO_SEND {
let mut client = client.clone();
let task = tokio::spawn(async move {
let request = Request::builder().body(()).unwrap();

let instant = Instant::now();
let (response, _) = client.send_request(request, true).unwrap();
let response = response.await.unwrap();
let mut body = response.into_body();
while let Some(_chunk) = body.data().await {}
instant.elapsed()
});
handles.push(task);
}

let instant = Instant::now();
let mut result = Vec::with_capacity(NUM_REQUESTS_TO_SEND);
for handle in handles {
result.push(handle.await.unwrap());
}
let mut sum = Duration::new(0, 0);
for r in result.iter() {
sum = sum.checked_add(*r).unwrap();
}

println!("Overall: {}ms.", instant.elapsed().as_millis());
println!("Fastest: {}ms", result.iter().min().unwrap().as_millis());
println!("Slowest: {}ms", result.iter().max().unwrap().as_millis());
println!(
"Avg : {}ms",
sum.div_f64(NUM_REQUESTS_TO_SEND as f64).as_millis()
);
Ok(())
}

fn main() {
let _ = env_logger::try_init();
let addr = "127.0.0.1:5928";
println!("H2 running in current-thread runtime at {addr}:");
std::thread::spawn(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(server(addr)).unwrap();
});

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(send_requests(addr)).unwrap();

let addr = "127.0.0.1:5929";
println!("H2 running in multi-thread runtime at {addr}:");
std::thread::spawn(|| {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.unwrap();
rt.block_on(server(addr)).unwrap();
});

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(send_requests(addr)).unwrap();
}
13 changes: 2 additions & 11 deletions examples/akamai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use http::{Method, Request};
use tokio::net::TcpStream;
use tokio_rustls::TlsConnector;

use tokio_rustls::rustls::{OwnedTrustAnchor, RootCertStore, ServerName};
use tokio_rustls::rustls::{pki_types::ServerName, RootCertStore};

use std::error::Error;
use std::net::ToSocketAddrs;
Expand All @@ -15,17 +15,8 @@ pub async fn main() -> Result<(), Box<dyn Error>> {
let _ = env_logger::try_init();

let tls_client_config = std::sync::Arc::new({
let mut root_store = RootCertStore::empty();
root_store.add_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.iter().map(|ta| {
OwnedTrustAnchor::from_subject_spki_name_constraints(
ta.subject,
ta.spki,
ta.name_constraints,
)
}));

let root_store = RootCertStore::from_iter(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
let mut c = tokio_rustls::rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(root_store)
.with_no_client_auth();
c.alpn_protocols.push(ALPN_H2.as_bytes().to_owned());
Expand Down
9 changes: 7 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1428,7 +1428,12 @@ where

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.inner.maybe_close_connection_if_no_streams();
self.inner.poll(cx).map_err(Into::into)
let result = self.inner.poll(cx).map_err(Into::into);
if result.is_pending() && !self.inner.has_streams_or_other_references() {
tracing::trace!("last stream closed during poll, wake again");
cx.waker().wake_by_ref();
}
result
}
}

Expand Down Expand Up @@ -1487,7 +1492,7 @@ impl ResponseFuture {
impl PushPromises {
/// Get the next `PushPromise`.
pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>> {
futures_util::future::poll_fn(move |cx| self.poll_push_promise(cx)).await
crate::poll_fn(move |cx| self.poll_push_promise(cx)).await
}

#[doc(hidden)]
Expand Down
Loading

0 comments on commit 2e62b78

Please sign in to comment.