From 9220e7dec994f0f190c7a19928440c007fa939ad Mon Sep 17 00:00:00 2001 From: Ben Chambers <35960+bjchambers@users.noreply.github.com> Date: Wed, 5 Jul 2023 12:18:33 -0700 Subject: [PATCH 1/2] feat: Use object_store for metadata This is part of #465. This uses `object_store` to retrieve the metadata files. Additionally, it uses a `select_all` so that all metadata files are read concurrently. --- crates/sparrow-catalog/src/update.rs | 27 +++---- .../src/update/execute_example.rs | 3 - crates/sparrow-main/src/batch.rs | 4 - crates/sparrow-main/src/materialize.rs | 4 - .../sparrow-main/src/serve/compute_service.rs | 1 - .../tests/e2e/fixture/query_fixture.rs | 19 +---- crates/sparrow-runtime/src/execute.rs | 16 ++-- .../src/execute/key_hash_inverse.rs | 73 +++++++++++-------- .../src/metadata/prepared_metadata.rs | 2 +- wren/compute/helpers.go | 2 +- 10 files changed, 68 insertions(+), 83 deletions(-) diff --git a/crates/sparrow-catalog/src/update.rs b/crates/sparrow-catalog/src/update.rs index 3b8fa80f8..f7ac46df5 100644 --- a/crates/sparrow-catalog/src/update.rs +++ b/crates/sparrow-catalog/src/update.rs @@ -3,7 +3,6 @@ use std::path::PathBuf; use error_stack::{IntoReport, ResultExt}; use futures::TryStreamExt; use itertools::Itertools; -use sparrow_runtime::s3::S3Helper; use tracing::{error, info, info_span}; use crate::list_doc_files; @@ -29,12 +28,10 @@ pub(super) async fn update_doc_structs( options: &UpdateOptions, doc_root: PathBuf, ) -> error_stack::Result, Error> { - let s3_helper = S3Helper::new().await; - if let Some(example) = &options.example { let doc_path = doc_root.join(format!("{example}.toml")); error_stack::ensure!(doc_path.is_file(), Error::NonFile(doc_path)); - let changed = update_doc_struct(command, doc_path.clone(), s3_helper) + let changed = update_doc_struct(command, doc_path.clone()) .await .attach_printable_lazy(|| DocFile(doc_path.clone()))?; if changed { @@ -50,17 +47,14 @@ pub(super) async fn update_doc_structs( .map_err(|e| error_stack::report!(e).change_context(Error::ListingFiles)); let changed = file_stream - .map_ok(move |doc_path| { - let s3_helper = s3_helper.clone(); - async move { - let changed = update_doc_struct(command, doc_path.clone(), s3_helper) - .await - .change_context(Error::UpdatingDocs)?; - if changed { - Ok(Some(doc_path)) - } else { - Ok(None) - } + .map_ok(move |doc_path| async move { + let changed = update_doc_struct(command, doc_path.clone()) + .await + .change_context(Error::UpdatingDocs)?; + if changed { + Ok(Some(doc_path)) + } else { + Ok(None) } }) .try_buffer_unordered(4) @@ -107,7 +101,6 @@ impl error_stack::Context for Error {} async fn update_doc_struct( command: UpdateCommand, doc_path: PathBuf, - s3_helper: S3Helper, ) -> error_stack::Result { let input = tokio::fs::read_to_string(&doc_path) .await @@ -139,7 +132,7 @@ async fn update_doc_struct( let span = info_span!("Execute example", function = ?catalog_entry.name, index); let _enter = span.enter(); - let output_csv = execute_example::execute_example(example, s3_helper.clone()) + let output_csv = execute_example::execute_example(example) .await .change_context(Error::ExecuteExample(index))?; example.output_csv = Some(output_csv); diff --git a/crates/sparrow-catalog/src/update/execute_example.rs b/crates/sparrow-catalog/src/update/execute_example.rs index c2a74d08e..ff39e314e 100644 --- a/crates/sparrow-catalog/src/update/execute_example.rs +++ b/crates/sparrow-catalog/src/update/execute_example.rs @@ -13,7 +13,6 @@ use sparrow_api::kaskada::v1alpha::{ }; use sparrow_compiler::InternalCompileOptions; use sparrow_qfr::kaskada::sparrow::v1alpha::FlightRecordHeader; -use sparrow_runtime::s3::S3Helper; use sparrow_runtime::PreparedMetadata; use tempfile::NamedTempFile; use uuid::Uuid; @@ -45,7 +44,6 @@ impl error_stack::Context for Error {} /// Execute the example and return the result as a CSV string. pub(super) async fn execute_example( example: &FunctionExample, - s3_helper: S3Helper, ) -> error_stack::Result { // 1. Prepare the file let mut preparer = ExampleInputPreparer::new(); @@ -106,7 +104,6 @@ pub(super) async fn execute_example( changed_since: None, final_result_time: None, }, - s3_helper, None, None, FlightRecordHeader::default(), diff --git a/crates/sparrow-main/src/batch.rs b/crates/sparrow-main/src/batch.rs index c1c6f33c8..436d88ed2 100644 --- a/crates/sparrow-main/src/batch.rs +++ b/crates/sparrow-main/src/batch.rs @@ -7,7 +7,6 @@ use sparrow_api::kaskada::v1alpha::execute_request::Limits; use sparrow_api::kaskada::v1alpha::{CompileRequest, ExecuteRequest, FenlDiagnostics}; use sparrow_compiler::CompilerOptions; use sparrow_qfr::kaskada::sparrow::v1alpha::FlightRecordHeader; -use sparrow_runtime::s3::S3Helper; use tracing::{info, info_span}; use crate::script::{Schema, Script, ScriptPath}; @@ -112,8 +111,6 @@ impl BatchCommand { }; if !self.compile_only { - let s3_helper = S3Helper::new().await; - if !self.output_dir.exists() { tokio::fs::create_dir_all(&self.output_dir) .await @@ -133,7 +130,6 @@ impl BatchCommand { changed_since: None, final_result_time: None, }, - s3_helper, None, self.flight_record_path, FlightRecordHeader::default(), diff --git a/crates/sparrow-main/src/materialize.rs b/crates/sparrow-main/src/materialize.rs index a8eaa44d5..b35a1f8a7 100644 --- a/crates/sparrow-main/src/materialize.rs +++ b/crates/sparrow-main/src/materialize.rs @@ -8,7 +8,6 @@ use sparrow_api::kaskada::v1alpha::{destination, CompileRequest, ExecuteRequest, use sparrow_compiler::CompilerOptions; use sparrow_qfr::kaskada::sparrow::v1alpha::FlightRecordHeader; -use sparrow_runtime::s3::S3Helper; use tracing::{info, info_span}; use crate::script::{Schema, Script, ScriptPath}; @@ -109,8 +108,6 @@ impl MaterializeCommand { error_stack::bail!(Error::InvalidQuery(diagnostics)); }; - let s3_helper = S3Helper::new().await; - // Note: it might be cleaner to create a separate entry point for materialize, but for now it's ok. let result_stream = sparrow_runtime::execute::execute( ExecuteRequest { @@ -122,7 +119,6 @@ impl MaterializeCommand { changed_since: None, final_result_time: None, }, - s3_helper, Some(script.bounded_lateness_ns), self.flight_record_path, FlightRecordHeader::default(), diff --git a/crates/sparrow-main/src/serve/compute_service.rs b/crates/sparrow-main/src/serve/compute_service.rs index 991e2972c..6455a6be9 100644 --- a/crates/sparrow-main/src/serve/compute_service.rs +++ b/crates/sparrow-main/src/serve/compute_service.rs @@ -272,7 +272,6 @@ async fn execute_impl( let progress_stream = sparrow_runtime::execute::execute( request, - s3_helper.clone(), None, flight_record_local_path, flight_record_header, diff --git a/crates/sparrow-main/tests/e2e/fixture/query_fixture.rs b/crates/sparrow-main/tests/e2e/fixture/query_fixture.rs index a97963dbb..a8b6d158e 100644 --- a/crates/sparrow-main/tests/e2e/fixture/query_fixture.rs +++ b/crates/sparrow-main/tests/e2e/fixture/query_fixture.rs @@ -19,7 +19,6 @@ use sparrow_api::kaskada::v1alpha::{ }; use sparrow_compiler::InternalCompileOptions; use sparrow_qfr::kaskada::sparrow::v1alpha::FlightRecordHeader; -use sparrow_runtime::s3::S3Helper; use crate::DataFixture; @@ -263,11 +262,6 @@ impl QueryFixture { return Err(compile_result.fenl_diagnostics.unwrap_or_default().into()); }; - // TODO: If this is expensive to construct each time in tests (it shouldn't be) - // we could put it in a `[dynamic]` static, and clone it. Or we could have a - // special "for test" version. - let s3_helper = S3Helper::new().await; - let destination = ObjectStoreDestination { output_prefix_uri: format!("file:///{}", output_dir.display()), file_type: output_format.into(), @@ -285,15 +279,10 @@ impl QueryFixture { ..self.execute_request.clone() }; - let mut stream = sparrow_runtime::execute::execute( - request, - s3_helper, - None, - None, - FlightRecordHeader::default(), - ) - .await? - .boxed(); + let mut stream = + sparrow_runtime::execute::execute(request, None, None, FlightRecordHeader::default()) + .await? + .boxed(); let mut output_files = Vec::new(); let mut snapshots = Vec::new(); diff --git a/crates/sparrow-runtime/src/execute.rs b/crates/sparrow-runtime/src/execute.rs index 638504c3c..064361bfe 100644 --- a/crates/sparrow-runtime/src/execute.rs +++ b/crates/sparrow-runtime/src/execute.rs @@ -43,7 +43,6 @@ const STORE_PATH_PREFIX: &str = "compute_snapshot_"; /// execute response. pub async fn execute( request: ExecuteRequest, - s3_helper: S3Helper, bounded_lateness_ns: Option, _flight_record_local_path: Option, _flight_record_header: FlightRecordHeader, @@ -87,6 +86,8 @@ pub async fn execute( .into_report() .change_context(Error::internal_msg("create data context"))?; + let s3_helper = S3Helper::new().await; + // If the snapshot config exists, sparrow should attempt to resume from state, // and store new state. Create a new storage path for the local store to // exist. @@ -156,6 +157,8 @@ pub async fn execute( None }; + let object_stores = ObjectStoreRegistry::default(); + let primary_grouping_key_type = plan .primary_grouping_key_type .to_owned() @@ -177,9 +180,8 @@ pub async fn execute( .change_context(Error::internal_msg("get primary grouping ID"))?; key_hash_inverse - .add_from_data_context(&data_context, primary_group_id, s3_helper.clone()) + .add_from_data_context(&data_context, primary_group_id, &object_stores) .await - .into_report() .change_context(Error::internal_msg("initialize key hash inverse"))?; let key_hash_inverse = Arc::new(ThreadSafeKeyHashInverse::new(key_hash_inverse)); @@ -202,7 +204,7 @@ pub async fn execute( let context = OperationContext { plan, plan_hash, - object_stores: ObjectStoreRegistry::default(), + object_stores, data_context, compute_store, key_hash_inverse, @@ -298,10 +300,10 @@ pub async fn materialize( .into_report() .change_context(Error::internal_msg("get primary grouping ID"))?; + let object_stores = ObjectStoreRegistry::default(); key_hash_inverse - .add_from_data_context(&data_context, primary_group_id, s3_helper.clone()) + .add_from_data_context(&data_context, primary_group_id, &object_stores) .await - .into_report() .change_context(Error::internal_msg("initialize key hash inverse"))?; let key_hash_inverse = Arc::new(ThreadSafeKeyHashInverse::new(key_hash_inverse)); @@ -315,7 +317,7 @@ pub async fn materialize( let context = OperationContext { plan, plan_hash, - object_stores: ObjectStoreRegistry::default(), + object_stores, data_context, compute_store: snapshot_compute_store, key_hash_inverse, diff --git a/crates/sparrow-runtime/src/execute/key_hash_inverse.rs b/crates/sparrow-runtime/src/execute/key_hash_inverse.rs index 1316b2d0f..a078d7196 100644 --- a/crates/sparrow-runtime/src/execute/key_hash_inverse.rs +++ b/crates/sparrow-runtime/src/execute/key_hash_inverse.rs @@ -1,18 +1,19 @@ -use std::path::PathBuf; +use std::str::FromStr; use anyhow::Context; use arrow::array::{Array, ArrayRef, PrimitiveArray, UInt64Array}; use arrow::datatypes::{DataType, UInt64Type}; +use error_stack::{IntoReportCompat, ResultExt}; +use futures::TryStreamExt; use hashbrown::HashMap; -use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use sparrow_arrow::downcast::downcast_primitive_array; use sparrow_compiler::DataContext; use sparrow_instructions::{ComputeStore, StoreKey}; use sparrow_plan::GroupId; -use tempfile::NamedTempFile; -use crate::s3::{self, S3Helper, S3Object}; +use crate::read::ParquetFile; +use crate::stores::{ObjectStoreRegistry, ObjectStoreUrl}; /// Stores the mapping from key hash u64 to the position in the keys array. /// @@ -36,6 +37,18 @@ impl std::fmt::Debug for KeyHashInverse { } } +#[derive(derive_more::Display, Debug)] +pub enum Error { + #[display(fmt = "invalid metadata url '{_0}'")] + InvalidMetadataUrl(String), + #[display(fmt = "failed to open metadata")] + OpeningMetadata, + #[display(fmt = "failed to read metadata")] + ReadingMetadata, +} + +impl error_stack::Context for Error {} + impl KeyHashInverse { /// Restores the KeyHashInverse from the compute store. pub fn restore_from(store: &ComputeStore) -> anyhow::Result { @@ -68,32 +81,37 @@ impl KeyHashInverse { &mut self, data_context: &DataContext, primary_grouping: GroupId, - s3: S3Helper, - ) -> anyhow::Result<()> { + registry: &ObjectStoreRegistry, + ) -> error_stack::Result<(), Error> { let metadata_files = data_context .tables_for_grouping(primary_grouping) .flat_map(|table| table.metadata_for_files()); + let mut streams = Vec::new(); for file in metadata_files { - let file = if s3::is_s3_path(&file) { - let s3_object = S3Object::try_from_uri(&file)?; - let downloaded_file = NamedTempFile::new()?; - let download_file_path = downloaded_file.into_temp_path(); - s3.download_s3(s3_object, &download_file_path).await?; - file_from_path(&download_file_path)? - } else { - file_from_path(&PathBuf::from(file))? - }; - - let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?; - let parquet_reader = parquet_reader.build()?; - for batch in parquet_reader { - let batch = batch?; - let hash_col = batch.column(0); - let hash_col: &UInt64Array = downcast_primitive_array(hash_col.as_ref())?; - let entity_key_col = batch.column(1); - self.add(entity_key_col.to_owned(), hash_col)?; - } + let url = + ObjectStoreUrl::from_str(&file).change_context(Error::InvalidMetadataUrl(file))?; + let file = ParquetFile::try_new(registry, url) + .await + .change_context(Error::OpeningMetadata)?; + let stream = file + .read_stream(None) + .await + .change_context(Error::OpeningMetadata)?; + streams.push(stream); + } + + let mut stream = futures::stream::select_all(streams) + .map_err(|e| e.change_context(Error::ReadingMetadata)); + while let Some(batch) = stream.try_next().await? { + let hash_col = batch.column(0); + let hash_col: &UInt64Array = downcast_primitive_array(hash_col.as_ref()) + .into_report() + .change_context(Error::ReadingMetadata)?; + let entity_key_col = batch.column(1); + self.add(entity_key_col.to_owned(), hash_col) + .into_report() + .change_context(Error::ReadingMetadata)?; } Ok(()) @@ -241,11 +259,6 @@ impl ThreadSafeKeyHashInverse { } } -/// Return the file at a given path. -fn file_from_path(path: &std::path::Path) -> anyhow::Result { - std::fs::File::open(path).with_context(|| format!("unable to open {path:?}")) -} - #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/crates/sparrow-runtime/src/metadata/prepared_metadata.rs b/crates/sparrow-runtime/src/metadata/prepared_metadata.rs index 9ec3dcea8..11dfd0906 100644 --- a/crates/sparrow-runtime/src/metadata/prepared_metadata.rs +++ b/crates/sparrow-runtime/src/metadata/prepared_metadata.rs @@ -96,7 +96,7 @@ impl PreparedMetadata { .unwrap_or(i64::MIN); let path = format!("file://{}", parquet_path.display()); - let metadata_path = format!("{}", metadata_parquet_path.display()); + let metadata_path = format!("file://{}", metadata_parquet_path.display()); Self::try_from_prepared_schema( path, prepared_schema.clone(), diff --git a/wren/compute/helpers.go b/wren/compute/helpers.go index 3b69659d8..00c86484a 100644 --- a/wren/compute/helpers.go +++ b/wren/compute/helpers.go @@ -88,7 +88,7 @@ func getComputePreparedFiles(prepareJobs []*ent.PrepareJob) []*v1alpha.PreparedF MaxEventTime: timestamppb.New(time.Unix(0, preparedFile.MaxEventTime)), MinEventTime: timestamppb.New(time.Unix(0, preparedFile.MinEventTime)), NumRows: preparedFile.RowCount, - MetadataPath: ConvertURIForCompute(metadataPath), + MetadataPath: metadataPath, }) } } From c66c91e92dd0ae22f31bf3c18dad835beb5d2602 Mon Sep 17 00:00:00 2001 From: Ben Chambers <35960+bjchambers@users.noreply.github.com> Date: Wed, 5 Jul 2023 23:09:01 -0700 Subject: [PATCH 2/2] fix --- crates/sparrow-runtime/src/prepare.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/sparrow-runtime/src/prepare.rs b/crates/sparrow-runtime/src/prepare.rs index 5b2b95f49..0dbb2b130 100644 --- a/crates/sparrow-runtime/src/prepare.rs +++ b/crates/sparrow-runtime/src/prepare.rs @@ -296,9 +296,12 @@ async fn upload_prepared_files( || Error::InvalidUrl(metadata_prepared_url.as_str().to_string()), )?; + let local_path = prepared_metadata + .path + .strip_prefix("file://") + .expect("starts with file://"); parquet_uploads.push( - object_store_registry - .upload(prepare_object_store_url, Path::new(&prepared_metadata.path)), + object_store_registry.upload(prepare_object_store_url, Path::new(local_path)), ); parquet_uploads.push(object_store_registry.upload(