From 7fe9710a98650efc37f35bb21b19926c015f0631 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 3 Apr 2018 14:25:39 -0700 Subject: [PATCH] fix(client): ensure idle connection is pooled before response body finishes --- src/client/conn.rs | 4 +++ src/client/dispatch.rs | 4 +++ src/client/mod.rs | 14 +++++++-- src/client/signal.rs | 4 +++ src/proto/body.rs | 66 +++++++++++++++++++++++++++++++++++++++++- src/proto/response.rs | 2 ++ tests/client.rs | 2 ++ 7 files changed, 93 insertions(+), 3 deletions(-) diff --git a/src/client/conn.rs b/src/client/conn.rs index f6a139b53e..5f44a11e3a 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -126,6 +126,10 @@ impl SendRequest self.dispatch.poll_ready() } + pub(super) fn is_ready(&self) -> bool { + self.dispatch.is_ready() + } + pub(super) fn is_closed(&self) -> bool { self.dispatch.is_closed() } diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index efc720d8df..cc5e345710 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -45,6 +45,10 @@ impl Sender { } } + pub fn is_ready(&self) -> bool { + self.giver.is_wanting() + } + pub fn is_closed(&self) -> bool { self.giver.is_canceled() } diff --git a/src/client/mod.rs b/src/client/mod.rs index eb8c10eadc..96a2b216bb 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -9,6 +9,7 @@ use std::time::Duration; use futures::{Async, Future, Poll, Stream}; use futures::future::{self, Executor}; +use futures::sync::oneshot; #[cfg(feature = "compat")] use http; use tokio::reactor::Handle; @@ -244,7 +245,7 @@ where C: Connect, ClientError::Normal(err) } }) - .map(move |res| { + .map(move |mut res| { // when pooled is dropped, it will try to insert back into the // pool. To delay that, spawn a future that completes once the // sender is ready again. @@ -253,11 +254,20 @@ where C: Connect, // for a new request to start. // // It won't be ready if there is a body to stream. - if let Ok(Async::NotReady) = pooled.tx.poll_ready() { + if pooled.tx.is_ready() { + drop(pooled); + } else if let Some(body) = res.body_mut() { + let (delayed_tx, delayed_rx) = oneshot::channel(); + body.delayed_eof(delayed_rx); // If the executor doesn't have room, oh well. Things will likely // be blowing up soon, but this specific task isn't required. let _ = executor.execute(future::poll_fn(move || { pooled.tx.poll_ready().map_err(|_| ()) + }).then(move |_| { + // At this point, `pooled` is dropped, and had a chance + // to insert into the pool (if conn was idle) + drop(delayed_tx); + Ok(()) })); } diff --git a/src/client/signal.rs b/src/client/signal.rs index 2ddf67f7a5..7fd6bb4490 100644 --- a/src/client/signal.rs +++ b/src/client/signal.rs @@ -88,6 +88,10 @@ impl Giver { } } + pub fn is_wanting(&self) -> bool { + self.inner.state.load(Ordering::SeqCst) == STATE_WANT + } + pub fn is_canceled(&self) -> bool { self.inner.state.load(Ordering::SeqCst) == STATE_CLOSED } diff --git a/src/proto/body.rs b/src/proto/body.rs index 8f23378285..281c883b61 100644 --- a/src/proto/body.rs +++ b/src/proto/body.rs @@ -7,6 +7,7 @@ use futures::sync::{mpsc, oneshot}; use tokio_proto; use std::borrow::Cow; +use common::Never; use super::Chunk; #[cfg(feature = "tokio-proto")] @@ -17,6 +18,15 @@ pub type BodySender = mpsc::Sender>; #[must_use = "streams do nothing unless polled"] pub struct Body { kind: Kind, + /// Allow the client to pass a future to delay the `Body` from returning + /// EOF. This allows the `Client` to try to put the idle connection + /// back into the pool before the body is "finished". + /// + /// The reason for this is so that creating a new request after finishing + /// streaming the body of a response could sometimes result in creating + /// a brand new connection, since the pool didn't know about the idle + /// connection yet. + delayed_eof: Option, } #[derive(Debug)] @@ -31,6 +41,17 @@ enum Kind { Empty, } +type DelayEofUntil = oneshot::Receiver; + +enum DelayEof { + /// Initial state, stream hasn't seen EOF yet. + NotEof(DelayEofUntil), + /// Transitions to this state once we've seen `poll` try to + /// return EOF (`None`). This future is then polled, and + /// when it completes, the Body finally returns EOF (`None`). + Eof(DelayEofUntil), +} + //pub(crate) #[derive(Debug)] pub struct ChunkSender { @@ -72,6 +93,49 @@ impl Body { fn new(kind: Kind) -> Body { Body { kind: kind, + delayed_eof: None, + } + } + + pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) { + self.delayed_eof = Some(DelayEof::NotEof(fut)); + } + + fn poll_eof(&mut self) -> Poll, ::Error> { + match self.delayed_eof.take() { + Some(DelayEof::NotEof(mut delay)) => { + match self.poll_inner() { + ok @ Ok(Async::Ready(Some(..))) | + ok @ Ok(Async::NotReady) => { + self.delayed_eof = Some(DelayEof::NotEof(delay)); + ok + }, + Ok(Async::Ready(None)) => match delay.poll() { + Ok(Async::Ready(never)) => match never {}, + Ok(Async::NotReady) => { + self.delayed_eof = Some(DelayEof::Eof(delay)); + Ok(Async::NotReady) + }, + Err(_done) => { + Ok(Async::Ready(None)) + }, + }, + Err(e) => Err(e), + } + }, + Some(DelayEof::Eof(mut delay)) => { + match delay.poll() { + Ok(Async::Ready(never)) => match never {}, + Ok(Async::NotReady) => { + self.delayed_eof = Some(DelayEof::Eof(delay)); + Ok(Async::NotReady) + }, + Err(_done) => { + Ok(Async::Ready(None)) + }, + } + }, + None => self.poll_inner(), } } @@ -104,7 +168,7 @@ impl Stream for Body { #[inline] fn poll(&mut self) -> Poll, ::Error> { - self.poll_inner() + self.poll_eof() } } diff --git a/src/proto/response.rs b/src/proto/response.rs index 0154cb15f1..4c9c4001b4 100644 --- a/src/proto/response.rs +++ b/src/proto/response.rs @@ -102,6 +102,8 @@ impl Response { /// Read the body. #[inline] pub fn body_ref(&self) -> Option<&B> { self.body.as_ref() } + + pub(crate) fn body_mut(&mut self) -> Option<&mut B> { self.body.as_mut() } } impl Response { diff --git a/tests/client.rs b/tests/client.rs index 72783b3dc0..8e52d87324 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -741,9 +741,11 @@ mod dispatch_impl { client.get(uri).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::Ok); res.body().concat2() + /* }).and_then(|_| { Timeout::new(Duration::from_secs(1), &handle).unwrap() .from_err() + */ }) }; // client is dropped