diff --git a/crates/sparrow-catalog/src/update/execute_example.rs b/crates/sparrow-catalog/src/update/execute_example.rs index ff39e314e..c5568829c 100644 --- a/crates/sparrow-catalog/src/update/execute_example.rs +++ b/crates/sparrow-catalog/src/update/execute_example.rs @@ -249,9 +249,10 @@ impl ExampleInputPreparer { metadata_output.write(metadata).unwrap(); metadata_output.close().unwrap(); - let prepared_metadata = PreparedMetadata::try_from_local_parquet_path( - prepared_file.path(), - metadata_file.path(), + let prepared_metadata = PreparedMetadata::try_from_data( + format!("file://{}", prepared_file.path().display()), + prepared_batch, + format!("file://{}", metadata_file.path().display()), ) .into_report() .change_context(Error::PrepareInput)?; diff --git a/crates/sparrow-main/src/serve/compute_service.rs b/crates/sparrow-main/src/serve/compute_service.rs index 6455a6be9..970f0d64a 100644 --- a/crates/sparrow-main/src/serve/compute_service.rs +++ b/crates/sparrow-main/src/serve/compute_service.rs @@ -689,11 +689,13 @@ mod tests { output.close().unwrap(); } - let prepared_metadata = PreparedMetadata::try_from_local_parquet_path( - prepared_file.path(), - metadata_file.path(), + let prepared_metadata = PreparedMetadata::try_from_data( + format!("file://{}", prepared_file.path().display()), + record_batch, + format!("file://{}", metadata_file.path().display()), ) .unwrap(); + let file_set = compute_table::FileSet { slice_plan: Some(slice_plan), prepared_files: vec![prepared_metadata.try_into().unwrap()], diff --git a/crates/sparrow-main/src/serve/preparation_service.rs b/crates/sparrow-main/src/serve/preparation_service.rs index e720e942d..8ccb07af6 100644 --- a/crates/sparrow-main/src/serve/preparation_service.rs +++ b/crates/sparrow-main/src/serve/preparation_service.rs @@ -95,7 +95,7 @@ pub async fn prepare_data( temp_file.path(), ) .await?; - let (_prepared_metadata, prepared_files) = prepare_file( + let prepared_files = prepare_file( &object_store_registry, &source_data, &prepare_request.output_path_prefix, diff --git a/crates/sparrow-main/tests/e2e/fixture/local_table.rs b/crates/sparrow-main/tests/e2e/fixture/local_table.rs index 3817f9ea1..d90a509f7 100644 --- a/crates/sparrow-main/tests/e2e/fixture/local_table.rs +++ b/crates/sparrow-main/tests/e2e/fixture/local_table.rs @@ -127,9 +127,10 @@ impl LocalTestTable { output.write(&metadata).unwrap(); output.close().unwrap(); - let prepared_metadata = PreparedMetadata::try_from_local_parquet_path( - prepared_file.path(), - metadata_output_file.path(), + let prepared_metadata = PreparedMetadata::try_from_data( + format!("file://{}", prepared_file.path().display()), + &prepared_batch, + format!("file://{}", metadata_output_file.path().display()), )?; prepared_batches_and_metadata.push(( diff --git a/crates/sparrow-runtime/src/metadata/prepared_metadata.rs b/crates/sparrow-runtime/src/metadata/prepared_metadata.rs index 11dfd0906..b714ed878 100644 --- a/crates/sparrow-runtime/src/metadata/prepared_metadata.rs +++ b/crates/sparrow-runtime/src/metadata/prepared_metadata.rs @@ -1,17 +1,15 @@ -use std::path::Path; use std::sync::Arc; -use arrow::datatypes::{Schema, SchemaRef}; -use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; -use parquet::file::metadata::ParquetMetaData; -use parquet::file::statistics::ValueStatistics; +use arrow::array::TimestampNanosecondArray; +use arrow::datatypes::{ArrowPrimitiveType, Schema, SchemaRef, TimestampNanosecondType}; +use arrow::record_batch::RecordBatch; + use sparrow_api::kaskada::v1alpha::PreparedFile; +use sparrow_arrow::downcast::downcast_primitive_array; use sparrow_core::TableSchema; -use crate::metadata::file_from_path; - #[non_exhaustive] -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct PreparedMetadata { pub path: String, @@ -42,64 +40,48 @@ pub struct PreparedMetadata { pub metadata_path: String, } -fn get_time_statistics( - metadata: &ParquetMetaData, - row_group: usize, -) -> anyhow::Result>> { - if metadata.file_metadata().num_rows() == 0 { - Ok(None) - } else { - use parquet::file::statistics::Statistics; - match metadata.row_group(row_group).column(0).statistics() { - Some(Statistics::Int64(stats)) => { - anyhow::ensure!( - stats.has_min_max_set(), - "Time column statistics missing min/max for row_group {row_group}", - ); - - Ok(Some(stats)) - } - stats => Err(anyhow::anyhow!( - "Time column missing or invalid for row_group {row_group}: {stats:?}" - )), - } - } -} - impl PreparedMetadata { - /// Create a `PreparedMetadata` from the path to a parquet file. - pub fn try_from_local_parquet_path( - parquet_path: &Path, - metadata_parquet_path: &Path, + pub fn try_from_data( + data_path: String, + data: &RecordBatch, + metadata_path: String, ) -> anyhow::Result { - let file = file_from_path(parquet_path)?; - - let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?; - let metadata = parquet_reader.metadata(); - let num_rows = metadata.file_metadata().num_rows(); - - let prepared_schema = parquet_reader.schema(); + let prepared_schema = data.schema(); anyhow::ensure!( prepared_schema.field(0).name() == "_time", "First column of prepared files must be '_time'" ); + anyhow::ensure!( + prepared_schema.field(0).data_type() == &TimestampNanosecondType::DATA_TYPE, + "First column of prepared files must be TimestmapNanosecondType" + ); - let min_time = get_time_statistics(metadata.as_ref(), 0)? - .map(|stats| *stats.min()) + // Compute the time statistics directly from the data. + // + // TODO: We could instead just get this from the parquet metadata. + let time = data.column(0); + let time: &TimestampNanosecondArray = downcast_primitive_array(time.as_ref())?; + + let num_rows = data.num_rows() as i64; + // Time column is sorted (since the file is already prepared). + let min_time = time + .values() + .first() + .copied() // Empty files contain no stats. We default to assuming the min time. .unwrap_or(i64::MIN); - - let max_time = get_time_statistics(metadata.as_ref(), metadata.num_row_groups() - 1)? - .map(|stats| *stats.max()) + let max_time = time + .values() + .last() + .copied() // Empty files contain no stats. We default to assuming the min time. .unwrap_or(i64::MIN); + tracing::info!("Determined times {min_time} to {max_time} for file '{data_path}'"); - let path = format!("file://{}", parquet_path.display()); - let metadata_path = format!("file://{}", metadata_parquet_path.display()); Self::try_from_prepared_schema( - path, - prepared_schema.clone(), + data_path, + prepared_schema, min_time, max_time, num_rows, diff --git a/crates/sparrow-runtime/src/prepare.rs b/crates/sparrow-runtime/src/prepare.rs index 0dbb2b130..254fc5109 100644 --- a/crates/sparrow-runtime/src/prepare.rs +++ b/crates/sparrow-runtime/src/prepare.rs @@ -1,20 +1,19 @@ use std::collections::hash_map::DefaultHasher; +use std::fmt; use std::fs::File; use std::hash::{Hash, Hasher}; use std::io::{BufReader, Cursor}; use std::path::Path; use std::str::FromStr; use std::sync::Arc; -use std::{fmt, path}; use arrow::record_batch::RecordBatch; use error_stack::{IntoReport, IntoReportCompat, ResultExt}; -use futures::stream::BoxStream; +use futures::stream::{BoxStream, FuturesUnordered}; use futures::{StreamExt, TryStreamExt}; +use object_store::ObjectStore; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; -use parquet::arrow::ArrowWriter; -use serde_yaml; use sha2::Digest; use sparrow_api::kaskada::v1alpha::{ slice_plan, source_data, PreparedFile, PulsarSubscription, SourceData, TableConfig, @@ -30,14 +29,18 @@ mod slice_preparer; pub use error::*; pub(crate) use prepare_metadata::*; use sparrow_api::kaskada::v1alpha::slice_plan::Slice; -use tracing::Instrument; -use crate::stores::object_store_url::ObjectStoreKey; use crate::stores::{ObjectStoreRegistry, ObjectStoreUrl}; use crate::{streams, PreparedMetadata, RawMetadata}; const GIGABYTE_IN_BYTES: usize = 1_000_000_000; +/// Initial size of the upload buffer. +/// +/// This balances size (if we have multiple uploads in parallel) with +/// number of "parts" required to perform an upload. +const UPLOAD_BUFFER_SIZE_IN_BYTES: usize = 5_000_000; + /// Prepare batches from the file according to the `config` and `slice`. /// /// Returns a fallible iterator over pairs containing the data and metadata @@ -140,195 +143,97 @@ impl<'a> fmt::Display for SourceDataWrapper<'a> { /// Prepare the given file and return the list of prepared files. pub async fn prepare_file( - object_store_registry: &ObjectStoreRegistry, + object_stores: &ObjectStoreRegistry, source_data: &SourceData, output_path_prefix: &str, output_file_prefix: &str, table_config: &TableConfig, slice: &Option, -) -> error_stack::Result<(Vec, Vec), Error> { +) -> error_stack::Result, Error> { let output_url = ObjectStoreUrl::from_str(output_path_prefix) .change_context_lazy(|| Error::InvalidUrl(output_path_prefix.to_owned()))?; - let output_key = output_url - .key() - .change_context_lazy(|| Error::InvalidUrl(format!("{}", output_url)))?; - let temp_dir = tempfile::tempdir() - .into_report() + let object_store = object_stores + .object_store(output_url.key().change_context(Error::Internal)?) .change_context(Error::Internal)?; - let temp_dir = temp_dir.path().to_str().ok_or(Error::Internal)?; - let path = format!( - "/{}", - output_url - .path() - .change_context_lazy(|| Error::InvalidUrl(format!("{}", output_url)))? - ); + let mut prepare_stream = prepared_batches(source_data, table_config, slice) + .await? + .enumerate(); - // If the output path is to a local destination, we'll use that. If it's to - // a remote destination, we'll first write to a local temp file, then upload - // that to the remote destination. - let local_output_prefix = match output_key { - ObjectStoreKey::Local => path::Path::new(&path), - _ => path::Path::new(temp_dir), - }; + let mut prepared_files = Vec::new(); + let mut uploads = FuturesUnordered::new(); + while let Some((n, next)) = prepare_stream.next().await { + let (data, metadata) = next?; - let prepare_stream = prepared_batches(source_data, table_config, slice).await?; - let batch_count = std::sync::atomic::AtomicUsize::new(0); - - let write_results = |output_ordinal: usize, records: RecordBatch, metadata: RecordBatch| { - // Defines the local path to the result file - let local_result_path = - local_output_prefix.join(format!("{output_file_prefix}-{output_ordinal}.parquet")); - let local_result_file = create_file(&local_result_path)?; - - // Defines the local path to the metadata file - let local_metadata_output = local_output_prefix.join(format!( - "{output_file_prefix}-{output_ordinal}-metadata.parquet" - )); - let local_metadata_file = create_file(&local_metadata_output)?; - - // Defines the local path to the metadata yaml file - let metadata_yaml_output = local_output_prefix.join(format!( - "{output_file_prefix}-{output_ordinal}-metadata.yaml" - )); - let metadata_yaml_output_file = create_file(&metadata_yaml_output)?; - - // Write batches to the local output files - write_batch(local_result_file, records).change_context(Error::WriteParquetData)?; - write_batch(local_metadata_file, metadata).change_context(Error::WriteMetadata)?; - - let prepared_metadata = PreparedMetadata::try_from_local_parquet_path( - &local_result_path, - &local_metadata_output, - ) - .into_report() - .change_context(Error::DetermineMetadata)?; - - let prepared_file: PreparedFile = prepared_metadata - .to_owned() - .try_into() + let data_url = output_url + .join(&format!("{output_file_prefix}-{n}.parquet")) .change_context(Error::Internal)?; - - tracing::info!("Prepared batch {output_ordinal} with metadata {prepared_metadata:?}"); - - serde_yaml::to_writer(metadata_yaml_output_file, &prepared_file) - .into_report() + let metadata_url = output_url + .join(&format!("{output_file_prefix}-{n}-metadata.parquet")) .change_context(Error::Internal)?; - tracing::info!("Wrote metadata yaml to {:?}", metadata_yaml_output); - Ok((prepared_metadata, prepared_file)) - }; - - let results = prepare_stream.filter_map(|r| { - let n = batch_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - async move { - match r { - Ok((records, metadata)) => { - tracing::debug!("Prepared batch {n} has {} rows", records.num_rows()); - - if records.num_rows() == 0 { - return None; - } - Some(write_results(n, records, metadata)) - } - Err(e) => Some(Err(e.change_context(Error::Internal))), - } - } - }); - - // unzip the stream of (prepared_file, prepared_metadata) into two vectors - let (pm, pf): (Vec<_>, Vec<_>) = results.try_collect::>().await?.into_iter().unzip(); - match output_key { - ObjectStoreKey::Local | ObjectStoreKey::Memory => { - // Prepared files are stored locally - Ok((pm, pf)) - } - _ => { - // Upload the local prepared files to the remote destination - let upload_results = upload_prepared_files( - object_store_registry, - &pm, - output_path_prefix, - output_file_prefix, - ) - .await?; - Ok((pm, upload_results)) - } - } -} - -async fn upload_prepared_files( - object_store_registry: &ObjectStoreRegistry, - prepared_metadata: &[PreparedMetadata], - output_path_prefix: &str, - output_file_prefix: &str, -) -> error_stack::Result, Error> { - let span = tracing::info_span!( - "Uploading prepared files to object store", - ?output_path_prefix - ); - let _enter = span.enter(); - - let mut parquet_uploads: Vec<_> = Vec::new(); - let mut prepared_files: Vec<_> = Vec::new(); - { - for (batch_count, prepared_metadata) in prepared_metadata.iter().enumerate() { - tracing::debug!( - "Prepared batch {batch_count} has {} rows", - prepared_metadata.num_rows - ); - - tracing::debug!( - "Prepared batch {batch_count} with metadata {:?}", - prepared_metadata - ); - - let prepared_url = - format!("{output_path_prefix}/{output_file_prefix}-{batch_count}.parquet"); - let metadata_prepared_url = - format!("{output_path_prefix}/{output_file_prefix}-{batch_count}-metadata.parquet"); - - let prepare_object_store_url = ObjectStoreUrl::from_str(prepared_url.as_str()) - .change_context_lazy(|| Error::InvalidUrl(prepared_url.as_str().to_string()))?; - let metadata_object_store_url = - ObjectStoreUrl::from_str(metadata_prepared_url.as_str()).change_context_lazy( - || Error::InvalidUrl(metadata_prepared_url.as_str().to_string()), - )?; - - let local_path = prepared_metadata - .path - .strip_prefix("file://") - .expect("starts with file://"); - parquet_uploads.push( - object_store_registry.upload(prepare_object_store_url, Path::new(local_path)), - ); - - parquet_uploads.push(object_store_registry.upload( - metadata_object_store_url, - Path::new(&prepared_metadata.metadata_path), - )); - - let result_prepared_file: PreparedFile = prepared_metadata - .to_owned() - .with_path(prepared_url) - .with_metadata_path(metadata_prepared_url) + // Create the prepared file via PreparedMetadata. + // TODO: We could probably do this directly, eliminating the PreparedMetadata struct. + let prepared_file: PreparedFile = + PreparedMetadata::try_from_data(data_url.to_string(), &data, metadata_url.to_string()) + .into_report() + .change_context(Error::Internal)? .try_into() .change_context(Error::Internal)?; - prepared_files.push(result_prepared_file) - } + prepared_files.push(prepared_file); + + uploads.push(write_parquet(data, data_url, object_store.clone())); + uploads.push(write_parquet(metadata, metadata_url, object_store.clone())); } - // TODO: We could (instead) use a loop and select, which would allow us - // to fail early if anything failed. But it is more book-keeping. - let parquet_uploads = futures::future::join_all(parquet_uploads).in_current_span(); - for result in parquet_uploads.await { - result.change_context(Error::UploadResult)?; + // Wait for the uploads. + while let Some(upload) = uploads.try_next().await? { + tracing::info!("Finished uploading {upload}"); } Ok(prepared_files) } +async fn write_parquet( + batch: RecordBatch, + url: ObjectStoreUrl, + object_store: Arc, +) -> error_stack::Result { + let path = url + .path() + .change_context_lazy(|| Error::Write(url.url().clone()))?; + let (upload_id, writer) = object_store + .put_multipart(&path) + .await + .into_report() + .change_context_lazy(|| Error::Write(url.url().clone()))?; + tracing::info!("Multipart upload to {url} started with ID {upload_id}"); + + let mut writer = parquet::arrow::AsyncArrowWriter::try_new( + writer, + batch.schema(), + UPLOAD_BUFFER_SIZE_IN_BYTES, + None, + ) + .into_report() + .change_context_lazy(|| Error::Write(url.url().clone()))?; + + writer + .write(&batch) + .await + .into_report() + .change_context_lazy(|| Error::Write(url.url().clone()))?; + + writer + .close() + .await + .into_report() + .change_context_lazy(|| Error::Write(url.url().clone()))?; + + Ok(url) +} + fn open_file(path: impl AsRef) -> error_stack::Result { fn inner(path: &Path) -> error_stack::Result { File::open(path) @@ -340,24 +245,6 @@ fn open_file(path: impl AsRef) -> error_stack::Result { inner(path.as_ref()) } -fn create_file(path: &Path) -> error_stack::Result { - File::create(path) - .into_report() - .change_context_lazy(|| Error::OpenFile { - path: path.to_owned(), - }) -} - -fn write_batch( - file: File, - record_batch: RecordBatch, -) -> error_stack::Result<(), parquet::errors::ParquetError> { - let mut writer = ArrowWriter::try_new(file, record_batch.schema(), None)?; - writer.write(&record_batch)?; - writer.close()?; - Ok(()) -} - fn get_prepare_hash(source_data: &SourceData) -> error_stack::Result { let source = source_data.source.as_ref().ok_or(Error::Internal)?; let hex_encoding = match source { diff --git a/crates/sparrow-runtime/src/prepare/error.rs b/crates/sparrow-runtime/src/prepare/error.rs index c48ab169e..b82979b52 100644 --- a/crates/sparrow-runtime/src/prepare/error.rs +++ b/crates/sparrow-runtime/src/prepare/error.rs @@ -1,5 +1,7 @@ use std::path::PathBuf; +use url::Url; + #[derive(derive_more::Display, Debug)] pub enum Error { #[display(fmt = "unable to open '{path:?}'")] @@ -28,10 +30,8 @@ pub enum Error { DetermineMetadata, #[display(fmt = "invalid schema provided")] ReadSchema, - #[display(fmt = "failed to write Parquet file")] - WriteParquetData, - #[display(fmt = "failed to write metadata file")] - WriteMetadata, + #[display(fmt = "failed to write to '{_0}")] + Write(Url), #[display(fmt = "prepare request missing '{_0}'")] MissingField(&'static str), #[display( diff --git a/crates/sparrow-runtime/src/read/table_reader.rs b/crates/sparrow-runtime/src/read/table_reader.rs index 56c5eb42e..37cb07961 100644 --- a/crates/sparrow-runtime/src/read/table_reader.rs +++ b/crates/sparrow-runtime/src/read/table_reader.rs @@ -96,14 +96,14 @@ pub async fn table_reader( .change_context(Error::Internal)? .timestamp_nanos(); - gatherer - .skip_to(index, min_event_time) - .into_report() - .change_context(Error::SkippingToMinEvent)?; info!( "Skipping to time {} for data file {:?} for index {}", min_event_time, prepared_file, index ); + gatherer + .skip_to(index, min_event_time) + .into_report() + .change_context(Error::SkippingToMinEvent)?; let stream = new_parquet_stream(object_stores, &prepared_file.path, &projected_schema) .await diff --git a/crates/sparrow-runtime/src/stores/object_store_url.rs b/crates/sparrow-runtime/src/stores/object_store_url.rs index 53530a3df..72d24940f 100644 --- a/crates/sparrow-runtime/src/stores/object_store_url.rs +++ b/crates/sparrow-runtime/src/stores/object_store_url.rs @@ -16,12 +16,36 @@ pub struct ObjectStoreUrl { } impl ObjectStoreUrl { + /// Return the [object_store::path::Path] corresponding to this URL. pub fn path(&self) -> error_stack::Result { object_store::path::Path::parse(self.url.path()) .into_report() .change_context_lazy(|| Error::UrlInvalidPath(self.url.clone())) } + pub fn url(&self) -> &Url { + &self.url + } + + /// Parse a string as an URL, with this URL as the base URL. + /// + /// Note: a trailing slash is significant. + /// Without it, the last path component is considered to be a “file” name + /// to be removed to get at the “directory” that is used as the base: + /// + /// # Errors + /// + /// If the function can not parse an URL from the given string + /// with this URL as the base URL, an [Error] variant will be returned. + pub fn join(&self, input: &str) -> error_stack::Result { + let url = self + .url + .join(input) + .into_report() + .change_context(Error::InvalidUrl(input.to_owned()))?; + Ok(Self { url }) + } + pub fn key(&self) -> error_stack::Result { match self.url.scheme() { "file" => Ok(ObjectStoreKey::Local), diff --git a/wren/compute/prepare_manager.go b/wren/compute/prepare_manager.go index 50ddd747f..b9b8edcf2 100644 --- a/wren/compute/prepare_manager.go +++ b/wren/compute/prepare_manager.go @@ -267,7 +267,7 @@ func (m *prepareManager) executePrepare(ctx context.Context, owner *ent.Owner, p prepareReq := &v1alpha.PrepareDataRequest{ SourceData: sourceData, Config: computeTable.Config, - OutputPathPrefix: prepareOutputURI, + OutputPathPrefix: prepareOutputURI + "/", FilePrefix: prepareFilePrefix, SlicePlan: prepareJob.SlicePlan, }