Skip to content

Commit

Permalink
refactor(lib): convert usage of tokio_core::io to tokio_io
Browse files Browse the repository at this point in the history
This commit updates to the most recent versions (released today) of the various
Tokio libraries in use. Namely the `tokio_core::io` module has now been
deprecated in favor of an external `tokio-io` crate. This commit pulls in that
crate and uses the `AsyncRead + AsyncWrite` abstraction instead of `Io` from
tokio-core.

BREAKING CHANGE: Any external types that were using that had implemented `Io` will need to 
  implement `AsyncRead + AsyncWrite` from tokio_io.
  • Loading branch information
alexcrichton authored and seanmonstar committed Mar 18, 2017
1 parent 34509ef commit 8554904
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 96 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ include = [
[dependencies]
base64 = "0.4"
bytes = "0.4"
futures = "0.1.7"
futures = "0.1.11"
futures-cpupool = "0.1"
httparse = "1.0"
language-tags = "0.2"
log = "0.3"
mime = "0.2"
relay = "0.1"
time = "0.1"
tokio-core = "0.1"
tokio-core = "0.1.6"
tokio-proto = "0.1"
tokio-service = "0.1"
tokio-io = "0.1"
unicase = "1.0"
url = "1.0"

Expand Down
6 changes: 3 additions & 3 deletions src/client/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::io;
//use std::net::SocketAddr;

use futures::{Future, Poll, Async};
use tokio::io::Io;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::reactor::Handle;
use tokio::net::{TcpStream, TcpStreamNew};
use tokio_service::Service;
Expand All @@ -18,7 +18,7 @@ use super::dns;
/// `Request=Url` and `Response: Io` instead.
pub trait Connect: Service<Request=Url, Error=io::Error> + 'static {
/// The connected Io Stream.
type Output: Io + 'static;
type Output: AsyncRead + AsyncWrite + 'static;
/// A Future that will resolve to the connected Stream.
type Future: Future<Item=Self::Output, Error=io::Error> + 'static;
/// Connect to a remote address.
Expand All @@ -27,7 +27,7 @@ pub trait Connect: Service<Request=Url, Error=io::Error> + 'static {

impl<T> Connect for T
where T: Service<Request=Url, Error=io::Error> + 'static,
T::Response: Io,
T::Response: AsyncRead + AsyncWrite,
T::Future: Future<Error=io::Error>,
{
type Output = T::Response;
Expand Down
16 changes: 8 additions & 8 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use std::rc::Rc;
use std::time::Duration;

use futures::{Poll, Async, Future, Stream};
use relay;
use tokio::io::Io;
use futures::unsync::oneshot;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::reactor::Handle;
use tokio_proto::BindClient;
use tokio_proto::streaming::Message;
Expand Down Expand Up @@ -149,12 +149,12 @@ where C: Connect,
let pool_key = Rc::new(url[..::url::Position::BeforePath].to_owned());
self.connector.connect(url)
.map(move |io| {
let (tx, rx) = relay::channel();
let (tx, rx) = oneshot::channel();
let client = HttpClient {
client_rx: RefCell::new(Some(rx)),
}.bind_client(&handle, io);
let pooled = pool.pooled(pool_key, client);
tx.complete(pooled.clone());
drop(tx.send(pooled.clone()));
pooled
})
};
Expand Down Expand Up @@ -207,11 +207,11 @@ impl<C, B> fmt::Debug for Client<C, B> {
type TokioClient<B> = ClientProxy<Message<http::RequestHead, B>, Message<http::ResponseHead, TokioBody>, ::Error>;

struct HttpClient<B> {
client_rx: RefCell<Option<relay::Receiver<Pooled<TokioClient<B>>>>>,
client_rx: RefCell<Option<oneshot::Receiver<Pooled<TokioClient<B>>>>>,
}

impl<T, B> ClientProto<T> for HttpClient<B>
where T: Io + 'static,
where T: AsyncRead + AsyncWrite + 'static,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
Expand All @@ -232,12 +232,12 @@ where T: Io + 'static,
}

struct BindingClient<T, B> {
rx: relay::Receiver<Pooled<TokioClient<B>>>,
rx: oneshot::Receiver<Pooled<TokioClient<B>>>,
io: Option<T>,
}

impl<T, B> Future for BindingClient<T, B>
where T: Io + 'static,
where T: AsyncRead + AsyncWrite + 'static,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
Expand Down
51 changes: 27 additions & 24 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::rc::Rc;
use std::time::{Duration, Instant};

use futures::{Future, Async, Poll};
use relay;
use futures::unsync::oneshot;

use http::{KeepAlive, KA};

Expand All @@ -18,7 +18,7 @@ pub struct Pool<T> {
struct PoolInner<T> {
enabled: bool,
idle: HashMap<Rc<String>, Vec<Entry<T>>>,
parked: HashMap<Rc<String>, VecDeque<relay::Sender<Entry<T>>>>,
parked: HashMap<Rc<String>, VecDeque<oneshot::Sender<Entry<T>>>>,
timeout: Option<Duration>,
}

Expand All @@ -44,31 +44,33 @@ impl<T: Clone> Pool<T> {

fn put(&mut self, key: Rc<String>, entry: Entry<T>) {
trace!("Pool::put {:?}", key);
let mut inner = self.inner.borrow_mut();
//let inner = &mut *inner;
let mut remove_parked = false;
let tx = self.inner.borrow_mut().parked.get_mut(&key).and_then(|parked| {
let mut ret = None;
let mut entry = Some(entry);
if let Some(parked) = inner.parked.get_mut(&key) {
while let Some(tx) = parked.pop_front() {
if !tx.is_canceled() {
ret = Some(tx);
break;
match tx.send(entry.take().unwrap()) {
Ok(()) => break,
Err(e) => {
trace!("Pool::put removing canceled parked {:?}", key);
entry = Some(e);
}
}
trace!("Pool::put removing canceled parked {:?}", key);
}
remove_parked = parked.is_empty();
ret
});
}
if remove_parked {
self.inner.borrow_mut().parked.remove(&key);
inner.parked.remove(&key);
}

if let Some(tx) = tx {
trace!("Pool::put found parked {:?}", key);
tx.complete(entry);
} else {
self.inner.borrow_mut()
.idle.entry(key)
.or_insert(Vec::new())
.push(entry);
match entry {
Some(entry) => {
inner.idle.entry(key)
.or_insert(Vec::new())
.push(entry);
}
None => trace!("Pool::put found parked {:?}", key),
}
}

Expand Down Expand Up @@ -100,7 +102,7 @@ impl<T: Clone> Pool<T> {
}
}

fn park(&mut self, key: Rc<String>, tx: relay::Sender<Entry<T>>) {
fn park(&mut self, key: Rc<String>, tx: oneshot::Sender<Entry<T>>) {
trace!("Pool::park {:?}", key);
self.inner.borrow_mut()
.parked.entry(key)
Expand Down Expand Up @@ -191,7 +193,7 @@ struct Entry<T> {
pub struct Checkout<T> {
key: Rc<String>,
pool: Pool<T>,
parked: Option<relay::Receiver<Entry<T>>>,
parked: Option<oneshot::Receiver<Entry<T>>>,
}

impl<T: Clone> Future for Checkout<T> {
Expand Down Expand Up @@ -247,7 +249,7 @@ impl<T: Clone> Future for Checkout<T> {
Some(entry) => Ok(Async::Ready(self.pool.reuse(self.key.clone(), entry))),
None => {
if self.parked.is_none() {
let (tx, mut rx) = relay::channel();
let (tx, mut rx) = oneshot::channel();
let _ = rx.poll(); // park this task
self.pool.park(self.key.clone(), tx);
self.parked = Some(rx);
Expand Down Expand Up @@ -279,6 +281,7 @@ mod tests {
use std::rc::Rc;
use std::time::Duration;
use futures::{Async, Future};
use futures::future;
use http::KeepAlive;
use super::Pool;

Expand All @@ -297,7 +300,7 @@ mod tests {

#[test]
fn test_pool_checkout_returns_none_if_expired() {
::futures::lazy(|| {
future::lazy(|| {
let pool = Pool::new(true, Some(Duration::from_secs(1)));
let key = Rc::new("foo".to_string());
let mut pooled = pool.pooled(key.clone(), 41);
Expand Down Expand Up @@ -339,7 +342,7 @@ mod tests {
let pooled1 = pool.pooled(key.clone(), 41);

let mut pooled = pooled1.clone();
let checkout = pool.checkout(&key).join(::futures::lazy(move || {
let checkout = pool.checkout(&key).join(future::lazy(move || {
// the checkout future will park first,
// and then this lazy future will be polled, which will insert
// the pooled back into the pool
Expand Down
36 changes: 21 additions & 15 deletions src/http/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::time::Instant;

use futures::{Poll, Async, AsyncSink, Stream, Sink, StartSend};
use futures::task::Task;
use tokio::io::Io;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_proto::streaming::pipeline::{Frame, Transport};

use header::{ContentLength, TransferEncoding};
Expand All @@ -16,7 +16,7 @@ use version::HttpVersion;


/// This handles a connection, which will have been established over an
/// `Io` (like a socket), and will likely include multiple
/// `AsyncRead + AsyncWrite` (like a socket), and will likely include multiple
/// `Transaction`s over HTTP.
///
/// The connection will determine when a message begins and ends as well as
Expand All @@ -29,7 +29,7 @@ pub struct Conn<I, B, T, K = KA> {
}

impl<I, B, T, K> Conn<I, B, T, K>
where I: Io,
where I: AsyncRead + AsyncWrite,
B: AsRef<[u8]>,
T: Http1Transaction,
K: KeepAlive
Expand Down Expand Up @@ -155,7 +155,7 @@ where I: Io,
}

fn maybe_park_read(&mut self) {
if self.io.poll_read().is_ready() {
if !self.io.is_read_blocked() {
// the Io object is ready to read, which means it will never alert
// us that it is ready until we drain it. However, we're currently
// finished reading, so we need to park the task to be able to
Expand Down Expand Up @@ -350,7 +350,7 @@ where I: Io,
}

impl<I, B, T, K> Stream for Conn<I, B, T, K>
where I: Io,
where I: AsyncRead + AsyncWrite,
B: AsRef<[u8]>,
T: Http1Transaction,
K: KeepAlive,
Expand Down Expand Up @@ -385,7 +385,7 @@ where I: Io,
}

impl<I, B, T, K> Sink for Conn<I, B, T, K>
where I: Io,
where I: AsyncRead + AsyncWrite,
B: AsRef<[u8]>,
T: Http1Transaction,
K: KeepAlive,
Expand Down Expand Up @@ -450,10 +450,15 @@ where I: Io,
trace!("Conn::flush = {:?}", ret);
ret
}

fn close(&mut self) -> Poll<(), Self::SinkError> {
try_ready!(self.poll_complete());
self.io.io_mut().shutdown()
}
}

impl<I, B, T, K> Transport for Conn<I, B, T, K>
where I: Io + 'static,
where I: AsyncRead + AsyncWrite + 'static,
B: AsRef<[u8]> + 'static,
T: Http1Transaction + 'static,
K: KeepAlive + 'static,
Expand Down Expand Up @@ -665,6 +670,7 @@ impl<'a, T: fmt::Debug + 'a, B: AsRef<[u8]> + 'a> fmt::Debug for DebugFrame<'a,
#[cfg(test)]
mod tests {
use futures::{Async, Future, Stream, Sink};
use futures::future;
use tokio_proto::streaming::pipeline::Frame;

use http::{self, MessageHead, ServerTransaction};
Expand Down Expand Up @@ -705,7 +711,7 @@ mod tests {

#[test]
fn test_conn_parse_partial() {
let _: Result<(), ()> = ::futures::lazy(|| {
let _: Result<(), ()> = future::lazy(|| {
let good_message = b"GET / HTTP/1.1\r\nHost: foo.bar\r\n\r\n".to_vec();
let io = AsyncIo::new_buf(good_message, 10);
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
Expand Down Expand Up @@ -772,7 +778,7 @@ mod tests {

#[test]
fn test_conn_body_write_length() {
let _: Result<(), ()> = ::futures::lazy(|| {
let _: Result<(), ()> = future::lazy(|| {
let io = AsyncIo::new_buf(vec![], 0);
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
let max = ::http::io::MAX_BUFFER_SIZE + 4096;
Expand Down Expand Up @@ -800,7 +806,7 @@ mod tests {

#[test]
fn test_conn_body_write_chunked() {
let _: Result<(), ()> = ::futures::lazy(|| {
let _: Result<(), ()> = future::lazy(|| {
let io = AsyncIo::new_buf(vec![], 4096);
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
conn.state.writing = Writing::Body(Encoder::chunked(), None);
Expand All @@ -813,7 +819,7 @@ mod tests {

#[test]
fn test_conn_body_flush() {
let _: Result<(), ()> = ::futures::lazy(|| {
let _: Result<(), ()> = future::lazy(|| {
let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5);
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
conn.state.writing = Writing::Body(Encoder::length(1024 * 1024), None);
Expand All @@ -829,7 +835,7 @@ mod tests {
#[test]
fn test_conn_parking() {
use std::sync::Arc;
use futures::task::Unpark;
use futures::executor::Unpark;

struct Car {
permit: bool,
Expand All @@ -847,7 +853,7 @@ mod tests {
}

// test that once writing is done, unparks
let f = ::futures::lazy(|| {
let f = future::lazy(|| {
let io = AsyncIo::new_buf(vec![], 4096);
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
conn.state.reading = Reading::KeepAlive;
Expand All @@ -861,7 +867,7 @@ mod tests {


// test that flushing when not waiting on read doesn't unpark
let f = ::futures::lazy(|| {
let f = future::lazy(|| {
let io = AsyncIo::new_buf(vec![], 4096);
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
conn.state.writing = Writing::KeepAlive;
Expand All @@ -872,7 +878,7 @@ mod tests {


// test that flushing and writing isn't done doesn't unpark
let f = ::futures::lazy(|| {
let f = future::lazy(|| {
let io = AsyncIo::new_buf(vec![], 4096);
let mut conn = Conn::<_, http::Chunk, ServerTransaction>::new(io, Default::default());
conn.state.reading = Reading::KeepAlive;
Expand Down
2 changes: 1 addition & 1 deletion src/http/h1/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ mod tests {
let (a, b) = self.split_at(n);
let mut buf = BytesMut::from(a);
*self = b;
Ok(buf.drain_to(n).freeze())
Ok(buf.split_to(n).freeze())
} else {
Ok(Bytes::new())
}
Expand Down
Loading

0 comments on commit 8554904

Please sign in to comment.