Skip to content

Commit

Permalink
object_store: add support for using proxy_url for connection testing
Browse files Browse the repository at this point in the history
  • Loading branch information
sum12 committed Nov 14, 2022
1 parent e44cb5b commit 1843c0d
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 8 deletions.
23 changes: 21 additions & 2 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::stream::BoxStream;
use futures::TryStreamExt;
use reqwest::Client;
use reqwest::{Client, Proxy};
use snafu::{OptionExt, ResultExt, Snafu};
use std::collections::BTreeSet;
use std::ops::Range;
Expand Down Expand Up @@ -120,6 +120,9 @@ enum Error {

#[snafu(display("Error reading token file: {}", source))]
ReadTokenFile { source: std::io::Error },

#[snafu(display("Unable to use proxy url: {}", source))]
ProxyUrlError { source: reqwest::Error },
}

impl From<Error> for super::Error {
Expand Down Expand Up @@ -363,6 +366,7 @@ pub struct AmazonS3Builder {
virtual_hosted_style_request: bool,
metadata_endpoint: Option<String>,
profile: Option<String>,
proxy_url: Option<String>,
}

impl AmazonS3Builder {
Expand Down Expand Up @@ -537,6 +541,12 @@ impl AmazonS3Builder {
self
}

/// Set the proxy_url to be used by the underlying client
pub fn with_proxy_url(mut self, proxy_url: impl Into<String>) -> Self {
self.proxy_url = Some(proxy_url.into());
self
}

/// Set the AWS profile name, see <https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html>
///
/// This makes use of [aws-config] to provide credentials and therefore requires
Expand Down Expand Up @@ -590,7 +600,16 @@ impl AmazonS3Builder {
let endpoint = format!("https://sts.{}.amazonaws.com", region);

// Disallow non-HTTPs requests
let client = Client::builder().https_only(true).build().unwrap();
let clientbuilder = Client::builder().https_only(true);
let client = if let Some(url) = self.proxy_url {
let pr: Proxy = Proxy::all(&url)
.map_err(|source| Error::ProxyUrlError { source })?;
clientbuilder.proxy(pr)
} else {
clientbuilder
}
.build()
.unwrap();

Box::new(WebIdentityProvider {
cache: Default::default(),
Expand Down
17 changes: 12 additions & 5 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use chrono::{DateTime, TimeZone, Utc};
use itertools::Itertools;
use reqwest::{
header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH, RANGE},
Client as ReqwestClient, Method, Response, StatusCode,
Client as ReqwestClient, Method, Proxy, Response, StatusCode,
};
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{ResultExt, Snafu};
Expand Down Expand Up @@ -124,6 +124,7 @@ pub struct AzureConfig {
pub allow_http: bool,
pub service: Url,
pub is_emulator: bool,
pub proxy_url: Option<String>,
}

impl AzureConfig {
Expand All @@ -149,10 +150,16 @@ pub(crate) struct AzureClient {
impl AzureClient {
/// create a new instance of [AzureClient]
pub fn new(config: AzureConfig) -> Self {
let client = reqwest::ClientBuilder::new()
.https_only(!config.allow_http)
.build()
.unwrap();
let builder = ReqwestClient::builder();
let client = if let Some(url) = config.proxy_url.as_ref() {
let pr: Proxy = Proxy::all(url).unwrap();
builder.proxy(pr)
} else {
builder
}
.https_only(!config.allow_http)
.build()
.unwrap();

Self { config, client }
}
Expand Down
22 changes: 22 additions & 0 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ enum Error {

#[snafu(display("Azure credential error: {}", source), context(false))]
Credential { source: credential::Error },

#[snafu(display("Unable to use proxy url"))]
ProxyUrlError { source: reqwest::Error },
}

impl From<Error> for super::Error {
Expand Down Expand Up @@ -360,6 +363,7 @@ pub struct MicrosoftAzureBuilder {
use_emulator: bool,
retry_config: RetryConfig,
allow_http: bool,
proxy_url: Option<String>,
}

impl Debug for MicrosoftAzureBuilder {
Expand Down Expand Up @@ -500,6 +504,12 @@ impl MicrosoftAzureBuilder {
self
}

/// Set the proxy_url to be used by the underlying client
pub fn with_proxy_url(mut self, proxy_url: impl Into<String>) -> Self {
self.proxy_url = Some(proxy_url.into());
self
}

/// Configure a connection to container with given name on Microsoft Azure
/// Blob store.
pub fn build(self) -> Result<MicrosoftAzure> {
Expand All @@ -516,8 +526,19 @@ impl MicrosoftAzureBuilder {
retry_config,
allow_http,
authority_host,
proxy_url,
} = self;

// check if proxy_url is usable
if let Some(url) = proxy_url.as_ref() {
let pr = reqwest::Proxy::all(url)
.map_err(|source| Error::ProxyUrlError { source })?;
let _ = reqwest::ClientBuilder::new()
.proxy(pr)
.build()
.map_err(|source| Error::ProxyUrlError { source })?;
}

let container = container_name.ok_or(Error::MissingContainerName {})?;

let (is_emulator, allow_http, storage_url, auth, account) = if use_emulator {
Expand Down Expand Up @@ -567,6 +588,7 @@ impl MicrosoftAzureBuilder {
container,
credentials: auth,
is_emulator,
proxy_url,
};

let client = Arc::new(client::AzureClient::new(config));
Expand Down
27 changes: 26 additions & 1 deletion object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use percent_encoding::{percent_encode, NON_ALPHANUMERIC};
use reqwest::header::RANGE;
use reqwest::Proxy;
use reqwest::{header, Client, Method, Response, StatusCode};
use snafu::{ResultExt, Snafu};
use tokio::io::AsyncWrite;
Expand Down Expand Up @@ -122,6 +123,9 @@ enum Error {

#[snafu(display("GCP credential error: {}", source))]
Credential { source: credential::Error },

#[snafu(display("Unable to use proxy url: {}", source))]
ProxyUrlError { source: reqwest::Error },
}

impl From<Error> for super::Error {
Expand Down Expand Up @@ -741,6 +745,7 @@ pub struct GoogleCloudStorageBuilder {
service_account_path: Option<String>,
client: Option<Client>,
retry_config: RetryConfig,
proxy_url: Option<String>,
}

impl GoogleCloudStorageBuilder {
Expand Down Expand Up @@ -782,6 +787,14 @@ impl GoogleCloudStorageBuilder {
self
}

/// Set proxy url used for connection
///
/// if a pre-built client is provided with this builder then the proxy_url is just ignored
pub fn with_proxy_url(mut self, proxy_url: impl Into<String>) -> Self {
self.proxy_url = Some(proxy_url.into());
self
}

/// Configure a connection to Google Cloud Storage, returning a
/// new [`GoogleCloudStorage`] and consuming `self`
pub fn build(self) -> Result<GoogleCloudStorage> {
Expand All @@ -790,12 +803,24 @@ impl GoogleCloudStorageBuilder {
service_account_path,
client,
retry_config,
proxy_url,
} = self;

let bucket_name = bucket_name.ok_or(Error::MissingBucketName {})?;
let service_account_path =
service_account_path.ok_or(Error::MissingServiceAccountPath)?;
let client = client.unwrap_or_else(Client::new);
let client = client.unwrap_or(match proxy_url {
Some(url) => {
let pr: Proxy =
Proxy::all(&url).map_err(|source| Error::ProxyUrlError { source })?;

Client::builder()
.proxy(pr)
.build()
.map_err(|source| Error::ProxyUrlError { source })?
}
_ => Client::new(),
});

let credentials = reader_credentials_file(service_account_path)?;

Expand Down

0 comments on commit 1843c0d

Please sign in to comment.