Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

object_store: full HTTP range support #5222

Merged
merged 15 commits into from
Jan 5, 2024
10 changes: 9 additions & 1 deletion object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::util::deserialize_rfc1123;
use crate::util::{deserialize_rfc1123, GetRange};
use crate::{
ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, PutOptions, PutResult,
Result, RetryConfig,
Expand Down Expand Up @@ -356,6 +356,14 @@ impl GetClient for AzureClient {
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob>
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties>
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
// As of 2024-01-02, Azure does not support suffix requests,
// so we should fail fast here rather than sending one
if let Some(GetRange::Suffix(_)) = options.range.as_ref() {
return Err(crate::Error::NotSupported {
source: "Azure does not support suffix range requests".into(),
});
}

let credential = self.get_credential().await?;
let url = self.config.path_url(path);
let method = match options.head {
Expand Down
69 changes: 59 additions & 10 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::ops::Range;

use crate::client::header::{header_meta, HeaderConfig};
use crate::path::Path;
use crate::{Error, GetOptions, GetResult, GetResultPayload, Result};
use crate::{GetOptions, GetRange, GetResult, GetResultPayload, Result};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use hyper::header::CONTENT_RANGE;
Expand Down Expand Up @@ -49,6 +49,12 @@ pub trait GetClientExt {
impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let range = options.range.clone();
if let Some(r) = range.as_ref() {
r.is_valid().map_err(|e| crate::Error::Generic {
store: T::STORE,
source: Box::new(e),
})?;
}
let response = self.get_request(location, options).await?;
get_result::<T>(location, range, response).map_err(|e| crate::Error::Generic {
store: T::STORE,
Expand Down Expand Up @@ -94,6 +100,11 @@ enum GetResultError {
source: crate::client::header::Error,
},

#[snafu(context(false))]
InvalidRangeRequest {
source: crate::util::InvalidGetRange,
},

#[snafu(display("Received non-partial response when range requested"))]
NotPartial,

Expand All @@ -115,7 +126,7 @@ enum GetResultError {

fn get_result<T: GetClient>(
location: &Path,
range: Option<Range<usize>>,
range: Option<GetRange>,
response: Response,
) -> Result<GetResult, GetResultError> {
let mut meta = header_meta(location, response.headers(), T::HEADER_CONFIG)?;
Expand All @@ -135,21 +146,24 @@ fn get_result<T: GetClient>(
let value = ContentRange::from_str(value).context(ParseContentRangeSnafu { value })?;
let actual = value.range;

// Update size to reflect full size of object (#5272)
meta.size = value.size;

let expected = expected.as_range(meta.size)?;

ensure!(
actual == expected,
UnexpectedRangeSnafu { expected, actual }
);

// Update size to reflect full size of object (#5272)
meta.size = value.size;
actual
} else {
0..meta.size
};

let stream = response
.bytes_stream()
.map_err(|source| Error::Generic {
.map_err(|source| crate::Error::Generic {
store: T::STORE,
source: Box::new(source),
})
Expand Down Expand Up @@ -220,20 +234,22 @@ mod tests {
let bytes = res.bytes().await.unwrap();
assert_eq!(bytes.len(), 12);

let get_range = GetRange::from(2..3);

let resp = make_response(
12,
Some(2..3),
StatusCode::PARTIAL_CONTENT,
Some("bytes 2-2/12"),
);
let res = get_result::<TestClient>(&path, Some(2..3), resp).unwrap();
let res = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap();
assert_eq!(res.meta.size, 12);
assert_eq!(res.range, 2..3);
let bytes = res.bytes().await.unwrap();
assert_eq!(bytes.len(), 1);

let resp = make_response(12, Some(2..3), StatusCode::OK, None);
let err = get_result::<TestClient>(&path, Some(2..3), resp).unwrap_err();
let err = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap_err();
assert_eq!(
err.to_string(),
"Received non-partial response when range requested"
Expand All @@ -245,7 +261,7 @@ mod tests {
StatusCode::PARTIAL_CONTENT,
Some("bytes 2-3/12"),
);
let err = get_result::<TestClient>(&path, Some(2..3), resp).unwrap_err();
let err = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap_err();
assert_eq!(err.to_string(), "Requested 2..3, got 2..4");

let resp = make_response(
Expand All @@ -254,17 +270,50 @@ mod tests {
StatusCode::PARTIAL_CONTENT,
Some("bytes 2-2/*"),
);
let err = get_result::<TestClient>(&path, Some(2..3), resp).unwrap_err();
let err = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap_err();
assert_eq!(
err.to_string(),
"Failed to parse value for CONTENT_RANGE header: \"bytes 2-2/*\""
);

let resp = make_response(12, Some(2..3), StatusCode::PARTIAL_CONTENT, None);
let err = get_result::<TestClient>(&path, Some(2..3), resp).unwrap_err();
let err = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap_err();
assert_eq!(
err.to_string(),
"Content-Range header not present in partial response"
);

let resp = make_response(
2,
Some(2..3),
StatusCode::PARTIAL_CONTENT,
Some("bytes 2-3/2"),
);
let err = get_result::<TestClient>(&path, Some(get_range.clone()), resp).unwrap_err();
assert_eq!(
err.to_string(),
"InvalidRangeRequest: Wanted range starting at 2, but object was only 2 bytes long"
);

let resp = make_response(
6,
Some(2..6),
StatusCode::PARTIAL_CONTENT,
Some("bytes 2-5/6"),
);
let res = get_result::<TestClient>(&path, Some(GetRange::Suffix(4)), resp).unwrap();
assert_eq!(res.meta.size, 6);
assert_eq!(res.range, 2..6);
let bytes = res.bytes().await.unwrap();
assert_eq!(bytes.len(), 4);

let resp = make_response(
6,
Some(2..6),
StatusCode::PARTIAL_CONTENT,
Some("bytes 2-3/6"),
);
let err = get_result::<TestClient>(&path, Some(GetRange::Suffix(4)), resp).unwrap_err();
assert_eq!(err.to_string(), "Requested 2..6, got 2..4");
}
}
3 changes: 1 addition & 2 deletions object_store/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,7 @@ impl GetOptionsExt for RequestBuilder {
use hyper::header::*;

if let Some(range) = options.range {
let range = format!("bytes={}-{}", range.start, range.end.saturating_sub(1));
self = self.header(RANGE, range);
self = self.header(RANGE, range.to_string());
}

if let Some(tag) = options.if_match {
Expand Down
67 changes: 63 additions & 4 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ mod parse;
mod util;

pub use parse::{parse_url, parse_url_opts};
pub use util::GetRange;

use crate::path::Path;
#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -580,10 +581,12 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;

/// Return the bytes that are stored at the specified location
/// in the given byte range
/// in the given byte range.
///
/// See [`GetRange::Bounded`] for more details on how `range` gets interpreted
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let options = GetOptions {
range: Some(range.clone()),
range: Some(range.into()),
..Default::default()
};
self.get_opts(location, options).await?.bytes().await
Expand Down Expand Up @@ -913,7 +916,7 @@ pub struct GetOptions {
/// otherwise returning [`Error::NotModified`]
///
/// <https://datatracker.ietf.org/doc/html/rfc9110#name-range>
pub range: Option<Range<usize>>,
pub range: Option<GetRange>,
/// Request a particular object version
pub version: Option<String>,
/// Request transfer of no content
Expand Down Expand Up @@ -1307,7 +1310,7 @@ mod tests {
assert_eq!(bytes, expected_data.slice(range.clone()));

let opts = GetOptions {
range: Some(2..5),
range: Some(GetRange::Bounded(2..5)),
..Default::default()
};
let result = storage.get_opts(&location, opts).await.unwrap();
Expand All @@ -1323,6 +1326,62 @@ mod tests {
// Should be a non-fatal error
out_of_range_result.unwrap_err();

let opts = GetOptions {
range: Some(GetRange::Bounded(2..100)),
..Default::default()
};
let result = storage.get_opts(&location, opts).await.unwrap();
assert_eq!(result.range, 2..14);
assert_eq!(result.meta.size, 14);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"bitrary data".as_ref());

let opts = GetOptions {
range: Some(GetRange::Suffix(2)),
..Default::default()
};
match storage.get_opts(&location, opts).await {
Ok(result) => {
assert_eq!(result.range, 12..14);
assert_eq!(result.meta.size, 14);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"ta".as_ref());
}
Err(Error::NotSupported { .. }) => {}
Err(e) => panic!("{e}"),
}

let opts = GetOptions {
range: Some(GetRange::Suffix(100)),
..Default::default()
};
match storage.get_opts(&location, opts).await {
Ok(result) => {
assert_eq!(result.range, 0..14);
assert_eq!(result.meta.size, 14);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"arbitrary data".as_ref());
}
Err(Error::NotSupported { .. }) => {}
Err(e) => panic!("{e}"),
}

let opts = GetOptions {
range: Some(GetRange::Offset(3)),
..Default::default()
};
let result = storage.get_opts(&location, opts).await.unwrap();
assert_eq!(result.range, 3..14);
assert_eq!(result.meta.size, 14);
let bytes = result.bytes().await.unwrap();
assert_eq!(bytes, b"itrary data".as_ref());

let opts = GetOptions {
range: Some(GetRange::Offset(100)),
..Default::default()
};
storage.get_opts(&location, opts).await.unwrap_err();

let ranges = vec![0..1, 2..3, 0..5];
let bytes = storage.get_ranges(&location, &ranges).await.unwrap();
for (range, bytes) in ranges.iter().zip(bytes) {
Expand Down
13 changes: 12 additions & 1 deletion object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use crate::{
maybe_spawn_blocking,
path::{absolute_path_to_url, Path},
util::InvalidGetRange,
GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, ObjectStore,
PutMode, PutOptions, PutResult, Result,
};
Expand Down Expand Up @@ -111,6 +112,11 @@ pub(crate) enum Error {
actual: usize,
},

#[snafu(display("Requested range was invalid"))]
InvalidRange {
source: InvalidGetRange,
},

#[snafu(display("Unable to copy file from {} to {}: {}", from.display(), to.display(), source))]
UnableToCopyFile {
from: PathBuf,
Expand Down Expand Up @@ -424,9 +430,14 @@ impl ObjectStore for LocalFileSystem {
let meta = convert_metadata(metadata, location)?;
options.check_preconditions(&meta)?;

let range = match options.range {
Some(r) => r.as_range(meta.size).context(InvalidRangeSnafu)?,
None => 0..meta.size,
};

Ok(GetResult {
payload: GetResultPayload::File(file, path),
range: options.range.unwrap_or(0..meta.size),
range,
meta,
})
})
Expand Down
Loading
Loading