Skip to content

Commit

Permalink
feat(client): adds HttpInfo to responses when HttpConnector is used
Browse files Browse the repository at this point in the history
- Adds `client::connect::Connected::extra()`, which allows connectors to
  specify arbitrary custom information about a connected transport.

If a connector provides this extra value, it will be set in the
`Response` extensions.

Closes #1402
  • Loading branch information
seanmonstar committed Jul 5, 2018
1 parent e06dc52 commit 18015ca
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 24 deletions.
1 change: 0 additions & 1 deletion examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ static PHRASE: &'static [u8] = b"Hello World!";

fn main() {
pretty_env_logger::init();

let addr = ([127, 0, 0, 1], 3000).into();

// new_service is run for each connection, creating a 'service'
Expand Down
138 changes: 123 additions & 15 deletions src/client/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
//! establishes connections over TCP.
//! - The [`Connect`](Connect) trait and related types to build custom connectors.
use std::error::Error as StdError;
use std::fmt;
use std::mem;

use bytes::{BufMut, BytesMut};
use futures::Future;
use http::{uri, Uri};
use http::{uri, Response, Uri};
use tokio_io::{AsyncRead, AsyncWrite};

#[cfg(feature = "runtime")] pub use self::http::HttpConnector;
#[cfg(feature = "runtime")] pub use self::http::{HttpConnector, HttpInfo};

/// Connect to a destination, returning an IO transport.
///
Expand Down Expand Up @@ -46,8 +47,12 @@ pub struct Destination {
pub struct Connected {
//alpn: Alpn,
pub(super) is_proxied: bool,
pub(super) extra: Option<Extra>,
}

pub(super) struct Extra(Box<FnClone>);


/*TODO: when HTTP1 Upgrades to H2 are added, this will be needed
#[derive(Debug)]
pub(super) enum Alpn {
Expand Down Expand Up @@ -234,8 +239,8 @@ impl Connected {
/// Create new `Connected` type with empty metadata.
pub fn new() -> Connected {
Connected {
//alpn: Alpn::Http1,
is_proxied: false,
extra: None,
}
}

Expand All @@ -251,6 +256,15 @@ impl Connected {
self
}

/// Set extra connection information to be set in the extensions of every `Response`.
pub fn extra<T: Clone + Send + Sync + 'static>(mut self, extra: T) -> Connected {
self.extra = Some(Extra(Box::new(move |res: &mut Response<::Body>| {
let e = extra.clone();
res.extensions_mut().insert(e);
})));
self
}

/*
/// Set that the connected transport negotiated HTTP/2 as it's
/// next protocol.
Expand All @@ -259,6 +273,54 @@ impl Connected {
self
}
*/

// Don't public expose that `Connected` is `Clone`, unsure if we want to
// keep that contract...
pub(super) fn clone(&self) -> Connected {
Connected {
is_proxied: self.is_proxied,
extra: self.extra.clone(),
}
}
}

// ===== impl Extra =====

impl Extra {
pub(super) fn set(&self, res: &mut Response<::Body>) {
self.0.call(res);
}
}

impl Clone for Extra {
fn clone(&self) -> Extra {
Extra(self.0.clone_fn())
}
}

impl fmt::Debug for Extra {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Extra")
.finish()
}
}

trait FnClone: Send + Sync {
fn clone_fn(&self) -> Box<FnClone>;
fn call(&self, res: &mut Response<::Body>);
}

impl<F> FnClone for F
where
F: Fn(&mut Response<::Body>) + Clone + Send + Sync + 'static
{
fn clone_fn(&self) -> Box<FnClone> {
Box::new(self.clone())
}

fn call(&self, res: &mut Response<::Body>) {
(*self)(res)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -436,14 +498,50 @@ mod http {
/// A connector for the `http` scheme.
///
/// Performs DNS resolution in a thread pool, and then connects over TCP.
///
/// # Note
///
/// Sets the [`HttpInfo`](HttpInfo) value on responses, which includes
/// transport information such as the remote socket address used.
#[derive(Clone)]
pub struct HttpConnector {
executor: HttpConnectExecutor,
enforce_http: bool,
handle: Option<Handle>,
keep_alive_timeout: Option<Duration>,
nodelay: bool,
local_address: Option<IpAddr>,
nodelay: bool,
}

/// Extra information about the transport when an HttpConnector is used.
///
/// # Example
///
/// ```rust
/// use hyper::client::{Client, connect::HttpInfo};
/// use hyper::rt::Future;
///
/// let client = Client::new();
///
/// let fut = client.get("http://example.local".parse().unwrap())
/// .inspect(|resp| {
/// let info = resp
/// .extensions()
/// .get::<HttpInfo>()
/// .expect("HttpConnector sets HttpInfo");
///
/// println!("remote addr = {}", info.remote_addr());
/// });
/// ```
///
/// # Note
///
/// If a different connector is used besides [`HttpConnector`](HttpConnector),
/// this value will not exist in the extensions. Consult that specific
/// connector to see what "extra" information it might provide to responses.
#[derive(Clone, Debug)]
pub struct HttpInfo {
remote_addr: SocketAddr,
}

impl HttpConnector {
Expand Down Expand Up @@ -600,6 +698,7 @@ mod http {
}
}
}

/// A Future representing work to connect to a URL.
#[must_use = "futures do nothing unless polled"]
pub struct HttpConnecting {
Expand Down Expand Up @@ -640,16 +739,12 @@ mod http {
}
},
State::Resolving(ref mut future, local_addr) => {
match try!(future.poll()) {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(addrs) => {
state = State::Connecting(ConnectingTcp {
addrs: addrs,
local_addr: local_addr,
current: None,
})
}
};
let addrs = try_ready!(future.poll());
state = State::Connecting(ConnectingTcp {
addrs: addrs,
local_addr: local_addr,
current: None,
});
},
State::Connecting(ref mut c) => {
let sock = try_ready!(c.poll(&self.handle));
Expand All @@ -660,7 +755,13 @@ mod http {

sock.set_nodelay(self.nodelay)?;

return Ok(Async::Ready((sock, Connected::new())));
let extra = HttpInfo {
remote_addr: sock.peer_addr()?,
};
let connected = Connected::new()
.extra(extra);

return Ok(Async::Ready((sock, connected)));
},
State::Error(ref mut e) => return Err(e.take().expect("polled more than once")),
}
Expand Down Expand Up @@ -710,6 +811,13 @@ mod http {
}
}

impl HttpInfo {
/// Get the remote address of the transport used.
pub fn remote_addr(&self) -> SocketAddr {
self.remote_addr
}
}

// Make this Future unnameable outside of this crate.
mod http_connector {
use super::*;
Expand Down
22 changes: 15 additions & 7 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ use http::uri::Scheme;
use body::{Body, Payload};
use common::Exec;
use common::lazy as hyper_lazy;
use self::connect::{Connect, Destination};
use self::connect::{Connect, Connected, Destination};
use self::pool::{Pool, Poolable, Reservation};

#[cfg(feature = "runtime")] pub use self::connect::HttpConnector;
Expand Down Expand Up @@ -304,7 +304,7 @@ where C: Connect + Sync + 'static,
})
.map(move |tx| {
pool.pooled(connecting, PoolClient {
is_proxied: connected.is_proxied,
conn_info: connected,
tx: match ver {
Ver::Http1 => PoolTx::Http1(tx),
Ver::Http2 => PoolTx::Http2(tx.into_http2()),
Expand Down Expand Up @@ -387,7 +387,7 @@ where C: Connect + Sync + 'static,
// CONNECT always sends origin-form, so check it first...
if req.method() == &Method::CONNECT {
authority_form(req.uri_mut());
} else if pooled.is_proxied {
} else if pooled.conn_info.is_proxied {
absolute_form(req.uri_mut());
} else {
origin_form(req.uri_mut());
Expand All @@ -401,6 +401,14 @@ where C: Connect + Sync + 'static,

let fut = pooled.send_request_retryable(req);

let extra_info = pooled.conn_info.extra.clone();
let fut = fut.map(move |mut res| {
if let Some(extra) = extra_info {
extra.set(&mut res);
}
res
});

// As of futures@0.1.21, there is a race condition in the mpsc
// channel, such that sending when the receiver is closing can
// result in the message being stuck inside the queue. It won't
Expand Down Expand Up @@ -584,7 +592,7 @@ where
}

struct PoolClient<B> {
is_proxied: bool,
conn_info: Connected,
tx: PoolTx<B>,
}

Expand Down Expand Up @@ -644,17 +652,17 @@ where
match self.tx {
PoolTx::Http1(tx) => {
Reservation::Unique(PoolClient {
is_proxied: self.is_proxied,
conn_info: self.conn_info,
tx: PoolTx::Http1(tx),
})
},
PoolTx::Http2(tx) => {
let b = PoolClient {
is_proxied: self.is_proxied,
conn_info: self.conn_info.clone(),
tx: PoolTx::Http2(tx.clone()),
};
let a = PoolClient {
is_proxied: self.is_proxied,
conn_info: self.conn_info,
tx: PoolTx::Http2(tx),
};
Reservation::Shared(a, b)
Expand Down
12 changes: 11 additions & 1 deletion tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,17 @@ macro_rules! test {

let rx = rx.expect("thread panicked");

rt.block_on(res.join(rx).map(|r| r.0))
rt.block_on(res.join(rx).map(|r| r.0)).map(move |mut resp| {
// Always check that HttpConnector has set the "extra" info...
let extra = resp
.extensions_mut()
.remove::<::hyper::client::connect::HttpInfo>()
.expect("HttpConnector should set HttpInfo");

assert_eq!(extra.remote_addr(), addr, "HttpInfo should have server addr");

resp
})
});
}

Expand Down

0 comments on commit 18015ca

Please sign in to comment.