Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract Common Listing and Retrieval Functionality #4220

Merged
merged 6 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 103 additions & 121 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
use crate::aws::checksum::Checksum;
use crate::aws::credential::{AwsCredential, CredentialExt};
use crate::aws::{AwsCredentialProvider, STORE, STRICT_PATH_ENCODE_SET};
use crate::client::list::ListResponse;
use crate::client::pagination::stream_paginated;
use crate::client::get::GetClient;
use crate::client::list::ListClient;
use crate::client::list_response::ListResponse;
use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::multipart::UploadPart;
use crate::path::DELIMITER;
use crate::util::format_prefix;
use crate::{
BoxStream, ClientOptions, GetOptions, ListResult, MultipartId, Path, Result,
RetryConfig, StreamExt,
ClientOptions, GetOptions, ListResult, MultipartId, Path, Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::{Buf, Bytes};
Expand Down Expand Up @@ -169,40 +169,6 @@ impl S3Client {
self.config.credentials.get_credential().await
}

/// Make an S3 GET request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html>
pub async fn get_request(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved to be a trait implementation

&self,
path: &Path,
options: GetOptions,
head: bool,
) -> Result<Response> {
let credential = self.get_credential().await?;
let url = self.config.path_url(path);
let method = match head {
true => Method::HEAD,
false => Method::GET,
};

let builder = self.client.request(method, url);

let response = builder
.with_get_options(options)
.with_aws_sigv4(
credential.as_ref(),
&self.config.region,
"s3",
self.config.sign_payload,
None,
)
.send_retry(&self.config.retry_config)
.await
.context(GetRequestSnafu {
path: path.as_ref(),
})?;

Ok(response)
}

/// Make an S3 PUT request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html>
pub async fn put_request<T: Serialize + ?Sized + Sync>(
&self,
Expand Down Expand Up @@ -302,88 +268,6 @@ impl S3Client {
Ok(())
}

/// Make an S3 List request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
async fn list_request(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just moved

&self,
prefix: Option<&str>,
delimiter: bool,
token: Option<&str>,
offset: Option<&str>,
) -> Result<(ListResult, Option<String>)> {
let credential = self.get_credential().await?;
let url = self.config.bucket_endpoint.clone();

let mut query = Vec::with_capacity(4);

if let Some(token) = token {
query.push(("continuation-token", token))
}

if delimiter {
query.push(("delimiter", DELIMITER))
}

query.push(("list-type", "2"));

if let Some(prefix) = prefix {
query.push(("prefix", prefix))
}

if let Some(offset) = offset {
query.push(("start-after", offset))
}

let response = self
.client
.request(Method::GET, &url)
.query(&query)
.with_aws_sigv4(
credential.as_ref(),
&self.config.region,
"s3",
self.config.sign_payload,
None,
)
.send_retry(&self.config.retry_config)
.await
.context(ListRequestSnafu)?
.bytes()
.await
.context(ListResponseBodySnafu)?;

let mut response: ListResponse = quick_xml::de::from_reader(response.reader())
.context(InvalidListResponseSnafu)?;
let token = response.next_continuation_token.take();

Ok((response.try_into()?, token))
}

/// Perform a list operation automatically handling pagination
pub fn list_paginated(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now implemented by ListClientExt

&self,
prefix: Option<&Path>,
delimiter: bool,
offset: Option<&Path>,
) -> BoxStream<'_, Result<ListResult>> {
let offset = offset.map(|x| x.to_string());
let prefix = format_prefix(prefix);
stream_paginated(
(prefix, offset),
move |(prefix, offset), token| async move {
let (r, next_token) = self
.list_request(
prefix.as_deref(),
delimiter,
token.as_deref(),
offset.as_deref(),
)
.await?;
Ok((r, (prefix, offset), next_token))
},
)
.boxed()
}

pub async fn create_multipart(&self, location: &Path) -> Result<MultipartId> {
let credential = self.get_credential().await?;
let url = format!("{}?uploads=", self.config.path_url(location),);
Expand Down Expand Up @@ -451,6 +335,104 @@ impl S3Client {
}
}

#[async_trait]
impl GetClient for S3Client {
const STORE: &'static str = STORE;

/// Make an S3 GET request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html>
async fn get_request(
&self,
path: &Path,
options: GetOptions,
head: bool,
) -> Result<Response> {
let credential = self.get_credential().await?;
let url = self.config.path_url(path);
let method = match head {
true => Method::HEAD,
false => Method::GET,
};

let builder = self.client.request(method, url);

let response = builder
.with_get_options(options)
.with_aws_sigv4(
credential.as_ref(),
&self.config.region,
"s3",
self.config.sign_payload,
None,
)
.send_retry(&self.config.retry_config)
.await
.context(GetRequestSnafu {
path: path.as_ref(),
})?;

Ok(response)
}
}

#[async_trait]
impl ListClient for S3Client {
/// Make an S3 List request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
async fn list_request(
&self,
prefix: Option<&str>,
delimiter: bool,
token: Option<&str>,
offset: Option<&str>,
) -> Result<(ListResult, Option<String>)> {
let credential = self.get_credential().await?;
let url = self.config.bucket_endpoint.clone();

let mut query = Vec::with_capacity(4);

if let Some(token) = token {
query.push(("continuation-token", token))
}

if delimiter {
query.push(("delimiter", DELIMITER))
}

query.push(("list-type", "2"));

if let Some(prefix) = prefix {
query.push(("prefix", prefix))
}

if let Some(offset) = offset {
query.push(("start-after", offset))
}

let response = self
.client
.request(Method::GET, &url)
.query(&query)
.with_aws_sigv4(
credential.as_ref(),
&self.config.region,
"s3",
self.config.sign_payload,
None,
)
.send_retry(&self.config.retry_config)
.await
.context(ListRequestSnafu)?
.bytes()
.await
.context(ListResponseBodySnafu)?;

let mut response: ListResponse = quick_xml::de::from_reader(response.reader())
.context(InvalidListResponseSnafu)?;
let token = response.next_continuation_token.take();

Ok((response.try_into()?, token))
}
}

fn encode_path(path: &Path) -> PercentEncode<'_> {
utf8_percent_encode(path.as_ref(), &STRICT_PATH_ENCODE_SET)
}
63 changes: 8 additions & 55 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::TryStreamExt;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::collections::BTreeSet;
use std::str::FromStr;
use std::sync::Arc;
use tokio::io::AsyncWrite;
Expand All @@ -48,7 +46,8 @@ use url::Url;
pub use crate::aws::checksum::Checksum;
use crate::aws::client::{S3Client, S3Config};
use crate::aws::credential::{InstanceCredentialProvider, WebIdentityProvider};
use crate::client::header::header_meta;
use crate::client::get::GetClientExt;
use crate::client::list::ListClientExt;
use crate::client::{
ClientConfigKey, CredentialProvider, StaticCredentialProvider,
TokenCredentialProvider,
Expand All @@ -57,7 +56,7 @@ use crate::config::ConfigValue;
use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart};
use crate::{
ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, Path, Result, RetryConfig, StreamExt,
ObjectStore, Path, Result, RetryConfig,
};

mod checksum;
Expand Down Expand Up @@ -138,11 +137,6 @@ enum Error {

#[snafu(display("Failed to parse the region for bucket '{}'", bucket))]
RegionParse { bucket: String },

#[snafu(display("Failed to parse headers: {}", source))]
Header {
source: crate::client::header::Error,
},
}

impl From<Error> for super::Error {
Expand Down Expand Up @@ -244,24 +238,11 @@ impl ObjectStore for AmazonS3 {
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let response = self.client.get_request(location, options, false).await?;
let stream = response
.bytes_stream()
.map_err(|source| crate::Error::Generic {
store: STORE,
source: Box::new(source),
})
.boxed();

Ok(GetResult::Stream(stream))
self.client.get_opts(location, options).await
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let options = GetOptions::default();
// Extract meta from headers
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax
let response = self.client.get_request(location, options, true).await?;
Ok(header_meta(location, response.headers()).context(HeaderSnafu)?)
self.client.head(location).await
}

async fn delete(&self, location: &Path) -> Result<()> {
Expand All @@ -272,47 +253,19 @@ impl ObjectStore for AmazonS3 {
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
let stream = self
.client
.list_paginated(prefix, false, None)
.map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
.try_flatten()
.boxed();

Ok(stream)
self.client.list(prefix).await
}

async fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
let stream = self
.client
.list_paginated(prefix, false, Some(offset))
.map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
.try_flatten()
.boxed();

Ok(stream)
self.client.list_with_offset(prefix, offset).await
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
let mut stream = self.client.list_paginated(prefix, true, None);

let mut common_prefixes = BTreeSet::new();
let mut objects = Vec::new();

while let Some(result) = stream.next().await {
let response = result?;
common_prefixes.extend(response.common_prefixes.into_iter());
objects.extend(response.objects.into_iter());
}

Ok(ListResult {
common_prefixes: common_prefixes.into_iter().collect(),
objects,
})
self.client.list_with_delimiter(prefix).await
}

async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
Expand Down
Loading