diff --git a/Cargo.toml b/Cargo.toml index 815289bb75..75900aa168 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ language-tags = "0.2" log = "0.3" mime = "0.3.2" percent-encoding = "1.0" +relay = "0.1" time = "0.1" tokio-core = "0.1.6" tokio-proto = "0.1" diff --git a/src/client/pool.rs b/src/client/pool.rs index c86657923a..24b39eb752 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -7,7 +7,7 @@ use std::rc::Rc; use std::time::{Duration, Instant}; use futures::{Future, Async, Poll}; -use futures::unsync::oneshot; +use relay; use http::{KeepAlive, KA}; @@ -17,8 +17,19 @@ pub struct Pool { struct PoolInner { enabled: bool, + // These are internal Conns sitting in the event loop in the KeepAlive + // state, waiting to receive a new Request to send on the socket. idle: HashMap, Vec>>, - parked: HashMap, VecDeque>>>, + // These are outstanding Checkouts that are waiting for a socket to be + // able to send a Request one. This is used when "racing" for a new + // connection. + // + // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait + // for the Pool to receive an idle Conn. When a Conn becomes idle, + // this list is checked for any parked Checkouts, and tries to notify + // them that the Conn could be used instead of waiting for a brand new + // connection. + parked: HashMap, VecDeque>>>, timeout: Option, } @@ -50,6 +61,12 @@ impl Pool { let mut entry = Some(entry); if let Some(parked) = inner.parked.get_mut(&key) { while let Some(tx) = parked.pop_front() { + if tx.is_canceled() { + trace!("Pool::put removing canceled parked {:?}", key); + } else { + tx.complete(entry.take().unwrap()); + } + /* match tx.send(entry.take().unwrap()) { Ok(()) => break, Err(e) => { @@ -57,6 +74,7 @@ impl Pool { entry = Some(e); } } + */ } remove_parked = parked.is_empty(); } @@ -74,6 +92,7 @@ impl Pool { } } + pub fn pooled(&self, key: Rc, value: T) -> Pooled { trace!("Pool::pooled {:?}", key); Pooled { @@ -102,7 +121,7 @@ impl Pool { } } - fn park(&mut self, key: Rc, tx: oneshot::Sender>) { + fn park(&mut self, key: Rc, tx: relay::Sender>) { trace!("Pool::park {:?}", key); self.inner.borrow_mut() .parked.entry(key) @@ -111,6 +130,24 @@ impl Pool { } } +impl Pool { + fn clean_parked(&mut self, key: &Rc) { + trace!("Pool::clean_parked {:?}", key); + let mut inner = self.inner.borrow_mut(); + + let mut remove_parked = false; + if let Some(parked) = inner.parked.get_mut(key) { + parked.retain(|tx| { + !tx.is_canceled() + }); + remove_parked = parked.is_empty(); + } + if remove_parked { + inner.parked.remove(key); + } + } +} + impl Clone for Pool { fn clone(&self) -> Pool { Pool { @@ -204,7 +241,7 @@ enum TimedKA { pub struct Checkout { key: Rc, pool: Pool, - parked: Option>>, + parked: Option>>, } impl Future for Checkout { @@ -260,7 +297,7 @@ impl Future for Checkout { Some(entry) => Ok(Async::Ready(self.pool.reuse(self.key.clone(), entry))), None => { if self.parked.is_none() { - let (tx, mut rx) = oneshot::channel(); + let (tx, mut rx) = relay::channel(); let _ = rx.poll(); // park this task self.pool.park(self.key.clone(), tx); self.parked = Some(rx); @@ -271,6 +308,13 @@ impl Future for Checkout { } } +impl Drop for Checkout { + fn drop(&mut self) { + self.parked.take(); + self.pool.clean_parked(&self.key); + } +} + struct Expiration(Option); impl Expiration { @@ -364,4 +408,30 @@ mod tests { })).map(|(entry, _)| entry); assert_eq!(*checkout.wait().unwrap(), *pooled1); } + + #[test] + fn test_pool_checkout_drop_cleans_up_parked() { + future::lazy(|| { + let pool = Pool::new(true, Some(Duration::from_secs(10))); + let key = Rc::new("localhost:12345".to_string()); + let _pooled1 = pool.pooled(key.clone(), 41); + let mut checkout1 = pool.checkout(&key); + let mut checkout2 = pool.checkout(&key); + + // first poll needed to get into Pool's parked + checkout1.poll().unwrap(); + assert_eq!(pool.inner.borrow().parked.get(&key).unwrap().len(), 1); + checkout2.poll().unwrap(); + assert_eq!(pool.inner.borrow().parked.get(&key).unwrap().len(), 2); + + // on drop, clean up Pool + drop(checkout1); + assert_eq!(pool.inner.borrow().parked.get(&key).unwrap().len(), 1); + + drop(checkout2); + assert!(pool.inner.borrow().parked.get(&key).is_none()); + + ::futures::future::ok::<(), ()>(()) + }).wait().unwrap(); + } } diff --git a/src/lib.rs b/src/lib.rs index 1742886589..13e66a0320 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,7 @@ extern crate language_tags; #[macro_use] extern crate log; pub extern crate mime; #[macro_use] extern crate percent_encoding; +extern crate relay; extern crate time; extern crate tokio_core as tokio; #[macro_use] extern crate tokio_io;