-
Notifications
You must be signed in to change notification settings - Fork 433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add stats to convert-to-delta operation #2491
Changes from 5 commits
c3620c8
3eccdef
5972aab
7e80cec
f74f9f2
1191aa5
2c49439
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -4,6 +4,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; | |||
use std::{collections::HashMap, ops::AddAssign}; | ||||
|
||||
use indexmap::IndexMap; | ||||
use parquet::file::metadata::ParquetMetaData; | ||||
use parquet::format::FileMetaData; | ||||
use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor}; | ||||
use parquet::{basic::LogicalType, errors::ParquetError}; | ||||
|
@@ -66,6 +67,32 @@ pub fn create_add( | |||
}) | ||||
} | ||||
|
||||
// As opposed to `stats_from_file_metadata` which operates on `parquet::format::FileMetaData`, | ||||
// this function produces the stats by reading the metadata from already written out files. | ||||
// | ||||
// Note that the file metadata used here is actually `parquet::file::metadata::FileMetaData` | ||||
// which is a thrift decoding of the `parquet::format::FileMetaData` which is typically obtained | ||||
// when flushing the write. | ||||
pub(crate) fn stats_from_parquet_metadata( | ||||
partition_values: &IndexMap<String, Scalar>, | ||||
parquet_metadata: &ParquetMetaData, | ||||
num_indexed_cols: i32, | ||||
stats_columns: &Option<Vec<String>>, | ||||
) -> Result<Stats, DeltaWriterError> { | ||||
let num_rows = parquet_metadata.file_metadata().num_rows(); | ||||
let schema_descriptor = parquet_metadata.file_metadata().schema_descr_ptr(); | ||||
let row_group_metadata = parquet_metadata.row_groups().to_vec(); | ||||
|
||||
stats_from_metadata( | ||||
partition_values, | ||||
schema_descriptor, | ||||
row_group_metadata, | ||||
num_rows, | ||||
num_indexed_cols, | ||||
stats_columns, | ||||
) | ||||
} | ||||
|
||||
fn stats_from_file_metadata( | ||||
partition_values: &IndexMap<String, Scalar>, | ||||
file_metadata: &FileMetaData, | ||||
|
@@ -75,27 +102,46 @@ fn stats_from_file_metadata( | |||
let type_ptr = parquet::schema::types::from_thrift(file_metadata.schema.as_slice()); | ||||
let schema_descriptor = type_ptr.map(|type_| Arc::new(SchemaDescriptor::new(type_)))?; | ||||
|
||||
let mut min_values: HashMap<String, ColumnValueStat> = HashMap::new(); | ||||
let mut max_values: HashMap<String, ColumnValueStat> = HashMap::new(); | ||||
let mut null_count: HashMap<String, ColumnCountStat> = HashMap::new(); | ||||
|
||||
let row_group_metadata: Result<Vec<RowGroupMetaData>, ParquetError> = file_metadata | ||||
let row_group_metadata: Vec<RowGroupMetaData> = file_metadata | ||||
.row_groups | ||||
.iter() | ||||
.map(|rg| RowGroupMetaData::from_thrift(schema_descriptor.clone(), rg.clone())) | ||||
.collect(); | ||||
let row_group_metadata = row_group_metadata?; | ||||
let schema_cols = file_metadata | ||||
.schema | ||||
.iter() | ||||
.map(|v| &v.name) | ||||
.collect::<Vec<_>>(); | ||||
.collect::<Result<Vec<RowGroupMetaData>, ParquetError>>()?; | ||||
|
||||
stats_from_metadata( | ||||
partition_values, | ||||
schema_descriptor, | ||||
row_group_metadata, | ||||
file_metadata.num_rows, | ||||
num_indexed_cols, | ||||
stats_columns, | ||||
) | ||||
} | ||||
|
||||
fn stats_from_metadata( | ||||
partition_values: &IndexMap<String, Scalar>, | ||||
schema_descriptor: Arc<SchemaDescriptor>, | ||||
row_group_metadata: Vec<RowGroupMetaData>, | ||||
num_rows: i64, | ||||
num_indexed_cols: i32, | ||||
stats_columns: &Option<Vec<String>>, | ||||
) -> Result<Stats, DeltaWriterError> { | ||||
let mut min_values: HashMap<String, ColumnValueStat> = HashMap::new(); | ||||
let mut max_values: HashMap<String, ColumnValueStat> = HashMap::new(); | ||||
let mut null_count: HashMap<String, ColumnCountStat> = HashMap::new(); | ||||
|
||||
let idx_to_iterate = if let Some(stats_cols) = stats_columns { | ||||
stats_cols | ||||
.iter() | ||||
.map(|col| schema_cols[1..].iter().position(|value| *value == col)) | ||||
.flatten() | ||||
schema_descriptor | ||||
.columns() | ||||
.into_iter() | ||||
.enumerate() | ||||
.filter_map(|(index, col)| { | ||||
if stats_cols.contains(&col.name().to_string()) { | ||||
Some(index) | ||||
} else { | ||||
None | ||||
} | ||||
}) | ||||
.collect() | ||||
} else if num_indexed_cols == -1 { | ||||
(0..schema_descriptor.num_columns()).collect::<Vec<_>>() | ||||
|
@@ -149,7 +195,7 @@ fn stats_from_file_metadata( | |||
Ok(Stats { | ||||
min_values, | ||||
max_values, | ||||
num_records: file_metadata.num_rows, | ||||
num_records: num_rows, | ||||
null_count, | ||||
}) | ||||
} | ||||
|
@@ -262,18 +308,8 @@ impl StatsScalar { | |||
v.max_bytes() | ||||
}; | ||||
|
||||
let val = if val.len() <= 4 { | ||||
let mut bytes = [0; 4]; | ||||
bytes[..val.len()].copy_from_slice(val); | ||||
i32::from_be_bytes(bytes) as f64 | ||||
} else if val.len() <= 8 { | ||||
let mut bytes = [0; 8]; | ||||
bytes[..val.len()].copy_from_slice(val); | ||||
i64::from_be_bytes(bytes) as f64 | ||||
} else if val.len() <= 16 { | ||||
let mut bytes = [0; 16]; | ||||
bytes[..val.len()].copy_from_slice(val); | ||||
i128::from_be_bytes(bytes) as f64 | ||||
let val = if val.len() <= 16 { | ||||
i128::from_be_bytes(sign_extend_be(val)) as f64 | ||||
Comment on lines
-265
to
+312
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like this was causing those test failures. In particular note that the above would wrongly extend slices that are not a power of 2 long. This is the case that occured for the failing tests, since this: delta-rs/python/tests/conftest.py Line 218 in 81593e9
would result in min [0, 39, 16] and max [0, 54, 176] fixed length byte arrays, and these would in turn be coerced to [0, 39, 16, 0] an [0, 54, 176, 0], instead of [0, 0, 39, 16] and [0, 0, 54, 176] respectively. Contrast with arrow-rs/datafusion where the extension happens at the beginning of the array: Granted I'm not sure what/how encodes the min value "10.000" to [0, 39, 16] in the first place, aside from that it occurs in pyarrow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @ion-elgreco, can you approve the rest of the workflows? |
||||
} else { | ||||
return Err(DeltaWriterError::StatsParsingFailed { | ||||
debug_value: format!("{val:?}"), | ||||
|
@@ -315,6 +351,19 @@ impl StatsScalar { | |||
} | ||||
} | ||||
|
||||
/// Performs big endian sign extension | ||||
/// Copied from arrow-rs repo/parquet crate: | ||||
/// https://github.com/apache/arrow-rs/blob/b25c441745602c9967b1e3cc4a28bc469cfb1311/parquet/src/arrow/buffer/bit_util.rs#L54 | ||||
pub fn sign_extend_be<const N: usize>(b: &[u8]) -> [u8; N] { | ||||
assert!(b.len() <= N, "Array too large, expected less than {N}"); | ||||
let is_negative = (b[0] & 128u8) == 128u8; | ||||
let mut result = if is_negative { [255u8; N] } else { [0u8; N] }; | ||||
for (d, s) in result.iter_mut().skip(N - b.len()).zip(b) { | ||||
*d = *s; | ||||
} | ||||
result | ||||
} | ||||
|
||||
impl From<StatsScalar> for serde_json::Value { | ||||
fn from(scalar: StatsScalar) -> Self { | ||||
match scalar { | ||||
|
@@ -653,6 +702,17 @@ mod tests { | |||
}), | ||||
Value::from(1243124142314.423), | ||||
), | ||||
( | ||||
simple_parquet_stat!( | ||||
Statistics::FixedLenByteArray, | ||||
FixedLenByteArray::from(vec![0, 39, 16]) | ||||
), | ||||
Some(LogicalType::Decimal { | ||||
scale: 3, | ||||
precision: 5, | ||||
}), | ||||
Value::from(10.0), | ||||
), | ||||
( | ||||
simple_parquet_stat!( | ||||
Statistics::FixedLenByteArray, | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be -1. You should use the same function I recently added in the write the get the configuration values:
get_num_idx_cols_and_stats_columns
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, revised now.