Skip to content

Commit

Permalink
feat: add stats to convert-to-delta operation (#2491)
Browse files Browse the repository at this point in the history
# Description
Collect stats during conversion of a parquet dir to a Delta table and
add to the actions.

# Related Issue(s)
Closes #2490 

# Documentation
  • Loading branch information
gruuya authored May 15, 2024
1 parent f11f9bb commit c86d29f
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 37 deletions.
44 changes: 36 additions & 8 deletions crates/core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
//! Command for converting a Parquet table to a Delta table in place
// https://github.com/delta-io/delta/blob/1d5dd774111395b0c4dc1a69c94abc169b1c83b6/spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala

use crate::operations::write::get_num_idx_cols_and_stats_columns;
use crate::{
kernel::{Add, DataType, Schema, StructField},
logstore::{LogStore, LogStoreRef},
operations::create::CreateBuilder,
protocol::SaveMode,
table::builder::ensure_table_uri,
table::config::DeltaConfigKey,
writer::stats::stats_from_parquet_metadata,
DeltaResult, DeltaTable, DeltaTableError, ObjectStoreError, NULL_PARTITION_VALUE_DATA_PATH,
};
use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError};
use futures::{
future::{self, BoxFuture},
TryStreamExt,
};
use indexmap::IndexMap;
use parquet::{
arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder},
errors::ParquetError,
Expand Down Expand Up @@ -284,6 +287,10 @@ impl ConvertToDeltaBuilder {
// A vector of StructField of all unique partition columns in a Parquet table
let mut partition_schema_fields = HashMap::new();

// Obtain settings on which columns to skip collecting stats on if any
let (num_indexed_cols, stats_columns) =
get_num_idx_cols_and_stats_columns(None, self.configuration.clone());

for file in files {
// A HashMap from partition column to value for this parquet file only
let mut partition_values = HashMap::new();
Expand Down Expand Up @@ -328,6 +335,24 @@ impl ConvertToDeltaBuilder {
subpath = iter.next();
}

let batch_builder = ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new(
object_store.clone(),
file.clone(),
))
.await?;

// Fetch the stats
let parquet_metadata = batch_builder.metadata();
let stats = stats_from_parquet_metadata(
&IndexMap::from_iter(partition_values.clone().into_iter()),
parquet_metadata.as_ref(),
num_indexed_cols,
&stats_columns,
)
.map_err(|e| Error::DeltaTable(e.into()))?;
let stats_string =
serde_json::to_string(&stats).map_err(|e| Error::DeltaTable(e.into()))?;

actions.push(
Add {
path: percent_decode_str(file.location.as_ref())
Expand All @@ -349,19 +374,13 @@ impl ConvertToDeltaBuilder {
.collect(),
modification_time: file.last_modified.timestamp_millis(),
data_change: true,
stats: Some(stats_string),
..Default::default()
}
.into(),
);

let mut arrow_schema = ParquetRecordBatchStreamBuilder::new(ParquetObjectReader::new(
object_store.clone(),
file,
))
.await?
.schema()
.as_ref()
.clone();
let mut arrow_schema = batch_builder.schema().as_ref().clone();

// Arrow schema of Parquet files may have conflicting metatdata
// Since Arrow schema metadata is not used to generate Delta table schema, we set the metadata field to an empty HashMap
Expand Down Expand Up @@ -584,6 +603,15 @@ mod tests {
"part-00000-d22c627d-9655-4153-9527-f8995620fa42-c000.snappy.parquet"
);

let Some(Scalar::Struct(min_values, _)) = action.min_values() else {
panic!("Missing min values");
};
assert_eq!(min_values, vec![Scalar::Date(18628), Scalar::Integer(1)]);
let Some(Scalar::Struct(max_values, _)) = action.max_values() else {
panic!("Missing max values");
};
assert_eq!(max_values, vec![Scalar::Date(18632), Scalar::Integer(5)]);

assert_delta_table(
table,
path,
Expand Down
118 changes: 89 additions & 29 deletions crates/core/src/writer/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
use std::{collections::HashMap, ops::AddAssign};

use indexmap::IndexMap;
use parquet::file::metadata::ParquetMetaData;
use parquet::format::FileMetaData;
use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor};
use parquet::{basic::LogicalType, errors::ParquetError};
Expand Down Expand Up @@ -66,6 +67,32 @@ pub fn create_add(
})
}

// As opposed to `stats_from_file_metadata` which operates on `parquet::format::FileMetaData`,
// this function produces the stats by reading the metadata from already written out files.
//
// Note that the file metadata used here is actually `parquet::file::metadata::FileMetaData`
// which is a thrift decoding of the `parquet::format::FileMetaData` which is typically obtained
// when flushing the write.
pub(crate) fn stats_from_parquet_metadata(
partition_values: &IndexMap<String, Scalar>,
parquet_metadata: &ParquetMetaData,
num_indexed_cols: i32,
stats_columns: &Option<Vec<String>>,
) -> Result<Stats, DeltaWriterError> {
let num_rows = parquet_metadata.file_metadata().num_rows();
let schema_descriptor = parquet_metadata.file_metadata().schema_descr_ptr();
let row_group_metadata = parquet_metadata.row_groups().to_vec();

stats_from_metadata(
partition_values,
schema_descriptor,
row_group_metadata,
num_rows,
num_indexed_cols,
stats_columns,
)
}

fn stats_from_file_metadata(
partition_values: &IndexMap<String, Scalar>,
file_metadata: &FileMetaData,
Expand All @@ -75,27 +102,46 @@ fn stats_from_file_metadata(
let type_ptr = parquet::schema::types::from_thrift(file_metadata.schema.as_slice());
let schema_descriptor = type_ptr.map(|type_| Arc::new(SchemaDescriptor::new(type_)))?;

let mut min_values: HashMap<String, ColumnValueStat> = HashMap::new();
let mut max_values: HashMap<String, ColumnValueStat> = HashMap::new();
let mut null_count: HashMap<String, ColumnCountStat> = HashMap::new();

let row_group_metadata: Result<Vec<RowGroupMetaData>, ParquetError> = file_metadata
let row_group_metadata: Vec<RowGroupMetaData> = file_metadata
.row_groups
.iter()
.map(|rg| RowGroupMetaData::from_thrift(schema_descriptor.clone(), rg.clone()))
.collect();
let row_group_metadata = row_group_metadata?;
let schema_cols = file_metadata
.schema
.iter()
.map(|v| &v.name)
.collect::<Vec<_>>();
.collect::<Result<Vec<RowGroupMetaData>, ParquetError>>()?;

stats_from_metadata(
partition_values,
schema_descriptor,
row_group_metadata,
file_metadata.num_rows,
num_indexed_cols,
stats_columns,
)
}

fn stats_from_metadata(
partition_values: &IndexMap<String, Scalar>,
schema_descriptor: Arc<SchemaDescriptor>,
row_group_metadata: Vec<RowGroupMetaData>,
num_rows: i64,
num_indexed_cols: i32,
stats_columns: &Option<Vec<String>>,
) -> Result<Stats, DeltaWriterError> {
let mut min_values: HashMap<String, ColumnValueStat> = HashMap::new();
let mut max_values: HashMap<String, ColumnValueStat> = HashMap::new();
let mut null_count: HashMap<String, ColumnCountStat> = HashMap::new();

let idx_to_iterate = if let Some(stats_cols) = stats_columns {
stats_cols
.iter()
.map(|col| schema_cols[1..].iter().position(|value| *value == col))
.flatten()
schema_descriptor
.columns()
.into_iter()
.enumerate()
.filter_map(|(index, col)| {
if stats_cols.contains(&col.name().to_string()) {
Some(index)
} else {
None
}
})
.collect()
} else if num_indexed_cols == -1 {
(0..schema_descriptor.num_columns()).collect::<Vec<_>>()
Expand Down Expand Up @@ -149,7 +195,7 @@ fn stats_from_file_metadata(
Ok(Stats {
min_values,
max_values,
num_records: file_metadata.num_rows,
num_records: num_rows,
null_count,
})
}
Expand Down Expand Up @@ -262,18 +308,8 @@ impl StatsScalar {
v.max_bytes()
};

let val = if val.len() <= 4 {
let mut bytes = [0; 4];
bytes[..val.len()].copy_from_slice(val);
i32::from_be_bytes(bytes) as f64
} else if val.len() <= 8 {
let mut bytes = [0; 8];
bytes[..val.len()].copy_from_slice(val);
i64::from_be_bytes(bytes) as f64
} else if val.len() <= 16 {
let mut bytes = [0; 16];
bytes[..val.len()].copy_from_slice(val);
i128::from_be_bytes(bytes) as f64
let val = if val.len() <= 16 {
i128::from_be_bytes(sign_extend_be(val)) as f64
} else {
return Err(DeltaWriterError::StatsParsingFailed {
debug_value: format!("{val:?}"),
Expand Down Expand Up @@ -315,6 +351,19 @@ impl StatsScalar {
}
}

/// Performs big endian sign extension
/// Copied from arrow-rs repo/parquet crate:
/// https://github.com/apache/arrow-rs/blob/b25c441745602c9967b1e3cc4a28bc469cfb1311/parquet/src/arrow/buffer/bit_util.rs#L54
pub fn sign_extend_be<const N: usize>(b: &[u8]) -> [u8; N] {
assert!(b.len() <= N, "Array too large, expected less than {N}");
let is_negative = (b[0] & 128u8) == 128u8;
let mut result = if is_negative { [255u8; N] } else { [0u8; N] };
for (d, s) in result.iter_mut().skip(N - b.len()).zip(b) {
*d = *s;
}
result
}

impl From<StatsScalar> for serde_json::Value {
fn from(scalar: StatsScalar) -> Self {
match scalar {
Expand Down Expand Up @@ -653,6 +702,17 @@ mod tests {
}),
Value::from(1243124142314.423),
),
(
simple_parquet_stat!(
Statistics::FixedLenByteArray,
FixedLenByteArray::from(vec![0, 39, 16])
),
Some(LogicalType::Decimal {
scale: 3,
precision: 5,
}),
Value::from(10.0),
),
(
simple_parquet_stat!(
Statistics::FixedLenByteArray,
Expand Down

0 comments on commit c86d29f

Please sign in to comment.