Skip to content

Commit

Permalink
object_store: add support for using proxy_url for connection testing
Browse files Browse the repository at this point in the history
  • Loading branch information
sum12 committed Nov 27, 2022
1 parent e44cb5b commit a1c7906
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 19 deletions.
23 changes: 17 additions & 6 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ pub(crate) enum Error {

#[snafu(display("Got invalid multipart response: {}", source))]
InvalidMultipartResponse { source: quick_xml::de::DeError },

#[snafu(display("Unable to use proxy url: {}", source))]
ProxyUrl { source: reqwest::Error },
}

impl From<Error> for crate::Error {
Expand Down Expand Up @@ -201,6 +204,7 @@ pub struct S3Config {
pub credentials: Box<dyn CredentialProvider>,
pub retry_config: RetryConfig,
pub allow_http: bool,
pub proxy_url: Option<String>,
}

impl S3Config {
Expand All @@ -216,13 +220,20 @@ pub(crate) struct S3Client {
}

impl S3Client {
pub fn new(config: S3Config) -> Self {
let client = reqwest::ClientBuilder::new()
.https_only(!config.allow_http)
.build()
.unwrap();
pub fn new(config: S3Config) -> Result<Self> {
let builder = reqwest::ClientBuilder::new().https_only(!config.allow_http);
let client = match &config.proxy_url {
Some(ref url) => {
let pr = reqwest::Proxy::all(url)
.map_err(|source| Error::ProxyUrl { source })?;
builder.proxy(pr)
}
_ => builder,
}
.build()
.unwrap();

Self { config, client }
Ok(Self { config, client })
}

/// Returns the config
Expand Down
58 changes: 54 additions & 4 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::stream::BoxStream;
use futures::TryStreamExt;
use reqwest::Client;
use reqwest::{Client, Proxy};
use snafu::{OptionExt, ResultExt, Snafu};
use std::collections::BTreeSet;
use std::ops::Range;
Expand Down Expand Up @@ -120,6 +120,9 @@ enum Error {

#[snafu(display("Error reading token file: {}", source))]
ReadTokenFile { source: std::io::Error },

#[snafu(display("Unable to use proxy url: {}", source))]
ProxyUrl { source: reqwest::Error },
}

impl From<Error> for super::Error {
Expand Down Expand Up @@ -363,6 +366,7 @@ pub struct AmazonS3Builder {
virtual_hosted_style_request: bool,
metadata_endpoint: Option<String>,
profile: Option<String>,
proxy_url: Option<String>,
}

impl AmazonS3Builder {
Expand Down Expand Up @@ -537,6 +541,12 @@ impl AmazonS3Builder {
self
}

/// Set the proxy_url to be used by the underlying client
pub fn with_proxy_url(mut self, proxy_url: impl Into<String>) -> Self {
self.proxy_url = Some(proxy_url.into());
self
}

/// Set the AWS profile name, see <https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html>
///
/// This makes use of [aws-config] to provide credentials and therefore requires
Expand All @@ -561,6 +571,14 @@ impl AmazonS3Builder {
let bucket = self.bucket_name.context(MissingBucketNameSnafu)?;
let region = self.region.context(MissingRegionSnafu)?;

let clientbuilder = match self.proxy_url {
Some(ref url) => {
let pr: Proxy =
Proxy::all(url).map_err(|source| Error::ProxyUrl { source })?;
Client::builder().proxy(pr)
}
None => Client::builder(),
};
let credentials = match (self.access_key_id, self.secret_access_key, self.token) {
(Some(key_id), Some(secret_key), token) => {
info!("Using Static credential provider");
Expand Down Expand Up @@ -590,7 +608,7 @@ impl AmazonS3Builder {
let endpoint = format!("https://sts.{}.amazonaws.com", region);

// Disallow non-HTTPs requests
let client = Client::builder().https_only(true).build().unwrap();
let client = clientbuilder.https_only(true).build().unwrap();

Box::new(WebIdentityProvider {
cache: Default::default(),
Expand All @@ -611,7 +629,7 @@ impl AmazonS3Builder {
info!("Using Instance credential provider");

// The instance metadata endpoint is access over HTTP
let client = Client::builder().https_only(false).build().unwrap();
let client = clientbuilder.https_only(false).build().unwrap();

Box::new(InstanceCredentialProvider {
cache: Default::default(),
Expand Down Expand Up @@ -653,9 +671,10 @@ impl AmazonS3Builder {
credentials,
retry_config: self.retry_config,
allow_http: self.allow_http,
proxy_url: self.proxy_url,
};

let client = Arc::new(S3Client::new(config));
let client = Arc::new(S3Client::new(config).unwrap());

Ok(AmazonS3 { client })
}
Expand Down Expand Up @@ -898,4 +917,35 @@ mod tests {
let err = integration.delete(&location).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
}

#[tokio::test]
async fn s3_test_proxy_url() {
let s3 = AmazonS3Builder::new()
.with_access_key_id("access_key_id")
.with_secret_access_key("secret_access_key")
.with_region("region")
.with_bucket_name("bucket_name")
.with_allow_http(true)
.with_proxy_url("https://example.com")
.build();

assert!(s3.is_ok());

let s3 = AmazonS3Builder::new()
.with_access_key_id("access_key_id")
.with_secret_access_key("secret_access_key")
.with_region("region")
.with_bucket_name("bucket_name")
.with_allow_http(true)
.with_proxy_url("asdf://example.com")
.build();

assert!(match s3 {
Err(crate::Error::Generic { source, .. }) => matches!(
source.downcast_ref(),
Some(crate::aws::Error::ProxyUrl { .. })
),
_ => false,
})
}
}
25 changes: 18 additions & 7 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use chrono::{DateTime, TimeZone, Utc};
use itertools::Itertools;
use reqwest::{
header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH, RANGE},
Client as ReqwestClient, Method, Response, StatusCode,
Client as ReqwestClient, Method, Proxy, Response, StatusCode,
};
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{ResultExt, Snafu};
Expand Down Expand Up @@ -82,6 +82,9 @@ pub(crate) enum Error {
Authorization {
source: crate::azure::credential::Error,
},

#[snafu(display("Unable to use proxy url: {}", source))]
ProxyUrl { source: reqwest::Error },
}

impl From<Error> for crate::Error {
Expand Down Expand Up @@ -124,6 +127,7 @@ pub struct AzureConfig {
pub allow_http: bool,
pub service: Url,
pub is_emulator: bool,
pub proxy_url: Option<String>,
}

impl AzureConfig {
Expand All @@ -148,13 +152,20 @@ pub(crate) struct AzureClient {

impl AzureClient {
/// create a new instance of [AzureClient]
pub fn new(config: AzureConfig) -> Self {
let client = reqwest::ClientBuilder::new()
.https_only(!config.allow_http)
.build()
.unwrap();
pub fn new(config: AzureConfig) -> Result<Self> {
let builder = ReqwestClient::builder();

let client = if let Some(url) = config.proxy_url.as_ref() {
let pr = Proxy::all(url).map_err(|source| Error::ProxyUrl { source });
builder.proxy(pr.unwrap())
} else {
builder
}
.https_only(!config.allow_http)
.build()
.unwrap();

Self { config, client }
Ok(Self { config, client })
}

/// Returns the config
Expand Down
11 changes: 10 additions & 1 deletion object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ pub struct MicrosoftAzureBuilder {
use_emulator: bool,
retry_config: RetryConfig,
allow_http: bool,
proxy_url: Option<String>,
}

impl Debug for MicrosoftAzureBuilder {
Expand Down Expand Up @@ -500,6 +501,12 @@ impl MicrosoftAzureBuilder {
self
}

/// Set the proxy_url to be used by the underlying client
pub fn with_proxy_url(mut self, proxy_url: impl Into<String>) -> Self {
self.proxy_url = Some(proxy_url.into());
self
}

/// Configure a connection to container with given name on Microsoft Azure
/// Blob store.
pub fn build(self) -> Result<MicrosoftAzure> {
Expand All @@ -516,6 +523,7 @@ impl MicrosoftAzureBuilder {
retry_config,
allow_http,
authority_host,
proxy_url,
} = self;

let container = container_name.ok_or(Error::MissingContainerName {})?;
Expand Down Expand Up @@ -567,9 +575,10 @@ impl MicrosoftAzureBuilder {
container,
credentials: auth,
is_emulator,
proxy_url,
};

let client = Arc::new(client::AzureClient::new(config));
let client = Arc::new(client::AzureClient::new(config)?);

Ok(MicrosoftAzure { client })
}
Expand Down
55 changes: 54 additions & 1 deletion object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use percent_encoding::{percent_encode, NON_ALPHANUMERIC};
use reqwest::header::RANGE;
use reqwest::Proxy;
use reqwest::{header, Client, Method, Response, StatusCode};
use snafu::{ResultExt, Snafu};
use tokio::io::AsyncWrite;
Expand Down Expand Up @@ -122,6 +123,9 @@ enum Error {

#[snafu(display("GCP credential error: {}", source))]
Credential { source: credential::Error },

#[snafu(display("Unable to use proxy url: {}", source))]
ProxyUrl { source: reqwest::Error },
}

impl From<Error> for super::Error {
Expand Down Expand Up @@ -741,6 +745,7 @@ pub struct GoogleCloudStorageBuilder {
service_account_path: Option<String>,
client: Option<Client>,
retry_config: RetryConfig,
proxy_url: Option<String>,
}

impl GoogleCloudStorageBuilder {
Expand Down Expand Up @@ -782,6 +787,12 @@ impl GoogleCloudStorageBuilder {
self
}

/// Set proxy url used for connection
pub fn with_proxy_url(mut self, proxy_url: impl Into<String>) -> Self {
self.proxy_url = Some(proxy_url.into());
self
}

/// Configure a connection to Google Cloud Storage, returning a
/// new [`GoogleCloudStorage`] and consuming `self`
pub fn build(self) -> Result<GoogleCloudStorage> {
Expand All @@ -790,12 +801,24 @@ impl GoogleCloudStorageBuilder {
service_account_path,
client,
retry_config,
proxy_url,
} = self;

let bucket_name = bucket_name.ok_or(Error::MissingBucketName {})?;
let service_account_path =
service_account_path.ok_or(Error::MissingServiceAccountPath)?;
let client = client.unwrap_or_else(Client::new);

let client = match (proxy_url, client) {
(_, Some(client)) => client,
(Some(url), None) => {
let pr = Proxy::all(&url).map_err(|source| Error::ProxyUrl { source })?;
Client::builder()
.proxy(pr)
.build()
.map_err(|source| Error::ProxyUrl { source })?
}
(None, None) => Client::new(),
};

let credentials = reader_credentials_file(service_account_path)?;

Expand Down Expand Up @@ -1015,4 +1038,34 @@ mod test {
err
)
}

#[tokio::test]
async fn gcs_test_proxy_url() {
use std::io::Write;
use tempfile::NamedTempFile;
let mut tfile = NamedTempFile::new().unwrap();
let creds = r#"{"private_key": "private_key", "client_email":"client_email", "disable_oauth":true}"#;
write!(tfile, "{}", creds).unwrap();
let service_account_path = tfile.path();
let gcs = GoogleCloudStorageBuilder::new()
.with_service_account_path(service_account_path.to_str().unwrap())
.with_bucket_name("foo")
.with_proxy_url("https://example.com")
.build();
assert!(dbg!(gcs).is_ok());

let gcs = GoogleCloudStorageBuilder::new()
.with_service_account_path(service_account_path.to_str().unwrap())
.with_bucket_name("foo")
.with_proxy_url("asdf://example.com")
.build();

assert!(match gcs {
Err(ObjectStoreError::Generic { source, .. }) => matches!(
source.downcast_ref(),
Some(crate::gcp::Error::ProxyUrl { .. })
),
_ => false,
})
}
}

0 comments on commit a1c7906

Please sign in to comment.