Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): redesign the Connect trait #1428

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/client/compat.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
//! Wrappers to build compatibility with the `http` crate.

use std::io;

use futures::{Future, Poll, Stream};
use http;
use tokio_service::Service;

use client::{Connect, Client, FutureResponse};
use client::{Connect2, Client, FutureResponse};
use error::Error;
use proto::Body;

Expand All @@ -19,7 +21,9 @@ pub(super) fn client<C, B>(client: Client<C, B>) -> CompatClient<C, B> {
}

impl<C, B> Service for CompatClient<C, B>
where C: Connect,
where C: Connect2<Error=io::Error>,
C::Transport: 'static,
C::Future: 'static,
B: Stream<Error=Error> + 'static,
B::Item: AsRef<[u8]>,
{
Expand Down
217 changes: 174 additions & 43 deletions src/client/connect.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! Contains the `Connect2` trait, and supporting types.
use std::error::Error as StdError;
use std::fmt;
use std::io;
use std::mem;
use std::sync::Arc;
//use std::net::SocketAddr;

use futures::{Future, Poll, Async};
use futures::future::{Executor, ExecuteError};
Expand All @@ -16,31 +16,79 @@ use tokio_service::Service;
use Uri;

use super::dns;
use self::http_connector::HttpConnectorBlockingTask;

/// Connect to a destination, returning an IO transport.
pub trait Connect2 {
/// The connected IO Stream.
type Transport: AsyncRead + AsyncWrite;
/// An error occured when trying to connect.
type Error;
/// A Future that will resolve to the connected Transport.
type Future: Future<Item=(Self::Transport, Connected), Error=Self::Error>;
/// Connect to a destination.
fn connect(&self, dst: Destination) -> Self::Future;
}

/// A connector creates an Io to a remote address..
///
/// This trait is not implemented directly, and only exists to make
/// the intent clearer. A connector should implement `Service` with
/// `Request=Uri` and `Response: Io` instead.
pub trait Connect: Service<Request=Uri, Error=io::Error> + 'static {
/// The connected Io Stream.
type Output: AsyncRead + AsyncWrite + 'static;
/// A Future that will resolve to the connected Stream.
type Future: Future<Item=Self::Output, Error=io::Error> + 'static;
/// Connect to a remote address.
fn connect(&self, Uri) -> <Self as Connect>::Future;
/// A set of properties to describe where and how to try to connect.
#[derive(Debug)]
pub struct Destination {
pub(super) alpn: Alpn,
pub(super) uri: Uri,
}

impl<T> Connect for T
where T: Service<Request=Uri, Error=io::Error> + 'static,
T::Response: AsyncRead + AsyncWrite,
T::Future: Future<Error=io::Error>,
{
type Output = T::Response;
type Future = T::Future;
/// Extra information about the connected transport.
#[derive(Debug)]
pub struct Connected {
alpn: Alpn,
is_proxy: bool,
}

fn connect(&self, url: Uri) -> <Self as Connect>::Future {
self.call(url)
#[derive(Debug)]
pub(super) enum Alpn {
Http1,
H2,
}

impl Destination {
/// Get a reference to the requested `Uri`.
pub fn uri(&self) -> &Uri {
&self.uri
}

/// Returns whether this connection must negotiate HTTP/2 via ALPN.
pub fn h2(&self) -> bool {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect users to be able to configure a hyper Client, stating whether to allow both HTTP/1 and HTTP/2, or to require only HTTP/2. To allow both, ALPN needs to send both protocols. A connector would query the Destination to determine what protocols it should try for.

I wasn't actually sure what to name the methods. They are meant to represent the desire of the user, but some desires may be wants or preferences, versus musts. Maybe some combination in this list is better?

  • must_h2
  • can_http1
  • can_http2
  • prefer_h2

match self.alpn {
Alpn::Http1 => false,
Alpn::H2 => true,
}
}
}

impl Connected {
/// Create new `Connected` type with empty metadata.
pub fn new() -> Connected {
Connected {
alpn: Alpn::Http1,
is_proxy: false,
}
}

/// Set that the connected transport is to an HTTP proxy.
///
/// This setting will affect if HTTP/1 requests written on the transport
/// will have the request-target in absolute-form or origin-form (such as
/// `GET http://hyper.rs/guide HTTP/1.1` or `GET /guide HTTP/1.1`).
pub fn proxy(mut self) -> Connected {
self.is_proxy = true;
self
}

/// Set that the connected transport negotiated HTTP/2 as it's
/// next protocol.
pub fn h2(mut self) -> Connected {
self.alpn = Alpn::H2;
self
}
}

Expand Down Expand Up @@ -96,6 +144,8 @@ impl fmt::Debug for HttpConnector {
}
}

// deprecated, will be gone in 0.12
#[doc(hidden)]
impl Service for HttpConnector {
type Request = Uri;
type Response = TcpStream;
Expand Down Expand Up @@ -258,23 +308,27 @@ impl ConnectingTcp {
}
}

/// Blocking task to be executed on a thread pool.
pub struct HttpConnectorBlockingTask {
work: oneshot::Execute<dns::Work>
}
// Make this Future unnameable outside of this crate.
mod http_connector {
use super::*;
// Blocking task to be executed on a thread pool.
pub struct HttpConnectorBlockingTask {
pub(super) work: oneshot::Execute<dns::Work>
}

impl fmt::Debug for HttpConnectorBlockingTask {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("HttpConnectorBlockingTask")
impl fmt::Debug for HttpConnectorBlockingTask {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("HttpConnectorBlockingTask")
}
}
}

impl Future for HttpConnectorBlockingTask {
type Item = ();
type Error = ();
impl Future for HttpConnectorBlockingTask {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<(), ()> {
self.work.poll()
fn poll(&mut self) -> Poll<(), ()> {
self.work.poll()
}
}
}

Expand All @@ -288,20 +342,97 @@ impl Executor<oneshot::Execute<dns::Work>> for HttpConnectExecutor {
}
}

/*
impl<S: SslClient> HttpsConnector<S> {
/// Create a new connector using the provided SSL implementation.
pub fn new(s: S) -> HttpsConnector<S> {
HttpsConnector {
http: HttpConnector::default(),
ssl: s,
#[doc(hidden)]
#[deprecated(since="0.11.16", note="Use the Connect2 trait, which will become Connect in 0.12")]
pub trait Connect: Service<Request=Uri, Error=io::Error> + 'static {
/// The connected Io Stream.
type Output: AsyncRead + AsyncWrite + 'static;
/// A Future that will resolve to the connected Stream.
type Future: Future<Item=Self::Output, Error=io::Error> + 'static;
/// Connect to a remote address.
fn connect(&self, Uri) -> <Self as Connect>::Future;
}

#[doc(hidden)]
#[allow(deprecated)]
impl<T> Connect for T
where T: Service<Request=Uri, Error=io::Error> + 'static,
T::Response: AsyncRead + AsyncWrite,
T::Future: Future<Error=io::Error>,
{
type Output = T::Response;
type Future = T::Future;

fn connect(&self, url: Uri) -> <Self as Connect>::Future {
self.call(url)
}
}

#[doc(hidden)]
#[allow(deprecated)]
impl<T> Connect2 for T
where
T: Connect,
{
type Transport = <T as Connect>::Output;
type Error = io::Error;
type Future = ConnectToConnect2Future<<T as Connect>::Future>;

fn connect(&self, dst: Destination) -> <Self as Connect2>::Future {
ConnectToConnect2Future {
inner: <Self as Connect>::connect(self, dst.uri),
}
}
}
*/

#[doc(hidden)]
#[deprecated(since="0.11.16")]
#[allow(missing_debug_implementations)]
pub struct ConnectToConnect2Future<F> {
inner: F,
}

#[allow(deprecated)]
impl<F> Future for ConnectToConnect2Future<F>
where
F: Future,
{
type Item = (F::Item, Connected);
type Error = F::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
.map(|async| async.map(|t| (t, Connected::new())))
}
}

// even though deprecated, we need to make sure the HttpConnector still
// implements Connect (and Service apparently...)

#[allow(deprecated)]
fn _assert_http_connector() {
fn assert_connect<T>()
where
T: Connect2<
Transport=TcpStream,
Error=io::Error,
Future=ConnectToConnect2Future<HttpConnecting>
>,
T: Connect<Output=TcpStream, Future=HttpConnecting>,
T: Service<
Request=Uri,
Response=TcpStream,
Future=HttpConnecting,
Error=io::Error
>,
{}

assert_connect::<HttpConnector>();
}

#[cfg(test)]
mod tests {
#![allow(deprecated)]
use std::io;
use tokio::reactor::Core;
use super::{Connect, HttpConnector};
Expand Down
27 changes: 20 additions & 7 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ use version::HttpVersion;

pub use proto::response::Response;
pub use proto::request::Request;
pub use self::connect::{HttpConnector, Connect};
pub use self::connect::{Connect2, HttpConnector};
#[allow(deprecated)]
pub use self::connect::Connect;

use self::background::{bg, Background};
use self::connect::Destination;

mod connect;
pub mod connect;
mod dns;
mod pool;
#[cfg(feature = "compat")]
Expand Down Expand Up @@ -99,7 +102,9 @@ impl<C, B> Client<C, B> {
}

impl<C, B> Client<C, B>
where C: Connect,
where C: Connect2<Error=io::Error>,
C::Transport: 'static,
C::Future: 'static,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
Expand Down Expand Up @@ -149,7 +154,9 @@ impl Future for FutureResponse {
}

impl<C, B> Service for Client<C, B>
where C: Connect,
where C: Connect2<Error=io::Error>,
C::Transport: 'static,
C::Future: 'static,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
Expand Down Expand Up @@ -195,8 +202,12 @@ where C: Connect,
let executor = self.executor.clone();
let pool = self.pool.clone();
let pool_key = Rc::new(domain.to_string());
self.connector.connect(url)
.and_then(move |io| {
let dst = Destination {
uri: url,
alpn: self::connect::Alpn::Http1,
};
self.connector.connect(dst)
.and_then(move |(io, _connected)| {
let (tx, rx) = mpsc::channel(0);
let tx = HyperClient {
tx: RefCell::new(tx),
Expand Down Expand Up @@ -409,7 +420,9 @@ impl<C, B> Config<C, B> {
}

impl<C, B> Config<C, B>
where C: Connect,
where C: Connect2<Error=io::Error>,
C::Transport: 'static,
C::Future: 'static,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
Expand Down