Skip to content

Commit

Permalink
feat: Prepare directly to object stores (#475)
Browse files Browse the repository at this point in the history
This is part of #465 and serves as the first example of using
`object_store` and the async Parquet writer to write directly to object
stores.
  • Loading branch information
bjchambers authored Jul 6, 2023
2 parents 74e6299 + 327bb30 commit 8dbc359
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 260 deletions.
7 changes: 4 additions & 3 deletions crates/sparrow-catalog/src/update/execute_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,10 @@ impl ExampleInputPreparer {
metadata_output.write(metadata).unwrap();
metadata_output.close().unwrap();

let prepared_metadata = PreparedMetadata::try_from_local_parquet_path(
prepared_file.path(),
metadata_file.path(),
let prepared_metadata = PreparedMetadata::try_from_data(
format!("file://{}", prepared_file.path().display()),
prepared_batch,
format!("file://{}", metadata_file.path().display()),
)
.into_report()
.change_context(Error::PrepareInput)?;
Expand Down
8 changes: 5 additions & 3 deletions crates/sparrow-main/src/serve/compute_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,11 +689,13 @@ mod tests {
output.close().unwrap();
}

let prepared_metadata = PreparedMetadata::try_from_local_parquet_path(
prepared_file.path(),
metadata_file.path(),
let prepared_metadata = PreparedMetadata::try_from_data(
format!("file://{}", prepared_file.path().display()),
record_batch,
format!("file://{}", metadata_file.path().display()),
)
.unwrap();

let file_set = compute_table::FileSet {
slice_plan: Some(slice_plan),
prepared_files: vec![prepared_metadata.try_into().unwrap()],
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-main/src/serve/preparation_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub async fn prepare_data(
temp_file.path(),
)
.await?;
let (_prepared_metadata, prepared_files) = prepare_file(
let prepared_files = prepare_file(
&object_store_registry,
&source_data,
&prepare_request.output_path_prefix,
Expand Down
7 changes: 4 additions & 3 deletions crates/sparrow-main/tests/e2e/fixture/local_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,10 @@ impl LocalTestTable {
output.write(&metadata).unwrap();
output.close().unwrap();

let prepared_metadata = PreparedMetadata::try_from_local_parquet_path(
prepared_file.path(),
metadata_output_file.path(),
let prepared_metadata = PreparedMetadata::try_from_data(
format!("file://{}", prepared_file.path().display()),
&prepared_batch,
format!("file://{}", metadata_output_file.path().display()),
)?;

prepared_batches_and_metadata.push((
Expand Down
86 changes: 34 additions & 52 deletions crates/sparrow-runtime/src/metadata/prepared_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use std::path::Path;
use std::sync::Arc;

use arrow::datatypes::{Schema, SchemaRef};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::statistics::ValueStatistics;
use arrow::array::TimestampNanosecondArray;
use arrow::datatypes::{ArrowPrimitiveType, Schema, SchemaRef, TimestampNanosecondType};
use arrow::record_batch::RecordBatch;

use sparrow_api::kaskada::v1alpha::PreparedFile;
use sparrow_arrow::downcast::downcast_primitive_array;
use sparrow_core::TableSchema;

use crate::metadata::file_from_path;

#[non_exhaustive]
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq)]
pub struct PreparedMetadata {
pub path: String,

Expand Down Expand Up @@ -42,64 +40,48 @@ pub struct PreparedMetadata {
pub metadata_path: String,
}

fn get_time_statistics(
metadata: &ParquetMetaData,
row_group: usize,
) -> anyhow::Result<Option<&ValueStatistics<i64>>> {
if metadata.file_metadata().num_rows() == 0 {
Ok(None)
} else {
use parquet::file::statistics::Statistics;
match metadata.row_group(row_group).column(0).statistics() {
Some(Statistics::Int64(stats)) => {
anyhow::ensure!(
stats.has_min_max_set(),
"Time column statistics missing min/max for row_group {row_group}",
);

Ok(Some(stats))
}
stats => Err(anyhow::anyhow!(
"Time column missing or invalid for row_group {row_group}: {stats:?}"
)),
}
}
}

impl PreparedMetadata {
/// Create a `PreparedMetadata` from the path to a parquet file.
pub fn try_from_local_parquet_path(
parquet_path: &Path,
metadata_parquet_path: &Path,
pub fn try_from_data(
data_path: String,
data: &RecordBatch,
metadata_path: String,
) -> anyhow::Result<Self> {
let file = file_from_path(parquet_path)?;

let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?;
let metadata = parquet_reader.metadata();
let num_rows = metadata.file_metadata().num_rows();

let prepared_schema = parquet_reader.schema();
let prepared_schema = data.schema();

anyhow::ensure!(
prepared_schema.field(0).name() == "_time",
"First column of prepared files must be '_time'"
);
anyhow::ensure!(
prepared_schema.field(0).data_type() == &TimestampNanosecondType::DATA_TYPE,
"First column of prepared files must be TimestmapNanosecondType"
);

let min_time = get_time_statistics(metadata.as_ref(), 0)?
.map(|stats| *stats.min())
// Compute the time statistics directly from the data.
//
// TODO: We could instead just get this from the parquet metadata.
let time = data.column(0);
let time: &TimestampNanosecondArray = downcast_primitive_array(time.as_ref())?;

let num_rows = data.num_rows() as i64;
// Time column is sorted (since the file is already prepared).
let min_time = time
.values()
.first()
.copied()
// Empty files contain no stats. We default to assuming the min time.
.unwrap_or(i64::MIN);

let max_time = get_time_statistics(metadata.as_ref(), metadata.num_row_groups() - 1)?
.map(|stats| *stats.max())
let max_time = time
.values()
.last()
.copied()
// Empty files contain no stats. We default to assuming the min time.
.unwrap_or(i64::MIN);
tracing::info!("Determined times {min_time} to {max_time} for file '{data_path}'");

let path = format!("file://{}", parquet_path.display());
let metadata_path = format!("file://{}", metadata_parquet_path.display());
Self::try_from_prepared_schema(
path,
prepared_schema.clone(),
data_path,
prepared_schema,
min_time,
max_time,
num_rows,
Expand Down
Loading

0 comments on commit 8dbc359

Please sign in to comment.