Skip to content

Commit

Permalink
Redesign pooling mechanic (#565)
Browse files Browse the repository at this point in the history
Introduce PoolReturner, a handle on an agent and a PoolKey that is
capable of returning a Stream to a Pool. Make Streams keep track of
their own PoolReturner, instead of having PoolReturnRead keep track of
that information.

For the LimitedRead code path, get rid of PoolReturnRead. Instead,
LimitedRead is responsible for returning its Stream to the Pool after
its second-to-last read. In other words, LimitedRead will return the
stream if the next read is guaranteed to return Ok(0).

Constructing a LimitedRead of size 0 is always wrong, because we could
always just return the stream immediately. Change the size argument to
NonZeroUsize to enforce that.

Remove the Done trait, which was only used for LimitedRead. It was used
to try and make sure we returned the stream to the pool on exact reads,
but was not reliable.

This does not yet move the ChunkDecoder code path away from
PoolReturnRead. That requires a little more work.

Part 1 of #559.  Fixes #555.
  • Loading branch information
jsha authored Dec 4, 2022
1 parent d8225b2 commit 9083d69
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 114 deletions.
135 changes: 64 additions & 71 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ use std::collections::{HashMap, VecDeque};
use std::io::{self, Read};
use std::sync::Mutex;

use crate::response::LimitedRead;
use crate::stream::Stream;
use crate::{Agent, Proxy};

use chunked_transfer::Decoder;
use log::debug;
use url::Url;

Expand Down Expand Up @@ -124,7 +122,7 @@ impl ConnectionPool {
}
}

fn add(&self, key: &PoolKey, stream: Stream) {
pub(crate) fn add(&self, key: &PoolKey, stream: Stream) {
if self.noop() {
return;
}
Expand Down Expand Up @@ -188,7 +186,7 @@ impl ConnectionPool {
}

#[derive(PartialEq, Clone, Eq, Hash)]
struct PoolKey {
pub(crate) struct PoolKey {
scheme: String,
hostname: String,
port: Option<u16>,
Expand Down Expand Up @@ -218,27 +216,55 @@ impl PoolKey {
proxy,
}
}

pub(crate) fn from_parts(scheme: &str, hostname: &str, port: u16) -> Self {
PoolKey {
scheme: scheme.to_string(),
hostname: hostname.to_string(),
port: Some(port),
proxy: None,
}
}
}

#[derive(Clone, Debug)]
pub(crate) struct PoolReturner {
inner: Option<(Agent, PoolKey)>,
}

impl PoolReturner {
/// A PoolReturner that returns to the given Agent's Pool.
pub(crate) fn new(agent: Agent, pool_key: PoolKey) -> Self {
Self {
inner: Some((agent, pool_key)),
}
}

/// A PoolReturner that does nothing
pub(crate) fn none() -> Self {
Self { inner: None }
}

pub(crate) fn return_to_pool(&self, stream: Stream) {
if let Some((agent, pool_key)) = &self.inner {
agent.state.pool.add(pool_key, stream);
}
}
}

/// Read wrapper that returns a stream to the pool once the
/// read is exhausted (reached a 0).
///
/// *Internal API*
pub(crate) struct PoolReturnRead<R: Read + Sized + Into<Stream>> {
// the agent where we want to return the stream.
agent: Agent,
// wrapped reader around the same stream. It's an Option because we `take()` it
// upon returning the stream to the Agent.
reader: Option<R>,
// Key under which to store the stream when we're done.
key: PoolKey,
}

impl<R: Read + Sized + Into<Stream>> PoolReturnRead<R> {
pub fn new(agent: &Agent, url: &Url, reader: R) -> Self {
pub fn new(reader: R) -> Self {
PoolReturnRead {
agent: agent.clone(),
key: PoolKey::new(url, agent.config.proxy.clone()),
reader: Some(reader),
}
}
Expand All @@ -247,13 +273,8 @@ impl<R: Read + Sized + Into<Stream>> PoolReturnRead<R> {
// guard we only do this once.
if let Some(reader) = self.reader.take() {
// bring back stream here to either go into pool or dealloc
let mut stream = reader.into();

// ensure stream can be reused
stream.reset()?;

// insert back into pool
self.agent.state.pool.add(&self.key, stream);
let stream: Stream = reader.into();
stream.return_to_pool()?;
}

Ok(())
Expand All @@ -267,33 +288,12 @@ impl<R: Read + Sized + Into<Stream>> PoolReturnRead<R> {
}
}

// Done allows a reader to indicate it is done (next read will return Ok(0))
// without actually performing a read. This is useful so LimitedRead can
// inform PoolReturnRead to return a stream to the pool even if the user
// never read past the end of the response (For instance because their
// application is handling length information on its own).
pub(crate) trait Done {
fn done(&self) -> bool;
}

impl<R: Read> Done for LimitedRead<R> {
fn done(&self) -> bool {
self.remaining() == 0
}
}

impl<R: Read> Done for Decoder<R> {
fn done(&self) -> bool {
false
}
}

impl<R: Read + Sized + Done + Into<Stream>> Read for PoolReturnRead<R> {
impl<R: Read + Sized + Into<Stream>> Read for PoolReturnRead<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let amount = self.do_read(buf)?;
// only if the underlying reader is exhausted can we send a new
// request to the same socket. hence, we only return it now.
if amount == 0 || self.reader.as_ref().map(|r| r.done()).unwrap_or_default() {
if amount == 0 {
self.return_connection()?;
}
Ok(amount)
Expand All @@ -313,8 +313,8 @@ mod tests {
struct NoopStream;

impl NoopStream {
fn stream() -> Stream {
Stream::new(NoopStream, remote_addr_for_test())
fn stream(pool_returner: PoolReturner) -> Stream {
Stream::new(NoopStream, remote_addr_for_test(), pool_returner)
}
}

Expand Down Expand Up @@ -360,7 +360,7 @@ mod tests {
proxy: None,
});
for key in poolkeys.clone() {
pool.add(&key, NoopStream::stream());
pool.add(&key, NoopStream::stream(PoolReturner::none()));
}
assert_eq!(pool.len(), pool.max_idle_connections);

Expand All @@ -385,7 +385,7 @@ mod tests {
};

for _ in 0..pool.max_idle_connections_per_host * 2 {
pool.add(&poolkey, NoopStream::stream())
pool.add(&poolkey, NoopStream::stream(PoolReturner::none()))
}
assert_eq!(pool.len(), pool.max_idle_connections_per_host);

Expand All @@ -404,38 +404,39 @@ mod tests {
let url = Url::parse("zzz:///example.com").unwrap();
let pool_key = PoolKey::new(&url, None);

pool.add(&pool_key, NoopStream::stream());
pool.add(&pool_key, NoopStream::stream(PoolReturner::none()));
assert_eq!(pool.len(), 1);

let pool_key = PoolKey::new(&url, Some(Proxy::new("localhost:9999").unwrap()));

pool.add(&pool_key, NoopStream::stream());
pool.add(&pool_key, NoopStream::stream(PoolReturner::none()));
assert_eq!(pool.len(), 2);

let pool_key = PoolKey::new(
&url,
Some(Proxy::new("user:password@localhost:9999").unwrap()),
);

pool.add(&pool_key, NoopStream::stream());
pool.add(&pool_key, NoopStream::stream(PoolReturner::none()));
assert_eq!(pool.len(), 3);
}

// Test that a stream gets returned to the pool if it was wrapped in a LimitedRead, and
// user reads the exact right number of bytes (but never gets a read of 0 bytes).
#[test]
fn read_exact() {
use crate::response::LimitedRead;

let url = Url::parse("https:///example.com").unwrap();

let mut out_buf = [0u8; 500];

let agent = Agent::new();
let stream = NoopStream::stream();
let limited_read = LimitedRead::new(stream, 500);

let mut pool_return_read = PoolReturnRead::new(&agent, &url, limited_read);
let pool_key = PoolKey::new(&url, None);
let stream = NoopStream::stream(PoolReturner::new(agent.clone(), pool_key));
let mut limited_read = LimitedRead::new(stream, std::num::NonZeroUsize::new(500).unwrap());

pool_return_read.read_exact(&mut out_buf).unwrap();
limited_read.read_exact(&mut out_buf).unwrap();

assert_eq!(agent.state.pool.len(), 1);
}
Expand All @@ -448,6 +449,7 @@ mod tests {
fn read_exact_chunked_gzip() {
use crate::response::Compression;
use chunked_transfer::Decoder as ChunkDecoder;
use std::io::Cursor;

let gz_body = vec![
b'E', b'\r', b'\n', // 14 first chunk
Expand All @@ -464,28 +466,19 @@ mod tests {
b'\r', b'\n', //
];

println!("{:?}", gz_body);

impl ReadWrite for io::Cursor<Vec<u8>> {
fn socket(&self) -> Option<&std::net::TcpStream> {
None
}
}

impl From<io::Cursor<Vec<u8>>> for Stream {
fn from(c: io::Cursor<Vec<u8>>) -> Self {
Stream::new(c, "1.1.1.1:8080".parse().unwrap())
}
}

let agent = Agent::new();
let url = Url::parse("https://example.com").unwrap();

assert_eq!(agent.state.pool.len(), 0);

let chunked = ChunkDecoder::new(io::Cursor::new(gz_body));
let ro = crate::test::TestStream::new(Cursor::new(gz_body), std::io::sink());
let stream = Stream::new(
ro,
"1.1.1.1:4343".parse().unwrap(),
PoolReturner::new(agent.clone(), PoolKey::from_parts("http", "1.1.1.1", 8080)),
);

let chunked = ChunkDecoder::new(stream);
let pool_return_read: Box<(dyn Read + Send + Sync + 'static)> =
Box::new(PoolReturnRead::new(&agent, &url, chunked));
Box::new(PoolReturnRead::new(chunked));

let compression = Compression::Gzip;
let mut stream = compression.wrap_reader(pool_return_read);
Expand Down
Loading

0 comments on commit 9083d69

Please sign in to comment.