Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Remove lifetime from Context, ResponseFuture, InterceptorFuture #405

Merged
merged 8 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/body/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl AsyncBody {
impl AsyncRead for AsyncBody {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
cx: &mut Context,
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match &mut self.0 {
Expand Down
64 changes: 33 additions & 31 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
error::{Error, ErrorKind},
handler::{RequestHandler, ResponseBodyReader},
headers::HasHeaders,
interceptor::{self, Interceptor, InterceptorObj},
interceptor::{self, Interceptor, InterceptorFuture, InterceptorObj},
parsing::header_to_curl_string,
};
use futures_lite::{
Expand All @@ -21,8 +21,7 @@ use futures_lite::{
};
use http::{
header::{HeaderMap, HeaderName, HeaderValue},
Request,
Response,
Request, Response,
};
use once_cell::sync::Lazy;
use std::{
Expand Down Expand Up @@ -651,6 +650,11 @@ impl HttpClient {
self.inner.cookie_jar.as_ref()
}

/// Get the configured interceptors for this HTTP client.
pub(crate) fn interceptors(&self) -> &[InterceptorObj] {
&self.inner.interceptors
}

/// Send a GET request to the given URI.
///
/// To customize the request further, see [`HttpClient::send`]. To execute
Expand Down Expand Up @@ -682,7 +686,7 @@ impl HttpClient {
///
/// To customize the request further, see [`HttpClient::send_async`]. To
/// execute the request synchronously, see [`HttpClient::get`].
pub fn get_async<U>(&self, uri: U) -> ResponseFuture<'_>
pub fn get_async<U>(&self, uri: U) -> ResponseFuture
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
Expand Down Expand Up @@ -724,7 +728,7 @@ impl HttpClient {
///
/// To customize the request further, see [`HttpClient::send_async`]. To
/// execute the request synchronously, see [`HttpClient::head`].
pub fn head_async<U>(&self, uri: U) -> ResponseFuture<'_>
pub fn head_async<U>(&self, uri: U) -> ResponseFuture
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
Expand Down Expand Up @@ -770,7 +774,7 @@ impl HttpClient {
///
/// To customize the request further, see [`HttpClient::send_async`]. To
/// execute the request synchronously, see [`HttpClient::post`].
pub fn post_async<U, B>(&self, uri: U, body: B) -> ResponseFuture<'_>
pub fn post_async<U, B>(&self, uri: U, body: B) -> ResponseFuture
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
Expand Down Expand Up @@ -818,7 +822,7 @@ impl HttpClient {
///
/// To customize the request further, see [`HttpClient::send_async`]. To
/// execute the request synchronously, see [`HttpClient::put`].
pub fn put_async<U, B>(&self, uri: U, body: B) -> ResponseFuture<'_>
pub fn put_async<U, B>(&self, uri: U, body: B) -> ResponseFuture
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
Expand Down Expand Up @@ -850,7 +854,7 @@ impl HttpClient {
///
/// To customize the request further, see [`HttpClient::send_async`]. To
/// execute the request synchronously, see [`HttpClient::delete`].
pub fn delete_async<U>(&self, uri: U) -> ResponseFuture<'_>
pub fn delete_async<U>(&self, uri: U) -> ResponseFuture
where
http::Uri: TryFrom<U>,
<http::Uri as TryFrom<U>>::Error: Into<http::Error>,
Expand Down Expand Up @@ -997,7 +1001,7 @@ impl HttpClient {
/// # Ok(()) }
/// ```
#[inline]
pub fn send_async<B>(&self, request: Request<B>) -> ResponseFuture<'_>
pub fn send_async<B>(&self, request: Request<B>) -> ResponseFuture
where
B: Into<AsyncBody>,
{
Expand All @@ -1014,10 +1018,7 @@ impl HttpClient {
}

/// Actually send the request. All the public methods go through here.
async fn send_async_inner(
&self,
mut request: Request<AsyncBody>,
) -> Result<Response<AsyncBody>, Error> {
fn send_async_inner(&self, mut request: Request<AsyncBody>) -> InterceptorFuture<Error> {
// Populate request config, creating if necessary.
if let Some(config) = request.extensions_mut().get_mut::<RequestConfig>() {
// Merge request configuration with defaults.
Expand All @@ -1029,11 +1030,11 @@ impl HttpClient {
}

let ctx = interceptor::Context {
invoker: Arc::new(self),
interceptors: &self.inner.interceptors,
client: self.clone(),
interceptor_offset: 0,
};

ctx.send(request).await
ctx.send(request)
}

fn create_easy_handle(
Expand Down Expand Up @@ -1159,11 +1160,10 @@ impl HttpClient {
}
}

impl crate::interceptor::Invoke for &HttpClient {
fn invoke(
&self,
mut request: Request<AsyncBody>,
) -> crate::interceptor::InterceptorFuture<'_, Error> {
impl interceptor::Invoke for HttpClient {
fn invoke(&self, mut request: Request<AsyncBody>) -> InterceptorFuture<Error> {
let client = self.clone();

Box::pin(async move {
let is_head_request = request.method() == http::Method::HEAD;

Expand All @@ -1183,10 +1183,12 @@ impl crate::interceptor::Invoke for &HttpClient {
.unwrap_or(false);

// Create and configure a curl easy handle to fulfil the request.
let (easy, future) = self.create_easy_handle(request).map_err(Error::from_any)?;
let (easy, future) = client
.create_easy_handle(request)
.map_err(Error::from_any)?;

// Send the request to the agent to be executed.
self.inner.agent.submit_request(easy)?;
client.inner.agent.submit_request(easy)?;

// Await for the response headers.
let response = future.await?;
Expand Down Expand Up @@ -1220,7 +1222,7 @@ impl crate::interceptor::Invoke for &HttpClient {
inner: reader,
// Extend the lifetime of the agent by including a reference
// to its handle in the response body.
_client: (*self).clone(),
_client: client,
};

if let Some(len) = body_len {
Expand All @@ -1242,12 +1244,12 @@ impl fmt::Debug for HttpClient {

/// A future for a request being executed.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ResponseFuture<'c>(Pin<Box<dyn Future<Output = <Self as Future>::Output> + 'c + Send>>);
pub struct ResponseFuture(Pin<Box<dyn Future<Output = <Self as Future>::Output> + 'static + Send>>);

impl<'c> ResponseFuture<'c> {
impl ResponseFuture {
fn new<F>(future: F) -> Self
where
F: Future<Output = <Self as Future>::Output> + Send + 'c,
F: Future<Output = <Self as Future>::Output> + Send + 'static,
{
ResponseFuture(Box::pin(future))
}
Expand All @@ -1257,15 +1259,15 @@ impl<'c> ResponseFuture<'c> {
}
}

impl Future for ResponseFuture<'_> {
impl Future for ResponseFuture {
type Output = Result<Response<AsyncBody>, Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.0.as_mut().poll(cx)
}
}

impl<'c> fmt::Debug for ResponseFuture<'c> {
impl fmt::Debug for ResponseFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ResponseFuture").finish()
}
Expand All @@ -1281,7 +1283,7 @@ struct ResponseBody {
impl AsyncRead for ResponseBody {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let inner = Pin::new(&mut self.inner);
Expand Down
12 changes: 7 additions & 5 deletions src/cookies/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ impl CookieInterceptor {
impl Interceptor for CookieInterceptor {
type Err = Error;

fn intercept<'a>(
&'a self,
fn intercept(
&self,
mut request: Request<AsyncBody>,
ctx: Context<'a>,
) -> InterceptorFuture<'a, Self::Err> {
ctx: Context,
) -> InterceptorFuture<Self::Err> {
let cookie_jar = self.cookie_jar.clone();

Box::pin(async move {
// Determine the cookie jar to use for this request. If one is
// attached to this specific request, use it, otherwise use the
Expand All @@ -41,7 +43,7 @@ impl Interceptor for CookieInterceptor {
.extensions()
.get::<CookieJar>()
.cloned()
.or_else(|| self.cookie_jar.clone());
.or(cookie_jar);

if let Some(jar) = jar.as_ref() {
// Get the outgoing cookie header.
Expand Down
14 changes: 8 additions & 6 deletions src/default_headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,20 @@ impl From<HeaderMap<HeaderValue>> for DefaultHeadersInterceptor {
impl Interceptor for DefaultHeadersInterceptor {
type Err = Error;

fn intercept<'a>(
&'a self,
fn intercept(
&self,
mut request: Request<AsyncBody>,
ctx: Context<'a>,
) -> InterceptorFuture<'a, Self::Err> {
ctx: Context,
) -> InterceptorFuture<Self::Err> {
let headers = self.headers.clone();

Box::pin(async move {
// We are checking here if header already contains the key, simply
// ignore it. In case the key wasn't present in parts.headers ensure
// that we have all the headers from default headers.
for name in self.headers.keys() {
for name in headers.keys() {
if !request.headers().contains_key(name) {
for v in self.headers.get_all(name).iter() {
for v in headers.get_all(name).iter() {
request.headers_mut().append(name, v.clone());
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ pub(crate) struct ResponseBodyReader {
impl AsyncRead for ResponseBodyReader {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let inner = Pin::new(&mut self.inner);
Expand Down
32 changes: 16 additions & 16 deletions src/interceptor/context.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,37 @@
use super::{Interceptor, InterceptorFuture, InterceptorObj};
use crate::{body::AsyncBody, error::Error};
use http::{Request, Response};
use std::{fmt, sync::Arc};
use super::{Interceptor, InterceptorFuture};
use crate::{body::AsyncBody, error::Error, HttpClient};
use http::Request;
use std::fmt;

/// Execution context for an interceptor.
pub struct Context<'a> {
pub(crate) invoker: Arc<dyn Invoke + Send + Sync + 'a>,
pub(crate) interceptors: &'a [InterceptorObj],
pub struct Context {
pub(crate) client: HttpClient,
pub(crate) interceptor_offset: usize,
}

impl<'a> Context<'a> {
impl Context {
/// Send a request asynchronously, executing the next interceptor in the
/// chain, if any.
pub async fn send(&self, request: Request<AsyncBody>) -> Result<Response<AsyncBody>, Error> {
if let Some(interceptor) = self.interceptors.first() {
pub fn send(&self, request: Request<AsyncBody>) -> InterceptorFuture<Error> {
if let Some(interceptor) = self.client.interceptors().get(self.interceptor_offset) {
let inner_context = Self {
invoker: self.invoker.clone(),
interceptors: &self.interceptors[1..],
client: self.client.clone(),
interceptor_offset: self.interceptor_offset + 1,
};

interceptor.intercept(request, inner_context).await
interceptor.intercept(request, inner_context)
} else {
self.invoker.invoke(request).await
self.client.invoke(request)
}
}
}

impl fmt::Debug for Context<'_> {
impl fmt::Debug for Context {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Context").finish()
}
}

pub(crate) trait Invoke {
fn invoke(&self, request: Request<AsyncBody>) -> InterceptorFuture<'_, Error>;
fn invoke(&self, request: Request<AsyncBody>) -> InterceptorFuture<Error>;
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
}
21 changes: 7 additions & 14 deletions src/interceptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ macro_rules! interceptor {
($request:ident, $ctx:ident, $body:expr) => {{
async fn interceptor(
mut $request: $crate::http::Request<$crate::AsyncBody>,
$ctx: $crate::interceptor::Context<'_>,
$ctx: $crate::interceptor::Context,
) -> Result<$crate::http::Response<$crate::AsyncBody>, $crate::Error> {
(move || async move { $body })().await.map_err(Into::into)
}
Expand All @@ -60,20 +60,17 @@ pub trait Interceptor: Send + Sync {
///
/// The returned future is allowed to borrow the interceptor for the
/// duration of its execution.
fn intercept<'a>(
&'a self,
request: Request<AsyncBody>,
ctx: Context<'a>,
) -> InterceptorFuture<'a, Self::Err>;
fn intercept(&self, request: Request<AsyncBody>, ctx: Context) -> InterceptorFuture<Self::Err>;
sagebind marked this conversation as resolved.
Show resolved Hide resolved
}

/// The type of future returned by an interceptor.
pub type InterceptorFuture<'a, E> = Pin<Box<dyn Future<Output = InterceptorResult<E>> + Send + 'a>>;
pub type InterceptorFuture<E> =
Pin<Box<dyn Future<Output = InterceptorResult<E>> + Send + 'static>>;

/// Creates an interceptor from an arbitrary closure or function.
pub fn from_fn<F, E>(f: F) -> InterceptorFn<F>
where
F: for<'a> private::AsyncFn2<Request<AsyncBody>, Context<'a>, Output = InterceptorResult<E>>
F: for<'a> private::AsyncFn2<Request<AsyncBody>, Context, Output = InterceptorResult<E>>
+ Send
+ Sync
+ 'static,
Expand All @@ -89,18 +86,14 @@ pub struct InterceptorFn<F>(F);
impl<E, F> Interceptor for InterceptorFn<F>
where
E: Error + Send + Sync + 'static,
F: for<'a> private::AsyncFn2<Request<AsyncBody>, Context<'a>, Output = InterceptorResult<E>>
F: for<'a> private::AsyncFn2<Request<AsyncBody>, Context, Output = InterceptorResult<E>>
+ Send
+ Sync
+ 'static,
{
type Err = E;

fn intercept<'a>(
&self,
request: Request<AsyncBody>,
ctx: Context<'a>,
) -> InterceptorFuture<'a, Self::Err> {
fn intercept(&self, request: Request<AsyncBody>, ctx: Context) -> InterceptorFuture<Self::Err> {
Box::pin(self.0.call(request, ctx))
}
}
Expand Down
Loading