Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Remove lifetime from Context, ResponseFuture, InterceptorFuture #405

Merged
merged 8 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 95 additions & 94 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
error::{Error, ErrorKind},
handler::{RequestHandler, ResponseBodyReader},
headers::HasHeaders,
interceptor::{self, Interceptor, InterceptorObj},
interceptor::{self, Interceptor, InterceptorFuture, InterceptorObj},
parsing::header_to_curl_string,
};
use futures_lite::{
Expand Down Expand Up @@ -651,6 +651,11 @@ impl HttpClient {
self.inner.cookie_jar.as_ref()
}

/// Get the configured interceptors for this HTTP client.
pub(crate) fn interceptors(&self) -> &[InterceptorObj] {
&self.inner.interceptors
}

/// Send a GET request to the given URI.
///
/// To customize the request further, see [`HttpClient::send`]. To execute
Expand Down Expand Up @@ -682,7 +687,7 @@ impl HttpClient {
///
/// To customize the request further, see [`HttpClient::send_async`]. To
/// execute the request synchronously, see [`HttpClient::get`].
pub fn get_async<U>(&self, uri: U) -> ResponseFuture<'_>
pub fn get_async<U>(&self, uri: U) -> ResponseFuture
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
Expand Down Expand Up @@ -724,7 +729,7 @@ impl HttpClient {
///
/// To customize the request further, see [`HttpClient::send_async`]. To
/// execute the request synchronously, see [`HttpClient::head`].
pub fn head_async<U>(&self, uri: U) -> ResponseFuture<'_>
pub fn head_async<U>(&self, uri: U) -> ResponseFuture
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
Expand Down Expand Up @@ -770,7 +775,7 @@ impl HttpClient {
///
/// To customize the request further, see [`HttpClient::send_async`]. To
/// execute the request synchronously, see [`HttpClient::post`].
pub fn post_async<U, B>(&self, uri: U, body: B) -> ResponseFuture<'_>
pub fn post_async<U, B>(&self, uri: U, body: B) -> ResponseFuture
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
Expand Down Expand Up @@ -818,7 +823,7 @@ impl HttpClient {
///
/// To customize the request further, see [`HttpClient::send_async`]. To
/// execute the request synchronously, see [`HttpClient::put`].
pub fn put_async<U, B>(&self, uri: U, body: B) -> ResponseFuture<'_>
pub fn put_async<U, B>(&self, uri: U, body: B) -> ResponseFuture
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
Expand Down Expand Up @@ -850,7 +855,7 @@ impl HttpClient {
///
/// To customize the request further, see [`HttpClient::send_async`]. To
/// execute the request synchronously, see [`HttpClient::delete`].
pub fn delete_async<U>(&self, uri: U) -> ResponseFuture<'_>
pub fn delete_async<U>(&self, uri: U) -> ResponseFuture
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
Expand Down Expand Up @@ -997,7 +1002,7 @@ impl HttpClient {
/// # Ok(()) }
/// ```
#[inline]
pub fn send_async<B>(&self, request: Request<B>) -> ResponseFuture<'_>
pub fn send_async<B>(&self, request: Request<B>) -> ResponseFuture
where
B: Into<AsyncBody>,
{
Expand All @@ -1014,10 +1019,7 @@ impl HttpClient {
}

/// Actually send the request. All the public methods go through here.
async fn send_async_inner(
&self,
mut request: Request<AsyncBody>,
) -> Result<Response<AsyncBody>, Error> {
fn send_async_inner(&self, mut request: Request<AsyncBody>) -> InterceptorFuture<Error> {
// Populate request config, creating if necessary.
if let Some(config) = request.extensions_mut().get_mut::<RequestConfig>() {
// Merge request configuration with defaults.
Expand All @@ -1029,11 +1031,85 @@ impl HttpClient {
}

let ctx = interceptor::Context {
invoker: Arc::new(self),
interceptors: &self.inner.interceptors,
client: self.clone(),
interceptor_offset: 0,
};

ctx.send(request).await
ctx.send(request)
}

pub(crate) fn invoke(&self, mut request: Request<AsyncBody>) -> InterceptorFuture<Error> {
let client = self.clone();

Box::pin(async move {
let is_head_request = request.method() == http::Method::HEAD;

// Set default user agent if not specified.
request
.headers_mut()
.entry(http::header::USER_AGENT)
.or_insert(USER_AGENT.parse().unwrap());

// Check if automatic decompression is enabled; we'll need to know
// this later after the response is sent.
let is_automatic_decompression = request
.extensions()
.get::<RequestConfig>()
.unwrap()
.automatic_decompression
.unwrap_or(false);

// Create and configure a curl easy handle to fulfil the request.
let (easy, future) = client
.create_easy_handle(request)
.map_err(Error::from_any)?;

// Send the request to the agent to be executed.
client.inner.agent.submit_request(easy)?;

// Await for the response headers.
let response = future.await?;

// If a Content-Length header is present, include that information in
// the body as well.
let body_len = response.content_length().filter(|_| {
// If automatic decompression is enabled, and will likely be
// selected, then the value of Content-Length does not indicate
// the uncompressed body length and merely the compressed data
// length. If it looks like we are in this scenario then we
// ignore the Content-Length, since it can only cause confusion
// when included with the body.
if is_automatic_decompression {
if let Some(value) = response.headers().get(http::header::CONTENT_ENCODING) {
if value != "identity" {
return false;
}
}
}

true
});

// Convert the reader into an opaque Body.
Ok(response.map(|reader| {
if is_head_request {
AsyncBody::empty()
} else {
let body = ResponseBody {
inner: reader,
// Extend the lifetime of the agent by including a reference
// to its handle in the response body.
_client: client,
};

if let Some(len) = body_len {
AsyncBody::from_reader_sized(body, len)
} else {
AsyncBody::from_reader(body)
}
}
}))
})
}

fn create_easy_handle(
Expand Down Expand Up @@ -1159,81 +1235,6 @@ impl HttpClient {
}
}

impl crate::interceptor::Invoke for &HttpClient {
fn invoke(
&self,
mut request: Request<AsyncBody>,
) -> crate::interceptor::InterceptorFuture<'_, Error> {
Box::pin(async move {
let is_head_request = request.method() == http::Method::HEAD;

// Set default user agent if not specified.
request
.headers_mut()
.entry(http::header::USER_AGENT)
.or_insert(USER_AGENT.parse().unwrap());

// Check if automatic decompression is enabled; we'll need to know
// this later after the response is sent.
let is_automatic_decompression = request
.extensions()
.get::<RequestConfig>()
.unwrap()
.automatic_decompression
.unwrap_or(false);

// Create and configure a curl easy handle to fulfil the request.
let (easy, future) = self.create_easy_handle(request).map_err(Error::from_any)?;

// Send the request to the agent to be executed.
self.inner.agent.submit_request(easy)?;

// Await for the response headers.
let response = future.await?;

// If a Content-Length header is present, include that information in
// the body as well.
let body_len = response.content_length().filter(|_| {
// If automatic decompression is enabled, and will likely be
// selected, then the value of Content-Length does not indicate
// the uncompressed body length and merely the compressed data
// length. If it looks like we are in this scenario then we
// ignore the Content-Length, since it can only cause confusion
// when included with the body.
if is_automatic_decompression {
if let Some(value) = response.headers().get(http::header::CONTENT_ENCODING) {
if value != "identity" {
return false;
}
}
}

true
});

// Convert the reader into an opaque Body.
Ok(response.map(|reader| {
if is_head_request {
AsyncBody::empty()
} else {
let body = ResponseBody {
inner: reader,
// Extend the lifetime of the agent by including a reference
// to its handle in the response body.
_client: (*self).clone(),
};

if let Some(len) = body_len {
AsyncBody::from_reader_sized(body, len)
} else {
AsyncBody::from_reader(body)
}
}
}))
})
}
}

impl fmt::Debug for HttpClient {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("HttpClient").finish()
Expand All @@ -1242,12 +1243,12 @@ impl fmt::Debug for HttpClient {

/// A future for a request being executed.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ResponseFuture<'c>(Pin<Box<dyn Future<Output = <Self as Future>::Output> + 'c + Send>>);
pub struct ResponseFuture(Pin<Box<dyn Future<Output = <Self as Future>::Output> + 'static + Send>>);

impl<'c> ResponseFuture<'c> {
impl ResponseFuture {
fn new<F>(future: F) -> Self
where
F: Future<Output = <Self as Future>::Output> + Send + 'c,
F: Future<Output = <Self as Future>::Output> + Send + 'static,
{
ResponseFuture(Box::pin(future))
}
Expand All @@ -1257,15 +1258,15 @@ impl<'c> ResponseFuture<'c> {
}
}

impl Future for ResponseFuture<'_> {
impl Future for ResponseFuture {
type Output = Result<Response<AsyncBody>, Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.as_mut().poll(cx)
}
}

impl<'c> fmt::Debug for ResponseFuture<'c> {
impl fmt::Debug for ResponseFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ResponseFuture").finish()
}
Expand Down
70 changes: 35 additions & 35 deletions src/cookies/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,49 +28,49 @@ impl CookieInterceptor {
impl Interceptor for CookieInterceptor {
type Err = Error;

fn intercept<'a>(
&'a self,
fn intercept(
&self,
mut request: Request<AsyncBody>,
ctx: Context<'a>,
) -> InterceptorFuture<'a, Self::Err> {
Box::pin(async move {
// Determine the cookie jar to use for this request. If one is
// attached to this specific request, use it, otherwise use the
// default one.
let jar = request
.extensions()
.get::<CookieJar>()
.cloned()
.or_else(|| self.cookie_jar.clone());

if let Some(jar) = jar.as_ref() {
// Get the outgoing cookie header.
let mut cookie_string = request
.headers_mut()
.remove(http::header::COOKIE)
.map(|value| value.as_bytes().to_vec())
.unwrap_or_default();
ctx: Context,
) -> InterceptorFuture<Self::Err> {
// Determine the cookie jar to use for this request. If one is
// attached to this specific request, use it, otherwise use the
// default one.
let jar = request
.extensions()
.get::<CookieJar>()
.cloned()
.or_else(|| self.cookie_jar.clone());

// Append cookies in the jar to the cookie header value.
for cookie in jar.get_for_uri(request.uri()) {
if !cookie_string.is_empty() {
cookie_string.extend_from_slice(b"; ");
}
if let Some(jar) = jar.as_ref() {
// Get the outgoing cookie header.
let mut cookie_string = request
.headers_mut()
.remove(http::header::COOKIE)
.map(|value| value.as_bytes().to_vec())
.unwrap_or_default();

cookie_string.extend_from_slice(cookie.name().as_bytes());
cookie_string.push(b'=');
cookie_string.extend_from_slice(cookie.value().as_bytes());
// Append cookies in the jar to the cookie header value.
for cookie in jar.get_for_uri(request.uri()) {
if !cookie_string.is_empty() {
cookie_string.extend_from_slice(b"; ");
}

if !cookie_string.is_empty() {
if let Ok(header_value) = cookie_string.try_into() {
request
.headers_mut()
.insert(http::header::COOKIE, header_value);
}
cookie_string.extend_from_slice(cookie.name().as_bytes());
cookie_string.push(b'=');
cookie_string.extend_from_slice(cookie.value().as_bytes());
}

if !cookie_string.is_empty() {
if let Ok(header_value) = cookie_string.try_into() {
request
.headers_mut()
.insert(http::header::COOKIE, header_value);
}
}
}

Box::pin(async move {
let request_uri = request.uri().clone();
let mut response = ctx.send(request).await?;

Expand Down
Loading