Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace middleware API with interceptors API #206

Merged
merged 10 commits into from
Oct 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
toolchain: "1.45.0"
default: true

- run: cargo test --features cookies,psl,spnego
- run: cargo test --features cookies,psl,spnego,unstable-interceptors

- run: cargo run --release --example simple

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ spnego = ["curl-sys/spnego"]
static-curl = ["curl/static-curl"]
static-ssl = ["curl/static-ssl"]
text-decoding = ["encoding_rs", "mime"]
middleware-api-preview = []
unstable-interceptors = []

[dependencies]
bytes = "0.5"
Expand Down
152 changes: 75 additions & 77 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
config::*,
handler::{RequestHandler, ResponseBodyReader},
headers,
middleware::Middleware,
interceptor::{self, Interceptor, InterceptorObj},
task::Join,
Body, Error,
};
Expand Down Expand Up @@ -63,7 +63,7 @@ lazy_static! {
pub struct HttpClientBuilder {
agent_builder: AgentBuilder,
defaults: http::Extensions,
middleware: Vec<Box<dyn Middleware>>,
interceptors: Vec<InterceptorObj>,
default_headers: HeaderMap<HeaderValue>,
error: Option<Error>,
}
Expand Down Expand Up @@ -95,7 +95,7 @@ impl HttpClientBuilder {
Self {
agent_builder: AgentBuilder::default(),
defaults,
middleware: Vec::new(),
interceptors: Vec::new(),
default_headers: HeaderMap::new(),
error: None,
}
Expand All @@ -109,24 +109,25 @@ impl HttpClientBuilder {
/// feature is enabled.
#[cfg(feature = "cookies")]
pub fn cookies(self) -> Self {
self.middleware_impl(crate::cookies::CookieJar::default())
self.interceptor_impl(crate::cookies::CookieJar::default())
}

/// Add a middleware layer to the client.
/// Add a request interceptor to the client.
///
/// # Availability
///
/// This method is only available when the
/// [`middleware-api-preview`](index.html#middleware-api-preview) feature is
/// [`unstable-interceptors`](index.html#unstable-interceptors) feature is
/// enabled.
#[cfg(feature = "middleware-api-preview")]
pub fn middleware(self, middleware: impl Middleware) -> Self {
self.middleware_impl(middleware)
#[cfg(feature = "unstable-interceptors")]
#[inline]
pub fn interceptor(self, interceptor: impl Interceptor + 'static) -> Self {
self.interceptor_impl(interceptor)
}

#[allow(unused)]
fn middleware_impl(mut self, middleware: impl Middleware) -> Self {
self.middleware.push(Box::new(middleware));
pub(crate) fn interceptor_impl(mut self, interceptor: impl Interceptor + 'static) -> Self {
self.interceptors.push(InterceptorObj::new(interceptor));
self
}

Expand Down Expand Up @@ -382,7 +383,7 @@ impl HttpClientBuilder {
Ok(HttpClient {
agent: Arc::new(self.agent_builder.spawn()?),
defaults: self.defaults,
middleware: self.middleware,
interceptors: self.interceptors,
default_headers: self.default_headers,
})
}
Expand Down Expand Up @@ -494,11 +495,14 @@ impl<'a, K: Copy, V: Copy> HeaderPair<K, V> for &'a (K, V) {
pub struct HttpClient {
/// This is how we talk to our background agent thread.
agent: Arc<agent::Handle>,

/// Map of config values that should be used to configure execution if not
/// specified in a request.
defaults: http::Extensions,
/// Any middleware implementations that requests should pass through.
middleware: Vec<Box<dyn Middleware>>,

/// Registered interceptors that requests should pass through.
interceptors: Vec<InterceptorObj>,

/// Default headers to add to every request.
default_headers: HeaderMap<HeaderValue>,
}
Expand Down Expand Up @@ -798,77 +802,71 @@ impl HttpClient {
}

/// Actually send the request. All the public methods go through here.
async fn send_async_inner(&self, mut request: Request<Body>) -> Result<Response<Body>, Error> {
async fn send_async_inner(&self, request: Request<Body>) -> Result<Response<Body>, Error> {
let span = tracing::debug_span!(
"send_async",
method = ?request.method(),
uri = ?request.uri(),
);

async move {
// We are checking here if header already contains the key, simply ignore it.
// In case the key wasn't present in parts.headers ensure that
// we have all the headers from default headers.
for name in self.default_headers.keys() {
if !request.headers().contains_key(name) {
for v in self.default_headers.get_all(name).iter() {
request.headers_mut().append(name, v.clone());
let cx = interceptor::Context {
invoker: Arc::new(move |mut request| {
Box::pin(async move {
// We are checking here if header already contains the key, simply ignore it.
// In case the key wasn't present in parts.headers ensure that
// we have all the headers from default headers.
for name in self.default_headers.keys() {
if !request.headers().contains_key(name) {
for v in self.default_headers.get_all(name).iter() {
request.headers_mut().append(name, v.clone());
}
}
}
}
}

// Set default user agent if not specified.
request
.headers_mut()
.entry(http::header::USER_AGENT)
.or_insert(USER_AGENT.parse().unwrap());

// Apply any request middleware, starting with the outermost one.
for middleware in self.middleware.iter().rev() {
request = middleware.filter_request(request);
}

// Create and configure a curl easy handle to fulfil the request.
let (easy, future) = self.create_easy_handle(request)?;

// Send the request to the agent to be executed.
self.agent.submit_request(easy)?;

// Await for the response headers.
let response = future.await?;

// If a Content-Length header is present, include that information in
// the body as well.
let content_length = response
.headers()
.get(http::header::CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse().ok());

// Convert the reader into an opaque Body.
let mut response = response.map(|reader| {
let body = ResponseBody {
inner: reader,
// Extend the lifetime of the agent by including a reference
// to its handle in the response body.
_agent: self.agent.clone(),
};

if let Some(len) = content_length {
Body::from_reader_sized(body, len)
} else {
Body::from_reader(body)
}
});

// Apply response middleware, starting with the innermost
// one.
for middleware in self.middleware.iter() {
response = middleware.filter_response(response);
}

Ok(response)
}.instrument(span).await
// Set default user agent if not specified.
request
.headers_mut()
.entry(http::header::USER_AGENT)
.or_insert(USER_AGENT.parse().unwrap());

// Create and configure a curl easy handle to fulfil the request.
let (easy, future) = self.create_easy_handle(request)?;

// Send the request to the agent to be executed.
self.agent.submit_request(easy)?;

// Await for the response headers.
let response = future.await?;

// If a Content-Length header is present, include that information in
// the body as well.
let content_length = response
.headers()
.get(http::header::CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse().ok());

// Convert the reader into an opaque Body.
Ok(response.map(|reader| {
let body = ResponseBody {
inner: reader,
// Extend the lifetime of the agent by including a reference
// to its handle in the response body.
_agent: self.agent.clone(),
};

if let Some(len) = content_length {
Body::from_reader_sized(body, len)
} else {
Body::from_reader(body)
}
}))
}.instrument(span.clone()))
}),
interceptors: &self.interceptors,
};

cx.send(request).await
}

fn create_easy_handle(
Expand Down
19 changes: 16 additions & 3 deletions src/cookies/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
//!
//! Everything in this module requires the `cookies` feature to be enabled.

use crate::{middleware::Middleware, response::ResponseExt, Body};
use crate::{
Error,
response::ResponseExt,
Body,
interceptor::{Interceptor, InterceptorFuture, Context},
};
use chrono::prelude::*;
use chrono::Duration;
use http::{Request, Response, Uri};
Expand Down Expand Up @@ -263,9 +268,7 @@ impl CookieJar {
Some(values.join("; "))
}
}
}

impl Middleware for CookieJar {
fn filter_request(&self, mut request: Request<Body>) -> Request<Body> {
if let Some(header) = self.get_cookies(request.uri()) {
request
Expand Down Expand Up @@ -306,6 +309,16 @@ impl Middleware for CookieJar {
}
}

impl Interceptor for CookieJar {
type Err = Error;

fn intercept<'a>(&'a self, request: Request<Body>, cx: Context<'a>) -> InterceptorFuture<'a, Self::Err> {
Box::pin(async move {
Ok(self.filter_response(cx.send(self.filter_request(request)).await?))
})
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
42 changes: 42 additions & 0 deletions src/interceptor/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use crate::{Body, Error};
use super::{Interceptor, InterceptorFuture, InterceptorObj};
use http::{Request, Response};
use std::{
fmt,
sync::Arc,
};

/// Execution context for an interceptor.
pub struct Context<'a> {
pub(crate) invoker: Arc<dyn (Fn(Request<Body>) -> InterceptorFuture<'a, Error>) + Send + Sync + 'a>,
pub(crate) interceptors: &'a [InterceptorObj],
}

impl Context<'_> {
/// Send a request asynchronously, executing the next interceptor in the
/// chain, if any.
pub async fn send(&self, request: Request<Body>) -> Result<Response<Body>, Error> {
if let Some(interceptor) = self.interceptors.first() {
let inner_context = Self {
invoker: self.invoker.clone(),
interceptors: &self.interceptors[1..],
};

match interceptor.intercept(request, inner_context).await {
Ok(response) => Ok(response),

// TODO: Introduce a new error variant for errors caused by an
// interceptor. This is a temporary hack.
Err(e) => Err(Error::Curl(e.to_string())),
}
} else {
(self.invoker)(request).await
}
}
}

impl fmt::Debug for Context<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Context").finish()
}
}
Loading