From 5bb1c14131511942966ac44b4ff3f9d603ea290e Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Mon, 12 Dec 2022 12:09:11 +0100 Subject: [PATCH 1/4] feat(object_store): add PrefixObjectStore --- object_store/src/lib.rs | 1 + object_store/src/prefix.rs | 291 +++++++++++++++++++++++++++++++++++++ 2 files changed, 292 insertions(+) create mode 100644 object_store/src/prefix.rs diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index ec41f381228b..0cd56612ee45 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -170,6 +170,7 @@ pub mod limit; pub mod local; pub mod memory; pub mod path; +pub mod prefix; pub mod throttle; #[cfg(any(feature = "gcp", feature = "aws", feature = "azure"))] diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs new file mode 100644 index 000000000000..b22977b3a51a --- /dev/null +++ b/object_store/src/prefix.rs @@ -0,0 +1,291 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! An object store wrapper handling a constant path prefix +use bytes::Bytes; +use futures::{stream::BoxStream, StreamExt, TryStreamExt}; +use std::ops::Range; +use tokio::io::AsyncWrite; + +use crate::path::{Path, DELIMITER}; +use crate::{ + GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, + Result as ObjectStoreResult, +}; + +/// Store wrapper that applies a constant prefix to all paths handled by the store. +#[derive(Debug, Clone)] +pub struct PrefixObjectStore { + prefix: Path, + inner: T, +} + +impl std::fmt::Display for PrefixObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "PrefixObjectStore({})", self.prefix.as_ref()) + } +} + +impl PrefixObjectStore { + /// Create a new instance of [`PrefixObjectStore`] + pub fn new(store: T, prefix: impl Into) -> Self { + Self { + prefix: prefix.into(), + inner: store, + } + } + + /// Create the full path from a path relative to prefix + fn full_path(&self, location: &Path) -> ObjectStoreResult { + let path: &str = location.as_ref(); + let stripped = match self.prefix.as_ref() { + "" => path.to_string(), + p => format!("{}/{}", p, path), + }; + Ok(Path::parse(stripped.trim_end_matches(DELIMITER))?) + } + + /// Strip the constant prefix from a given path + fn strip_prefix(&self, path: &Path) -> Option { + let path: &str = path.as_ref(); + let stripped = match self.prefix.as_ref() { + "" => path, + p => path.strip_prefix(p)?.strip_prefix(DELIMITER)?, + }; + Path::parse(stripped).ok() + } +} + +#[async_trait::async_trait] +impl ObjectStore for PrefixObjectStore { + /// Save the provided bytes to the specified location. + async fn put(&self, location: &Path, bytes: Bytes) -> ObjectStoreResult<()> { + let full_path = self.full_path(location)?; + self.inner.put(&full_path, bytes).await + } + + /// Return the bytes that are stored at the specified location. + async fn get(&self, location: &Path) -> ObjectStoreResult { + let full_path = self.full_path(location)?; + self.inner.get(&full_path).await + } + + /// Return the bytes that are stored at the specified location + /// in the given byte range + async fn get_range( + &self, + location: &Path, + range: Range, + ) -> ObjectStoreResult { + let full_path = self.full_path(location)?; + self.inner.get_range(&full_path, range).await + } + + /// Return the metadata for the specified location + async fn head(&self, location: &Path) -> ObjectStoreResult { + let full_path = self.full_path(location)?; + self.inner.head(&full_path).await.map(|meta| ObjectMeta { + last_modified: meta.last_modified, + size: meta.size, + location: self.strip_prefix(&meta.location).unwrap_or(meta.location), + }) + } + + /// Delete the object at the specified location. + async fn delete(&self, location: &Path) -> ObjectStoreResult<()> { + let full_path = self.full_path(location)?; + self.inner.delete(&full_path).await + } + + /// List all the objects with the given prefix. + /// + /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of + /// `foo/bar_baz/x`. + async fn list( + &self, + prefix: Option<&Path>, + ) -> ObjectStoreResult>> { + let prefix = prefix.and_then(|p| self.full_path(p).ok()); + Ok(self + .inner + .list(Some(&prefix.unwrap_or_else(|| self.prefix.clone()))) + .await? + .map_ok(|meta| ObjectMeta { + last_modified: meta.last_modified, + size: meta.size, + location: self.strip_prefix(&meta.location).unwrap_or(meta.location), + }) + .boxed()) + } + + /// List objects with the given prefix and an implementation specific + /// delimiter. Returns common prefixes (directories) in addition to object + /// metadata. + /// + /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of + /// `foo/bar_baz/x`. + async fn list_with_delimiter( + &self, + prefix: Option<&Path>, + ) -> ObjectStoreResult { + let prefix = prefix.and_then(|p| self.full_path(p).ok()); + self.inner + .list_with_delimiter(Some(&prefix.unwrap_or_else(|| self.prefix.clone()))) + .await + .map(|lst| ListResult { + common_prefixes: lst + .common_prefixes + .iter() + .map(|p| self.strip_prefix(p).unwrap_or_else(|| p.clone())) + .collect(), + objects: lst + .objects + .iter() + .map(|meta| ObjectMeta { + last_modified: meta.last_modified, + size: meta.size, + location: self + .strip_prefix(&meta.location) + .unwrap_or_else(|| meta.location.clone()), + }) + .collect(), + }) + } + + /// Copy an object from one path to another in the same object store. + /// + /// If there exists an object at the destination, it will be overwritten. + async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + let full_from = self.full_path(from)?; + let full_to = self.full_path(to)?; + self.inner.copy(&full_from, &full_to).await + } + + /// Copy an object from one path to another, only if destination is empty. + /// + /// Will return an error if the destination already has an object. + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + let full_from = self.full_path(from)?; + let full_to = self.full_path(to)?; + self.inner.copy_if_not_exists(&full_from, &full_to).await + } + + /// Move an object from one path to another in the same object store. + /// + /// Will return an error if the destination already has an object. + async fn rename_if_not_exists( + &self, + from: &Path, + to: &Path, + ) -> ObjectStoreResult<()> { + let full_from = self.full_path(from)?; + let full_to = self.full_path(to)?; + self.inner.rename_if_not_exists(&full_from, &full_to).await + } + + async fn put_multipart( + &self, + location: &Path, + ) -> ObjectStoreResult<(MultipartId, Box)> { + let full_path = self.full_path(location)?; + self.inner.put_multipart(&full_path).await + } + + async fn abort_multipart( + &self, + location: &Path, + multipart_id: &MultipartId, + ) -> ObjectStoreResult<()> { + let full_path = self.full_path(location)?; + self.inner.abort_multipart(&full_path, multipart_id).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::local::LocalFileSystem; + use crate::test_util::flatten_list_stream; + use crate::tests::{ + copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter, + put_get_delete_list, rename_and_copy, stream_get, + }; + + use tempfile::TempDir; + + #[tokio::test] + async fn prefix_test() { + let root = TempDir::new().unwrap(); + let inner = LocalFileSystem::new_with_prefix(root.path()).unwrap(); + let integration = PrefixObjectStore::new(inner, "prefix"); + + put_get_delete_list(&integration).await; + list_uses_directories_correctly(&integration).await; + list_with_delimiter(&integration).await; + rename_and_copy(&integration).await; + copy_if_not_exists(&integration).await; + stream_get(&integration).await; + } + + #[tokio::test] + async fn prefix_test_applies_prefix() { + let tmpdir = TempDir::new().unwrap(); + let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap(); + + let location = Path::from("prefix/test_file.json"); + let data = Bytes::from("arbitrary data"); + let expected_data = data.clone(); + + local.put(&location, data).await.unwrap(); + + let prefix = PrefixObjectStore::new(local, "prefix"); + let location_prefix = Path::from("test_file.json"); + + let content_list = flatten_list_stream(&prefix, None).await.unwrap(); + assert_eq!(content_list, &[location_prefix.clone()]); + + let root = Path::from("/"); + let content_list = flatten_list_stream(&prefix, Some(&root)).await.unwrap(); + assert_eq!(content_list, &[location_prefix.clone()]); + + let read_data = prefix + .get(&location_prefix) + .await + .unwrap() + .bytes() + .await + .unwrap(); + assert_eq!(&*read_data, expected_data); + + let target_prefix = Path::from("/test_written.json"); + prefix + .put(&target_prefix, expected_data.clone()) + .await + .unwrap(); + + prefix.delete(&location_prefix).await.unwrap(); + + let local = LocalFileSystem::new_with_prefix(tmpdir.path()).unwrap(); + + let err = local.get(&location).await.unwrap_err(); + assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err); + + let location = Path::from("prefix/test_written.json"); + let read_data = local.get(&location).await.unwrap().bytes().await.unwrap(); + assert_eq!(&*read_data, expected_data) + } +} From 6a7824ffbed779bd0ee1042e0dab558a9dfe0a93 Mon Sep 17 00:00:00 2001 From: Robert Pack <42610831+roeap@users.noreply.github.com> Date: Mon, 12 Dec 2022 13:20:45 +0100 Subject: [PATCH 2/4] Apply suggestions from code review Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- object_store/src/prefix.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index b22977b3a51a..0aa3a34edb24 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -56,7 +56,7 @@ impl PrefixObjectStore { "" => path.to_string(), p => format!("{}/{}", p, path), }; - Ok(Path::parse(stripped.trim_end_matches(DELIMITER))?) + Ok(Path::parse(stripped)?) } /// Strip the constant prefix from a given path @@ -150,18 +150,16 @@ impl ObjectStore for PrefixObjectStore { common_prefixes: lst .common_prefixes .iter() - .map(|p| self.strip_prefix(p).unwrap_or_else(|| p.clone())) + .filter_map(|p| self.strip_prefix(p)) .collect(), objects: lst .objects .iter() - .map(|meta| ObjectMeta { + .filter_map(|meta| Some(ObjectMeta { last_modified: meta.last_modified, size: meta.size, - location: self - .strip_prefix(&meta.location) - .unwrap_or_else(|| meta.location.clone()), - }) + location: self.strip_prefix(&meta.location)? + })) .collect(), }) } From 278340424bbb2317a28aff6e259b788ea1561e3a Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Mon, 12 Dec 2022 14:02:42 +0100 Subject: [PATCH 3/4] chore: PR comments --- object_store/src/prefix.rs | 43 ++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index 0aa3a34edb24..521431f03f03 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -21,7 +21,7 @@ use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use std::ops::Range; use tokio::io::AsyncWrite; -use crate::path::{Path, DELIMITER}; +use crate::path::Path; use crate::{ GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result as ObjectStoreResult, @@ -61,12 +61,7 @@ impl PrefixObjectStore { /// Strip the constant prefix from a given path fn strip_prefix(&self, path: &Path) -> Option { - let path: &str = path.as_ref(); - let stripped = match self.prefix.as_ref() { - "" => path, - p => path.strip_prefix(p)?.strip_prefix(DELIMITER)?, - }; - Path::parse(stripped).ok() + Some(path.prefix_match(&self.prefix)?.collect()) } } @@ -119,17 +114,17 @@ impl ObjectStore for PrefixObjectStore { &self, prefix: Option<&Path>, ) -> ObjectStoreResult>> { - let prefix = prefix.and_then(|p| self.full_path(p).ok()); - Ok(self - .inner - .list(Some(&prefix.unwrap_or_else(|| self.prefix.clone()))) - .await? - .map_ok(|meta| ObjectMeta { - last_modified: meta.last_modified, - size: meta.size, - location: self.strip_prefix(&meta.location).unwrap_or(meta.location), - }) - .boxed()) + Ok(match &prefix.and_then(|p| self.full_path(p).ok()) { + Some(p) => self.inner.list(Some(p)), + None => self.inner.list(Some(&self.prefix)), + } + .await? + .map_ok(|meta| ObjectMeta { + last_modified: meta.last_modified, + size: meta.size, + location: self.strip_prefix(&meta.location).unwrap_or(meta.location), + }) + .boxed()) } /// List objects with the given prefix and an implementation specific @@ -155,11 +150,13 @@ impl ObjectStore for PrefixObjectStore { objects: lst .objects .iter() - .filter_map(|meta| Some(ObjectMeta { - last_modified: meta.last_modified, - size: meta.size, - location: self.strip_prefix(&meta.location)? - })) + .filter_map(|meta| { + Some(ObjectMeta { + last_modified: meta.last_modified, + size: meta.size, + location: self.strip_prefix(&meta.location)?, + }) + }) .collect(), }) } From 7e24eedc727b8da0a0ba0950debefbc095a84237 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Mon, 12 Dec 2022 15:35:03 +0100 Subject: [PATCH 4/4] refactor: infallible full_path --- object_store/src/prefix.rs | 61 +++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 33 deletions(-) diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index 521431f03f03..d61fc22271a2 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -50,13 +50,8 @@ impl PrefixObjectStore { } /// Create the full path from a path relative to prefix - fn full_path(&self, location: &Path) -> ObjectStoreResult { - let path: &str = location.as_ref(); - let stripped = match self.prefix.as_ref() { - "" => path.to_string(), - p => format!("{}/{}", p, path), - }; - Ok(Path::parse(stripped)?) + fn full_path(&self, location: &Path) -> Path { + self.prefix.parts().chain(location.parts()).collect() } /// Strip the constant prefix from a given path @@ -69,13 +64,13 @@ impl PrefixObjectStore { impl ObjectStore for PrefixObjectStore { /// Save the provided bytes to the specified location. async fn put(&self, location: &Path, bytes: Bytes) -> ObjectStoreResult<()> { - let full_path = self.full_path(location)?; + let full_path = self.full_path(location); self.inner.put(&full_path, bytes).await } /// Return the bytes that are stored at the specified location. async fn get(&self, location: &Path) -> ObjectStoreResult { - let full_path = self.full_path(location)?; + let full_path = self.full_path(location); self.inner.get(&full_path).await } @@ -86,13 +81,13 @@ impl ObjectStore for PrefixObjectStore { location: &Path, range: Range, ) -> ObjectStoreResult { - let full_path = self.full_path(location)?; + let full_path = self.full_path(location); self.inner.get_range(&full_path, range).await } /// Return the metadata for the specified location async fn head(&self, location: &Path) -> ObjectStoreResult { - let full_path = self.full_path(location)?; + let full_path = self.full_path(location); self.inner.head(&full_path).await.map(|meta| ObjectMeta { last_modified: meta.last_modified, size: meta.size, @@ -102,7 +97,7 @@ impl ObjectStore for PrefixObjectStore { /// Delete the object at the specified location. async fn delete(&self, location: &Path) -> ObjectStoreResult<()> { - let full_path = self.full_path(location)?; + let full_path = self.full_path(location); self.inner.delete(&full_path).await } @@ -114,17 +109,16 @@ impl ObjectStore for PrefixObjectStore { &self, prefix: Option<&Path>, ) -> ObjectStoreResult>> { - Ok(match &prefix.and_then(|p| self.full_path(p).ok()) { - Some(p) => self.inner.list(Some(p)), - None => self.inner.list(Some(&self.prefix)), - } - .await? - .map_ok(|meta| ObjectMeta { - last_modified: meta.last_modified, - size: meta.size, - location: self.strip_prefix(&meta.location).unwrap_or(meta.location), - }) - .boxed()) + Ok(self + .inner + .list(Some(&self.full_path(prefix.unwrap_or(&Path::from("/"))))) + .await? + .map_ok(|meta| ObjectMeta { + last_modified: meta.last_modified, + size: meta.size, + location: self.strip_prefix(&meta.location).unwrap_or(meta.location), + }) + .boxed()) } /// List objects with the given prefix and an implementation specific @@ -137,9 +131,10 @@ impl ObjectStore for PrefixObjectStore { &self, prefix: Option<&Path>, ) -> ObjectStoreResult { - let prefix = prefix.and_then(|p| self.full_path(p).ok()); self.inner - .list_with_delimiter(Some(&prefix.unwrap_or_else(|| self.prefix.clone()))) + .list_with_delimiter(Some( + &self.full_path(prefix.unwrap_or(&Path::from("/"))), + )) .await .map(|lst| ListResult { common_prefixes: lst @@ -165,8 +160,8 @@ impl ObjectStore for PrefixObjectStore { /// /// If there exists an object at the destination, it will be overwritten. async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { - let full_from = self.full_path(from)?; - let full_to = self.full_path(to)?; + let full_from = self.full_path(from); + let full_to = self.full_path(to); self.inner.copy(&full_from, &full_to).await } @@ -174,8 +169,8 @@ impl ObjectStore for PrefixObjectStore { /// /// Will return an error if the destination already has an object. async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { - let full_from = self.full_path(from)?; - let full_to = self.full_path(to)?; + let full_from = self.full_path(from); + let full_to = self.full_path(to); self.inner.copy_if_not_exists(&full_from, &full_to).await } @@ -187,8 +182,8 @@ impl ObjectStore for PrefixObjectStore { from: &Path, to: &Path, ) -> ObjectStoreResult<()> { - let full_from = self.full_path(from)?; - let full_to = self.full_path(to)?; + let full_from = self.full_path(from); + let full_to = self.full_path(to); self.inner.rename_if_not_exists(&full_from, &full_to).await } @@ -196,7 +191,7 @@ impl ObjectStore for PrefixObjectStore { &self, location: &Path, ) -> ObjectStoreResult<(MultipartId, Box)> { - let full_path = self.full_path(location)?; + let full_path = self.full_path(location); self.inner.put_multipart(&full_path).await } @@ -205,7 +200,7 @@ impl ObjectStore for PrefixObjectStore { location: &Path, multipart_id: &MultipartId, ) -> ObjectStoreResult<()> { - let full_path = self.full_path(location)?; + let full_path = self.full_path(location); self.inner.abort_multipart(&full_path, multipart_id).await } }