Skip to content

Commit

Permalink
fix(client): never call connect if idle connection is available
Browse files Browse the repository at this point in the history
The HttpConnector's connect future was lazy, but if any custom connector
did not use a lazy future, then a connect would always be started, even
if an idle connection was available.
  • Loading branch information
seanmonstar committed Feb 28, 2018
1 parent ad77630 commit 13741f5
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 27 deletions.
35 changes: 19 additions & 16 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,22 +182,25 @@ where C: Connect,
let pool = self.pool.clone();
let pool_key = Rc::new(domain.to_string());
let h1_writev = self.h1_writev;
self.connector.connect(url)
.and_then(move |io| {
let (tx, rx) = dispatch::channel();
let tx = HyperClient {
tx: tx,
should_close: Cell::new(true),
};
let pooled = pool.pooled(pool_key, tx);
let mut conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone());
if !h1_writev {
conn.set_write_strategy_flatten();
}
let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn);
executor.execute(dispatch.map_err(|e| debug!("client connection error: {}", e)))?;
Ok(pooled)
})
let connector = self.connector.clone();
future::lazy(move || {
connector.connect(url)
.and_then(move |io| {
let (tx, rx) = dispatch::channel();
let tx = HyperClient {
tx: tx,
should_close: Cell::new(true),
};
let pooled = pool.pooled(pool_key, tx);
let mut conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone());
if !h1_writev {
conn.set_write_strategy_flatten();
}
let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn);
executor.execute(dispatch.map_err(|e| debug!("client connection error: {}", e)))?;
Ok(pooled)
})
})
};

let race = checkout.select(connect)
Expand Down
97 changes: 86 additions & 11 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ mod dispatch_impl {
let handle = core.handle();
let closes = Arc::new(AtomicUsize::new(0));
let client = Client::configure()
.connector(DebugConnector(HttpConnector::new(1, &core.handle()), closes.clone()))
.connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &core.handle()), closes.clone()))
.build(&handle);

let (tx1, rx1) = oneshot::channel();
Expand Down Expand Up @@ -784,7 +784,7 @@ mod dispatch_impl {

let res = {
let client = Client::configure()
.connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone()))
.connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes.clone()))
.build(&handle);
client.get(uri).and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::Ok);
Expand Down Expand Up @@ -834,7 +834,7 @@ mod dispatch_impl {
let uri = format!("http://{}/a", addr).parse().unwrap();

let client = Client::configure()
.connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone()))
.connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes.clone()))
.build(&handle);
let res = client.get(uri).and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::Ok);
Expand Down Expand Up @@ -883,7 +883,7 @@ mod dispatch_impl {

let res = {
let client = Client::configure()
.connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone()))
.connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes.clone()))
.build(&handle);
client.get(uri)
};
Expand Down Expand Up @@ -927,7 +927,7 @@ mod dispatch_impl {

let res = {
let client = Client::configure()
.connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone()))
.connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes.clone()))
.build(&handle);
// notably, havent read body yet
client.get(uri)
Expand Down Expand Up @@ -966,7 +966,7 @@ mod dispatch_impl {
let uri = format!("http://{}/a", addr).parse().unwrap();

let client = Client::configure()
.connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone()))
.connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes.clone()))
.keep_alive(false)
.build(&handle);
let res = client.get(uri).and_then(move |res| {
Expand Down Expand Up @@ -1005,7 +1005,7 @@ mod dispatch_impl {
let uri = format!("http://{}/a", addr).parse().unwrap();

let client = Client::configure()
.connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone()))
.connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes.clone()))
.build(&handle);
let res = client.get(uri).and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::Ok);
Expand Down Expand Up @@ -1095,7 +1095,7 @@ mod dispatch_impl {
let uri = format!("http://{}/a", addr).parse().unwrap();

let client = Client::configure()
.connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone()))
.connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes.clone()))
.executor(handle.clone());
let res = client.get(uri).and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::Ok);
Expand All @@ -1110,7 +1110,79 @@ mod dispatch_impl {
assert_eq!(closes.load(Ordering::Relaxed), 1);
}

struct DebugConnector(HttpConnector, Arc<AtomicUsize>);
#[test]
fn idle_conn_prevents_connect_call() {
let _ = pretty_env_logger::try_init();

let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap();
let mut core = Core::new().unwrap();
let handle = core.handle();
let connector = DebugConnector::new(&handle);
let connects = connector.connects.clone();

let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();

thread::spawn(move || {
let mut sock = server.accept().unwrap().0;
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap();
let mut buf = [0; 4096];
sock.read(&mut buf).expect("read 1");
sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").unwrap();
let _ = tx1.send(());

sock.read(&mut buf).expect("read 2");
sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").unwrap();
let _ = tx2.send(());
});

let uri: hyper::Uri = format!("http://{}/a", addr).parse().unwrap();

let client = Client::configure()
.connector(connector)
.build(&handle);

let res = client.get(uri.clone()).and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::Ok);
res.body().concat2()
});
let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked")));
core.run(res.join(rx).map(|r| r.0)).unwrap();
assert_eq!(connects.load(Ordering::Relaxed), 1);

let res2 = client.get(uri).and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::Ok);
res.body().concat2()
});
let rx = rx2.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked")));
core.run(res2.join(rx).map(|r| r.0)).unwrap();

assert_eq!(connects.load(Ordering::Relaxed), 1);
}


struct DebugConnector {
http: HttpConnector,
closes: Arc<AtomicUsize>,
connects: Arc<AtomicUsize>,
}

impl DebugConnector {
fn new(handle: &Handle) -> DebugConnector {
let http = HttpConnector::new(1, handle);
DebugConnector::with_http_and_closes(http, Arc::new(AtomicUsize::new(0)))
}

fn with_http_and_closes(http: HttpConnector, closes: Arc<AtomicUsize>) -> DebugConnector {
DebugConnector {
http: http,
closes: closes,
connects: Arc::new(AtomicUsize::new(0)),
}
}
}

impl Service for DebugConnector {
type Request = Uri;
Expand All @@ -1119,8 +1191,11 @@ mod dispatch_impl {
type Future = Box<Future<Item = DebugStream, Error = io::Error>>;

fn call(&self, uri: Uri) -> Self::Future {
let counter = self.1.clone();
Box::new(self.0.call(uri).map(move |s| DebugStream(s, counter)))
self.connects.fetch_add(1, Ordering::SeqCst);
let closes = self.closes.clone();
Box::new(self.http.call(uri).map(move |s| {
DebugStream(s, closes)
}))
}
}

Expand Down

0 comments on commit 13741f5

Please sign in to comment.