From 7d1f154cb7b4db4a029b52857c377000a3f23419 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 16 Jun 2015 11:02:36 -0700 Subject: [PATCH] feat(net): add socket timeouts to Server and Client While these methods are marked unstable in libstd, this is behind a feature flag, `timeouts`. The Client and Server both have `set_read_timeout` and `set_write_timeout` methods, that will affect all connections with that entity. BREAKING CHANGE: Any custom implementation of NetworkStream must now implement `set_read_timeout` and `set_write_timeout`, so those will break. Most users who only use the provided streams should work with no changes needed. Closes #315 --- Cargo.toml | 4 +- src/client/mod.rs | 43 ++++++++++++++++++++-- src/client/pool.rs | 15 ++++++++ src/client/request.rs | 17 +++++++++ src/http/h1.rs | 47 ++++++++++++++++++++++-- src/http/h2.rs | 15 ++++++++ src/http/message.rs | 13 +++++-- src/lib.rs | 1 + src/mock.rs | 60 +++++++++++++++++++++--------- src/net.rs | 59 +++++++++++++++++++++++++++--- src/server/mod.rs | 85 +++++++++++++++++++++++++++++++++++-------- 11 files changed, 310 insertions(+), 49 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5a1551701b..a3895a34f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,5 +47,5 @@ env_logger = "*" default = ["ssl"] ssl = ["openssl", "cookie/secure"] serde-serialization = ["serde"] -nightly = [] - +timeouts = [] +nightly = ["timeouts"] diff --git a/src/client/mod.rs b/src/client/mod.rs index dd72dd90d8..955320c370 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -59,13 +59,16 @@ use std::default::Default; use std::io::{self, copy, Read}; use std::iter::Extend; +#[cfg(feature = "timeouts")] +use std::time::Duration; + use url::UrlParser; use url::ParseError as UrlError; use header::{Headers, Header, HeaderFormat}; use header::{ContentLength, Location}; use method::Method; -use net::{NetworkConnector, NetworkStream}; +use net::{NetworkConnector, NetworkStream, Fresh}; use {Url}; use Error; @@ -87,7 +90,9 @@ pub struct Client { protocol: Box, redirect_policy: RedirectPolicy, #[cfg(feature = "timeouts")] - read_timeout: Option + read_timeout: Option, + #[cfg(feature = "timeouts")] + write_timeout: Option, } impl Client { @@ -108,11 +113,23 @@ impl Client { Client::with_protocol(Http11Protocol::with_connector(connector)) } + #[cfg(not(feature = "timeouts"))] + /// Create a new client with a specific `Protocol`. + pub fn with_protocol(protocol: P) -> Client { + Client { + protocol: Box::new(protocol), + redirect_policy: Default::default(), + } + } + + #[cfg(feature = "timeouts")] /// Create a new client with a specific `Protocol`. pub fn with_protocol(protocol: P) -> Client { Client { protocol: Box::new(protocol), - redirect_policy: Default::default() + redirect_policy: Default::default(), + read_timeout: None, + write_timeout: None, } } @@ -127,6 +144,12 @@ impl Client { self.read_timeout = dur; } + /// Set the write timeout value for all requests. + #[cfg(feature = "timeouts")] + pub fn set_write_timeout(&mut self, dur: Option) { + self.write_timeout = dur; + } + /// Build a Get request. pub fn get(&self, url: U) -> RequestBuilder { self.request(Method::Get, url) @@ -236,6 +259,20 @@ impl<'a, U: IntoUrl> RequestBuilder<'a, U> { let mut req = try!(Request::with_message(method.clone(), url.clone(), message)); headers.as_ref().map(|headers| req.headers_mut().extend(headers.iter())); + #[cfg(not(feature = "timeouts"))] + fn set_timeouts(_req: &mut Request, _client: &Client) -> ::Result<()> { + Ok(()) + } + + #[cfg(feature = "timeouts")] + fn set_timeouts(req: &mut Request, client: &Client) -> ::Result<()> { + try!(req.set_write_timeout(client.write_timeout)); + try!(req.set_read_timeout(client.read_timeout)); + Ok(()) + } + + try!(set_timeouts(&mut req, &client)); + match (can_have_body, body.as_ref()) { (true, Some(body)) => match body.size() { Some(size) => req.headers_mut().set(ContentLength(size)), diff --git a/src/client/pool.rs b/src/client/pool.rs index 0962469566..d2fe6fa592 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -5,6 +5,9 @@ use std::io::{self, Read, Write}; use std::net::{SocketAddr, Shutdown}; use std::sync::{Arc, Mutex}; +#[cfg(feature = "timeouts")] +use std::time::Duration; + use net::{NetworkConnector, NetworkStream, DefaultConnector}; /// The `NetworkConnector` that behaves as a connection pool used by hyper's `Client`. @@ -153,6 +156,18 @@ impl NetworkStream for PooledStream { self.inner.as_mut().unwrap().1.peer_addr() } + #[cfg(feature = "timeouts")] + #[inline] + fn set_read_timeout(&self, dur: Option) -> io::Result<()> { + self.inner.as_ref().unwrap().1.set_read_timeout(dur) + } + + #[cfg(feature = "timeouts")] + #[inline] + fn set_write_timeout(&self, dur: Option) -> io::Result<()> { + self.inner.as_ref().unwrap().1.set_write_timeout(dur) + } + #[inline] fn close(&mut self, how: Shutdown) -> io::Result<()> { self.is_closed = true; diff --git a/src/client/request.rs b/src/client/request.rs index 407a0706c8..3f84dda41b 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -2,6 +2,9 @@ use std::marker::PhantomData; use std::io::{self, Write}; +#[cfg(feature = "timeouts")] +use std::time::Duration; + use url::Url; use method::{self, Method}; @@ -39,6 +42,20 @@ impl Request { /// Read the Request method. #[inline] pub fn method(&self) -> method::Method { self.method.clone() } + + /// Set the write timeout. + #[cfg(feature = "timeouts")] + #[inline] + pub fn set_write_timeout(&self, dur: Option) -> io::Result<()> { + self.message.set_write_timeout(dur) + } + + /// Set the read timeout. + #[cfg(feature = "timeouts")] + #[inline] + pub fn set_read_timeout(&self, dur: Option) -> io::Result<()> { + self.message.set_read_timeout(dur) + } } impl Request { diff --git a/src/http/h1.rs b/src/http/h1.rs index 06bb804580..fbc830a036 100644 --- a/src/http/h1.rs +++ b/src/http/h1.rs @@ -4,6 +4,8 @@ use std::cmp::min; use std::fmt; use std::io::{self, Write, BufWriter, BufRead, Read}; use std::net::Shutdown; +#[cfg(feature = "timeouts")] +use std::time::Duration; use httparse; @@ -192,6 +194,19 @@ impl HttpMessage for Http11Message { }) } + #[cfg(feature = "timeouts")] + #[inline] + fn set_read_timeout(&self, dur: Option) -> io::Result<()> { + self.get_ref().set_read_timeout(dur) + } + + #[cfg(feature = "timeouts")] + #[inline] + fn set_write_timeout(&self, dur: Option) -> io::Result<()> { + self.get_ref().set_write_timeout(dur) + } + + #[inline] fn close_connection(&mut self) -> ::Result<()> { try!(self.get_mut().close(Shutdown::Both)); Ok(()) @@ -214,13 +229,27 @@ impl Http11Message { /// Gets a mutable reference to the underlying `NetworkStream`, regardless of the state of the /// `Http11Message`. - pub fn get_mut(&mut self) -> &mut Box { + pub fn get_ref(&self) -> &(NetworkStream + Send) { if self.stream.is_some() { - self.stream.as_mut().unwrap() + &**self.stream.as_ref().unwrap() } else if self.writer.is_some() { - self.writer.as_mut().unwrap().get_mut().get_mut() + &**self.writer.as_ref().unwrap().get_ref().get_ref() } else if self.reader.is_some() { - self.reader.as_mut().unwrap().get_mut().get_mut() + &**self.reader.as_ref().unwrap().get_ref().get_ref() + } else { + panic!("Http11Message lost its underlying stream somehow"); + } + } + + /// Gets a mutable reference to the underlying `NetworkStream`, regardless of the state of the + /// `Http11Message`. + pub fn get_mut(&mut self) -> &mut (NetworkStream + Send) { + if self.stream.is_some() { + &mut **self.stream.as_mut().unwrap() + } else if self.writer.is_some() { + &mut **self.writer.as_mut().unwrap().get_mut().get_mut() + } else if self.reader.is_some() { + &mut **self.reader.as_mut().unwrap().get_mut().get_mut() } else { panic!("Http11Message lost its underlying stream somehow"); } @@ -344,6 +373,16 @@ impl HttpReader { } } + /// Gets a borrowed reference to the underlying Reader. + pub fn get_ref(&self) -> &R { + match *self { + SizedReader(ref r, _) => r, + ChunkedReader(ref r, _) => r, + EofReader(ref r) => r, + EmptyReader(ref r) => r, + } + } + /// Gets a mutable reference to the underlying Reader. pub fn get_mut(&mut self) -> &mut R { match *self { diff --git a/src/http/h2.rs b/src/http/h2.rs index 7d6d9f683d..84af8ff573 100644 --- a/src/http/h2.rs +++ b/src/http/h2.rs @@ -4,6 +4,8 @@ use std::io::{self, Write, Read, Cursor}; use std::net::Shutdown; use std::ascii::AsciiExt; use std::mem; +#[cfg(feature = "timeouts")] +use std::time::Duration; use http::{ Protocol, @@ -398,6 +400,19 @@ impl HttpMessage for Http2Message where S: CloneableStream { Ok(head) } + #[cfg(feature = "timeouts")] + #[inline] + fn set_read_timeout(&self, _dur: Option) -> io::Result<()> { + Ok(()) + } + + #[cfg(feature = "timeouts")] + #[inline] + fn set_write_timeout(&self, _dur: Option) -> io::Result<()> { + Ok(()) + } + + #[inline] fn close_connection(&mut self) -> ::Result<()> { Ok(()) } diff --git a/src/http/message.rs b/src/http/message.rs index 2e09fd24b7..f0f2d9b21e 100644 --- a/src/http/message.rs +++ b/src/http/message.rs @@ -1,12 +1,16 @@ //! Defines the `HttpMessage` trait that serves to encapsulate the operations of a single //! request-response cycle on any HTTP connection. -use std::fmt::Debug; use std::any::{Any, TypeId}; +use std::fmt::Debug; use std::io::{Read, Write}; - use std::mem; +#[cfg(feature = "timeouts")] +use std::io; +#[cfg(feature = "timeouts")] +use std::time::Duration; + use typeable::Typeable; use header::Headers; @@ -62,7 +66,10 @@ pub trait HttpMessage: Write + Read + Send + Any + Typeable + Debug { fn get_incoming(&mut self) -> ::Result; /// Set the read timeout duration for this message. #[cfg(feature = "timeouts")] - fn set_read_timeout(&self, dur: Option) -> ::Result<()>; + fn set_read_timeout(&self, dur: Option) -> io::Result<()>; + /// Set the write timeout duration for this message. + #[cfg(feature = "timeouts")] + fn set_write_timeout(&self, dur: Option) -> io::Result<()>; /// Closes the underlying HTTP connection. fn close_connection(&mut self) -> ::Result<()>; } diff --git a/src/lib.rs b/src/lib.rs index 60fde80c11..24c96d9d58 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ #![cfg_attr(test, deny(missing_docs))] #![cfg_attr(test, deny(warnings))] #![cfg_attr(all(test, feature = "nightly"), feature(test))] +#![cfg_attr(feature = "timeouts", feature(duration, socket_timeout))] //! # Hyper //! diff --git a/src/mock.rs b/src/mock.rs index d9d3d64824..25ae9e8c7c 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -4,6 +4,10 @@ use std::io::{self, Read, Write, Cursor}; use std::cell::RefCell; use std::net::SocketAddr; use std::sync::{Arc, Mutex}; +#[cfg(feature = "timeouts")] +use std::time::Duration; +#[cfg(feature = "timeouts")] +use std::cell::Cell; use solicit::http::HttpScheme; use solicit::http::transport::TransportStream; @@ -13,18 +17,14 @@ use solicit::http::connection::{HttpConnection, EndStream, DataChunk}; use header::Headers; use net::{NetworkStream, NetworkConnector}; +#[derive(Clone)] pub struct MockStream { pub read: Cursor>, pub write: Vec, -} - -impl Clone for MockStream { - fn clone(&self) -> MockStream { - MockStream { - read: Cursor::new(self.read.get_ref().clone()), - write: self.write.clone() - } - } + #[cfg(feature = "timeouts")] + pub read_timeout: Cell>, + #[cfg(feature = "timeouts")] + pub write_timeout: Cell> } impl fmt::Debug for MockStream { @@ -41,16 +41,24 @@ impl PartialEq for MockStream { impl MockStream { pub fn new() -> MockStream { + MockStream::with_input(b"") + } + + #[cfg(not(feature = "timeouts"))] + pub fn with_input(input: &[u8]) -> MockStream { MockStream { - read: Cursor::new(vec![]), - write: vec![], + read: Cursor::new(input.to_vec()), + write: vec![] } } + #[cfg(feature = "timeouts")] pub fn with_input(input: &[u8]) -> MockStream { MockStream { read: Cursor::new(input.to_vec()), - write: vec![] + write: vec![], + read_timeout: Cell::new(None), + write_timeout: Cell::new(None), } } } @@ -75,6 +83,18 @@ impl NetworkStream for MockStream { fn peer_addr(&mut self) -> io::Result { Ok("127.0.0.1:1337".parse().unwrap()) } + + #[cfg(feature = "timeouts")] + fn set_read_timeout(&self, dur: Option) -> io::Result<()> { + self.read_timeout.set(dur); + Ok(()) + } + + #[cfg(feature = "timeouts")] + fn set_write_timeout(&self, dur: Option) -> io::Result<()> { + self.write_timeout.set(dur); + Ok(()) + } } /// A wrapper around a `MockStream` that allows one to clone it and keep an independent copy to the @@ -114,6 +134,16 @@ impl NetworkStream for CloneableMockStream { fn peer_addr(&mut self) -> io::Result { self.inner.lock().unwrap().peer_addr() } + + #[cfg(feature = "timeouts")] + fn set_read_timeout(&self, dur: Option) -> io::Result<()> { + self.inner.lock().unwrap().set_read_timeout(dur) + } + + #[cfg(feature = "timeouts")] + fn set_write_timeout(&self, dur: Option) -> io::Result<()> { + self.inner.lock().unwrap().set_write_timeout(dur) + } } impl CloneableMockStream { @@ -147,7 +177,6 @@ macro_rules! mock_connector ( fn connect(&self, host: &str, port: u16, scheme: &str) -> $crate::Result<::mock::MockStream> { use std::collections::HashMap; - use std::io::Cursor; debug!("MockStream::connect({:?}, {:?}, {:?})", host, port, scheme); let mut map = HashMap::new(); $(map.insert($url, $res);)* @@ -156,10 +185,7 @@ macro_rules! mock_connector ( let key = format!("{}://{}", scheme, host); // ignore port for now match map.get(&*key) { - Some(&res) => Ok($crate::mock::MockStream { - write: vec![], - read: Cursor::new(res.to_owned().into_bytes()), - }), + Some(&res) => Ok($crate::mock::MockStream::with_input(res.as_bytes())), None => panic!("{:?} doesn't know url {}", stringify!($name), key) } } diff --git a/src/net.rs b/src/net.rs index 5e35d86cab..b5ea35827d 100644 --- a/src/net.rs +++ b/src/net.rs @@ -8,6 +8,9 @@ use std::mem; #[cfg(feature = "openssl")] pub use self::openssl::Openssl; +#[cfg(feature = "timeouts")] +use std::time::Duration; + use typeable::Typeable; use traitobject; @@ -21,8 +24,6 @@ pub enum Streaming {} pub trait NetworkListener: Clone { /// The stream produced for each connection. type Stream: NetworkStream + Send + Clone; - /// Listens on a socket. - //fn listen(&mut self, addr: To) -> io::Result; /// Returns an iterator of streams. fn accept(&mut self) -> ::Result; @@ -30,9 +31,6 @@ pub trait NetworkListener: Clone { /// Get the address this Listener ended up listening on. fn local_addr(&mut self) -> io::Result; - /// Closes the Acceptor, so no more incoming connections will be handled. -// fn close(&mut self) -> io::Result<()>; - /// Returns an iterator over incoming connections. fn incoming(&mut self) -> NetworkConnections { NetworkConnections(self) @@ -53,6 +51,12 @@ impl<'a, N: NetworkListener + 'a> Iterator for NetworkConnections<'a, N> { pub trait NetworkStream: Read + Write + Any + Send + Typeable { /// Get the remote address of the underlying connection. fn peer_addr(&mut self) -> io::Result; + /// Set the maximum time to wait for a read to complete. + #[cfg(feature = "timeouts")] + fn set_read_timeout(&self, dur: Option) -> io::Result<()>; + /// Set the maximum time to wait for a write to complete. + #[cfg(feature = "timeouts")] + fn set_write_timeout(&self, dur: Option) -> io::Result<()>; /// This will be called when Stream should no longer be kept alive. #[inline] fn close(&mut self, _how: Shutdown) -> io::Result<()> { @@ -222,6 +226,18 @@ impl NetworkStream for HttpStream { self.0.peer_addr() } + #[cfg(feature = "timeouts")] + #[inline] + fn set_read_timeout(&self, dur: Option) -> io::Result<()> { + self.0.set_read_timeout(dur) + } + + #[cfg(feature = "timeouts")] + #[inline] + fn set_write_timeout(&self, dur: Option) -> io::Result<()> { + self.0.set_write_timeout(dur) + } + #[inline] fn close(&mut self, how: Shutdown) -> io::Result<()> { match self.0.shutdown(how) { @@ -312,6 +328,24 @@ impl NetworkStream for HttpsStream { } } + #[cfg(feature = "timeouts")] + #[inline] + fn set_read_timeout(&self, dur: Option) -> io::Result<()> { + match *self { + HttpsStream::Http(ref inner) => inner.0.set_read_timeout(dur), + HttpsStream::Https(ref inner) => inner.set_read_timeout(dur) + } + } + + #[cfg(feature = "timeouts")] + #[inline] + fn set_write_timeout(&self, dur: Option) -> io::Result<()> { + match *self { + HttpsStream::Http(ref inner) => inner.0.set_read_timeout(dur), + HttpsStream::Https(ref inner) => inner.set_read_timeout(dur) + } + } + #[inline] fn close(&mut self, how: Shutdown) -> io::Result<()> { match *self { @@ -397,6 +431,9 @@ mod openssl { use std::net::{SocketAddr, Shutdown}; use std::path::Path; use std::sync::Arc; + #[cfg(feature = "timeouts")] + use std::time::Duration; + use openssl::ssl::{Ssl, SslContext, SslStream, SslMethod, SSL_VERIFY_NONE}; use openssl::ssl::error::StreamError as SslIoError; use openssl::ssl::error::SslError; @@ -475,6 +512,18 @@ mod openssl { self.get_mut().peer_addr() } + #[cfg(feature = "timeouts")] + #[inline] + fn set_read_timeout(&self, dur: Option) -> io::Result<()> { + self.get_ref().set_read_timeout(dur) + } + + #[cfg(feature = "timeouts")] + #[inline] + fn set_write_timeout(&self, dur: Option) -> io::Result<()> { + self.get_ref().set_write_timeout(dur) + } + fn close(&mut self, how: Shutdown) -> io::Result<()> { self.get_mut().close(how) } diff --git a/src/server/mod.rs b/src/server/mod.rs index 60352c6c55..2061dcc235 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -108,10 +108,13 @@ //! `Request` object, that no longer has `headers_mut()`, but does //! implement `Write`. use std::fmt; -use std::io::{ErrorKind, BufWriter, Write}; +use std::io::{self, ErrorKind, BufWriter, Write}; use std::net::{SocketAddr, ToSocketAddrs}; use std::thread::{self, JoinHandle}; +#[cfg(feature = "timeouts")] +use std::time::Duration; + use num_cpus; pub use self::request::Request; @@ -143,8 +146,20 @@ mod listener; #[derive(Debug)] pub struct Server { listener: L, + _timeouts: Timeouts, +} + +#[cfg(feature = "timeouts")] +#[derive(Clone, Copy, Default, Debug)] +struct Timeouts { + read: Option, + write: Option, } +#[cfg(not(feature = "timeouts"))] +#[derive(Clone, Copy, Default, Debug)] +struct Timeouts; + macro_rules! try_option( ($e:expr) => {{ match $e { @@ -159,9 +174,22 @@ impl Server { #[inline] pub fn new(listener: L) -> Server { Server { - listener: listener + listener: listener, + _timeouts: Timeouts::default(), } } + + #[cfg(feature = "timeouts")] + pub fn set_read_timeout(&mut self, dur: Option) { + self._timeouts.read = dur; + } + + #[cfg(feature = "timeouts")] + pub fn set_write_timeout(&mut self, dur: Option) { + self._timeouts.write = dur; + } + + } impl Server { @@ -183,24 +211,25 @@ impl Server> { impl Server { /// Binds to a socket and starts handling connections. pub fn handle(self, handler: H) -> ::Result { - with_listener(handler, self.listener, num_cpus::get() * 5 / 4) + self.handle_threads(handler, num_cpus::get() * 5 / 4) } /// Binds to a socket and starts handling connections with the provided /// number of threads. pub fn handle_threads(self, handler: H, threads: usize) -> ::Result { - with_listener(handler, self.listener, threads) + handle(self, handler, threads) } } -fn with_listener(handler: H, mut listener: L, threads: usize) -> ::Result +fn handle(mut server: Server, handler: H, threads: usize) -> ::Result where H: Handler + 'static, L: NetworkListener + Send + 'static { - let socket = try!(listener.local_addr()); + let socket = try!(server.listener.local_addr()); debug!("threads = {:?}", threads); - let pool = ListenerPool::new(listener); - let work = move |mut stream| Worker(&handler).handle_connection(&mut stream); + let pool = ListenerPool::new(server.listener); + let worker = Worker::new(handler, server._timeouts); + let work = move |mut stream| worker.handle_connection(&mut stream); let guard = thread::spawn(move || pool.accept(work, threads)); @@ -210,12 +239,28 @@ L: NetworkListener + Send + 'static { }) } -struct Worker<'a, H: Handler + 'static>(&'a H); +struct Worker { + handler: H, + _timeouts: Timeouts, +} + +impl Worker { -impl<'a, H: Handler + 'static> Worker<'a, H> { + fn new(handler: H, timeouts: Timeouts) -> Worker { + Worker { + handler: handler, + _timeouts: timeouts, + } + } fn handle_connection(&self, mut stream: &mut S) where S: NetworkStream + Clone { debug!("Incoming stream"); + + if let Err(e) = self.set_timeouts(stream) { + error!("set_timeouts error: {:?}", e); + return; + } + let addr = match stream.peer_addr() { Ok(addr) => addr, Err(e) => { @@ -233,6 +278,17 @@ impl<'a, H: Handler + 'static> Worker<'a, H> { debug!("keep_alive loop ending for {}", addr); } + #[cfg(not(feature = "timeouts"))] + fn set_timeouts(&self, _: &mut S) -> io::Result<()> where S: NetworkStream { + Ok(()) + } + + #[cfg(feature = "timeouts")] + fn set_timeouts(&self, s: &mut S) -> io::Result<()> where S: NetworkStream { + try!(s.set_read_timeout(self._timeouts.read)); + s.set_write_timeout(self._timeouts.write) + } + fn keep_alive_loop(&self, mut rdr: BufReader<&mut NetworkStream>, mut wrt: W, addr: SocketAddr) { let mut keep_alive = true; @@ -268,7 +324,7 @@ impl<'a, H: Handler + 'static> Worker<'a, H> { { let mut res = Response::new(&mut wrt, &mut res_headers); res.version = version; - self.0.handle(req, res); + self.handler.handle(req, res); } // if the request was keep-alive, we need to check that the server agrees @@ -284,7 +340,7 @@ impl<'a, H: Handler + 'static> Worker<'a, H> { fn handle_expect(&self, req: &Request, wrt: &mut W) -> bool { if req.version == Http11 && req.headers.get() == Some(&Expect::Continue) { - let status = self.0.check_continue((&req.method, &req.uri, &req.headers)); + let status = self.handler.check_continue((&req.method, &req.uri, &req.headers)); match write!(wrt, "{} {}\r\n\r\n", Http11, status) { Ok(..) => (), Err(e) => { @@ -327,7 +383,6 @@ impl Listening { pub fn close(&mut self) -> ::Result<()> { let _ = self._guard.take(); debug!("closing server"); - //try!(self.acceptor.close()); Ok(()) } } @@ -379,7 +434,7 @@ mod tests { res.start().unwrap().end().unwrap(); } - Worker(&handle).handle_connection(&mut mock); + Worker::new(handle, Default::default()).handle_connection(&mut mock); let cont = b"HTTP/1.1 100 Continue\r\n\r\n"; assert_eq!(&mock.write[..cont.len()], cont); let res = b"HTTP/1.1 200 OK\r\n"; @@ -408,7 +463,7 @@ mod tests { 1234567890\ "); - Worker(&Reject).handle_connection(&mut mock); + Worker::new(Reject, Default::default()).handle_connection(&mut mock); assert_eq!(mock.write, &b"HTTP/1.1 417 Expectation Failed\r\n\r\n"[..]); } }