diff --git a/Cargo.toml b/Cargo.toml index aacf04653a..71d7084184 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,17 +22,17 @@ include = [ [dependencies] base64 = "0.4" bytes = "0.4" -futures = "0.1.7" +futures = "0.1.11" futures-cpupool = "0.1" httparse = "1.0" language-tags = "0.2" log = "0.3" mime = "0.2" -relay = "0.1" time = "0.1" -tokio-core = "0.1" +tokio-core = "0.1.6" tokio-proto = "0.1" tokio-service = "0.1" +tokio-io = "0.1" unicase = "1.0" url = "1.0" diff --git a/src/client/connect.rs b/src/client/connect.rs index 1312bd95ea..f176a4d34c 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -3,7 +3,7 @@ use std::io; //use std::net::SocketAddr; use futures::{Future, Poll, Async}; -use tokio::io::Io; +use tokio_io::{AsyncRead, AsyncWrite}; use tokio::reactor::Handle; use tokio::net::{TcpStream, TcpStreamNew}; use tokio_service::Service; @@ -18,7 +18,7 @@ use super::dns; /// `Request=Url` and `Response: Io` instead. pub trait Connect: Service + 'static { /// The connected Io Stream. - type Output: Io + 'static; + type Output: AsyncRead + AsyncWrite + 'static; /// A Future that will resolve to the connected Stream. type Future: Future + 'static; /// Connect to a remote address. @@ -27,7 +27,7 @@ pub trait Connect: Service + 'static { impl Connect for T where T: Service + 'static, - T::Response: Io, + T::Response: AsyncRead + AsyncWrite, T::Future: Future, { type Output = T::Response; diff --git a/src/client/mod.rs b/src/client/mod.rs index 923afea216..ffc7b6156d 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -11,8 +11,8 @@ use std::rc::Rc; use std::time::Duration; use futures::{Poll, Async, Future, Stream}; -use relay; -use tokio::io::Io; +use futures::unsync::oneshot; +use tokio_io::{AsyncRead, AsyncWrite}; use tokio::reactor::Handle; use tokio_proto::BindClient; use tokio_proto::streaming::Message; @@ -149,12 +149,12 @@ where C: Connect, let pool_key = Rc::new(url[..::url::Position::BeforePath].to_owned()); self.connector.connect(url) .map(move |io| { - let (tx, rx) = relay::channel(); + let (tx, rx) = oneshot::channel(); let client = HttpClient { client_rx: RefCell::new(Some(rx)), }.bind_client(&handle, io); let pooled = pool.pooled(pool_key, client); - tx.complete(pooled.clone()); + drop(tx.send(pooled.clone())); pooled }) }; @@ -207,11 +207,11 @@ impl fmt::Debug for Client { type TokioClient = ClientProxy, Message, ::Error>; struct HttpClient { - client_rx: RefCell>>>>, + client_rx: RefCell>>>>, } impl ClientProto for HttpClient -where T: Io + 'static, +where T: AsyncRead + AsyncWrite + 'static, B: Stream + 'static, B::Item: AsRef<[u8]>, { @@ -232,12 +232,12 @@ where T: Io + 'static, } struct BindingClient { - rx: relay::Receiver>>, + rx: oneshot::Receiver>>, io: Option, } impl Future for BindingClient -where T: Io + 'static, +where T: AsyncRead + AsyncWrite + 'static, B: Stream, B::Item: AsRef<[u8]>, { diff --git a/src/client/pool.rs b/src/client/pool.rs index cb7dc97e50..32df227f92 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -7,7 +7,7 @@ use std::rc::Rc; use std::time::{Duration, Instant}; use futures::{Future, Async, Poll}; -use relay; +use futures::unsync::oneshot; use http::{KeepAlive, KA}; @@ -18,7 +18,7 @@ pub struct Pool { struct PoolInner { enabled: bool, idle: HashMap, Vec>>, - parked: HashMap, VecDeque>>>, + parked: HashMap, VecDeque>>>, timeout: Option, } @@ -44,31 +44,33 @@ impl Pool { fn put(&mut self, key: Rc, entry: Entry) { trace!("Pool::put {:?}", key); + let mut inner = self.inner.borrow_mut(); + //let inner = &mut *inner; let mut remove_parked = false; - let tx = self.inner.borrow_mut().parked.get_mut(&key).and_then(|parked| { - let mut ret = None; + let mut entry = Some(entry); + if let Some(parked) = inner.parked.get_mut(&key) { while let Some(tx) = parked.pop_front() { - if !tx.is_canceled() { - ret = Some(tx); - break; + match tx.send(entry.take().unwrap()) { + Ok(()) => break, + Err(e) => { + trace!("Pool::put removing canceled parked {:?}", key); + entry = Some(e); + } } - trace!("Pool::put removing canceled parked {:?}", key); } remove_parked = parked.is_empty(); - ret - }); + } if remove_parked { - self.inner.borrow_mut().parked.remove(&key); + inner.parked.remove(&key); } - if let Some(tx) = tx { - trace!("Pool::put found parked {:?}", key); - tx.complete(entry); - } else { - self.inner.borrow_mut() - .idle.entry(key) - .or_insert(Vec::new()) - .push(entry); + match entry { + Some(entry) => { + inner.idle.entry(key) + .or_insert(Vec::new()) + .push(entry); + } + None => trace!("Pool::put found parked {:?}", key), } } @@ -100,7 +102,7 @@ impl Pool { } } - fn park(&mut self, key: Rc, tx: relay::Sender>) { + fn park(&mut self, key: Rc, tx: oneshot::Sender>) { trace!("Pool::park {:?}", key); self.inner.borrow_mut() .parked.entry(key) @@ -191,7 +193,7 @@ struct Entry { pub struct Checkout { key: Rc, pool: Pool, - parked: Option>>, + parked: Option>>, } impl Future for Checkout { @@ -247,7 +249,7 @@ impl Future for Checkout { Some(entry) => Ok(Async::Ready(self.pool.reuse(self.key.clone(), entry))), None => { if self.parked.is_none() { - let (tx, mut rx) = relay::channel(); + let (tx, mut rx) = oneshot::channel(); let _ = rx.poll(); // park this task self.pool.park(self.key.clone(), tx); self.parked = Some(rx); @@ -279,6 +281,7 @@ mod tests { use std::rc::Rc; use std::time::Duration; use futures::{Async, Future}; + use futures::future; use http::KeepAlive; use super::Pool; @@ -297,7 +300,7 @@ mod tests { #[test] fn test_pool_checkout_returns_none_if_expired() { - ::futures::lazy(|| { + future::lazy(|| { let pool = Pool::new(true, Some(Duration::from_secs(1))); let key = Rc::new("foo".to_string()); let mut pooled = pool.pooled(key.clone(), 41); @@ -339,7 +342,7 @@ mod tests { let pooled1 = pool.pooled(key.clone(), 41); let mut pooled = pooled1.clone(); - let checkout = pool.checkout(&key).join(::futures::lazy(move || { + let checkout = pool.checkout(&key).join(future::lazy(move || { // the checkout future will park first, // and then this lazy future will be polled, which will insert // the pooled back into the pool diff --git a/src/http/conn.rs b/src/http/conn.rs index 41ac3e624e..ff9aeca014 100644 --- a/src/http/conn.rs +++ b/src/http/conn.rs @@ -5,7 +5,7 @@ use std::time::Instant; use futures::{Poll, Async, AsyncSink, Stream, Sink, StartSend}; use futures::task::Task; -use tokio::io::Io; +use tokio_io::{AsyncRead, AsyncWrite}; use tokio_proto::streaming::pipeline::{Frame, Transport}; use header::{ContentLength, TransferEncoding}; @@ -16,7 +16,7 @@ use version::HttpVersion; /// This handles a connection, which will have been established over an -/// `Io` (like a socket), and will likely include multiple +/// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple /// `Transaction`s over HTTP. /// /// The connection will determine when a message begins and ends as well as @@ -29,7 +29,7 @@ pub struct Conn { } impl Conn -where I: Io, +where I: AsyncRead + AsyncWrite, B: AsRef<[u8]>, T: Http1Transaction, K: KeepAlive @@ -155,7 +155,7 @@ where I: Io, } fn maybe_park_read(&mut self) { - if self.io.poll_read().is_ready() { + if !self.io.is_read_blocked() { // the Io object is ready to read, which means it will never alert // us that it is ready until we drain it. However, we're currently // finished reading, so we need to park the task to be able to @@ -350,7 +350,7 @@ where I: Io, } impl Stream for Conn -where I: Io, +where I: AsyncRead + AsyncWrite, B: AsRef<[u8]>, T: Http1Transaction, K: KeepAlive, @@ -385,7 +385,7 @@ where I: Io, } impl Sink for Conn -where I: Io, +where I: AsyncRead + AsyncWrite, B: AsRef<[u8]>, T: Http1Transaction, K: KeepAlive, @@ -450,10 +450,15 @@ where I: Io, trace!("Conn::flush = {:?}", ret); ret } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + try_ready!(self.poll_complete()); + self.io.io_mut().shutdown() + } } impl Transport for Conn -where I: Io + 'static, +where I: AsyncRead + AsyncWrite + 'static, B: AsRef<[u8]> + 'static, T: Http1Transaction + 'static, K: KeepAlive + 'static, @@ -665,6 +670,7 @@ impl<'a, T: fmt::Debug + 'a, B: AsRef<[u8]> + 'a> fmt::Debug for DebugFrame<'a, #[cfg(test)] mod tests { use futures::{Async, Future, Stream, Sink}; + use futures::future; use tokio_proto::streaming::pipeline::Frame; use http::{self, MessageHead, ServerTransaction}; @@ -705,7 +711,7 @@ mod tests { #[test] fn test_conn_parse_partial() { - let _: Result<(), ()> = ::futures::lazy(|| { + let _: Result<(), ()> = future::lazy(|| { let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec(); let io = AsyncIo::new_buf(good_message, 10); let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); @@ -772,7 +778,7 @@ mod tests { #[test] fn test_conn_body_write_length() { - let _: Result<(), ()> = ::futures::lazy(|| { + let _: Result<(), ()> = future::lazy(|| { let io = AsyncIo::new_buf(vec![], 0); let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); let max = ::http::io::MAX_BUFFER_SIZE + 4096; @@ -800,7 +806,7 @@ mod tests { #[test] fn test_conn_body_write_chunked() { - let _: Result<(), ()> = ::futures::lazy(|| { + let _: Result<(), ()> = future::lazy(|| { let io = AsyncIo::new_buf(vec![], 4096); let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); conn.state.writing = Writing::Body(Encoder::chunked(), None); @@ -813,7 +819,7 @@ mod tests { #[test] fn test_conn_body_flush() { - let _: Result<(), ()> = ::futures::lazy(|| { + let _: Result<(), ()> = future::lazy(|| { let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5); let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); conn.state.writing = Writing::Body(Encoder::length(1024 * 1024), None); @@ -829,7 +835,7 @@ mod tests { #[test] fn test_conn_parking() { use std::sync::Arc; - use futures::task::Unpark; + use futures::executor::Unpark; struct Car { permit: bool, @@ -847,7 +853,7 @@ mod tests { } // test that once writing is done, unparks - let f = ::futures::lazy(|| { + let f = future::lazy(|| { let io = AsyncIo::new_buf(vec![], 4096); let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); conn.state.reading = Reading::KeepAlive; @@ -861,7 +867,7 @@ mod tests { // test that flushing when not waiting on read doesn't unpark - let f = ::futures::lazy(|| { + let f = future::lazy(|| { let io = AsyncIo::new_buf(vec![], 4096); let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); conn.state.writing = Writing::KeepAlive; @@ -872,7 +878,7 @@ mod tests { // test that flushing and writing isn't done doesn't unpark - let f = ::futures::lazy(|| { + let f = future::lazy(|| { let io = AsyncIo::new_buf(vec![], 4096); let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default()); conn.state.reading = Reading::KeepAlive; diff --git a/src/http/h1/decode.rs b/src/http/h1/decode.rs index 89a41b0e93..e22b7cb168 100644 --- a/src/http/h1/decode.rs +++ b/src/http/h1/decode.rs @@ -295,7 +295,7 @@ mod tests { let (a, b) = self.split_at(n); let mut buf = BytesMut::from(a); *self = b; - Ok(buf.drain_to(n).freeze()) + Ok(buf.split_to(n).freeze()) } else { Ok(Bytes::new()) } diff --git a/src/http/h1/parse.rs b/src/http/h1/parse.rs index 947813ab82..780ea3c6ca 100644 --- a/src/http/h1/parse.rs +++ b/src/http/h1/parse.rs @@ -55,7 +55,7 @@ impl Http1Transaction for ServerTransaction { }; let mut headers = Headers::with_capacity(headers_len); - let slice = buf.drain_to(len).freeze(); + let slice = buf.split_to(len).freeze(); let path = slice.slice(path.0, path.1); // path was found to be utf8 by httparse let path = unsafe { ByteStr::from_utf8_unchecked(path) }; @@ -171,7 +171,7 @@ impl Http1Transaction for ClientTransaction { }; let mut headers = Headers::with_capacity(headers_len); - let slice = buf.drain_to(len).freeze(); + let slice = buf.split_to(len).freeze(); headers.extend(HeadersAsBytesIter { headers: headers_indices[..headers_len].iter(), slice: slice, diff --git a/src/http/io.rs b/src/http/io.rs index 6548d77426..2dbaea088b 100644 --- a/src/http/io.rs +++ b/src/http/io.rs @@ -3,8 +3,7 @@ use std::fmt; use std::io::{self, Write}; use std::ptr; -use futures::Async; -use tokio::io::Io; +use tokio_io::{AsyncRead, AsyncWrite}; use http::{Http1Transaction, h1, MessageHead, ParseResult, DebugTruncate}; use bytes::{BytesMut, Bytes}; @@ -14,6 +13,7 @@ pub const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; pub struct Buffered { io: T, + read_blocked: bool, read_buf: BytesMut, write_buf: WriteBuf, } @@ -27,12 +27,13 @@ impl fmt::Debug for Buffered { } } -impl Buffered { +impl Buffered { pub fn new(io: T) -> Buffered { Buffered { io: io, read_buf: BytesMut::with_capacity(0), write_buf: WriteBuf::new(), + read_blocked: false, } } @@ -49,14 +50,10 @@ impl Buffered { _ => break, } } - self.read_buf.drain_to(i); + self.read_buf.split_to(i); } } - pub fn poll_read(&mut self) -> Async<()> { - self.io.poll_read() - } - pub fn parse(&mut self) -> ::Result>> { self.reserve_read_buf(); match self.read_from_io() { @@ -88,8 +85,17 @@ impl Buffered { fn read_from_io(&mut self) -> io::Result { use bytes::BufMut; + self.read_blocked = false; unsafe { - let n = try!(self.io.read(self.read_buf.bytes_mut())); + let n = match self.io.read(self.read_buf.bytes_mut()) { + Ok(n) => n, + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock { + self.read_blocked = true; + } + return Err(e) + } + }; self.read_buf.advance_mut(n); Ok(n) } @@ -112,10 +118,13 @@ impl Buffered { self.write_buf.buffer(buf.as_ref()) } - #[cfg(test)] pub fn io_mut(&mut self) -> &mut T { &mut self.io } + + pub fn is_read_blocked(&self) -> bool { + self.read_blocked + } } impl Write for Buffered { @@ -146,17 +155,17 @@ pub trait MemRead { fn read_mem(&mut self, len: usize) -> io::Result; } -impl MemRead for Buffered { +impl MemRead for Buffered { fn read_mem(&mut self, len: usize) -> io::Result { trace!("Buffered.read_mem read_buf={}, wanted={}", self.read_buf.len(), len); if !self.read_buf.is_empty() { let n = ::std::cmp::min(len, self.read_buf.len()); trace!("Buffered.read_mem read_buf is not empty, slicing {}", n); - Ok(self.read_buf.drain_to(n).freeze()) + Ok(self.read_buf.split_to(n).freeze()) } else { self.reserve_read_buf(); let n = try!(self.read_from_io()); - Ok(self.read_buf.drain_to(::std::cmp::min(len, n)).freeze()) + Ok(self.read_buf.split_to(::std::cmp::min(len, n)).freeze()) } } } diff --git a/src/lib.rs b/src/lib.rs index b29aed7c86..c6d65121d7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,10 +21,10 @@ extern crate httparse; #[cfg_attr(test, macro_use)] extern crate language_tags; #[macro_use] extern crate log; #[macro_use] pub extern crate mime; -extern crate relay; extern crate base64; extern crate time; #[macro_use] extern crate tokio_core as tokio; +extern crate tokio_io; extern crate tokio_proto; extern crate tokio_service; extern crate unicase; diff --git a/src/mock.rs b/src/mock.rs index 934732ad8d..6416640ec1 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -1,8 +1,8 @@ use std::cmp; use std::io::{self, Read, Write}; -use futures::Async; -use tokio::io::Io; +use futures::Poll; +use tokio_io::{AsyncRead, AsyncWrite}; #[derive(Debug)] pub struct Buf { @@ -123,21 +123,12 @@ impl Write for AsyncIo { } } -impl Io for AsyncIo { - fn poll_read(&mut self) -> Async<()> { - if self.bytes_until_block == 0 { - Async::NotReady - } else { - Async::Ready(()) - } - } +impl AsyncRead for AsyncIo { +} - fn poll_write(&mut self) -> Async<()> { - if self.bytes_until_block == 0 { - Async::NotReady - } else { - Async::Ready(()) - } +impl AsyncWrite for AsyncIo { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) } } diff --git a/src/server/mod.rs b/src/server/mod.rs index eb87a622db..4656ab8f33 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -13,9 +13,10 @@ use std::time::Duration; use futures::future; use futures::task::{self, Task}; -use futures::{Future, Map, Stream, Poll, Async, Sink, StartSend, AsyncSink}; +use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink}; +use futures::future::Map; -use tokio::io::Io; +use tokio_io::{AsyncRead, AsyncWrite}; use tokio::reactor::{Core, Handle, Timeout}; use tokio::net::TcpListener; use tokio_proto::BindServer; @@ -124,7 +125,7 @@ impl + 'static> Http { service: S) where S: Service, Error = ::Error> + 'static, Bd: Stream + 'static, - I: Io + 'static, + I: AsyncRead + AsyncWrite + 'static, { self.bind_server(handle, io, HttpService { inner: service, @@ -165,8 +166,8 @@ pub struct __ProtoBindTransport { } impl ServerProto for Http -where T: Io + 'static, - B: AsRef<[u8]> + 'static, + where T: AsyncRead + AsyncWrite + 'static, + B: AsRef<[u8]> + 'static, { type Request = __ProtoRequest; type RequestBody = http::Chunk; @@ -189,8 +190,8 @@ where T: Io + 'static, } impl Sink for __ProtoTransport -where T: Io + 'static, - B: AsRef<[u8]>, + where T: AsyncRead + AsyncWrite + 'static, + B: AsRef<[u8]> + 'static, { type SinkItem = Frame<__ProtoResponse, B, ::Error>; type SinkError = io::Error; @@ -224,9 +225,16 @@ where T: Io + 'static, fn poll_complete(&mut self) -> Poll<(), io::Error> { self.0.poll_complete() } + + fn close(&mut self) -> Poll<(), io::Error> { + self.0.close() + } } -impl> Stream for __ProtoTransport { +impl Stream for __ProtoTransport + where T: AsyncRead + AsyncWrite + 'static, + B: AsRef<[u8]> + 'static, +{ type Item = Frame<__ProtoRequest, http::Chunk, ::Error>; type Error = io::Error; @@ -246,7 +254,10 @@ impl> Stream for __ProtoTransport { } } -impl + 'static> Transport for __ProtoTransport { +impl Transport for __ProtoTransport + where T: AsyncRead + AsyncWrite + 'static, + B: AsRef<[u8]> + 'static, +{ fn tick(&mut self) { self.0.tick() } @@ -256,7 +267,9 @@ impl + 'static> Transport for __ProtoTransport Future for __ProtoBindTransport { +impl Future for __ProtoBindTransport + where T: AsyncRead + AsyncWrite + 'static, +{ type Item = __ProtoTransport; type Error = io::Error;