Skip to content

Commit

Permalink
refactor(http_util): Adopt reqwest's redirect support (#2390)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo authored Jun 1, 2023
1 parent 60c7576 commit 00885f8
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 260 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ suppaftp = { version = "4.5", default-features = false, features = [
], optional = true }
tokio = "1.27"
tracing = { version = "0.1", optional = true }
url = { version = "2.2" } # version should follow reqwest
uuid = { version = "1", features = ["serde", "v4"] }

[dev-dependencies]
Expand Down
175 changes: 2 additions & 173 deletions core/src/raw/http_util/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,12 @@ use std::mem;
use std::str::FromStr;

use futures::TryStreamExt;
use http::Request;
use http::Response;
use http::{header, Request, StatusCode};
use log::debug;
use reqwest::redirect::Policy;
use url::{ParseError, Url};

use super::body::IncomingAsyncBody;
use super::parse_content_length;
use super::AsyncBody;
use crate::raw::parse_location;
use crate::Error;
use crate::ErrorKind;
use crate::Result;
Expand Down Expand Up @@ -62,8 +58,6 @@ impl HttpClient {
builder = builder.no_brotli();
// Make sure we don't enable auto deflate decompress.
builder = builder.no_deflate();
// Redirect will be handled by ourselves.
builder = builder.redirect(Policy::none());

#[cfg(feature = "trust-dns")]
let builder = builder.trust_dns(true);
Expand Down Expand Up @@ -91,7 +85,7 @@ impl HttpClient {
.client
.request(
parts.method,
Url::from_str(&url).expect("input request url must be valid"),
reqwest::Url::from_str(&url).expect("input request url must be valid"),
)
.version(parts.version)
.headers(parts.headers);
Expand Down Expand Up @@ -155,169 +149,4 @@ impl HttpClient {

Ok(resp)
}

/// Send a request in async way with handling redirection logic.
/// Now we only support redirect GET request.
/// # Arguments
/// * `times` - how many times do you want to send request when we need to handle redirection
pub async fn send_with_redirect(
&self,
req: Request<AsyncBody>,
times: usize,
) -> Result<Response<IncomingAsyncBody>> {
if req.method() != http::Method::GET {
// for now we only handle redirection for GET request
// and please note that we don't support stream request either.
return Err(Error::new(
ErrorKind::Unsupported,
"redirect for unsupported HTTP method",
)
.with_operation("http_util::Client::send_with_redirect_async")
.with_context("method", req.method().as_str()));
}

let mut prev_req = self.clone_request(&req);
let mut prev_resp = self.send(req).await?;
let mut retries = 0;

let resp = loop {
let status = prev_resp.status();
// for now we only handle 302/308 for 3xx status
// notice that our redirect logic may not follow the HTTP standard
let should_redirect = match status {
StatusCode::FOUND => {
// theoretically we need to handle following status also:
// - StatusCode::MOVED_PERMANENTLY
// - StatusCode::SEE_OTHER
let mut new_req = self.clone_request(&prev_req);
for header in &[
header::TRANSFER_ENCODING,
header::CONTENT_ENCODING,
header::CONTENT_TYPE,
header::CONTENT_LENGTH,
] {
new_req.headers_mut().remove(header);
}
// see https://www.rfc-editor.org/rfc/rfc9110.html#section-15.4.2
// theoretically for 301, 302 and 303 should change
// original http method to GET except HEAD
// even though we only support GET request for now,
// just in case we support other HTTP method in the future
// add method modification logic here
match new_req.method() {
&http::Method::GET | &http::Method::HEAD => {}
_ => *new_req.method_mut() = http::Method::GET,
}
Some(new_req)
}
// theoretically we need to handle following status also:
// - StatusCode::PERMANENT_REDIRECT
StatusCode::TEMPORARY_REDIRECT => Some(self.clone_request(&prev_req)),
_ => None,
};

retries += 1;
if retries > times || should_redirect.is_none() {
// exceeds maximum retry times or no need to redirect request
// just return last response
debug!("no need to redirect or reach the maximum retry times");
break prev_resp;
}
debug!(
"it is the {} time for http client to retry. maximum times: {}",
retries, times
);

if let Some(mut redirect_req) = should_redirect {
let prev_url_str = redirect_req.uri().to_string();
let prev_url = Url::parse(&prev_url_str).map_err(|e| {
Error::new(ErrorKind::Unexpected, "url is not valid")
.with_context("url", &prev_url_str)
.set_source(e)
})?;

let loc = parse_location(prev_resp.headers())?
// no location means invalid redirect response
.ok_or_else(|| {
debug!(
"no location headers in response, url: {}, headers: {:?}",
&prev_url_str,
&prev_resp.headers()
);
Error::new(
ErrorKind::Unexpected,
"no location header in redirect response",
)
.with_context("method", redirect_req.method().as_str())
.with_context("url", &prev_url_str)
})?;

// one url with origin and path
let loc_url = Url::parse(loc).or_else(|err| {
match err {
ParseError::RelativeUrlWithoutBase => {
debug!("redirected location is relative url, will join it to original base url. loc: {}", loc);
let url = prev_url.clone().join(loc).map_err(|err| {
Error::new(ErrorKind::Unexpected, "invalid redirect base url and path")
.with_context("base", &prev_url_str)
.with_context("path", loc)
.set_source(err)
})?;
Ok(url)
}
err => {
Err(
Error::new(ErrorKind::Unexpected, "invalid location header")
.with_context("location", loc)
.set_source(err)
)
}
}
})?;

debug!("redirecting '{}' to '{}'", &prev_url_str, loc_url.as_str());
self.remove_sensitive_headers(&mut redirect_req, &loc_url, &prev_url);
// change the request uri
*redirect_req.uri_mut() = loc_url.as_str().parse().map_err(|err| {
Error::new(ErrorKind::Unexpected, "new redirect url is invalid")
.with_context("loc", loc_url.as_str())
.set_source(err)
})?;
prev_req = self.clone_request(&redirect_req);
prev_resp = self.send(redirect_req).await?;
}
};
Ok(resp)
}
}

impl HttpClient {
fn clone_request(&self, req: &Request<AsyncBody>) -> Request<AsyncBody> {
let (mut parts, body) = Request::new(match req.body() {
AsyncBody::Empty => AsyncBody::Empty,
AsyncBody::Bytes(bytes) => AsyncBody::Bytes(bytes.clone()),
})
.into_parts();

// we just ignore extensions of request, because we won't use it
parts.method = req.method().clone();
parts.uri = req.uri().clone();
parts.version = req.version();
parts.headers = req.headers().clone();

Request::from_parts(parts, body)
}

fn remove_sensitive_headers(&self, req: &mut Request<AsyncBody>, next: &Url, previous: &Url) {
let cross_host = next.host_str() != previous.host_str()
|| next.port_or_known_default() != previous.port_or_known_default();
if cross_host {
let headers = req.headers_mut();
headers.remove(header::AUTHORIZATION);
headers.remove(header::COOKIE);
headers.remove("cookie2");
headers.remove(header::PROXY_AUTHORIZATION);
headers.remove(header::WWW_AUTHENTICATE);
}
}
}
41 changes: 5 additions & 36 deletions core/src/services/onedrive/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,31 +93,13 @@ impl Accessor for OnedriveBackend {

let status = resp.status();

if status.is_redirection() {
let headers = resp.headers();
let location = parse_location(headers)?;
match location {
None => {
return Err(Error::new(
ErrorKind::ContentIncomplete,
"redirect location not found in response",
));
}
Some(location) => {
let resp = self.onedrive_get_redirection(location).await?;
let meta = parse_into_metadata(path, resp.headers())?;
Ok((RpRead::with_metadata(meta), resp.into_body()))
}
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let meta = parse_into_metadata(path, resp.headers())?;
Ok((RpRead::with_metadata(meta), resp.into_body()))
}
} else {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let meta = parse_into_metadata(path, resp.headers())?;
Ok((RpRead::with_metadata(meta), resp.into_body()))
}

_ => Err(parse_error(resp).await?),
}
_ => Err(parse_error(resp).await?),
}
}

Expand Down Expand Up @@ -279,19 +261,6 @@ impl OnedriveBackend {
self.client.send(req).await
}

async fn onedrive_get_redirection(&self, url: &str) -> Result<Response<IncomingAsyncBody>> {
let mut req = Request::get(url);

let auth_header_content = format!("Bearer {}", self.access_token);
req = req.header(header::AUTHORIZATION, auth_header_content);

let req = req
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;

self.client.send(req).await
}

pub async fn onedrive_upload_simple(
&self,
path: &str,
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/webdav/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ impl WebdavBackend {
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;

self.client.send_with_redirect(req, 5).await
self.client.send(req).await
}

pub async fn webdav_put(
Expand Down
53 changes: 5 additions & 48 deletions core/src/services/webhdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,32 +268,21 @@ impl WebhdfsBackend {
url += format!("&{auth}").as_str();
}

let req = Request::put(&url)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
let mut req = Request::put(&url);

// mkdir does not redirect
if path.ends_with('/') {
return Ok(req);
return req.body(AsyncBody::Empty).map_err(new_request_build_error);
}

let resp = self.client.send(req).await?;

// should be a 307 TEMPORARY_REDIRECT
if resp.status() != StatusCode::TEMPORARY_REDIRECT {
return Err(parse_error(resp).await?);
}
let re_url = self.follow_redirect(resp)?;

let mut re_builder = Request::put(re_url);
if let Some(size) = size {
re_builder = re_builder.header(CONTENT_LENGTH, size.to_string());
req = req.header(CONTENT_LENGTH, size.to_string());
}
if let Some(content_type) = content_type {
re_builder = re_builder.header(CONTENT_TYPE, content_type);
req = req.header(CONTENT_TYPE, content_type);
}

re_builder.body(body).map_err(new_request_build_error)
req.body(body).map_err(new_request_build_error)
}

async fn webhdfs_open_request(
Expand Down Expand Up @@ -358,17 +347,6 @@ impl WebhdfsBackend {
range: BytesRange,
) -> Result<Response<IncomingAsyncBody>> {
let req = self.webhdfs_open_request(path, &range).await?;
let resp = self.client.send(req).await?;

// webhdfs namenode will redirect us to datanode for data transfer.
if resp.status() != StatusCode::TEMPORARY_REDIRECT {
return Err(parse_error(resp).await?);
}

let location = self.follow_redirect(resp)?;
let req = Request::get(&location)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
self.client.send(req).await
}

Expand Down Expand Up @@ -409,27 +387,6 @@ impl WebhdfsBackend {
self.client.send(req).await
}

/// get redirect destination from 307 TEMPORARY_REDIRECT http response
fn follow_redirect(&self, resp: Response<IncomingAsyncBody>) -> Result<String> {
let location = parse_location(resp.headers())?.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"webhdfs expect to have redirect location but got none",
)
})?;

let location = if location.starts_with('/') {
// location starts with `/` means it's a relative path to current
// endpoint. We should prepend the endpoint to it so that we can
// send request to the correct location.
format!("{}/{location}", self.endpoint)
} else {
location.to_string()
};

Ok(location)
}

async fn check_root(&self) -> Result<()> {
let resp = self.webhdfs_get_file_status("/").await?;
match resp.status() {
Expand Down

0 comments on commit 00885f8

Please sign in to comment.