Skip to content

Commit

Permalink
fix(client): don't error on read before writing request
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Jan 10, 2018
1 parent 5ce269a commit 7976023
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 18 deletions.
18 changes: 16 additions & 2 deletions src/proto/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,21 @@ where I: AsyncRead + AsyncWrite,
ret
}

pub fn maybe_park_read(&mut self) {
pub fn read_keep_alive(&mut self) -> Result<(), ::Error> {
debug_assert!(!self.can_read_head() && !self.can_read_body());

trace!("Conn::read_keep_alive");

if T::should_read_first() || !self.state.is_idle() {
self.maybe_park_read();
} else {
self.try_empty_read()?;
}

Ok(())
}

fn maybe_park_read(&mut self) {
if !self.io.is_read_blocked() {
// the Io object is ready to read, which means it will never alert
// us that it is ready until we drain it. However, we're currently
Expand All @@ -258,7 +272,7 @@ where I: AsyncRead + AsyncWrite,
//
// This should only be called for Clients wanting to enter the idle
// state.
pub fn try_empty_read(&mut self) -> io::Result<()> {
fn try_empty_read(&mut self) -> io::Result<()> {
assert!(!self.can_read_head() && !self.can_read_body());

if !self.io.read_buf().is_empty() {
Expand Down
77 changes: 61 additions & 16 deletions src/proto/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ where
self.conn.disable_keep_alive()
}

fn poll2(&mut self) -> Poll<(), ::Error> {
self.poll_read()?;
self.poll_write()?;
self.poll_flush()?;

if self.is_done() {
try_ready!(self.conn.shutdown());
trace!("Dispatch::poll done");
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}

fn poll_read(&mut self) -> Poll<(), ::Error> {
loop {
if self.is_closing {
Expand Down Expand Up @@ -163,12 +177,8 @@ where
} else {
// just drop, the body will close automatically
}
} else if !T::should_read_first() {
self.conn.try_empty_read()?;
return Ok(Async::NotReady);
} else {
self.conn.maybe_park_read();
return Ok(Async::Ready(()));
return self.conn.read_keep_alive().map(Async::Ready);
}
}
}
Expand Down Expand Up @@ -266,17 +276,13 @@ where
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
trace!("Dispatcher::poll");
self.poll_read()?;
self.poll_write()?;
self.poll_flush()?;

if self.is_done() {
try_ready!(self.conn.shutdown());
trace!("Dispatch::poll done");
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
self.poll2().or_else(|e| {
// An error means we're shutting down either way.
// We just try to give the error to the user,
// and close the connection with an Ok. If we
// cannot give it to the user, then return the Err.
self.dispatch.recv_msg(Err(e)).map(Async::Ready)
})
}
}

Expand Down Expand Up @@ -399,6 +405,9 @@ where
if let Some(cb) = self.callback.take() {
let _ = cb.send(Err(err));
Ok(())
} else if let Ok(Async::Ready(Some(ClientMsg::Request(_, _, cb)))) = self.rx.poll() {
let _ = cb.send(Err(err));
Ok(())
} else {
Err(err)
}
Expand All @@ -424,3 +433,39 @@ where
self.callback.is_none()
}
}

#[cfg(test)]
mod tests {
use futures::Sink;

use super::*;
use mock::AsyncIo;
use proto::ClientTransaction;

#[test]
fn client_read_response_before_writing_request() {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();
::futures::lazy(|| {
let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 100);
let (mut tx, rx) = mpsc::channel(0);
let conn = Conn::<_, ::Chunk, ClientTransaction>::new(io, Default::default());
let mut dispatcher = Dispatcher::new(Client::new(rx), conn);

let req = RequestHead {
version: ::HttpVersion::Http11,
subject: ::proto::RequestLine::default(),
headers: Default::default(),
};
let (res_tx, res_rx) = oneshot::channel();
tx.start_send(ClientMsg::Request(req, None::<::Body>, res_tx)).unwrap();

dispatcher.poll().expect("dispatcher poll 1");
dispatcher.poll().expect("dispatcher poll 2");
let _res = res_rx.wait()
.expect("callback poll")
.expect("callback response");
Ok::<(), ()>(())
}).wait().unwrap();
}
}

0 comments on commit 7976023

Please sign in to comment.