Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

object_store: suffix requests #5206

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::aws::credential::{AwsCredential, CredentialExt};
use crate::aws::{
AwsCredentialProvider, S3ConditionalPut, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET,
};
use crate::client::get::GetClient;
use crate::client::get::{response_to_get_result, GetClient, GetSuffixClient};
use crate::client::header::HeaderConfig;
use crate::client::header::{get_put_result, get_version};
use crate::client::list::ListClient;
Expand All @@ -29,9 +29,10 @@ use crate::client::s3::{
CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult,
ListResponse,
};
use crate::client::GetOptionsExt;
use crate::client::{with_suffix_header, GetOptionsExt};
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::util::HttpRange;
use crate::{
ClientOptions, GetOptions, ListResult, MultipartId, Path, PutResult, Result, RetryConfig,
};
Expand Down Expand Up @@ -665,6 +666,39 @@ impl GetClient for S3Client {
}
}

#[async_trait]
impl GetSuffixClient for S3Client {
async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
let credential = self.config.get_credential().await?;
let url = self.config.path_url(location);
let method = Method::GET;

let builder = self.client.request(method, url);

// if let Some(v) = &options.version {
// builder = builder.query(&[("versionId", v)])
// }

let response = with_suffix_header(builder, nbytes)
.with_aws_sigv4(
credential.as_deref(),
&self.config.region,
"s3",
self.config.sign_payload,
None,
)
.send_retry(&self.config.retry_config)
.await
.context(GetRequestSnafu {
path: location.as_ref(),
})?;

response_to_get_result::<Self>(response, location, Some(HttpRange::new_suffix(nbytes)))?
.bytes()
.await
}
}

#[async_trait]
impl ListClient for S3Client {
/// Make an S3 List request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html>
Expand Down
6 changes: 5 additions & 1 deletion object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use tokio::io::AsyncWrite;
use url::Url;

use crate::aws::client::S3Client;
use crate::client::get::GetClientExt;
use crate::client::get::{GetClientExt, GetSuffixClient};
use crate::client::list::ListClientExt;
use crate::client::CredentialProvider;
use crate::multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart};
Expand Down Expand Up @@ -216,6 +216,10 @@ impl ObjectStore for AmazonS3 {
.await
}

async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
self.client.get_suffix(location, nbytes).await
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
self.client.get_opts(location, options).await
}
Expand Down
4 changes: 4 additions & 0 deletions object_store/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ impl ObjectStore for ChunkedStore {
self.inner.get_range(location, range).await
}

async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
self.inner.get_suffix(location, nbytes).await
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
self.inner.head(location).await
}
Expand Down
61 changes: 40 additions & 21 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

use crate::client::header::{header_meta, HeaderConfig};
use crate::path::Path;
use crate::util::{concrete_range, HttpRange};
use crate::{Error, GetOptions, GetResult};
use crate::{GetResultPayload, Result};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{StreamExt, TryStreamExt};
use reqwest::Response;

Expand All @@ -40,30 +42,47 @@ pub trait GetClientExt {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;
}

pub(crate) fn response_to_get_result<T: GetClient>(
response: Response,
location: &Path,
range: Option<HttpRange>,
) -> Result<GetResult> {
let meta = header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| {
Error::Generic {
store: T::STORE,
source: Box::new(e),
}
})?;

let stream = response
.bytes_stream()
.map_err(|source| Error::Generic {
store: T::STORE,
source: Box::new(source),
})
.boxed();

Ok(GetResult {
range: concrete_range(range, meta.size),
payload: GetResultPayload::Stream(stream),
meta,
})
}

#[async_trait]
impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let range = options.range.clone();
let range = options.range.clone().map(HttpRange::from);
let response = self.get_request(location, options).await?;
let meta = header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| {
Error::Generic {
store: T::STORE,
source: Box::new(e),
}
})?;

let stream = response
.bytes_stream()
.map_err(|source| Error::Generic {
store: T::STORE,
source: Box::new(source),
})
.boxed();

Ok(GetResult {
range: range.unwrap_or(0..meta.size),
payload: GetResultPayload::Stream(stream),
meta,
})
response_to_get_result::<T>(response, location, range)
}
}

/// This trait is a bodge to allow suffix requests without breaking the user-facing API.
///
/// See https://github.com/apache/arrow-rs/issues/4611 for discussion.
#[async_trait]
pub trait GetSuffixClient {
/// Get the last `nbytes` of a resource.
async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes>;
}
5 changes: 5 additions & 0 deletions object_store/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,11 @@ impl ClientOptions {
}
}

pub(crate) fn with_suffix_header(builder: RequestBuilder, nbytes: usize) -> RequestBuilder {
let range = format!("bytes=-{nbytes}");
builder.header(hyper::header::RANGE, range)
}

pub trait GetOptionsExt {
fn with_get_options(self, options: GetOptions) -> Self;
}
Expand Down
40 changes: 38 additions & 2 deletions object_store/src/gcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
// specific language governing permissions and limitations
// under the License.

use crate::client::get::GetClient;
use crate::client::get::{response_to_get_result, GetClient, GetSuffixClient};
use crate::client::header::{get_put_result, get_version, HeaderConfig};
use crate::client::list::ListClient;
use crate::client::retry::RetryExt;
use crate::client::s3::{
CompleteMultipartUpload, CompleteMultipartUploadResult, InitiateMultipartUploadResult,
ListResponse,
};
use crate::client::GetOptionsExt;
use crate::client::{with_suffix_header, GetOptionsExt};
use crate::gcp::{GcpCredential, GcpCredentialProvider, STORE};
use crate::multipart::PartId;
use crate::path::{Path, DELIMITER};
use crate::util::HttpRange;
use crate::{
ClientOptions, GetOptions, ListResult, MultipartId, PutMode, PutOptions, PutResult, Result,
RetryConfig,
Expand Down Expand Up @@ -462,6 +463,41 @@ impl GetClient for GoogleCloudStorageClient {
}
}

#[async_trait]
impl GetSuffixClient for GoogleCloudStorageClient {
async fn get_suffix(&self, path: &Path, nbytes: usize) -> Result<Bytes> {
let credential = self.get_credential().await?;
let url = self.object_url(path);

let method = Method::GET;

let mut request = self.client.request(method, url);

// if let Some(version) = &options.version {
// request = request.query(&[("generation", version)]);
// }

if !credential.bearer.is_empty() {
request = request.bearer_auth(&credential.bearer);
}

let response = with_suffix_header(request, nbytes)
.send_retry(&self.config.retry_config)
.await
.context(GetRequestSnafu {
path: path.as_ref(),
})?;

response_to_get_result::<Self>(
response,
path,
Some(HttpRange::new_suffix(nbytes)),
)?
.bytes()
.await
}
}

#[async_trait]
impl ListClient for GoogleCloudStorageClient {
/// Perform a list request <https://cloud.google.com/storage/docs/xml-api/get-bucket-list>
Expand Down
6 changes: 5 additions & 1 deletion object_store/src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use client::GoogleCloudStorageClient;
use futures::stream::BoxStream;
use tokio::io::AsyncWrite;

use crate::client::get::GetClientExt;
use crate::client::get::{GetClientExt, GetSuffixClient};
use crate::client::list::ListClientExt;
use crate::multipart::MultiPartStore;
pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey};
Expand Down Expand Up @@ -139,6 +139,10 @@ impl ObjectStore for GoogleCloudStorage {
self.client.get_opts(location, options).await
}

async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
self.client.get_suffix(location, nbytes).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.client.delete_request(location).await
}
Expand Down
43 changes: 40 additions & 3 deletions object_store/src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use crate::client::get::GetClient;
use crate::client::get::{response_to_get_result, GetClient, GetSuffixClient};
use crate::client::header::HeaderConfig;
use crate::client::retry::{self, RetryConfig, RetryExt};
use crate::client::GetOptionsExt;
use crate::client::{with_suffix_header, GetOptionsExt};
use crate::path::{Path, DELIMITER};
use crate::util::deserialize_rfc1123;
use crate::util::{deserialize_rfc1123, HttpRange};
use crate::{ClientOptions, GetOptions, ObjectMeta, Result};
use async_trait::async_trait;
use bytes::{Buf, Bytes};
Expand Down Expand Up @@ -322,6 +322,43 @@ impl GetClient for Client {
}
}

#[async_trait]
impl GetSuffixClient for Client {
async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
let url = self.path_url(location);
let method = Method::GET;
let builder = self.client.request(method, url);

let res = with_suffix_header(builder, nbytes)
.send_retry(&self.retry_config)
.await
.map_err(|source| match source.status() {
// Some stores return METHOD_NOT_ALLOWED for get on directories
Some(StatusCode::NOT_FOUND | StatusCode::METHOD_NOT_ALLOWED) => {
crate::Error::NotFound {
source: Box::new(source),
path: location.to_string(),
}
}
_ => Error::Request { source }.into(),
})?;

// We expect a 206 Partial Content response if a range was requested
// a 200 OK response would indicate the server did not fulfill the request
if res.status() != StatusCode::PARTIAL_CONTENT {
return Err(crate::Error::NotSupported {
source: Box::new(Error::RangeNotSupported {
href: location.to_string(),
}),
});
}

response_to_get_result::<Self>(res, location, Some(HttpRange::new_suffix(nbytes)))?
.bytes()
.await
}
}

/// The response returned by a PROPFIND request, i.e. list
#[derive(Deserialize, Default)]
pub struct MultiStatus {
Expand Down
6 changes: 5 additions & 1 deletion object_store/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use snafu::{OptionExt, ResultExt, Snafu};
use tokio::io::AsyncWrite;
use url::Url;

use crate::client::get::GetClientExt;
use crate::client::get::{GetClientExt, GetSuffixClient};
use crate::client::header::get_etag;
use crate::http::client::Client;
use crate::path::Path;
Expand Down Expand Up @@ -130,6 +130,10 @@ impl ObjectStore for HttpStore {
self.client.get_opts(location, options).await
}

async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
self.client.get_suffix(location, nbytes).await
}

async fn delete(&self, location: &Path) -> Result<()> {
self.client.delete(location).await
}
Expand Down
22 changes: 22 additions & 0 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,24 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
self.get_opts(location, options).await?.bytes().await
}

/// Return the last `nbytes` of the resource at the specified location.
///
/// The default implementation uses 2 requests: one to find the full length,
/// and one to get the requested bytes.
/// Many stores implement suffix requests directly; these should override the default.
///
/// If you already know the size of the resource, you should use a regular range request, like
/// `my_object_store.get_range(my_location, size.saturating_sub(nbytes)..size)`.
async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
let options = GetOptions {
head: true,
..Default::default()
};
let size = self.get_opts(location, options).await?.meta.size;
self.get_range(location, size.saturating_sub(nbytes)..size)
.await
}

/// Return the bytes that are stored at the specified location
/// in the given byte ranges
async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
Expand Down Expand Up @@ -779,6 +797,10 @@ macro_rules! as_ref_impl {
self.as_ref().get_range(location, range).await
}

async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
self.as_ref().get_suffix(location, nbytes).await
}

async fn get_ranges(
&self,
location: &Path,
Expand Down
5 changes: 5 additions & 0 deletions object_store/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
self.inner.get_range(location, range).await
}

async fn get_suffix(&self, location: &Path, nbytes: usize) -> Result<Bytes> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.get_suffix(location, nbytes).await
}

async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.get_ranges(location, ranges).await
Expand Down
Loading