diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index 865e0a1a939c..41b7cbd58f98 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -25,7 +25,7 @@ use crate::client::retry::RetryExt; use crate::client::GetOptionsExt; use crate::multipart::PartId; use crate::path::DELIMITER; -use crate::util::deserialize_rfc1123; +use crate::util::{deserialize_rfc1123, GetRange}; use crate::{ ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, PutOptions, PutResult, Result, RetryConfig, @@ -441,6 +441,14 @@ impl GetClient for AzureClient { /// /// async fn get_request(&self, path: &Path, options: GetOptions) -> Result { + // As of 2024-01-02, Azure does not support suffix requests, + // so we should fail fast here rather than sending one + if let Some(GetRange::Suffix(_)) = options.range.as_ref() { + return Err(crate::Error::NotSupported { + source: "Azure does not support suffix range requests".into(), + }); + } + let credential = self.get_credential().await?; let url = self.config.path_url(path); let method = match options.head { diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs index b7e7f24b29c2..2e399e523ed4 100644 --- a/object_store/src/client/get.rs +++ b/object_store/src/client/get.rs @@ -19,7 +19,7 @@ use std::ops::Range; use crate::client::header::{header_meta, HeaderConfig}; use crate::path::Path; -use crate::{Error, GetOptions, GetResult, GetResultPayload, Result}; +use crate::{GetOptions, GetRange, GetResult, GetResultPayload, Result}; use async_trait::async_trait; use futures::{StreamExt, TryStreamExt}; use hyper::header::CONTENT_RANGE; @@ -49,6 +49,12 @@ pub trait GetClientExt { impl GetClientExt for T { async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { let range = options.range.clone(); + if let Some(r) = range.as_ref() { + r.is_valid().map_err(|e| crate::Error::Generic { + store: T::STORE, + source: Box::new(e), + })?; + } let response = self.get_request(location, options).await?; get_result::(location, range, response).map_err(|e| crate::Error::Generic { store: T::STORE, @@ -94,6 +100,11 @@ enum GetResultError { source: crate::client::header::Error, }, + #[snafu(context(false))] + InvalidRangeRequest { + source: crate::util::InvalidGetRange, + }, + #[snafu(display("Received non-partial response when range requested"))] NotPartial, @@ -115,7 +126,7 @@ enum GetResultError { fn get_result( location: &Path, - range: Option>, + range: Option, response: Response, ) -> Result { let mut meta = header_meta(location, response.headers(), T::HEADER_CONFIG)?; @@ -135,13 +146,16 @@ fn get_result( let value = ContentRange::from_str(value).context(ParseContentRangeSnafu { value })?; let actual = value.range; + // Update size to reflect full size of object (#5272) + meta.size = value.size; + + let expected = expected.as_range(meta.size)?; + ensure!( actual == expected, UnexpectedRangeSnafu { expected, actual } ); - // Update size to reflect full size of object (#5272) - meta.size = value.size; actual } else { 0..meta.size @@ -149,7 +163,7 @@ fn get_result( let stream = response .bytes_stream() - .map_err(|source| Error::Generic { + .map_err(|source| crate::Error::Generic { store: T::STORE, source: Box::new(source), }) @@ -220,20 +234,22 @@ mod tests { let bytes = res.bytes().await.unwrap(); assert_eq!(bytes.len(), 12); + let get_range = GetRange::from(2..3); + let resp = make_response( 12, Some(2..3), StatusCode::PARTIAL_CONTENT, Some("bytes 2-2/12"), ); - let res = get_result::(&path, Some(2..3), resp).unwrap(); + let res = get_result::(&path, Some(get_range.clone()), resp).unwrap(); assert_eq!(res.meta.size, 12); assert_eq!(res.range, 2..3); let bytes = res.bytes().await.unwrap(); assert_eq!(bytes.len(), 1); let resp = make_response(12, Some(2..3), StatusCode::OK, None); - let err = get_result::(&path, Some(2..3), resp).unwrap_err(); + let err = get_result::(&path, Some(get_range.clone()), resp).unwrap_err(); assert_eq!( err.to_string(), "Received non-partial response when range requested" @@ -245,7 +261,7 @@ mod tests { StatusCode::PARTIAL_CONTENT, Some("bytes 2-3/12"), ); - let err = get_result::(&path, Some(2..3), resp).unwrap_err(); + let err = get_result::(&path, Some(get_range.clone()), resp).unwrap_err(); assert_eq!(err.to_string(), "Requested 2..3, got 2..4"); let resp = make_response( @@ -254,17 +270,50 @@ mod tests { StatusCode::PARTIAL_CONTENT, Some("bytes 2-2/*"), ); - let err = get_result::(&path, Some(2..3), resp).unwrap_err(); + let err = get_result::(&path, Some(get_range.clone()), resp).unwrap_err(); assert_eq!( err.to_string(), "Failed to parse value for CONTENT_RANGE header: \"bytes 2-2/*\"" ); let resp = make_response(12, Some(2..3), StatusCode::PARTIAL_CONTENT, None); - let err = get_result::(&path, Some(2..3), resp).unwrap_err(); + let err = get_result::(&path, Some(get_range.clone()), resp).unwrap_err(); assert_eq!( err.to_string(), "Content-Range header not present in partial response" ); + + let resp = make_response( + 2, + Some(2..3), + StatusCode::PARTIAL_CONTENT, + Some("bytes 2-3/2"), + ); + let err = get_result::(&path, Some(get_range.clone()), resp).unwrap_err(); + assert_eq!( + err.to_string(), + "InvalidRangeRequest: Wanted range starting at 2, but object was only 2 bytes long" + ); + + let resp = make_response( + 6, + Some(2..6), + StatusCode::PARTIAL_CONTENT, + Some("bytes 2-5/6"), + ); + let res = get_result::(&path, Some(GetRange::Suffix(4)), resp).unwrap(); + assert_eq!(res.meta.size, 6); + assert_eq!(res.range, 2..6); + let bytes = res.bytes().await.unwrap(); + assert_eq!(bytes.len(), 4); + + let resp = make_response( + 6, + Some(2..6), + StatusCode::PARTIAL_CONTENT, + Some("bytes 2-3/6"), + ); + let err = get_result::(&path, Some(GetRange::Suffix(4)), resp).unwrap_err(); + assert_eq!(err.to_string(), "Requested 2..6, got 2..4"); } } diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index 2baf586127c6..4a78927d0988 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -594,8 +594,7 @@ impl GetOptionsExt for RequestBuilder { use hyper::header::*; if let Some(range) = options.range { - let range = format!("bytes={}-{}", range.start, range.end.saturating_sub(1)); - self = self.header(RANGE, range); + self = self.header(RANGE, range.to_string()); } if let Some(tag) = options.if_match { diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 8fc47b2c5de7..53a535612e09 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -499,6 +499,7 @@ mod parse; mod util; pub use parse::{parse_url, parse_url_opts}; +pub use util::GetRange; use crate::path::Path; #[cfg(not(target_arch = "wasm32"))] @@ -580,10 +581,12 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { async fn get_opts(&self, location: &Path, options: GetOptions) -> Result; /// Return the bytes that are stored at the specified location - /// in the given byte range + /// in the given byte range. + /// + /// See [`GetRange::Bounded`] for more details on how `range` gets interpreted async fn get_range(&self, location: &Path, range: Range) -> Result { let options = GetOptions { - range: Some(range.clone()), + range: Some(range.into()), ..Default::default() }; self.get_opts(location, options).await?.bytes().await @@ -913,7 +916,7 @@ pub struct GetOptions { /// otherwise returning [`Error::NotModified`] /// /// - pub range: Option>, + pub range: Option, /// Request a particular object version pub version: Option, /// Request transfer of no content @@ -1308,7 +1311,7 @@ mod tests { assert_eq!(bytes, expected_data.slice(range.clone())); let opts = GetOptions { - range: Some(2..5), + range: Some(GetRange::Bounded(2..5)), ..Default::default() }; let result = storage.get_opts(&location, opts).await.unwrap(); @@ -1324,6 +1327,62 @@ mod tests { // Should be a non-fatal error out_of_range_result.unwrap_err(); + let opts = GetOptions { + range: Some(GetRange::Bounded(2..100)), + ..Default::default() + }; + let result = storage.get_opts(&location, opts).await.unwrap(); + assert_eq!(result.range, 2..14); + assert_eq!(result.meta.size, 14); + let bytes = result.bytes().await.unwrap(); + assert_eq!(bytes, b"bitrary data".as_ref()); + + let opts = GetOptions { + range: Some(GetRange::Suffix(2)), + ..Default::default() + }; + match storage.get_opts(&location, opts).await { + Ok(result) => { + assert_eq!(result.range, 12..14); + assert_eq!(result.meta.size, 14); + let bytes = result.bytes().await.unwrap(); + assert_eq!(bytes, b"ta".as_ref()); + } + Err(Error::NotSupported { .. }) => {} + Err(e) => panic!("{e}"), + } + + let opts = GetOptions { + range: Some(GetRange::Suffix(100)), + ..Default::default() + }; + match storage.get_opts(&location, opts).await { + Ok(result) => { + assert_eq!(result.range, 0..14); + assert_eq!(result.meta.size, 14); + let bytes = result.bytes().await.unwrap(); + assert_eq!(bytes, b"arbitrary data".as_ref()); + } + Err(Error::NotSupported { .. }) => {} + Err(e) => panic!("{e}"), + } + + let opts = GetOptions { + range: Some(GetRange::Offset(3)), + ..Default::default() + }; + let result = storage.get_opts(&location, opts).await.unwrap(); + assert_eq!(result.range, 3..14); + assert_eq!(result.meta.size, 14); + let bytes = result.bytes().await.unwrap(); + assert_eq!(bytes, b"itrary data".as_ref()); + + let opts = GetOptions { + range: Some(GetRange::Offset(100)), + ..Default::default() + }; + storage.get_opts(&location, opts).await.unwrap_err(); + let ranges = vec![0..1, 2..3, 0..5]; let bytes = storage.get_ranges(&location, &ranges).await.unwrap(); for (range, bytes) in ranges.iter().zip(bytes) { diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 71b96f058c79..e985ff070cd4 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -19,6 +19,7 @@ use crate::{ maybe_spawn_blocking, path::{absolute_path_to_url, Path}, + util::InvalidGetRange, GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult, Result, }; @@ -111,6 +112,11 @@ pub(crate) enum Error { actual: usize, }, + #[snafu(display("Requested range was invalid"))] + InvalidRange { + source: InvalidGetRange, + }, + #[snafu(display("Unable to copy file from {} to {}: {}", from.display(), to.display(), source))] UnableToCopyFile { from: PathBuf, @@ -424,9 +430,14 @@ impl ObjectStore for LocalFileSystem { let meta = convert_metadata(metadata, location)?; options.check_preconditions(&meta)?; + let range = match options.range { + Some(r) => r.as_range(meta.size).context(InvalidRangeSnafu)?, + None => 0..meta.size, + }; + Ok(GetResult { payload: GetResultPayload::File(file, path), - range: options.range.unwrap_or(0..meta.size), + range, meta, }) }) diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 382300123846..41cfcc490da6 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -16,9 +16,10 @@ // under the License. //! An in-memory object store implementation +use crate::util::InvalidGetRange; use crate::{ - path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutMode, - PutOptions, PutResult, Result, UpdateVersion, + path::Path, GetRange, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, + PutMode, PutOptions, PutResult, Result, UpdateVersion, }; use crate::{GetOptions, MultipartId}; use async_trait::async_trait; @@ -26,7 +27,7 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::{stream::BoxStream, StreamExt}; use parking_lot::RwLock; -use snafu::{ensure, OptionExt, Snafu}; +use snafu::{OptionExt, ResultExt, Snafu}; use std::collections::BTreeMap; use std::collections::BTreeSet; use std::io; @@ -43,13 +44,8 @@ enum Error { #[snafu(display("No data in memory found. Location: {path}"))] NoDataInMemory { path: String }, - #[snafu(display( - "Requested range {}..{} is out of bounds for object with length {}", range.start, range.end, len - ))] - OutOfRange { range: Range, len: usize }, - - #[snafu(display("Invalid range: {}..{}", range.start, range.end))] - BadRange { range: Range }, + #[snafu(display("Invalid range: {source}"))] + Range { source: InvalidGetRange }, #[snafu(display("Object already exists at that location: {path}"))] AlreadyExists { path: String }, @@ -220,10 +216,8 @@ impl ObjectStore for InMemory { let (range, data) = match options.range { Some(range) => { - let len = entry.data.len(); - ensure!(range.end <= len, OutOfRangeSnafu { range, len }); - ensure!(range.start <= range.end, BadRangeSnafu { range }); - (range.clone(), entry.data.slice(range)) + let r = range.as_range(entry.data.len()).context(RangeSnafu)?; + (r.clone(), entry.data.slice(r)) } None => (0..entry.data.len(), entry.data), }; @@ -241,14 +235,11 @@ impl ObjectStore for InMemory { ranges .iter() .map(|range| { - let range = range.clone(); - let len = entry.data.len(); - ensure!( - range.end <= entry.data.len(), - OutOfRangeSnafu { range, len } - ); - ensure!(range.start <= range.end, BadRangeSnafu { range }); - Ok(entry.data.slice(range)) + let r = GetRange::Bounded(range.clone()) + .as_range(entry.data.len()) + .context(RangeSnafu)?; + + Ok(entry.data.slice(r)) }) .collect() } diff --git a/object_store/src/util.rs b/object_store/src/util.rs index fd86ba7366b0..a19d5aab4b5b 100644 --- a/object_store/src/util.rs +++ b/object_store/src/util.rs @@ -16,9 +16,15 @@ // under the License. //! Common logic for interacting with remote object stores +use std::{ + fmt::Display, + ops::{Range, RangeBounds}, +}; + use super::Result; use bytes::Bytes; use futures::{stream::StreamExt, Stream, TryStreamExt}; +use snafu::Snafu; #[cfg(any(feature = "azure", feature = "http"))] pub static RFC1123_FMT: &str = "%a, %d %h %Y %T GMT"; @@ -98,12 +104,12 @@ pub const OBJECT_STORE_COALESCE_PARALLEL: usize = 10; /// * Make multiple `fetch` requests in parallel (up to maximum of 10) /// pub async fn coalesce_ranges( - ranges: &[std::ops::Range], + ranges: &[Range], fetch: F, coalesce: usize, ) -> Result, E> where - F: Send + FnMut(std::ops::Range) -> Fut, + F: Send + FnMut(Range) -> Fut, E: Send, Fut: std::future::Future> + Send, { @@ -124,13 +130,13 @@ where let start = range.start - fetch_range.start; let end = range.end - fetch_range.start; - fetch_bytes.slice(start..end) + fetch_bytes.slice(start..end.min(fetch_bytes.len())) }) .collect()) } /// Returns a sorted list of ranges that cover `ranges` -fn merge_ranges(ranges: &[std::ops::Range], coalesce: usize) -> Vec> { +fn merge_ranges(ranges: &[Range], coalesce: usize) -> Vec> { if ranges.is_empty() { return vec![]; } @@ -167,6 +173,119 @@ fn merge_ranges(ranges: &[std::ops::Range], coalesce: usize) -> Vec), + /// Request all bytes starting from a given byte offset + Offset(usize), + /// Request up to the last n bytes + Suffix(usize), +} + +#[derive(Debug, Snafu)] +pub(crate) enum InvalidGetRange { + #[snafu(display( + "Wanted range starting at {requested}, but object was only {length} bytes long" + ))] + StartTooLarge { requested: usize, length: usize }, + + #[snafu(display("Range started at {start} and ended at {end}"))] + Inconsistent { start: usize, end: usize }, +} + +impl GetRange { + pub(crate) fn is_valid(&self) -> Result<(), InvalidGetRange> { + match self { + Self::Bounded(r) if r.end <= r.start => { + return Err(InvalidGetRange::Inconsistent { + start: r.start, + end: r.end, + }); + } + _ => (), + }; + Ok(()) + } + + /// Convert to a [`Range`] if valid. + pub(crate) fn as_range(&self, len: usize) -> Result, InvalidGetRange> { + self.is_valid()?; + match self { + Self::Bounded(r) => { + if r.start >= len { + Err(InvalidGetRange::StartTooLarge { + requested: r.start, + length: len, + }) + } else if r.end > len { + Ok(r.start..len) + } else { + Ok(r.clone()) + } + } + Self::Offset(o) => { + if *o >= len { + Err(InvalidGetRange::StartTooLarge { + requested: *o, + length: len, + }) + } else { + Ok(*o..len) + } + } + Self::Suffix(n) => Ok(len.saturating_sub(*n)..len), + } + } +} + +impl Display for GetRange { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Bounded(r) => write!(f, "bytes={}-{}", r.start, r.end - 1), + Self::Offset(o) => write!(f, "bytes={o}-"), + Self::Suffix(n) => write!(f, "bytes=-{n}"), + } + } +} + +impl> From for GetRange { + fn from(value: T) -> Self { + use std::ops::Bound::*; + let first = match value.start_bound() { + Included(i) => *i, + Excluded(i) => i + 1, + Unbounded => 0, + }; + match value.end_bound() { + Included(i) => Self::Bounded(first..(i + 1)), + Excluded(i) => Self::Bounded(first..*i), + Unbounded => Self::Offset(first), + } + } +} + #[cfg(test)] mod tests { use crate::Error; @@ -269,4 +388,59 @@ mod tests { } } } + + #[test] + fn getrange_str() { + assert_eq!(GetRange::Offset(0).to_string(), "bytes=0-"); + assert_eq!(GetRange::Bounded(10..19).to_string(), "bytes=10-18"); + assert_eq!(GetRange::Suffix(10).to_string(), "bytes=-10"); + } + + #[test] + fn getrange_from() { + assert_eq!(Into::::into(10..15), GetRange::Bounded(10..15),); + assert_eq!(Into::::into(10..=15), GetRange::Bounded(10..16),); + assert_eq!(Into::::into(10..), GetRange::Offset(10),); + assert_eq!(Into::::into(..=15), GetRange::Bounded(0..16)); + } + + #[test] + fn test_as_range() { + let range = GetRange::Bounded(2..5); + assert_eq!(range.as_range(5).unwrap(), 2..5); + + let range = range.as_range(4).unwrap(); + assert_eq!(range, 2..4); + + let range = GetRange::Bounded(3..3); + let err = range.as_range(2).unwrap_err().to_string(); + assert_eq!(err, "Range started at 3 and ended at 3"); + + let range = GetRange::Bounded(2..2); + let err = range.as_range(3).unwrap_err().to_string(); + assert_eq!(err, "Range started at 2 and ended at 2"); + + let range = GetRange::Suffix(3); + assert_eq!(range.as_range(3).unwrap(), 0..3); + assert_eq!(range.as_range(2).unwrap(), 0..2); + + let range = GetRange::Suffix(0); + assert_eq!(range.as_range(0).unwrap(), 0..0); + + let range = GetRange::Offset(2); + let err = range.as_range(2).unwrap_err().to_string(); + assert_eq!( + err, + "Wanted range starting at 2, but object was only 2 bytes long" + ); + + let err = range.as_range(1).unwrap_err().to_string(); + assert_eq!( + err, + "Wanted range starting at 2, but object was only 1 bytes long" + ); + + let range = GetRange::Offset(1); + assert_eq!(range.as_range(2).unwrap(), 1..2); + } } diff --git a/object_store/tests/get_range_file.rs b/object_store/tests/get_range_file.rs index 85231a5a5b9b..f73d78578f08 100644 --- a/object_store/tests/get_range_file.rs +++ b/object_store/tests/get_range_file.rs @@ -93,4 +93,29 @@ async fn test_get_range() { let data = store.get_range(&path, range.clone()).await.unwrap(); assert_eq!(&data[..], &expected[range]) } + + let over_range = 0..(expected.len() * 2); + let data = store.get_range(&path, over_range.clone()).await.unwrap(); + assert_eq!(&data[..], expected) +} + +/// Test that, when a requesting a range which overhangs the end of the resource, +/// the resulting [GetResult::range] reports the returned range, +/// not the requested. +#[tokio::test] +async fn test_get_opts_over_range() { + let tmp = tempdir().unwrap(); + let store = MyStore(LocalFileSystem::new_with_prefix(tmp.path()).unwrap()); + let path = Path::from("foo"); + + let expected = Bytes::from_static(b"hello world"); + store.put(&path, expected.clone()).await.unwrap(); + + let opts = GetOptions { + range: Some(GetRange::Bounded(0..(expected.len() * 2))), + ..Default::default() + }; + let res = store.get_opts(&path, opts).await.unwrap(); + assert_eq!(res.range, 0..expected.len()); + assert_eq!(res.bytes().await.unwrap(), expected); }