Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix writing of invalid Parquet ColumnIndex when row group contains null pages #6319

Merged
merged 5 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 71 additions & 10 deletions parquet/src/file/metadata/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,19 +450,57 @@ mod tests {
true,
)]));

let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)]));
// build row groups / pages that exercise different combinations of nulls and values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like these test additions, particularly that they build off of the tests added in #6197. They also seem much more thorough than what I proposed 😄 .

// note that below we set the row group and page sizes to 4 and 2 respectively
// so that these "groupings" make sense
let a: ArrayRef = Arc::new(Int32Array::from(vec![
// a row group that has all values
Some(i32::MIN),
Some(-1),
Some(1),
Some(i32::MAX),
// a row group with a page of all nulls and a page of all values
None,
None,
Some(2),
Some(3),
// a row group that has all null pages
None,
None,
None,
None,
// a row group having 1 page with all values and 1 page with some nulls
Some(4),
Some(5),
None,
Some(6),
// a row group having 1 page with all nulls and 1 page with some nulls
None,
None,
Some(7),
None,
// a row group having all pages with some nulls
None,
Some(8),
Some(9),
None,
]));

let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();

let writer_props = match write_page_index {
true => WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.build(),
false => WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.build(),
let writer_props_builder = match write_page_index {
true => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Page),
false => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Chunk),
};

// tune the size or pages to the data above
// to make sure we exercise code paths where all items in a page are null, etc.
let writer_props = writer_props_builder
.set_max_row_group_size(4)
.set_data_page_row_count_limit(2)
.set_write_batch_size(2)
Comment on lines +498 to +501
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recognize that this is forcing things a bit. I leaned towards this approach because (1) it felt like a small step from where we are now and (2) I'm not that familiar with the lower level APIs that would be required if we wanted to build a ParquetMetadata by hand and use that for tests instead of relying on file writing to build one for us. I think a reasonable piece of feedback to this PR would be that now is the time to refactor the tests to build the ParquetMetadata by hand since we now care about getting pages of a certain size, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thik this is reasonable

.build();

let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(writer_props)).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
Expand Down Expand Up @@ -554,8 +592,8 @@ mod tests {
assert_eq!(left.compressed_size(), right.compressed_size());
assert_eq!(left.data_page_offset(), right.data_page_offset());
assert_eq!(left.statistics(), right.statistics());
assert_eq!(left.offset_index_length(), right.offset_index_length());
assert_eq!(left.column_index_length(), right.column_index_length());
// assert_eq!(left.offset_index_length(), right.offset_index_length());
// assert_eq!(left.column_index_length(), right.column_index_length());
Copy link
Contributor Author

@adriangb adriangb Aug 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With just the changes to this file (so the new test data) this fails on master because of #6316, so I commented it out for now and if:

  1. Pass empty vectors as min/max for all null pages when building ColumnIndex #6316 gets merged first I can rebase and re-enable the code
  2. If this gets merged first then Pass empty vectors as min/max for all null pages when building ColumnIndex #6316 can uncomment and re-enable this code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re-enabled in 8eea1e5

assert_eq!(
left.unencoded_byte_array_data_bytes(),
right.unencoded_byte_array_data_bytes()
Expand Down Expand Up @@ -610,6 +648,29 @@ mod tests {
decoded_metadata.num_row_groups()
);

// check that the mins and maxes are what we expect for each page
// also indirectly checking that the pages were written out as we expected them to be laid out
// (if they're not, or something gets refactored in the future that breaks that assumption,
// this test may have to drop down to a lower level and create metadata directly instead of relying on
// writing an entire file)
let column_indexes = metadata.metadata.column_index().unwrap();
assert_eq!(column_indexes.len(), 6);
// make sure each row group has 2 pages by checking the first column
// page counts for each column for each row group, should all be the same and there should be
// 12 pages in total across 6 row groups / 1 column
let mut page_counts = vec![];
for row_group in column_indexes {
for column in row_group {
match column {
Index::INT32(column_index) => {
page_counts.push(column_index.indexes.len());
}
_ => panic!("unexpected column index type"),
}
}
}
assert_eq!(page_counts, vec![2; 6]);
Comment on lines +658 to +672
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless we refactor this test to create the metadata directly I think this is a good idea. Previously there was no assertion that we were creating more than 1 page or anything of the sort, since this is testing code that evidently diverges in code paths depending on how many pages there are, etc. it's good to at least have assertions that the data is laid out as we expect it to be.


metadata
.metadata
.row_groups()
Expand Down
23 changes: 10 additions & 13 deletions parquet/src/file/page_index/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,19 +227,16 @@ impl<T: ParquetValueType> NativeIndex<T> {
}

pub(crate) fn to_thrift(&self) -> ColumnIndex {
let min_values = self
.indexes
.iter()
.map(|x| x.min_bytes().map(|x| x.to_vec()))
.collect::<Option<Vec<_>>>()
.unwrap_or_else(|| vec![vec![]; self.indexes.len()]);

let max_values = self
.indexes
.iter()
.map(|x| x.max_bytes().map(|x| x.to_vec()))
.collect::<Option<Vec<_>>>()
.unwrap_or_else(|| vec![vec![]; self.indexes.len()]);
let mut min_values = vec![vec![]; self.indexes.len()];
let mut max_values = vec![vec![]; self.indexes.len()];
for (i, index) in self.indexes.iter().enumerate() {
if let Some(min) = index.min_bytes() {
min_values[i].extend_from_slice(min);
}
if let Some(max) = index.max_bytes() {
max_values[i].extend_from_slice(max);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know I'm responsible for this, but now I wonder if simply changing the above to something like:

        let min_values = self
            .indexes
            .iter()
            .map(|x| x.min_bytes().unwrap_or(&[]).to_vec())
            .collect::<Vec<_>>();

would be more rusty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup i think it would be, changed


let null_counts = self
.indexes
Expand Down
Loading