Skip to content

Commit

Permalink
ref: Cleanup object_store code a bit (#501)
Browse files Browse the repository at this point in the history
Move `upload` and `download` to the root `ObjectStoreRegistry`.

Move `object_store_key` to a separate file and make that an
implementation detail.

This is part of cleanup after most of #465.
  • Loading branch information
bjchambers authored Jul 12, 2023
2 parents f9fec6d + 6fe96c9 commit b05855f
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 306 deletions.
20 changes: 7 additions & 13 deletions crates/sparrow-runtime/src/metadata/raw_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,28 +139,22 @@ impl RawMetadata {
let object_store_url = ObjectStoreUrl::from_str(path)
.change_context_lazy(|| Error::ObjectStore(path.to_owned()))?;

if object_store_url.is_local() {
let path = object_store_url
.path()
.change_context_lazy(|| Error::ObjectStore(path.to_owned()))?
.to_string();
let path = format!("/{}", path);
let file = file_from_path(std::path::Path::new(&path))
if let Some(local_path) = object_store_url.local_path() {
let file = file_from_path(local_path)
.into_report()
.change_context_lazy(|| Error::LocalFile)?;
Self::try_from_csv_reader(file)
} else {
let download_file = NamedTempFile::new()
.into_report()
.change_context_lazy(|| Error::Download)?;
object_store_url
.download(object_stores, download_file.path())
object_stores
.download(object_store_url, download_file.path())
.await
.change_context_lazy(|| Error::Download)?;
let file = file_from_path(download_file.path())
.into_report()
.change_context_lazy(|| Error::Download)?;
Self::try_from_csv_reader(file)
// Pass the download_file (which implements read) directly.
// The file will be deleted when the reader completes.
Self::try_from_csv_reader(download_file)
}
}

Expand Down
8 changes: 3 additions & 5 deletions crates/sparrow-runtime/src/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,10 @@ pub async fn prepared_batches<'a>(
// For CSV we need to download the file (for now) to perform inference.
// We could improve this by looking at the size and creating an in-memory
// buffer and/or looking at a prefix of the file...
url.download(object_stores, local_file.path())
object_stores
.download(url, local_file.path())
.await
.change_context_lazy(|| Error::DownloadingObject {
url,
local: local_file.path().to_owned(),
})?;
.change_context(Error::DownloadingObject)?;

// Transfer the local file to the reader. When the CSV reader
// completes the reader will be dropped, and the file deleted.
Expand Down
27 changes: 3 additions & 24 deletions crates/sparrow-runtime/src/prepare/error.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,7 @@
use std::path::PathBuf;

use url::Url;

use crate::stores::ObjectStoreUrl;

#[derive(derive_more::Display, Debug)]
pub enum Error {
#[display(fmt = "unable to open '{path:?}'")]
OpenFile { path: PathBuf },
#[display(fmt = "unable to create '{path:?}'")]
CreateFile { path: PathBuf },
#[display(fmt = "internal error")]
Internal,
#[display(fmt = "failed to create Parquet file reader")]
Expand All @@ -24,8 +16,6 @@ pub enum Error {
PreparingColumn,
#[display(fmt = "sorting batch")]
SortingBatch,
#[display(fmt = "determine metadata")]
DetermineMetadata,
#[display(fmt = "invalid schema provided")]
ReadSchema,
#[display(fmt = "failed to write to '{_0}")]
Expand All @@ -39,16 +29,8 @@ pub enum Error {
slice_plan: String,
table_config: String,
},
#[display(fmt = "unsupported source {_0:?}")]
UnsupportedOutputPath(&'static str),
#[display(fmt = "invalid input path")]
InvalidInputPath,
#[display(fmt = "failed to read input")]
ReadInput,
#[display(fmt = "failed to upload result")]
UploadResult,
#[display(fmt = "downloading object {url} to path {}", "local.display()")]
DownloadingObject { url: ObjectStoreUrl, local: PathBuf },
#[display(fmt = "downloading object to prepare")]
DownloadingObject,
#[display(fmt = "invalid url: {_0}")]
InvalidUrl(String),
}
Expand All @@ -58,10 +40,7 @@ impl error_stack::Context for Error {}
impl sparrow_core::ErrorCode for Error {
fn error_code(&self) -> tonic::Code {
match self {
Self::MissingField(_) | Self::IncorrectSlicePlan { .. } | Self::InvalidInputPath => {
tonic::Code::InvalidArgument
}
Self::UnsupportedOutputPath(_) => tonic::Code::Unimplemented,
Self::MissingField(_) | Self::IncorrectSlicePlan { .. } => tonic::Code::InvalidArgument,
_ => tonic::Code::Internal,
}
}
Expand Down
5 changes: 3 additions & 2 deletions crates/sparrow-runtime/src/stores.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
mod object_meta_ext;
mod object_store_key;
pub mod object_store_url;
mod object_stores;
mod registry;

pub use object_meta_ext::ObjectMetaExt;
pub use object_store_url::ObjectStoreUrl;
pub use object_stores::ObjectStoreRegistry;
pub use registry::ObjectStoreRegistry;
179 changes: 179 additions & 0 deletions crates/sparrow-runtime/src/stores/object_store_key.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
use itertools::Itertools;
use url::Url;

#[derive(derive_more::Display, Debug)]
pub enum Error {
#[display(fmt = "missing host in URL '{_0}'")]
MissingHost(Url),
#[display(fmt = "unsupported scheme '{}' in URL '{_0}'", "_0.scheme()")]
UnsupportedScheme(Url),
#[display(fmt = "invalid path '{}' in URL '{_0}'", "_0.path()")]
InvalidPath(Url),
#[display(fmt = "missing host in URL '{_0}'")]
UnsupportedHost(Url),
}

impl error_stack::Context for Error {}

#[derive(Debug, Hash, Eq, PartialEq, Clone, serde::Serialize, serde::Deserialize)]
pub(super) enum ObjectStoreKey {
Local,
Memory,
Aws {
bucket: String,
region: Option<String>,
virtual_hosted_style_request: bool,
},
Gcs {
bucket: String,
},
}

impl ObjectStoreKey {
pub(super) fn from_url(url: &Url) -> error_stack::Result<Self, Error> {
match url.scheme() {
"file" => Ok(Self::Local),
"mem" => Ok(Self::Memory),
// S3 is the traditional S3 prefix for reading from S3.
// S3a is the protocol designed for scalability with Hadoop reading in mind.
// See: https://aws.amazon.com/blogs/opensource/community-collaboration-the-s3a-story/
"s3" | "s3a" => {
let bucket = url
.host_str()
.ok_or_else(|| Error::MissingHost(url.clone()))?
.to_owned();
// For traditional S3 paths, the `host` should be just the bucket.
// We use this as the key. The creation of the S3 object store will
// parse out the bucket and other parts of the URL as needed.
Ok(Self::Aws {
bucket,
region: None,
virtual_hosted_style_request: false,
})
}
"https" => {
let host = url
.host_str()
.ok_or_else(|| Error::MissingHost(url.clone()))?;

match host.splitn(4, '.').collect_tuple() {
Some(("s3", bucket, "amazonaws", "com")) => Ok(Self::Aws {
bucket: bucket.to_owned(),
region: None,
virtual_hosted_style_request: false,
}),
Some((bucket, "s3", region, "amazonaws.com")) => Ok(Self::Aws {
bucket: bucket.to_owned(),
region: Some(region.to_owned()),
virtual_hosted_style_request: true,
}),
Some((bucket, "storage", "googleapis", "com")) => Ok(Self::Gcs {
bucket: bucket.to_owned(),
}),
Some(("storage", "cloud", "google", "com")) => {
let mut path = url
.path_segments()
.ok_or_else(|| Error::InvalidPath(url.clone()))?;
let bucket = path.next().ok_or_else(|| Error::InvalidPath(url.clone()))?;
Ok(Self::Gcs {
bucket: bucket.to_owned(),
})
}
_ => error_stack::bail!(Error::UnsupportedHost(url.clone())),
}
}
"gs" => {
let bucket = url
.host_str()
.ok_or_else(|| Error::MissingHost(url.clone()))?
.to_owned();
Ok(Self::Gcs { bucket })
}
_ => {
error_stack::bail!(Error::UnsupportedScheme(url.clone()))
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;

fn key_from_url(url: &str) -> error_stack::Result<ObjectStoreKey, Error> {
let url = Url::parse(url).expect("valid URL");
ObjectStoreKey::from_url(&url)
}

#[test]
fn test_local_urls() {
assert_eq!(key_from_url("file:///foo").unwrap(), ObjectStoreKey::Local);
assert_eq!(key_from_url("FILE:///foo").unwrap(), ObjectStoreKey::Local);
}

#[test]
fn test_memory_urls() {
assert_eq!(key_from_url("mem:///foo").unwrap(), ObjectStoreKey::Memory);
assert_eq!(key_from_url("mem:foo").unwrap(), ObjectStoreKey::Memory);
}

#[test]
fn test_aws_urls() {
assert_eq!(
key_from_url("s3://bucket/path").unwrap(),
ObjectStoreKey::Aws {
bucket: "bucket".to_owned(),
region: None,
virtual_hosted_style_request: false,
}
);
assert_eq!(
key_from_url("s3a://bucket/foo").unwrap(),
ObjectStoreKey::Aws {
bucket: "bucket".to_owned(),
region: None,
virtual_hosted_style_request: false,
}
);
assert_eq!(
key_from_url("https://s3.bucket.amazonaws.com/foo").unwrap(),
ObjectStoreKey::Aws {
bucket: "bucket".to_owned(),
region: None,
virtual_hosted_style_request: false,
}
);
assert_eq!(
key_from_url("https://bucket.s3.region.amazonaws.com/foo").unwrap(),
ObjectStoreKey::Aws {
bucket: "bucket".to_owned(),
region: Some("region".to_owned()),
virtual_hosted_style_request: true
}
);
}

#[test]
fn test_gcp_urls() {
assert_eq!(
key_from_url("gs://bucket/path").unwrap(),
ObjectStoreKey::Gcs {
bucket: "bucket".to_owned()
}
);

assert_eq!(
key_from_url("https://bucket.storage.googleapis.com/path").unwrap(),
ObjectStoreKey::Gcs {
bucket: "bucket".to_owned()
}
);

assert_eq!(
key_from_url("https://storage.cloud.google.com/bucket/path").unwrap(),
ObjectStoreKey::Gcs {
bucket: "bucket".to_owned()
}
);
}
}
Loading

0 comments on commit b05855f

Please sign in to comment.