Skip to content

Commit

Permalink
fix(client): ensure idle connection is pooled before response body fi…
Browse files Browse the repository at this point in the history
…nishes
  • Loading branch information
seanmonstar committed Apr 3, 2018
1 parent bc9283e commit 7fe9710
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 3 deletions.
4 changes: 4 additions & 0 deletions src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ impl<B> SendRequest<B>
self.dispatch.poll_ready()
}

pub(super) fn is_ready(&self) -> bool {
self.dispatch.is_ready()
}

pub(super) fn is_closed(&self) -> bool {
self.dispatch.is_closed()
}
Expand Down
4 changes: 4 additions & 0 deletions src/client/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ impl<T, U> Sender<T, U> {
}
}

pub fn is_ready(&self) -> bool {
self.giver.is_wanting()
}

pub fn is_closed(&self) -> bool {
self.giver.is_canceled()
}
Expand Down
14 changes: 12 additions & 2 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::time::Duration;

use futures::{Async, Future, Poll, Stream};
use futures::future::{self, Executor};
use futures::sync::oneshot;
#[cfg(feature = "compat")]
use http;
use tokio::reactor::Handle;
Expand Down Expand Up @@ -244,7 +245,7 @@ where C: Connect,
ClientError::Normal(err)
}
})
.map(move |res| {
.map(move |mut res| {
// when pooled is dropped, it will try to insert back into the
// pool. To delay that, spawn a future that completes once the
// sender is ready again.
Expand All @@ -253,11 +254,20 @@ where C: Connect,
// for a new request to start.
//
// It won't be ready if there is a body to stream.
if let Ok(Async::NotReady) = pooled.tx.poll_ready() {
if pooled.tx.is_ready() {
drop(pooled);
} else if let Some(body) = res.body_mut() {
let (delayed_tx, delayed_rx) = oneshot::channel();
body.delayed_eof(delayed_rx);
// If the executor doesn't have room, oh well. Things will likely
// be blowing up soon, but this specific task isn't required.
let _ = executor.execute(future::poll_fn(move || {
pooled.tx.poll_ready().map_err(|_| ())
}).then(move |_| {
// At this point, `pooled` is dropped, and had a chance
// to insert into the pool (if conn was idle)
drop(delayed_tx);
Ok(())
}));
}

Expand Down
4 changes: 4 additions & 0 deletions src/client/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ impl Giver {
}
}

pub fn is_wanting(&self) -> bool {
self.inner.state.load(Ordering::SeqCst) == STATE_WANT
}

pub fn is_canceled(&self) -> bool {
self.inner.state.load(Ordering::SeqCst) == STATE_CLOSED
}
Expand Down
66 changes: 65 additions & 1 deletion src/proto/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use futures::sync::{mpsc, oneshot};
use tokio_proto;
use std::borrow::Cow;

use common::Never;
use super::Chunk;

#[cfg(feature = "tokio-proto")]
Expand All @@ -17,6 +18,15 @@ pub type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;
#[must_use = "streams do nothing unless polled"]
pub struct Body {
kind: Kind,
/// Allow the client to pass a future to delay the `Body` from returning
/// EOF. This allows the `Client` to try to put the idle connection
/// back into the pool before the body is "finished".
///
/// The reason for this is so that creating a new request after finishing
/// streaming the body of a response could sometimes result in creating
/// a brand new connection, since the pool didn't know about the idle
/// connection yet.
delayed_eof: Option<DelayEof>,
}

#[derive(Debug)]
Expand All @@ -31,6 +41,17 @@ enum Kind {
Empty,
}

type DelayEofUntil = oneshot::Receiver<Never>;

enum DelayEof {
/// Initial state, stream hasn't seen EOF yet.
NotEof(DelayEofUntil),
/// Transitions to this state once we've seen `poll` try to
/// return EOF (`None`). This future is then polled, and
/// when it completes, the Body finally returns EOF (`None`).
Eof(DelayEofUntil),
}

//pub(crate)
#[derive(Debug)]
pub struct ChunkSender {
Expand Down Expand Up @@ -72,6 +93,49 @@ impl Body {
fn new(kind: Kind) -> Body {
Body {
kind: kind,
delayed_eof: None,
}
}

pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
self.delayed_eof = Some(DelayEof::NotEof(fut));
}

fn poll_eof(&mut self) -> Poll<Option<Chunk>, ::Error> {
match self.delayed_eof.take() {
Some(DelayEof::NotEof(mut delay)) => {
match self.poll_inner() {
ok @ Ok(Async::Ready(Some(..))) |
ok @ Ok(Async::NotReady) => {
self.delayed_eof = Some(DelayEof::NotEof(delay));
ok
},
Ok(Async::Ready(None)) => match delay.poll() {
Ok(Async::Ready(never)) => match never {},
Ok(Async::NotReady) => {
self.delayed_eof = Some(DelayEof::Eof(delay));
Ok(Async::NotReady)
},
Err(_done) => {
Ok(Async::Ready(None))
},
},
Err(e) => Err(e),
}
},
Some(DelayEof::Eof(mut delay)) => {
match delay.poll() {
Ok(Async::Ready(never)) => match never {},
Ok(Async::NotReady) => {
self.delayed_eof = Some(DelayEof::Eof(delay));
Ok(Async::NotReady)
},
Err(_done) => {
Ok(Async::Ready(None))
},
}
},
None => self.poll_inner(),
}
}

Expand Down Expand Up @@ -104,7 +168,7 @@ impl Stream for Body {

#[inline]
fn poll(&mut self) -> Poll<Option<Chunk>, ::Error> {
self.poll_inner()
self.poll_eof()
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/proto/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ impl<B> Response<B> {
/// Read the body.
#[inline]
pub fn body_ref(&self) -> Option<&B> { self.body.as_ref() }

pub(crate) fn body_mut(&mut self) -> Option<&mut B> { self.body.as_mut() }
}

impl Response<Body> {
Expand Down
2 changes: 2 additions & 0 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,9 +741,11 @@ mod dispatch_impl {
client.get(uri).and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::Ok);
res.body().concat2()
/*
}).and_then(|_| {
Timeout::new(Duration::from_secs(1), &handle).unwrap()
.from_err()
*/
})
};
// client is dropped
Expand Down

0 comments on commit 7fe9710

Please sign in to comment.