Skip to content

Commit

Permalink
Merge pull request #448 from mlalic/http2-initial
Browse files Browse the repository at this point in the history
feat(client): initial support for HTTP/2
  • Loading branch information
seanmonstar committed Jun 2, 2015
2 parents 0cd7e9d + 6504936 commit 486a219
Show file tree
Hide file tree
Showing 11 changed files with 1,507 additions and 188 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ unicase = "0.1"
url = "0.2"
traitobject = "0.0.1"
typeable = "0.1"
solicit = "0.2"

[dev-dependencies]
env_logger = "*"
Expand Down
34 changes: 34 additions & 0 deletions examples/client_http2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#![deny(warnings)]
extern crate hyper;

extern crate env_logger;

use std::env;
use std::io;

use hyper::Client;
use hyper::header::Connection;
use hyper::http2;

fn main() {
env_logger::init().unwrap();

let url = match env::args().nth(1) {
Some(url) => url,
None => {
println!("Usage: client <url>");
return;
}
};

let mut client = Client::with_protocol(http2::new_protocol());

// `Connection: Close` is not a valid header for HTTP/2, but the client handles it gracefully.
let mut res = client.get(&*url)
.header(Connection::close())
.send().unwrap();

println!("Response: {}", res.status);
println!("Headers:\n{}", res.headers);
io::copy(&mut res, &mut io::stdout()).unwrap();
}
54 changes: 16 additions & 38 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@ pub mod pool;
pub mod request;
pub mod response;

use message::Protocol;
use http11::Http11Protocol;

/// A Client to use additional features with Requests.
///
/// Clients can handle things such as: redirect policy, connection pooling.
pub struct Client {
connector: Connector,
protocol: Box<Protocol + Send>,
redirect_policy: RedirectPolicy,
}

Expand All @@ -77,15 +80,20 @@ impl Client {
/// Create a new client with a specific connector.
pub fn with_connector<C, S>(connector: C) -> Client
where C: NetworkConnector<Stream=S> + Send + 'static, S: NetworkStream + Send {
Client::with_protocol(Http11Protocol::with_connector(connector))
}

/// Create a new client with a specific `Protocol`.
pub fn with_protocol<P: Protocol + Send + 'static>(protocol: P) -> Client {
Client {
connector: with_connector(connector),
protocol: Box::new(protocol),
redirect_policy: Default::default()
}
}

/// Set the SSL verifier callback for use with OpenSSL.
pub fn set_ssl_verifier(&mut self, verifier: ContextVerifier) {
self.connector.set_ssl_verifier(verifier);
self.protocol.set_ssl_verifier(verifier);
}

/// Set the RedirectPolicy.
Expand Down Expand Up @@ -131,44 +139,10 @@ impl Client {
}
}

fn with_connector<C: NetworkConnector<Stream=S> + Send + 'static, S: NetworkStream + Send>(c: C) -> Connector {
Connector(Box::new(ConnAdapter(c)))
}

impl Default for Client {
fn default() -> Client { Client::new() }
}

struct ConnAdapter<C: NetworkConnector + Send>(C);

impl<C: NetworkConnector<Stream=S> + Send, S: NetworkStream + Send> NetworkConnector for ConnAdapter<C> {
type Stream = Box<NetworkStream + Send>;
#[inline]
fn connect(&self, host: &str, port: u16, scheme: &str)
-> ::Result<Box<NetworkStream + Send>> {
Ok(try!(self.0.connect(host, port, scheme)).into())
}
#[inline]
fn set_ssl_verifier(&mut self, verifier: ContextVerifier) {
self.0.set_ssl_verifier(verifier);
}
}

struct Connector(Box<NetworkConnector<Stream=Box<NetworkStream + Send>> + Send>);

impl NetworkConnector for Connector {
type Stream = Box<NetworkStream + Send>;
#[inline]
fn connect(&self, host: &str, port: u16, scheme: &str)
-> ::Result<Box<NetworkStream + Send>> {
Ok(try!(self.0.connect(host, port, scheme)).into())
}
#[inline]
fn set_ssl_verifier(&mut self, verifier: ContextVerifier) {
self.0.set_ssl_verifier(verifier);
}
}

/// Options for an individual Request.
///
/// One of these will be built for you if you use one of the convenience
Expand Down Expand Up @@ -229,7 +203,11 @@ impl<'a, U: IntoUrl> RequestBuilder<'a, U> {
};

loop {
let mut req = try!(Request::with_connector(method.clone(), url.clone(), &client.connector));
let message = {
let (host, port) = try!(get_host_and_port(&url));
try!(client.protocol.new_message(&host, port, &*url.scheme))
};
let mut req = try!(Request::with_message(method.clone(), url.clone(), message));
headers.as_ref().map(|headers| req.headers_mut().extend(headers.iter()));

match (can_have_body, body.as_ref()) {
Expand Down
134 changes: 47 additions & 87 deletions src/client/request.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
//! Client Requests
use std::marker::PhantomData;
use std::io::{self, Write, BufWriter};
use std::io::{self, Write};

use url::Url;

use method::{self, Method};
use header::Headers;
use header::{self, Host};
use header::Host;
use net::{NetworkStream, NetworkConnector, HttpConnector, Fresh, Streaming};
use http::{HttpWriter, LINE_ENDING};
use http::HttpWriter::{ThroughWriter, ChunkedWriter, SizedWriter, EmptyWriter};
use version;
use client::{Response, get_host_and_port};

use message::{HttpMessage, RequestHead};
use http11::Http11Message;


/// A client request to a remote server.
/// The W type tracks the state of the request, Fresh vs Streaming.
Expand All @@ -23,7 +24,7 @@ pub struct Request<W> {
/// The HTTP version of this request.
pub version: version::HttpVersion,

body: HttpWriter<BufWriter<Box<NetworkStream + Send>>>,
message: Box<HttpMessage>,
headers: Headers,
method: method::Method,

Expand All @@ -41,22 +42,12 @@ impl<W> Request<W> {
}

impl Request<Fresh> {
/// Create a new client request.
pub fn new(method: method::Method, url: Url) -> ::Result<Request<Fresh>> {
let mut conn = HttpConnector(None);
Request::with_connector(method, url, &mut conn)
}

/// Create a new client request with a specific underlying NetworkStream.
pub fn with_connector<C, S>(method: method::Method, url: Url, connector: &C)
-> ::Result<Request<Fresh>> where
C: NetworkConnector<Stream=S>,
S: Into<Box<NetworkStream + Send>> {
/// Create a new `Request<Fresh>` that will use the given `HttpMessage` for its communication
/// with the server. This implies that the given `HttpMessage` instance has already been
/// properly initialized by the caller (e.g. a TCP connection's already established).
pub fn with_message(method: method::Method, url: Url, message: Box<HttpMessage>)
-> ::Result<Request<Fresh>> {
let (host, port) = try!(get_host_and_port(&url));

let stream = try!(connector.connect(&*host, port, &*url.scheme)).into();
let stream = ThroughWriter(BufWriter::new(stream));

let mut headers = Headers::new();
headers.set(Host {
hostname: host,
Expand All @@ -68,77 +59,43 @@ impl Request<Fresh> {
headers: headers,
url: url,
version: version::HttpVersion::Http11,
body: stream,
message: message,
_marker: PhantomData,
})
}

/// Consume a Fresh Request, writing the headers and method,
/// returning a Streaming Request.
pub fn start(mut self) -> ::Result<Request<Streaming>> {
let mut uri = self.url.serialize_path().unwrap();
if let Some(ref q) = self.url.query {
uri.push('?');
uri.push_str(&q[..]);
}

debug!("request line: {:?} {:?} {:?}", self.method, uri, self.version);
try!(write!(&mut self.body, "{} {} {}{}",
self.method, uri, self.version, LINE_ENDING));


let stream = match self.method {
Method::Get | Method::Head => {
debug!("headers={:?}", self.headers);
try!(write!(&mut self.body, "{}{}", self.headers, LINE_ENDING));
EmptyWriter(self.body.into_inner())
},
_ => {
let mut chunked = true;
let mut len = 0;

match self.headers.get::<header::ContentLength>() {
Some(cl) => {
chunked = false;
len = **cl;
},
None => ()
};

// can't do in match above, thanks borrowck
if chunked {
let encodings = match self.headers.get_mut::<header::TransferEncoding>() {
Some(&mut header::TransferEncoding(ref mut encodings)) => {
//TODO: check if chunked is already in encodings. use HashSet?
encodings.push(header::Encoding::Chunked);
false
},
None => true
};

if encodings {
self.headers.set::<header::TransferEncoding>(
header::TransferEncoding(vec![header::Encoding::Chunked]))
}
}
/// Create a new client request.
pub fn new(method: method::Method, url: Url) -> ::Result<Request<Fresh>> {
let mut conn = HttpConnector(None);
Request::with_connector(method, url, &mut conn)
}

debug!("headers={:?}", self.headers);
try!(write!(&mut self.body, "{}{}", self.headers, LINE_ENDING));
/// Create a new client request with a specific underlying NetworkStream.
pub fn with_connector<C, S>(method: method::Method, url: Url, connector: &C)
-> ::Result<Request<Fresh>> where
C: NetworkConnector<Stream=S>,
S: Into<Box<NetworkStream + Send>> {
let (host, port) = try!(get_host_and_port(&url));
let stream = try!(connector.connect(&*host, port, &*url.scheme)).into();

if chunked {
ChunkedWriter(self.body.into_inner())
} else {
SizedWriter(self.body.into_inner(), len)
}
}
};
Request::with_message(method, url, Box::new(Http11Message::with_stream(stream)))
}

Ok(Request {
method: self.method,
/// Consume a Fresh Request, writing the headers and method,
/// returning a Streaming Request.
pub fn start(mut self) -> ::Result<Request<Streaming>> {
let head = try!(self.message.set_outgoing(RequestHead {
headers: self.headers,
method: self.method,
url: self.url,
}));

Ok(Request {
method: head.method,
headers: head.headers,
url: head.url,
version: self.version,
body: stream,
message: self.message,
_marker: PhantomData,
})
}
Expand All @@ -153,20 +110,19 @@ impl Request<Streaming> {
///
/// Consumes the Request.
pub fn send(self) -> ::Result<Response> {
let raw = try!(self.body.end()).into_inner().unwrap(); // end() already flushes
Response::new(raw)
Response::with_message(self.message)
}
}

impl Write for Request<Streaming> {
#[inline]
fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
self.body.write(msg)
self.message.write(msg)
}

#[inline]
fn flush(&mut self) -> io::Result<()> {
self.body.flush()
self.message.flush()
}
}

Expand All @@ -180,11 +136,15 @@ mod tests {
use header::{ContentLength,TransferEncoding,Encoding};
use url::form_urlencoded;
use super::Request;
use http11::Http11Message;

fn run_request(req: Request<Fresh>) -> Vec<u8> {
let req = req.start().unwrap();
let stream = *req.body.end().unwrap()
.into_inner().unwrap().downcast::<MockStream>().ok().unwrap();
let message = req.message;
let mut message = message.downcast::<Http11Message>().ok().unwrap();
message.flush_outgoing().unwrap();
let stream = *message
.into_inner().downcast::<MockStream>().ok().unwrap();
stream.write
}

Expand Down
Loading

0 comments on commit 486a219

Please sign in to comment.