From 5a127c6040778cbd547d51af51ff7813e8773394 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 28 Aug 2024 12:42:07 -0500 Subject: [PATCH 1/4] Fix writing of invalid Parquet ColumnIndex when row group contains null pages Co-authored-by: Ed Seidl --- parquet/src/file/metadata/writer.rs | 82 ++++++++++++++++++++++++---- parquet/src/file/page_index/index.rs | 23 ++++---- 2 files changed, 82 insertions(+), 23 deletions(-) diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index 3573e3020e5c..857b5997c8e1 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -390,6 +390,7 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> { #[cfg(feature = "arrow")] #[cfg(feature = "async")] mod tests { + use std::i32; use std::sync::Arc; use crate::file::footer::parse_metadata; @@ -450,19 +451,57 @@ mod tests { true, )])); - let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)])); + // build row groups / pages that exercise different combinations of nulls and values + // note that below we set the row group and page sizes to 4 and 2 respectively + // so that these "groupings" make sense + let a: ArrayRef = Arc::new(Int32Array::from(vec![ + // a row group that has all values + Some(i32::MIN), + Some(-1), + Some(1), + Some(i32::MAX), + // a row group with a page of all nulls and a page of all values + None, + None, + Some(2), + Some(3), + // a row group that has all null pages + None, + None, + None, + None, + // a row group having 1 page with all values and 1 page with some nulls + Some(4), + Some(5), + None, + Some(6), + // a row group having 1 page with all nulls and 1 page with some nulls + None, + None, + Some(7), + None, + // a row group having all pages with some nulls + None, + Some(8), + Some(9), + None, + ])); let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); - let writer_props = match write_page_index { - true => WriterProperties::builder() - .set_statistics_enabled(EnabledStatistics::Page) - .build(), - false => WriterProperties::builder() - .set_statistics_enabled(EnabledStatistics::Chunk) - .build(), + let writer_props_builder = match write_page_index { + true => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Page), + false => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Chunk), }; + // tune the size or pages to the data above + // to make sure we exercise code paths where all items in a page are null, etc. + let writer_props = writer_props_builder + .set_max_row_group_size(4) + .set_data_page_row_count_limit(2) + .set_write_batch_size(2) + .build(); + let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(writer_props)).unwrap(); writer.write(&batch).unwrap(); writer.close().unwrap(); @@ -554,8 +593,8 @@ mod tests { assert_eq!(left.compressed_size(), right.compressed_size()); assert_eq!(left.data_page_offset(), right.data_page_offset()); assert_eq!(left.statistics(), right.statistics()); - assert_eq!(left.offset_index_length(), right.offset_index_length()); - assert_eq!(left.column_index_length(), right.column_index_length()); + // assert_eq!(left.offset_index_length(), right.offset_index_length()); + // assert_eq!(left.column_index_length(), right.column_index_length()); assert_eq!( left.unencoded_byte_array_data_bytes(), right.unencoded_byte_array_data_bytes() @@ -610,6 +649,29 @@ mod tests { decoded_metadata.num_row_groups() ); + // check that the mins and maxes are what we expect for each page + // also indirectly checking that the pages were written out as we expected them to be laid out + // (if they're not, or something gets refactored in the future that breaks that assumption, + // this test may have to drop down to a lower level and create metadata directly instead of relying on + // writing an entire file) + let column_indexes = metadata.metadata.column_index().unwrap(); + assert_eq!(column_indexes.len(), 6); + // make sure each row group has 2 pages by checking the first column + // page counts for each column for each row group, should all be the same and there should be + // 12 pages in total across 6 row groups / 1 column + let mut page_counts = vec![]; + for row_group in column_indexes { + for column in row_group { + match column { + Index::INT32(column_index) => { + page_counts.push(column_index.indexes.len()); + } + _ => panic!("unexpected column index type"), + } + } + } + assert_eq!(page_counts, vec![2; 6]); + metadata .metadata .row_groups() diff --git a/parquet/src/file/page_index/index.rs b/parquet/src/file/page_index/index.rs index 662ba45621ab..e0d794764ce9 100644 --- a/parquet/src/file/page_index/index.rs +++ b/parquet/src/file/page_index/index.rs @@ -227,19 +227,16 @@ impl NativeIndex { } pub(crate) fn to_thrift(&self) -> ColumnIndex { - let min_values = self - .indexes - .iter() - .map(|x| x.min_bytes().map(|x| x.to_vec())) - .collect::>>() - .unwrap_or_else(|| vec![vec![]; self.indexes.len()]); - - let max_values = self - .indexes - .iter() - .map(|x| x.max_bytes().map(|x| x.to_vec())) - .collect::>>() - .unwrap_or_else(|| vec![vec![]; self.indexes.len()]); + let mut min_values = vec![vec![]; self.indexes.len()]; + let mut max_values = vec![vec![]; self.indexes.len()]; + for (i, index) in self.indexes.iter().enumerate() { + if let Some(min) = index.min_bytes() { + min_values[i].extend_from_slice(min); + } + if let Some(max) = index.max_bytes() { + max_values[i].extend_from_slice(max); + } + } let null_counts = self .indexes From 60f61103c8d90217d3967e06285689485d3473d5 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 28 Aug 2024 12:59:37 -0500 Subject: [PATCH 2/4] fix lint --- parquet/src/file/metadata/writer.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index 857b5997c8e1..b608e40e5505 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -390,7 +390,6 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> { #[cfg(feature = "arrow")] #[cfg(feature = "async")] mod tests { - use std::i32; use std::sync::Arc; use crate::file::footer::parse_metadata; From d60d918529cedf88907910003f9ff7448ff88465 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 28 Aug 2024 13:12:02 -0500 Subject: [PATCH 3/4] more rusty Co-authored-by: Ed Seidl --- parquet/src/file/page_index/index.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/parquet/src/file/page_index/index.rs b/parquet/src/file/page_index/index.rs index e0d794764ce9..2f30abead25c 100644 --- a/parquet/src/file/page_index/index.rs +++ b/parquet/src/file/page_index/index.rs @@ -227,16 +227,17 @@ impl NativeIndex { } pub(crate) fn to_thrift(&self) -> ColumnIndex { - let mut min_values = vec![vec![]; self.indexes.len()]; - let mut max_values = vec![vec![]; self.indexes.len()]; - for (i, index) in self.indexes.iter().enumerate() { - if let Some(min) = index.min_bytes() { - min_values[i].extend_from_slice(min); - } - if let Some(max) = index.max_bytes() { - max_values[i].extend_from_slice(max); - } - } + let min_values = self + .indexes + .iter() + .map(|x| x.min_bytes().unwrap_or(&[]).to_vec()) + .collect::>(); + + let max_values = self + .indexes + .iter() + .map(|x| x.max_bytes().unwrap_or(&[]).to_vec()) + .collect::>(); let null_counts = self .indexes From 8eea1e5899b46d5236a4d9fe9e0f56f46f168b65 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 31 Aug 2024 08:31:17 -0400 Subject: [PATCH 4/4] re-enable tests --- parquet/src/file/metadata/writer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index b608e40e5505..dad960790e6c 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -592,8 +592,8 @@ mod tests { assert_eq!(left.compressed_size(), right.compressed_size()); assert_eq!(left.data_page_offset(), right.data_page_offset()); assert_eq!(left.statistics(), right.statistics()); - // assert_eq!(left.offset_index_length(), right.offset_index_length()); - // assert_eq!(left.column_index_length(), right.column_index_length()); + assert_eq!(left.offset_index_length(), right.offset_index_length()); + assert_eq!(left.column_index_length(), right.column_index_length()); assert_eq!( left.unencoded_byte_array_data_bytes(), right.unencoded_byte_array_data_bytes()