Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use object_store for metadata #476

Merged
merged 2 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 10 additions & 17 deletions crates/sparrow-catalog/src/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::path::PathBuf;
use error_stack::{IntoReport, ResultExt};
use futures::TryStreamExt;
use itertools::Itertools;
use sparrow_runtime::s3::S3Helper;
use tracing::{error, info, info_span};

use crate::list_doc_files;
Expand All @@ -29,12 +28,10 @@ pub(super) async fn update_doc_structs(
options: &UpdateOptions,
doc_root: PathBuf,
) -> error_stack::Result<Vec<PathBuf>, Error> {
let s3_helper = S3Helper::new().await;

if let Some(example) = &options.example {
let doc_path = doc_root.join(format!("{example}.toml"));
error_stack::ensure!(doc_path.is_file(), Error::NonFile(doc_path));
let changed = update_doc_struct(command, doc_path.clone(), s3_helper)
let changed = update_doc_struct(command, doc_path.clone())
.await
.attach_printable_lazy(|| DocFile(doc_path.clone()))?;
if changed {
Expand All @@ -50,17 +47,14 @@ pub(super) async fn update_doc_structs(
.map_err(|e| error_stack::report!(e).change_context(Error::ListingFiles));

let changed = file_stream
.map_ok(move |doc_path| {
let s3_helper = s3_helper.clone();
async move {
let changed = update_doc_struct(command, doc_path.clone(), s3_helper)
.await
.change_context(Error::UpdatingDocs)?;
if changed {
Ok(Some(doc_path))
} else {
Ok(None)
}
.map_ok(move |doc_path| async move {
let changed = update_doc_struct(command, doc_path.clone())
.await
.change_context(Error::UpdatingDocs)?;
if changed {
Ok(Some(doc_path))
} else {
Ok(None)
}
})
.try_buffer_unordered(4)
Expand Down Expand Up @@ -107,7 +101,6 @@ impl error_stack::Context for Error {}
async fn update_doc_struct(
command: UpdateCommand,
doc_path: PathBuf,
s3_helper: S3Helper,
) -> error_stack::Result<bool, Error> {
let input = tokio::fs::read_to_string(&doc_path)
.await
Expand Down Expand Up @@ -139,7 +132,7 @@ async fn update_doc_struct(
let span = info_span!("Execute example", function = ?catalog_entry.name, index);
let _enter = span.enter();

let output_csv = execute_example::execute_example(example, s3_helper.clone())
let output_csv = execute_example::execute_example(example)
.await
.change_context(Error::ExecuteExample(index))?;
example.output_csv = Some(output_csv);
Expand Down
3 changes: 0 additions & 3 deletions crates/sparrow-catalog/src/update/execute_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use sparrow_api::kaskada::v1alpha::{
};
use sparrow_compiler::InternalCompileOptions;
use sparrow_qfr::kaskada::sparrow::v1alpha::FlightRecordHeader;
use sparrow_runtime::s3::S3Helper;
use sparrow_runtime::PreparedMetadata;
use tempfile::NamedTempFile;
use uuid::Uuid;
Expand Down Expand Up @@ -45,7 +44,6 @@ impl error_stack::Context for Error {}
/// Execute the example and return the result as a CSV string.
pub(super) async fn execute_example(
example: &FunctionExample,
s3_helper: S3Helper,
) -> error_stack::Result<String, Error> {
// 1. Prepare the file
let mut preparer = ExampleInputPreparer::new();
Expand Down Expand Up @@ -106,7 +104,6 @@ pub(super) async fn execute_example(
changed_since: None,
final_result_time: None,
},
s3_helper,
None,
None,
FlightRecordHeader::default(),
Expand Down
4 changes: 0 additions & 4 deletions crates/sparrow-main/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use sparrow_api::kaskada::v1alpha::execute_request::Limits;
use sparrow_api::kaskada::v1alpha::{CompileRequest, ExecuteRequest, FenlDiagnostics};
use sparrow_compiler::CompilerOptions;
use sparrow_qfr::kaskada::sparrow::v1alpha::FlightRecordHeader;
use sparrow_runtime::s3::S3Helper;
use tracing::{info, info_span};

use crate::script::{Schema, Script, ScriptPath};
Expand Down Expand Up @@ -112,8 +111,6 @@ impl BatchCommand {
};

if !self.compile_only {
let s3_helper = S3Helper::new().await;

if !self.output_dir.exists() {
tokio::fs::create_dir_all(&self.output_dir)
.await
Expand All @@ -133,7 +130,6 @@ impl BatchCommand {
changed_since: None,
final_result_time: None,
},
s3_helper,
None,
self.flight_record_path,
FlightRecordHeader::default(),
Expand Down
4 changes: 0 additions & 4 deletions crates/sparrow-main/src/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use sparrow_api::kaskada::v1alpha::{destination, CompileRequest, ExecuteRequest,

use sparrow_compiler::CompilerOptions;
use sparrow_qfr::kaskada::sparrow::v1alpha::FlightRecordHeader;
use sparrow_runtime::s3::S3Helper;
use tracing::{info, info_span};

use crate::script::{Schema, Script, ScriptPath};
Expand Down Expand Up @@ -109,8 +108,6 @@ impl MaterializeCommand {
error_stack::bail!(Error::InvalidQuery(diagnostics));
};

let s3_helper = S3Helper::new().await;

// Note: it might be cleaner to create a separate entry point for materialize, but for now it's ok.
let result_stream = sparrow_runtime::execute::execute(
ExecuteRequest {
Expand All @@ -122,7 +119,6 @@ impl MaterializeCommand {
changed_since: None,
final_result_time: None,
},
s3_helper,
Some(script.bounded_lateness_ns),
self.flight_record_path,
FlightRecordHeader::default(),
Expand Down
1 change: 0 additions & 1 deletion crates/sparrow-main/src/serve/compute_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ async fn execute_impl(

let progress_stream = sparrow_runtime::execute::execute(
request,
s3_helper.clone(),
None,
flight_record_local_path,
flight_record_header,
Expand Down
19 changes: 4 additions & 15 deletions crates/sparrow-main/tests/e2e/fixture/query_fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use sparrow_api::kaskada::v1alpha::{
};
use sparrow_compiler::InternalCompileOptions;
use sparrow_qfr::kaskada::sparrow::v1alpha::FlightRecordHeader;
use sparrow_runtime::s3::S3Helper;

use crate::DataFixture;

Expand Down Expand Up @@ -263,11 +262,6 @@ impl QueryFixture {
return Err(compile_result.fenl_diagnostics.unwrap_or_default().into());
};

// TODO: If this is expensive to construct each time in tests (it shouldn't be)
// we could put it in a `[dynamic]` static, and clone it. Or we could have a
// special "for test" version.
let s3_helper = S3Helper::new().await;

let destination = ObjectStoreDestination {
output_prefix_uri: format!("file:///{}", output_dir.display()),
file_type: output_format.into(),
Expand All @@ -285,15 +279,10 @@ impl QueryFixture {
..self.execute_request.clone()
};

let mut stream = sparrow_runtime::execute::execute(
request,
s3_helper,
None,
None,
FlightRecordHeader::default(),
)
.await?
.boxed();
let mut stream =
sparrow_runtime::execute::execute(request, None, None, FlightRecordHeader::default())
.await?
.boxed();

let mut output_files = Vec::new();
let mut snapshots = Vec::new();
Expand Down
16 changes: 9 additions & 7 deletions crates/sparrow-runtime/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ const STORE_PATH_PREFIX: &str = "compute_snapshot_";
/// execute response.
pub async fn execute(
request: ExecuteRequest,
s3_helper: S3Helper,
bounded_lateness_ns: Option<i64>,
_flight_record_local_path: Option<std::path::PathBuf>,
_flight_record_header: FlightRecordHeader,
Expand Down Expand Up @@ -87,6 +86,8 @@ pub async fn execute(
.into_report()
.change_context(Error::internal_msg("create data context"))?;

let s3_helper = S3Helper::new().await;

// If the snapshot config exists, sparrow should attempt to resume from state,
// and store new state. Create a new storage path for the local store to
// exist.
Expand Down Expand Up @@ -156,6 +157,8 @@ pub async fn execute(
None
};

let object_stores = ObjectStoreRegistry::default();

let primary_grouping_key_type = plan
.primary_grouping_key_type
.to_owned()
Expand All @@ -177,9 +180,8 @@ pub async fn execute(
.change_context(Error::internal_msg("get primary grouping ID"))?;

key_hash_inverse
.add_from_data_context(&data_context, primary_group_id, s3_helper.clone())
.add_from_data_context(&data_context, primary_group_id, &object_stores)
.await
.into_report()
.change_context(Error::internal_msg("initialize key hash inverse"))?;
let key_hash_inverse = Arc::new(ThreadSafeKeyHashInverse::new(key_hash_inverse));

Expand All @@ -202,7 +204,7 @@ pub async fn execute(
let context = OperationContext {
plan,
plan_hash,
object_stores: ObjectStoreRegistry::default(),
object_stores,
data_context,
compute_store,
key_hash_inverse,
Expand Down Expand Up @@ -298,10 +300,10 @@ pub async fn materialize(
.into_report()
.change_context(Error::internal_msg("get primary grouping ID"))?;

let object_stores = ObjectStoreRegistry::default();
key_hash_inverse
.add_from_data_context(&data_context, primary_group_id, s3_helper.clone())
.add_from_data_context(&data_context, primary_group_id, &object_stores)
.await
.into_report()
.change_context(Error::internal_msg("initialize key hash inverse"))?;
let key_hash_inverse = Arc::new(ThreadSafeKeyHashInverse::new(key_hash_inverse));

Expand All @@ -315,7 +317,7 @@ pub async fn materialize(
let context = OperationContext {
plan,
plan_hash,
object_stores: ObjectStoreRegistry::default(),
object_stores,
data_context,
compute_store: snapshot_compute_store,
key_hash_inverse,
Expand Down
73 changes: 43 additions & 30 deletions crates/sparrow-runtime/src/execute/key_hash_inverse.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::path::PathBuf;
use std::str::FromStr;

use anyhow::Context;
use arrow::array::{Array, ArrayRef, PrimitiveArray, UInt64Array};
use arrow::datatypes::{DataType, UInt64Type};

use error_stack::{IntoReportCompat, ResultExt};
use futures::TryStreamExt;
use hashbrown::HashMap;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use sparrow_arrow::downcast::downcast_primitive_array;
use sparrow_compiler::DataContext;
use sparrow_instructions::{ComputeStore, StoreKey};
use sparrow_plan::GroupId;
use tempfile::NamedTempFile;

use crate::s3::{self, S3Helper, S3Object};
use crate::read::ParquetFile;
use crate::stores::{ObjectStoreRegistry, ObjectStoreUrl};

/// Stores the mapping from key hash u64 to the position in the keys array.
///
Expand All @@ -36,6 +37,18 @@ impl std::fmt::Debug for KeyHashInverse {
}
}

#[derive(derive_more::Display, Debug)]
pub enum Error {
#[display(fmt = "invalid metadata url '{_0}'")]
InvalidMetadataUrl(String),
#[display(fmt = "failed to open metadata")]
OpeningMetadata,
#[display(fmt = "failed to read metadata")]
ReadingMetadata,
}

impl error_stack::Context for Error {}

impl KeyHashInverse {
/// Restores the KeyHashInverse from the compute store.
pub fn restore_from(store: &ComputeStore) -> anyhow::Result<Self> {
Expand Down Expand Up @@ -68,32 +81,37 @@ impl KeyHashInverse {
&mut self,
data_context: &DataContext,
primary_grouping: GroupId,
s3: S3Helper,
) -> anyhow::Result<()> {
registry: &ObjectStoreRegistry,
) -> error_stack::Result<(), Error> {
let metadata_files = data_context
.tables_for_grouping(primary_grouping)
.flat_map(|table| table.metadata_for_files());

let mut streams = Vec::new();
for file in metadata_files {
let file = if s3::is_s3_path(&file) {
let s3_object = S3Object::try_from_uri(&file)?;
let downloaded_file = NamedTempFile::new()?;
let download_file_path = downloaded_file.into_temp_path();
s3.download_s3(s3_object, &download_file_path).await?;
file_from_path(&download_file_path)?
} else {
file_from_path(&PathBuf::from(file))?
};

let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?;
let parquet_reader = parquet_reader.build()?;
for batch in parquet_reader {
let batch = batch?;
let hash_col = batch.column(0);
let hash_col: &UInt64Array = downcast_primitive_array(hash_col.as_ref())?;
let entity_key_col = batch.column(1);
self.add(entity_key_col.to_owned(), hash_col)?;
}
let url =
ObjectStoreUrl::from_str(&file).change_context(Error::InvalidMetadataUrl(file))?;
let file = ParquetFile::try_new(registry, url)
.await
.change_context(Error::OpeningMetadata)?;
let stream = file
.read_stream(None)
.await
.change_context(Error::OpeningMetadata)?;
streams.push(stream);
}

let mut stream = futures::stream::select_all(streams)
.map_err(|e| e.change_context(Error::ReadingMetadata));
while let Some(batch) = stream.try_next().await? {
let hash_col = batch.column(0);
let hash_col: &UInt64Array = downcast_primitive_array(hash_col.as_ref())
.into_report()
.change_context(Error::ReadingMetadata)?;
let entity_key_col = batch.column(1);
self.add(entity_key_col.to_owned(), hash_col)
.into_report()
.change_context(Error::ReadingMetadata)?;
}

Ok(())
Expand Down Expand Up @@ -241,11 +259,6 @@ impl ThreadSafeKeyHashInverse {
}
}

/// Return the file at a given path.
fn file_from_path(path: &std::path::Path) -> anyhow::Result<std::fs::File> {
std::fs::File::open(path).with_context(|| format!("unable to open {path:?}"))
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-runtime/src/metadata/prepared_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl PreparedMetadata {
.unwrap_or(i64::MIN);

let path = format!("file://{}", parquet_path.display());
let metadata_path = format!("{}", metadata_parquet_path.display());
let metadata_path = format!("file://{}", metadata_parquet_path.display());
Self::try_from_prepared_schema(
path,
prepared_schema.clone(),
Expand Down
7 changes: 5 additions & 2 deletions crates/sparrow-runtime/src/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,12 @@ async fn upload_prepared_files(
|| Error::InvalidUrl(metadata_prepared_url.as_str().to_string()),
)?;

let local_path = prepared_metadata
.path
.strip_prefix("file://")
.expect("starts with file://");
parquet_uploads.push(
object_store_registry
.upload(prepare_object_store_url, Path::new(&prepared_metadata.path)),
object_store_registry.upload(prepare_object_store_url, Path::new(local_path)),
);

parquet_uploads.push(object_store_registry.upload(
Expand Down
Loading
Loading