Skip to content

Commit

Permalink
feat: Use object_store for metadata (#476)
Browse files Browse the repository at this point in the history
This is part of #465.

This uses `object_store` to retrieve the metadata files. Additionally,
it uses a `select_all` so that all metadata files are read concurrently.
  • Loading branch information
bjchambers authored Jul 6, 2023
2 parents 7858a62 + c66c91e commit 74e6299
Show file tree
Hide file tree
Showing 11 changed files with 73 additions and 85 deletions.
27 changes: 10 additions & 17 deletions crates/sparrow-catalog/src/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::path::PathBuf;
use error_stack::{IntoReport, ResultExt};
use futures::TryStreamExt;
use itertools::Itertools;
use sparrow_runtime::s3::S3Helper;
use tracing::{error, info, info_span};

use crate::list_doc_files;
Expand All @@ -29,12 +28,10 @@ pub(super) async fn update_doc_structs(
options: &UpdateOptions,
doc_root: PathBuf,
) -> error_stack::Result<Vec<PathBuf>, Error> {
let s3_helper = S3Helper::new().await;

if let Some(example) = &options.example {
let doc_path = doc_root.join(format!("{example}.toml"));
error_stack::ensure!(doc_path.is_file(), Error::NonFile(doc_path));
let changed = update_doc_struct(command, doc_path.clone(), s3_helper)
let changed = update_doc_struct(command, doc_path.clone())
.await
.attach_printable_lazy(|| DocFile(doc_path.clone()))?;
if changed {
Expand All @@ -50,17 +47,14 @@ pub(super) async fn update_doc_structs(
.map_err(|e| error_stack::report!(e).change_context(Error::ListingFiles));

let changed = file_stream
.map_ok(move |doc_path| {
let s3_helper = s3_helper.clone();
async move {
let changed = update_doc_struct(command, doc_path.clone(), s3_helper)
.await
.change_context(Error::UpdatingDocs)?;
if changed {
Ok(Some(doc_path))
} else {
Ok(None)
}
.map_ok(move |doc_path| async move {
let changed = update_doc_struct(command, doc_path.clone())
.await
.change_context(Error::UpdatingDocs)?;
if changed {
Ok(Some(doc_path))
} else {
Ok(None)
}
})
.try_buffer_unordered(4)
Expand Down Expand Up @@ -107,7 +101,6 @@ impl error_stack::Context for Error {}
async fn update_doc_struct(
command: UpdateCommand,
doc_path: PathBuf,
s3_helper: S3Helper,
) -> error_stack::Result<bool, Error> {
let input = tokio::fs::read_to_string(&doc_path)
.await
Expand Down Expand Up @@ -139,7 +132,7 @@ async fn update_doc_struct(
let span = info_span!("Execute example", function = ?catalog_entry.name, index);
let _enter = span.enter();

let output_csv = execute_example::execute_example(example, s3_helper.clone())
let output_csv = execute_example::execute_example(example)
.await
.change_context(Error::ExecuteExample(index))?;
example.output_csv = Some(output_csv);
Expand Down
3 changes: 0 additions & 3 deletions crates/sparrow-catalog/src/update/execute_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use sparrow_api::kaskada::v1alpha::{
};
use sparrow_compiler::InternalCompileOptions;
use sparrow_qfr::kaskada::sparrow::v1alpha::FlightRecordHeader;
use sparrow_runtime::s3::S3Helper;
use sparrow_runtime::PreparedMetadata;
use tempfile::NamedTempFile;
use uuid::Uuid;
Expand Down Expand Up @@ -45,7 +44,6 @@ impl error_stack::Context for Error {}
/// Execute the example and return the result as a CSV string.
pub(super) async fn execute_example(
example: &FunctionExample,
s3_helper: S3Helper,
) -> error_stack::Result<String, Error> {
// 1. Prepare the file
let mut preparer = ExampleInputPreparer::new();
Expand Down Expand Up @@ -106,7 +104,6 @@ pub(super) async fn execute_example(
changed_since: None,
final_result_time: None,
},
s3_helper,
None,
None,
FlightRecordHeader::default(),
Expand Down
4 changes: 0 additions & 4 deletions crates/sparrow-main/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use sparrow_api::kaskada::v1alpha::execute_request::Limits;
use sparrow_api::kaskada::v1alpha::{CompileRequest, ExecuteRequest, FenlDiagnostics};
use sparrow_compiler::CompilerOptions;
use sparrow_qfr::kaskada::sparrow::v1alpha::FlightRecordHeader;
use sparrow_runtime::s3::S3Helper;
use tracing::{info, info_span};

use crate::script::{Schema, Script, ScriptPath};
Expand Down Expand Up @@ -112,8 +111,6 @@ impl BatchCommand {
};

if !self.compile_only {
let s3_helper = S3Helper::new().await;

if !self.output_dir.exists() {
tokio::fs::create_dir_all(&self.output_dir)
.await
Expand All @@ -133,7 +130,6 @@ impl BatchCommand {
changed_since: None,
final_result_time: None,
},
s3_helper,
None,
self.flight_record_path,
FlightRecordHeader::default(),
Expand Down
4 changes: 0 additions & 4 deletions crates/sparrow-main/src/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use sparrow_api::kaskada::v1alpha::{destination, CompileRequest, ExecuteRequest,

use sparrow_compiler::CompilerOptions;
use sparrow_qfr::kaskada::sparrow::v1alpha::FlightRecordHeader;
use sparrow_runtime::s3::S3Helper;
use tracing::{info, info_span};

use crate::script::{Schema, Script, ScriptPath};
Expand Down Expand Up @@ -109,8 +108,6 @@ impl MaterializeCommand {
error_stack::bail!(Error::InvalidQuery(diagnostics));
};

let s3_helper = S3Helper::new().await;

// Note: it might be cleaner to create a separate entry point for materialize, but for now it's ok.
let result_stream = sparrow_runtime::execute::execute(
ExecuteRequest {
Expand All @@ -122,7 +119,6 @@ impl MaterializeCommand {
changed_since: None,
final_result_time: None,
},
s3_helper,
Some(script.bounded_lateness_ns),
self.flight_record_path,
FlightRecordHeader::default(),
Expand Down
1 change: 0 additions & 1 deletion crates/sparrow-main/src/serve/compute_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ async fn execute_impl(

let progress_stream = sparrow_runtime::execute::execute(
request,
s3_helper.clone(),
None,
flight_record_local_path,
flight_record_header,
Expand Down
19 changes: 4 additions & 15 deletions crates/sparrow-main/tests/e2e/fixture/query_fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use sparrow_api::kaskada::v1alpha::{
};
use sparrow_compiler::InternalCompileOptions;
use sparrow_qfr::kaskada::sparrow::v1alpha::FlightRecordHeader;
use sparrow_runtime::s3::S3Helper;

use crate::DataFixture;

Expand Down Expand Up @@ -263,11 +262,6 @@ impl QueryFixture {
return Err(compile_result.fenl_diagnostics.unwrap_or_default().into());
};

// TODO: If this is expensive to construct each time in tests (it shouldn't be)
// we could put it in a `[dynamic]` static, and clone it. Or we could have a
// special "for test" version.
let s3_helper = S3Helper::new().await;

let destination = ObjectStoreDestination {
output_prefix_uri: format!("file:///{}", output_dir.display()),
file_type: output_format.into(),
Expand All @@ -285,15 +279,10 @@ impl QueryFixture {
..self.execute_request.clone()
};

let mut stream = sparrow_runtime::execute::execute(
request,
s3_helper,
None,
None,
FlightRecordHeader::default(),
)
.await?
.boxed();
let mut stream =
sparrow_runtime::execute::execute(request, None, None, FlightRecordHeader::default())
.await?
.boxed();

let mut output_files = Vec::new();
let mut snapshots = Vec::new();
Expand Down
16 changes: 9 additions & 7 deletions crates/sparrow-runtime/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ const STORE_PATH_PREFIX: &str = "compute_snapshot_";
/// execute response.
pub async fn execute(
request: ExecuteRequest,
s3_helper: S3Helper,
bounded_lateness_ns: Option<i64>,
_flight_record_local_path: Option<std::path::PathBuf>,
_flight_record_header: FlightRecordHeader,
Expand Down Expand Up @@ -87,6 +86,8 @@ pub async fn execute(
.into_report()
.change_context(Error::internal_msg("create data context"))?;

let s3_helper = S3Helper::new().await;

// If the snapshot config exists, sparrow should attempt to resume from state,
// and store new state. Create a new storage path for the local store to
// exist.
Expand Down Expand Up @@ -156,6 +157,8 @@ pub async fn execute(
None
};

let object_stores = ObjectStoreRegistry::default();

let primary_grouping_key_type = plan
.primary_grouping_key_type
.to_owned()
Expand All @@ -177,9 +180,8 @@ pub async fn execute(
.change_context(Error::internal_msg("get primary grouping ID"))?;

key_hash_inverse
.add_from_data_context(&data_context, primary_group_id, s3_helper.clone())
.add_from_data_context(&data_context, primary_group_id, &object_stores)
.await
.into_report()
.change_context(Error::internal_msg("initialize key hash inverse"))?;
let key_hash_inverse = Arc::new(ThreadSafeKeyHashInverse::new(key_hash_inverse));

Expand All @@ -202,7 +204,7 @@ pub async fn execute(
let context = OperationContext {
plan,
plan_hash,
object_stores: ObjectStoreRegistry::default(),
object_stores,
data_context,
compute_store,
key_hash_inverse,
Expand Down Expand Up @@ -298,10 +300,10 @@ pub async fn materialize(
.into_report()
.change_context(Error::internal_msg("get primary grouping ID"))?;

let object_stores = ObjectStoreRegistry::default();
key_hash_inverse
.add_from_data_context(&data_context, primary_group_id, s3_helper.clone())
.add_from_data_context(&data_context, primary_group_id, &object_stores)
.await
.into_report()
.change_context(Error::internal_msg("initialize key hash inverse"))?;
let key_hash_inverse = Arc::new(ThreadSafeKeyHashInverse::new(key_hash_inverse));

Expand All @@ -315,7 +317,7 @@ pub async fn materialize(
let context = OperationContext {
plan,
plan_hash,
object_stores: ObjectStoreRegistry::default(),
object_stores,
data_context,
compute_store: snapshot_compute_store,
key_hash_inverse,
Expand Down
73 changes: 43 additions & 30 deletions crates/sparrow-runtime/src/execute/key_hash_inverse.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::path::PathBuf;
use std::str::FromStr;

use anyhow::Context;
use arrow::array::{Array, ArrayRef, PrimitiveArray, UInt64Array};
use arrow::datatypes::{DataType, UInt64Type};

use error_stack::{IntoReportCompat, ResultExt};
use futures::TryStreamExt;
use hashbrown::HashMap;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use sparrow_arrow::downcast::downcast_primitive_array;
use sparrow_compiler::DataContext;
use sparrow_instructions::{ComputeStore, StoreKey};
use sparrow_plan::GroupId;
use tempfile::NamedTempFile;

use crate::s3::{self, S3Helper, S3Object};
use crate::read::ParquetFile;
use crate::stores::{ObjectStoreRegistry, ObjectStoreUrl};

/// Stores the mapping from key hash u64 to the position in the keys array.
///
Expand All @@ -36,6 +37,18 @@ impl std::fmt::Debug for KeyHashInverse {
}
}

#[derive(derive_more::Display, Debug)]
pub enum Error {
#[display(fmt = "invalid metadata url '{_0}'")]
InvalidMetadataUrl(String),
#[display(fmt = "failed to open metadata")]
OpeningMetadata,
#[display(fmt = "failed to read metadata")]
ReadingMetadata,
}

impl error_stack::Context for Error {}

impl KeyHashInverse {
/// Restores the KeyHashInverse from the compute store.
pub fn restore_from(store: &ComputeStore) -> anyhow::Result<Self> {
Expand Down Expand Up @@ -68,32 +81,37 @@ impl KeyHashInverse {
&mut self,
data_context: &DataContext,
primary_grouping: GroupId,
s3: S3Helper,
) -> anyhow::Result<()> {
registry: &ObjectStoreRegistry,
) -> error_stack::Result<(), Error> {
let metadata_files = data_context
.tables_for_grouping(primary_grouping)
.flat_map(|table| table.metadata_for_files());

let mut streams = Vec::new();
for file in metadata_files {
let file = if s3::is_s3_path(&file) {
let s3_object = S3Object::try_from_uri(&file)?;
let downloaded_file = NamedTempFile::new()?;
let download_file_path = downloaded_file.into_temp_path();
s3.download_s3(s3_object, &download_file_path).await?;
file_from_path(&download_file_path)?
} else {
file_from_path(&PathBuf::from(file))?
};

let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?;
let parquet_reader = parquet_reader.build()?;
for batch in parquet_reader {
let batch = batch?;
let hash_col = batch.column(0);
let hash_col: &UInt64Array = downcast_primitive_array(hash_col.as_ref())?;
let entity_key_col = batch.column(1);
self.add(entity_key_col.to_owned(), hash_col)?;
}
let url =
ObjectStoreUrl::from_str(&file).change_context(Error::InvalidMetadataUrl(file))?;
let file = ParquetFile::try_new(registry, url)
.await
.change_context(Error::OpeningMetadata)?;
let stream = file
.read_stream(None)
.await
.change_context(Error::OpeningMetadata)?;
streams.push(stream);
}

let mut stream = futures::stream::select_all(streams)
.map_err(|e| e.change_context(Error::ReadingMetadata));
while let Some(batch) = stream.try_next().await? {
let hash_col = batch.column(0);
let hash_col: &UInt64Array = downcast_primitive_array(hash_col.as_ref())
.into_report()
.change_context(Error::ReadingMetadata)?;
let entity_key_col = batch.column(1);
self.add(entity_key_col.to_owned(), hash_col)
.into_report()
.change_context(Error::ReadingMetadata)?;
}

Ok(())
Expand Down Expand Up @@ -241,11 +259,6 @@ impl ThreadSafeKeyHashInverse {
}
}

/// Return the file at a given path.
fn file_from_path(path: &std::path::Path) -> anyhow::Result<std::fs::File> {
std::fs::File::open(path).with_context(|| format!("unable to open {path:?}"))
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-runtime/src/metadata/prepared_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl PreparedMetadata {
.unwrap_or(i64::MIN);

let path = format!("file://{}", parquet_path.display());
let metadata_path = format!("{}", metadata_parquet_path.display());
let metadata_path = format!("file://{}", metadata_parquet_path.display());
Self::try_from_prepared_schema(
path,
prepared_schema.clone(),
Expand Down
7 changes: 5 additions & 2 deletions crates/sparrow-runtime/src/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,12 @@ async fn upload_prepared_files(
|| Error::InvalidUrl(metadata_prepared_url.as_str().to_string()),
)?;

let local_path = prepared_metadata
.path
.strip_prefix("file://")
.expect("starts with file://");
parquet_uploads.push(
object_store_registry
.upload(prepare_object_store_url, Path::new(&prepared_metadata.path)),
object_store_registry.upload(prepare_object_store_url, Path::new(local_path)),
);

parquet_uploads.push(object_store_registry.upload(
Expand Down
Loading

0 comments on commit 74e6299

Please sign in to comment.