Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make client able to use non-Send executor #3184

Merged
merged 28 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d88ec30
create example
Ruben2424 Mar 9, 2023
44587d6
make anonymous futures structs
Ruben2424 May 21, 2023
55753a9
remove exec and fix warnings
Ruben2424 Jun 6, 2023
ad57f1c
remove bound
Ruben2424 Jun 6, 2023
4132aa6
fmt
Ruben2424 Jun 6, 2023
4138669
use right future
Ruben2424 Jun 6, 2023
ce4619e
fix features ci
Ruben2424 Jun 6, 2023
4ac835a
fix ffi
Ruben2424 Jun 8, 2023
27eb1e2
Merge branch 'master' into issue-3017
Ruben2424 Jun 8, 2023
ed95666
move client example to single_threaded example
Ruben2424 Jun 8, 2023
beb3ee4
Merge branch 'issue-3017' of https://github.com/Ruben2424/hyper into …
Ruben2424 Jun 8, 2023
c4a51b3
remove from cargo toml
Ruben2424 Jun 8, 2023
7cbfc72
use pin_project_lite
Ruben2424 Jun 9, 2023
d3821a9
deny warnings
Ruben2424 Jun 9, 2023
2394949
error bounds
Ruben2424 Jun 9, 2023
8e0f907
fix test
Ruben2424 Jun 9, 2023
6d44c29
sealed ExecutorClient
Ruben2424 Jun 9, 2023
c7d59fe
better error message
Ruben2424 Jun 9, 2023
0269396
make it work also for io types
Ruben2424 Jun 9, 2023
3a78a57
improve example
Ruben2424 Jun 12, 2023
be5c654
fmt
Ruben2424 Jun 12, 2023
b39d5d4
Merge branch 'master' into issue-3017
Ruben2424 Jun 16, 2023
15be46f
fix merge fail
Ruben2424 Jun 16, 2023
8391604
fix error bounds
Ruben2424 Jun 16, 2023
a276efc
Merge branch 'master' into issue-3017
Ruben2424 Jun 19, 2023
3dd0579
Merge branch 'master' into issue-3017
Ruben2424 Jun 29, 2023
98b0599
Merge branch 'issue-3017' of https://github.com/Ruben2424/hyper into …
Ruben2424 Jun 29, 2023
25ff95c
Merge branch 'master' into issue-3017
Ruben2424 Jun 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ h2 = { version = "0.3.9", optional = true }
itoa = "1"
tracing = { version = "0.1", default-features = false, features = ["std"] }
pin-project-lite = "0.2.4"
pin-project = "1.0.12"
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
tokio = { version = "1", features = ["sync"] }
want = "0.3"

Expand Down Expand Up @@ -112,6 +113,11 @@ name = "client"
path = "examples/client.rs"
required-features = ["full"]

[[example]]
name = "client_single_thread"
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
path = "examples/client_http2_single_threaded.rs"
required-features = ["full"]

[[example]]
name = "client_json"
path = "examples/client_json.rs"
Expand Down
132 changes: 132 additions & 0 deletions examples/client_http2_single_threaded.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#![deny(warnings)]
#![warn(rust_2018_idioms)]
use std::env;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::Bytes;
use http_body_util::BodyExt;
use hyper::body::{Body as HttpBody, Frame};
use hyper::Error;
use hyper::Request;
use tokio::io::{self, AsyncWriteExt as _};
use tokio::net::TcpStream;

struct Body {
// Our Body type is !Send and !Sync:
_marker: PhantomData<*const ()>,
data: Option<Bytes>,
}

impl From<String> for Body {
fn from(a: String) -> Self {
Body {
_marker: PhantomData,
data: Some(a.into()),
}
}
}

impl HttpBody for Body {
type Data = Bytes;
type Error = Error;

fn poll_frame(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
Poll::Ready(self.get_mut().data.take().map(|d| Ok(Frame::data(d))))
}
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
pretty_env_logger::init();

// Configure a runtime that runs everything on the current thread
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build runtime");

// Combine it with a `LocalSet, which means it can spawn !Send futures...
let local = tokio::task::LocalSet::new();
local.block_on(&rt, init())
}

async fn init() -> Result<(), Box<dyn std::error::Error>> {
// Some simple CLI args requirements...
let url = match env::args().nth(1) {
Some(url) => url,
None => {
println!("Usage: client <url>");
return Ok(());
}
};

// HTTPS requires picking a TLS implementation, so give a better
// warning if the user tries to request an 'https' URL.
let url = url.parse::<hyper::Uri>().unwrap();
if url.scheme_str() != Some("http") {
println!("This example only works with 'http' URLs.");
return Ok(());
}

fetch_url(url).await
}

async fn fetch_url(url: hyper::Uri) -> Result<(), Box<dyn std::error::Error>> {
let host = url.host().expect("uri has no host");
let port = url.port_u16().unwrap_or(80);
let addr = format!("{}:{}", host, port);
let stream = TcpStream::connect(addr).await?;

let (mut sender, conn) = hyper::client::conn::http2::handshake(LocalExec, stream).await?;
tokio::task::spawn_local(async move {
if let Err(err) = conn.await {
println!("Connection failed: {:?}", err);
}
});

let authority = url.authority().unwrap().clone();

let req = Request::builder()
.uri(url)
.header(hyper::header::HOST, authority.as_str())
.body(Body::from("test".to_string()))?;

let mut res = sender.send_request(req).await?;

println!("Response: {}", res.status());
println!("Headers: {:#?}\n", res.headers());

// Stream the body, writing each chunk to stdout as we get it
// (instead of buffering and printing at the end).
while let Some(next) = res.frame().await {
let frame = next?;
if let Some(chunk) = frame.data_ref() {
io::stdout().write_all(&chunk).await?;
}
}

println!("\n\nDone!");

Ok(())
}

// NOTE: This part is only needed for HTTP/2. HTTP/1 doesn't need an executor.
//
// Since the Server needs to spawn some background tasks, we needed
// to configure an Executor that can spawn !Send futures...
#[derive(Clone, Copy, Debug)]
struct LocalExec;

impl<F> hyper::rt::Executor<F> for LocalExec
where
F: std::future::Future + 'static, // not requiring `Send`
{
fn execute(&self, fut: F) {
// This will spawn into the currently running `LocalSet`.
tokio::task::spawn_local(fut);
}
}
69 changes: 41 additions & 28 deletions src/client/conn/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use tokio::io::{AsyncRead, AsyncWrite};

use super::super::dispatch;
use crate::body::{Body, Incoming as IncomingBody};
use crate::common::exec::ExecutorClient;
use crate::common::time::Time;
use crate::common::{
exec::{BoxSendFuture, Exec},
Expand All @@ -26,7 +27,9 @@ pub struct SendRequest<B> {

impl<B> Clone for SendRequest<B> {
fn clone(&self) -> SendRequest<B> {
SendRequest { dispatch: self.dispatch.clone() }
SendRequest {
dispatch: self.dispatch.clone(),
}
}
}

Expand All @@ -35,20 +38,25 @@ impl<B> Clone for SendRequest<B> {
/// In most cases, this should just be spawned into an executor, so that it
/// can process incoming and outgoing messages, notice hangups, and the like.
#[must_use = "futures do nothing unless polled"]
pub struct Connection<T, B>
pub struct Connection<T, B, E>
where
T: AsyncRead + AsyncWrite + Send + 'static,
T: AsyncRead + AsyncWrite + Send + 'static + Unpin,
B: Body + 'static,
E: ExecutorClient<B, T> + Unpin,
<B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static,
{
inner: (PhantomData<T>, proto::h2::ClientTask<B>),
inner: (PhantomData<T>, proto::h2::ClientTask<B, E, T>),
}

/// A builder to configure an HTTP connection.
///
/// After setting options, the builder is used to create a handshake future.
#[derive(Clone, Debug)]
pub struct Builder {
pub(super) exec: Exec,
pub struct Builder<Ex>
where
Ex: Executor<BoxSendFuture> + Send + Sync + 'static + Clone,
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
{
pub(super) exec: Ex,
pub(super) timer: Time,
h2_builder: proto::h2::client::Config,
}
Expand All @@ -60,13 +68,15 @@ pub struct Builder {
pub async fn handshake<E, T, B>(
exec: E,
io: T,
) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
) -> crate::Result<(SendRequest<B>, Connection<T, B, E>)>
where
E: Executor<BoxSendFuture> + Send + Sync + 'static,
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Body + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: ExecutorClient<B, T> + Unpin + Clone,
<B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line just above, B::Error: Into<..> should be enough. Is it not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it required to add something here at all? Because currently, this makes it stricter in that not only is it Into, but it must also be Error. That means Box<dyn Error> no longer qualifies (since Box<dyn Error>: Error is not true for confusing reasons).

{
Builder::new(exec).handshake(io).await
}
Expand Down Expand Up @@ -189,12 +199,14 @@ impl<B> fmt::Debug for SendRequest<B> {

// ===== impl Connection

impl<T, B> Connection<T, B>
impl<T, B, E> Connection<T, B, E>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Body + Unpin + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: ExecutorClient<B, T> + Unpin,
<B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static,
{
/// Returns whether the [extended CONNECT protocol][1] is enabled or not.
///
Expand All @@ -210,22 +222,27 @@ where
}
}

impl<T, B> fmt::Debug for Connection<T, B>
impl<T, B, E> fmt::Debug for Connection<T, B, E>
where
T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static+ Unpin,
B: Body + 'static,
E: ExecutorClient<B, T> + Unpin,
<B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static,
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connection").finish()
}
}

impl<T, B> Future for Connection<T, B>
impl<T, B, E> Future for Connection<T, B, E>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Body + Send + 'static,
B: Body + 'static + Unpin,
B::Data: Send,
E: Unpin,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: ExecutorClient<B, T> + 'static + Send + Sync + Unpin,
<B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static,
{
type Output = crate::Result<()>;

Expand All @@ -240,22 +257,22 @@ where

// ===== impl Builder

impl Builder {
impl<Ex> Builder<Ex>
where
Ex: Executor<BoxSendFuture> + Send + Sync + 'static + Clone,
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
{
/// Creates a new connection builder.
#[inline]
pub fn new<E>(exec: E) -> Builder
where
E: Executor<BoxSendFuture> + Send + Sync + 'static,
{
pub fn new(exec: Ex) -> Builder<Ex> {
Builder {
exec: Exec::new(exec),
exec,
timer: Time::Empty,
h2_builder: Default::default(),
}
}

/// Provide a timer to execute background HTTP2 tasks.
pub fn timer<M>(&mut self, timer: M) -> &mut Builder
pub fn timer<M>(&mut self, timer: M) -> &mut Builder<Ex>
where
M: Timer + Send + Sync + 'static,
{
Expand Down Expand Up @@ -284,10 +301,7 @@ impl Builder {
/// Passing `None` will do nothing.
///
/// If not set, hyper will use a default.
pub fn initial_connection_window_size(
&mut self,
sz: impl Into<Option<u32>>,
) -> &mut Self {
pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
if let Some(sz) = sz.into() {
self.h2_builder.adaptive_window = false;
self.h2_builder.initial_conn_window_size = sz;
Expand Down Expand Up @@ -329,10 +343,7 @@ impl Builder {
/// Pass `None` to disable HTTP2 keep-alive.
///
/// Default is currently disabled.
pub fn keep_alive_interval(
&mut self,
interval: impl Into<Option<Duration>>,
) -> &mut Self {
pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
self.h2_builder.keep_alive_interval = interval.into();
self
}
Expand Down Expand Up @@ -395,12 +406,14 @@ impl Builder {
pub fn handshake<T, B>(
&self,
io: T,
) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B, Ex>)>> + '_
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Body + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
Ex: ExecutorClient<B, T> + Unpin,
<B as http_body::Body>::Error: std::error::Error + Send + Sync + 'static,
{
let opts = self.clone();

Expand Down
Loading