Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce hyper_util::server::conn::auto::upgrade::downcast #147

Merged
merged 1 commit into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ hyper = "1.4.0"
futures-util = { version = "0.3.16", default-features = false }
http = "1.0"
http-body = "1.0.0"
bytes = "1"
bytes = "1.7.1"
pin-project-lite = "0.2.4"
futures-channel = { version = "0.3", optional = true }
socket2 = { version = "0.5", optional = true, features = ["all"] }
Expand Down
4 changes: 2 additions & 2 deletions src/common/rewind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use std::{
/// Combine a buffer with an IO, rewinding reads to use the buffer.
#[derive(Debug)]
pub(crate) struct Rewind<T> {
pre: Option<Bytes>,
inner: T,
pub(crate) pre: Option<Bytes>,
pub(crate) inner: T,
}

impl<T> Rewind<T> {
Expand Down
6 changes: 6 additions & 0 deletions src/server/conn/auto.rs → src/server/conn/auto/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Http1 or Http2 connection.

pub mod upgrade;

use futures_util::ready;
use hyper::service::HttpService;
use std::future::Future;
Expand Down Expand Up @@ -163,6 +165,10 @@ impl<E> Builder<E> {
/// Bind a connection together with a [`Service`], with the ability to
/// handle HTTP upgrades. This requires that the IO object implements
/// `Send`.
///
/// Note that if you ever want to use [`hyper::upgrade::Upgraded::downcast`]
/// with this crate, you'll need to use [`hyper_util::server::conn::auto::upgrade::downcast`]
/// instead. See the documentation of the latter to understand why.
pub fn serve_connection_with_upgrades<I, S, B>(
&self,
io: I,
Expand Down
66 changes: 66 additions & 0 deletions src/server/conn/auto/upgrade.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//! Upgrade utilities.

use bytes::{Bytes, BytesMut};
use hyper::{
rt::{Read, Write},
upgrade::Upgraded,
};

use crate::common::rewind::Rewind;

/// Tries to downcast the internal trait object to the type passed.
///
/// On success, returns the downcasted parts. On error, returns the Upgraded back.
/// This is a kludge to work around the fact that the machinery provided by
/// [`hyper_util::server::con::auto`] wraps the inner `T` with a private type
/// that is not reachable from outside the crate.
///
/// This kludge will be removed when this machinery is added back to the main
/// `hyper` code.
pub fn downcast<T>(upgraded: Upgraded) -> Result<Parts<T>, Upgraded>
where
T: Read + Write + Unpin + 'static,
{
let hyper::upgrade::Parts {
io: rewind,
mut read_buf,
..
} = upgraded.downcast::<Rewind<T>>()?;

if let Some(pre) = rewind.pre {
read_buf = if read_buf.is_empty() {
pre
} else {
let mut buf = BytesMut::from(read_buf);

buf.extend_from_slice(&pre);

buf.freeze()
};
}

Ok(Parts {
io: rewind.inner,
read_buf,
})
}

/// The deconstructed parts of an [`Upgraded`] type.
///
/// Includes the original IO type, and a read buffer of bytes that the
/// HTTP state machine may have already read before completing an upgrade.
#[derive(Debug)]
#[non_exhaustive]
pub struct Parts<T> {
/// The original IO object used before the upgrade.
pub io: T,
/// A buffer of bytes that have been read but not processed as HTTP.
///
/// For instance, if the `Connection` is used for an HTTP upgrade request,
/// it is possible the server sent back the first bytes of the new protocol
/// along with the response upgrade.
///
/// You will want to check for any existing bytes if you plan to continue
/// communicating on the IO object.
pub read_buf: Bytes,
}