From dd1fc86b65ae39b741f46edc6da01d024bed50b6 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sat, 24 Aug 2024 13:44:57 +0200 Subject: [PATCH] perf: Partition metadata for parquet statistic loading (#18343) --- .../polars-io/src/parquet/read/async_impl.rs | 14 +++- crates/polars-io/src/parquet/read/metadata.rs | 31 ++++---- crates/polars-io/src/parquet/read/mod.rs | 1 + .../polars-io/src/parquet/read/predicates.rs | 25 ++++--- .../polars-io/src/parquet/read/read_impl.rs | 75 +++++++++---------- crates/polars-parquet/src/arrow/read/mod.rs | 12 --- .../src/arrow/read/statistics/mod.rs | 10 +-- .../polars-stream/src/nodes/parquet_source.rs | 29 ++++++- .../polars/tests/it/io/parquet/arrow/mod.rs | 16 +--- .../polars/tests/it/io/parquet/arrow/write.rs | 13 +--- 10 files changed, 116 insertions(+), 110 deletions(-) diff --git a/crates/polars-io/src/parquet/read/async_impl.rs b/crates/polars-io/src/parquet/read/async_impl.rs index 97e4829581bc..812011af48bf 100644 --- a/crates/polars-io/src/parquet/read/async_impl.rs +++ b/crates/polars-io/src/parquet/read/async_impl.rs @@ -18,6 +18,7 @@ use crate::cloud::{ build_object_store, object_path_from_str, CloudLocation, CloudOptions, PolarsObjectStore, }; use crate::parquet::metadata::FileMetaDataRef; +use crate::parquet::read::metadata::PartitionedColumnChunkMD; use crate::pl_async::get_runtime; use crate::predicates::PhysicalIoExpr; @@ -277,8 +278,19 @@ impl FetchRowGroupsFromObjectStore { row_group_range .filter_map(|i| { let rg = &row_groups[i]; + + // TODO! + // Optimize this. Now we partition the predicate columns twice. (later on reading as well) + // I think we must add metadata context where we can cache and amortize the partitioning. + let mut part_md = PartitionedColumnChunkMD::new(rg); + let live = pred.live_variables(); + part_md.set_partitions( + live.as_ref() + .map(|vars| vars.iter().map(|s| s.as_ref()).collect::>()) + .as_ref(), + ); let should_be_read = - matches!(read_this_row_group(Some(pred), rg, &schema), Ok(true)); + matches!(read_this_row_group(Some(pred), &part_md, &schema), Ok(true)); // Already add the row groups that will be skipped to the prefetched data. if !should_be_read { diff --git a/crates/polars-io/src/parquet/read/metadata.rs b/crates/polars-io/src/parquet/read/metadata.rs index 1b9ac9d4c8e3..8f1a2c1642c8 100644 --- a/crates/polars-io/src/parquet/read/metadata.rs +++ b/crates/polars-io/src/parquet/read/metadata.rs @@ -4,27 +4,34 @@ use polars_utils::aliases::{PlHashMap, PlHashSet}; use polars_utils::idx_vec::UnitVec; use polars_utils::unitvec; -pub(super) struct ColumnToColumnChunkMD<'a> { - partitions: PlHashMap>, - pub metadata: &'a RowGroupMetaData, +/// This is a utility struct that Partitions the `ColumnChunkMetaData` by `field.name == descriptor.path_in_schema[0]` +/// This is required to fix quadratic behavior in wide parquet files. See #18319. +pub struct PartitionedColumnChunkMD<'a> { + partitions: Option>>, + metadata: &'a RowGroupMetaData, } -impl<'a> ColumnToColumnChunkMD<'a> { - pub(super) fn new(metadata: &'a RowGroupMetaData) -> Self { +impl<'a> PartitionedColumnChunkMD<'a> { + pub fn new(metadata: &'a RowGroupMetaData) -> Self { Self { partitions: Default::default(), metadata, } } - pub(super) fn set_partitions(&mut self, field_names: Option<&PlHashSet<&str>>) { + pub(super) fn num_rows(&self) -> usize { + self.metadata.num_rows() + } + + pub fn set_partitions(&mut self, field_names: Option<&PlHashSet<&str>>) { + let mut partitions = PlHashMap::default(); for (i, ccmd) in self.metadata.columns().iter().enumerate() { let name = &ccmd.descriptor().path_in_schema[0]; if field_names .map(|field_names| field_names.contains(name.as_str())) .unwrap_or(true) { - let entry = self.partitions.raw_entry_mut().from_key(name.as_str()); + let entry = partitions.raw_entry_mut().from_key(name.as_str()); match entry { RawEntryMut::Vacant(slot) => { @@ -36,17 +43,15 @@ impl<'a> ColumnToColumnChunkMD<'a> { }; } } + self.partitions = Some(partitions) } - pub(super) fn get_partitions(&self, name: &str) -> UnitVec<&ColumnChunkMetaData> { - debug_assert!( - !self.partitions.is_empty(), - "fields should be partitioned first" - ); + pub fn get_partitions(&self, name: &str) -> Option> { let columns = self.metadata.columns(); self.partitions + .as_ref() + .expect("fields should be partitioned first") .get(name) .map(|idx| idx.iter().map(|i| &columns[*i]).collect::>()) - .unwrap_or_default() } } diff --git a/crates/polars-io/src/parquet/read/mod.rs b/crates/polars-io/src/parquet/read/mod.rs index 826f486efb59..b6b337c3ff6e 100644 --- a/crates/polars-io/src/parquet/read/mod.rs +++ b/crates/polars-io/src/parquet/read/mod.rs @@ -40,6 +40,7 @@ pub use reader::{BatchedParquetReader, ParquetReader}; pub use utils::materialize_empty_df; pub mod _internal { + pub use super::metadata::PartitionedColumnChunkMD; pub use super::mmap::to_deserializer; pub use super::predicates::read_this_row_group; } diff --git a/crates/polars-io/src/parquet/read/predicates.rs b/crates/polars-io/src/parquet/read/predicates.rs index e7078641e24d..565ef53f4edd 100644 --- a/crates/polars-io/src/parquet/read/predicates.rs +++ b/crates/polars-io/src/parquet/read/predicates.rs @@ -1,8 +1,7 @@ -use arrow::datatypes::ArrowSchemaRef; use polars_core::prelude::*; use polars_parquet::read::statistics::{deserialize, Statistics}; -use polars_parquet::read::RowGroupMetaData; +use crate::parquet::read::metadata::PartitionedColumnChunkMD; use crate::predicates::{BatchStats, ColumnStats, PhysicalIoExpr}; impl ColumnStats { @@ -16,17 +15,21 @@ impl ColumnStats { } } -/// Collect the statistics in a column chunk. +/// Collect the statistics in a row-group pub(crate) fn collect_statistics( - md: &RowGroupMetaData, + part_md: &PartitionedColumnChunkMD, schema: &ArrowSchema, ) -> PolarsResult> { + // TODO! fix this performance. This is a full sequential scan. let stats = schema .fields .iter() - .map(|field| { - let st = deserialize(field, md)?; - Ok(ColumnStats::from_arrow_stats(st, field)) + .map(|field| match part_md.get_partitions(&field.name) { + Some(md) => { + let st = deserialize(field, &md)?; + Ok(ColumnStats::from_arrow_stats(st, field)) + }, + None => Ok(ColumnStats::new(field.into(), None, None, None)), }) .collect::>>()?; @@ -37,18 +40,18 @@ pub(crate) fn collect_statistics( Ok(Some(BatchStats::new( Arc::new(schema.into()), stats, - Some(md.num_rows()), + Some(part_md.num_rows()), ))) } pub fn read_this_row_group( predicate: Option<&dyn PhysicalIoExpr>, - md: &RowGroupMetaData, - schema: &ArrowSchemaRef, + part_md: &PartitionedColumnChunkMD, + schema: &ArrowSchema, ) -> PolarsResult { if let Some(pred) = predicate { if let Some(pred) = pred.as_stats_evaluator() { - if let Some(stats) = collect_statistics(md, schema)? { + if let Some(stats) = collect_statistics(part_md, schema)? { let should_read = pred.should_read(&stats); // a parquet file may not have statistics of all columns if matches!(should_read, Ok(false)) { diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 9e2c8541813f..d0b1845cc8c5 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -26,7 +26,7 @@ use super::{mmap, ParallelStrategy}; use crate::hive::materialize_hive_partitions; use crate::mmap::{MmapBytesReader, ReaderBytes}; use crate::parquet::metadata::FileMetaDataRef; -use crate::parquet::read::metadata::ColumnToColumnChunkMD; +use crate::parquet::read::metadata::PartitionedColumnChunkMD; use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR; use crate::predicates::{apply_predicate, PhysicalIoExpr}; use crate::utils::get_reader_bytes; @@ -248,9 +248,24 @@ fn rg_to_dfs_prefiltered( polars_bail!(ComputeError: "Parquet file contains too many row groups (> {})", u32::MAX); } + let projected_columns = projected_columns_set(schema, projection); + + let part_mds = POOL.install(|| { + file_metadata + .row_groups + .par_iter() + .map(|rg| { + let mut part_md = PartitionedColumnChunkMD::new(rg); + part_md.set_partitions(projected_columns.as_ref()); + part_md + }) + .collect::>() + }); + let mut row_offset = *previous_row_count; let mut row_groups: Vec = (row_group_start..row_group_end) .filter_map(|index| { + let part_md = &part_mds[index]; let md = &file_metadata.row_groups[index]; let current_offset = row_offset; @@ -258,8 +273,7 @@ fn rg_to_dfs_prefiltered( row_offset += current_row_count; if use_statistics { - match read_this_row_group(Some(predicate), &file_metadata.row_groups[index], schema) - { + match read_this_row_group(Some(predicate), part_md, schema) { Ok(false) => return None, Ok(true) => {}, Err(e) => return Some(Err(e)), @@ -302,21 +316,6 @@ fn rg_to_dfs_prefiltered( // Set partitioned fields to prevent quadratic behavior. // Ensure all row groups are partitioned. - // Hashset, because row-groups will be modified in place later, so we cannot trust the indexes. - let part_md = { - let projected_columns = projected_columns_set(schema, projection); - - row_groups - .par_iter() - .map(|rg_info| { - let md = &file_metadata.row_groups[rg_info.index as usize]; - let mut part_md = ColumnToColumnChunkMD::new(md); - part_md.set_partitions(projected_columns.as_ref()); - (rg_info.index, part_md) - }) - .collect::>() - }; - // Collect the data for the live columns let mut live_columns = (0..row_groups.len() * num_live_columns) .into_par_iter() @@ -325,7 +324,7 @@ fn rg_to_dfs_prefiltered( let name = &schema.fields[col_idx].name; let rg_idx = row_groups[i / num_live_columns].index; - let field_md = part_md.get(&rg_idx).unwrap().get_partitions(name); + let field_md = part_mds[rg_idx as usize].get_partitions(name).unwrap(); column_idx_to_series(col_idx, field_md.as_slice(), None, schema, store) }) @@ -442,7 +441,7 @@ fn rg_to_dfs_prefiltered( let md = &file_metadata.row_groups[rg_idx as usize]; debug_assert_eq!(md.num_rows(), mask.len()); } - let field_md = part_md.get(&rg_idx).unwrap().get_partitions(name); + let field_md = part_mds[rg_idx as usize].get_partitions(name).unwrap(); column_idx_to_series( col_idx, @@ -502,13 +501,17 @@ fn rg_to_dfs_optionally_par_over_columns( for rg_idx in row_group_start..row_group_end { let md = &file_metadata.row_groups[rg_idx]; + + // Set partitioned fields to prevent quadratic behavior. + let projected_columns = projected_columns_set(schema, projection); + let mut part_md = PartitionedColumnChunkMD::new(md); + part_md.set_partitions(projected_columns.as_ref()); + let rg_slice = split_slice_at_file(&mut n_rows_processed, md.num_rows(), slice.0, slice_end); let current_row_count = md.num_rows() as IdxSize; - if use_statistics - && !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)? - { + if use_statistics && !read_this_row_group(predicate, &part_md, schema)? { *previous_row_count += rg_slice.1 as IdxSize; continue; } @@ -518,18 +521,13 @@ fn rg_to_dfs_optionally_par_over_columns( assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err()) } - // Set partitioned fields to prevent quadratic behavior. - let projected_columns = projected_columns_set(schema, projection); - let mut part_md = ColumnToColumnChunkMD::new(md); - part_md.set_partitions(projected_columns.as_ref()); - let columns = if let ParallelStrategy::Columns = parallel { POOL.install(|| { projection .par_iter() .map(|column_i| { let name = &schema.fields[*column_i].name; - let part = part_md.get_partitions(name); + let part = part_md.get_partitions(name).unwrap(); column_idx_to_series( *column_i, @@ -546,7 +544,7 @@ fn rg_to_dfs_optionally_par_over_columns( .iter() .map(|column_i| { let name = &schema.fields[*column_i].name; - let part = part_md.get_partitions(name); + let part = part_md.get_partitions(name).unwrap(); column_idx_to_series( *column_i, @@ -627,12 +625,12 @@ fn rg_to_dfs_par_over_rg( let dfs = POOL.install(|| { // Set partitioned fields to prevent quadratic behavior. // Ensure all row groups are partitioned. - let part_md = { + let part_mds = { let projected_columns = projected_columns_set(schema, projection); row_groups .par_iter() .map(|(_, rg, _, _)| { - let mut ccmd = ColumnToColumnChunkMD::new(rg); + let mut ccmd = PartitionedColumnChunkMD::new(rg); ccmd.set_partitions(projected_columns.as_ref()); ccmd }) @@ -642,14 +640,11 @@ fn rg_to_dfs_par_over_rg( row_groups .into_par_iter() .enumerate() - .map(|(iter_idx, (rg_idx, _md, slice, row_count_start))| { + .map(|(iter_idx, (_rg_idx, _md, slice, row_count_start))| { + let part_md = &part_mds[iter_idx]; + if slice.1 == 0 - || use_statistics - && !read_this_row_group( - predicate, - &file_metadata.row_groups[rg_idx], - schema, - )? + || use_statistics && !read_this_row_group(predicate, part_md, schema)? { return Ok(None); } @@ -663,7 +658,7 @@ fn rg_to_dfs_par_over_rg( .iter() .map(|column_i| { let name = &schema.fields[*column_i].name; - let field_md = part_md[iter_idx].get_partitions(name); + let field_md = part_md.get_partitions(name).unwrap(); column_idx_to_series( *column_i, diff --git a/crates/polars-parquet/src/arrow/read/mod.rs b/crates/polars-parquet/src/arrow/read/mod.rs index 102de7845eaf..9c445d7a46ce 100644 --- a/crates/polars-parquet/src/arrow/read/mod.rs +++ b/crates/polars-parquet/src/arrow/read/mod.rs @@ -38,18 +38,6 @@ pub use crate::parquet::{ FallibleStreamingIterator, }; -/// Returns all [`ColumnChunkMetaData`] associated to `field_name`. -/// For non-nested parquet types, this returns a single column -pub fn get_field_columns<'a>( - columns: &'a [ColumnChunkMetaData], - field_name: &str, -) -> Vec<&'a ColumnChunkMetaData> { - columns - .iter() - .filter(|x| x.descriptor().path_in_schema[0] == field_name) - .collect() -} - /// Returns all [`ColumnChunkMetaData`] associated to `field_name`. /// For non-nested parquet types, this returns a single column pub fn get_field_pages<'a, T>( diff --git a/crates/polars-parquet/src/arrow/read/statistics/mod.rs b/crates/polars-parquet/src/arrow/read/statistics/mod.rs index ea3b34fb8631..0face3c8b358 100644 --- a/crates/polars-parquet/src/arrow/read/statistics/mod.rs +++ b/crates/polars-parquet/src/arrow/read/statistics/mod.rs @@ -8,12 +8,12 @@ use arrow::with_match_primitive_type_full; use ethnum::I256; use polars_error::{polars_bail, PolarsResult}; -use crate::parquet::metadata::RowGroupMetaData; use crate::parquet::schema::types::{ PhysicalType as ParquetPhysicalType, PrimitiveType as ParquetPrimitiveType, }; use crate::parquet::statistics::{PrimitiveStatistics, Statistics as ParquetStatistics}; use crate::parquet::types::int96_to_i64_ns; +use crate::read::ColumnChunkMetaData; mod binary; mod binview; @@ -28,7 +28,6 @@ mod struct_; mod utf8; use self::list::DynMutableListArray; -use super::get_field_columns; /// Arrow-deserialized parquet Statistics of a file #[derive(Debug, PartialEq)] @@ -543,12 +542,11 @@ fn push( /// /// # Errors /// This function errors if the deserialization of the statistics fails (e.g. invalid utf8) -pub fn deserialize(field: &Field, row_group: &RowGroupMetaData) -> PolarsResult { +pub fn deserialize(field: &Field, field_md: &[&ColumnChunkMetaData]) -> PolarsResult { let mut statistics = MutableStatistics::try_new(field)?; - let columns = get_field_columns(row_group.columns(), field.name.as_ref()); - let mut stats = columns - .into_iter() + let mut stats = field_md + .iter() .map(|column| { Ok(( column.statistics().transpose()?, diff --git a/crates/polars-stream/src/nodes/parquet_source.rs b/crates/polars-stream/src/nodes/parquet_source.rs index 019afcb63ec4..3ae391831702 100644 --- a/crates/polars-stream/src/nodes/parquet_source.rs +++ b/crates/polars-stream/src/nodes/parquet_source.rs @@ -27,6 +27,7 @@ use polars_parquet::read::RowGroupMetaData; use polars_plan::plans::hive::HivePartitions; use polars_plan::plans::FileInfo; use polars_plan::prelude::FileScanOptions; +use polars_utils::aliases::PlHashSet; use polars_utils::mmap::MemSlice; use polars_utils::slice::GetSaferUnchecked; use polars_utils::IdxSize; @@ -1050,6 +1051,28 @@ struct RowGroupDataFetcher { current_shared_file_state: Arc>, } +fn read_this_row_group( + rg_md: &RowGroupMetaData, + predicate: Option<&dyn PhysicalIoExpr>, + reader_schema: &ArrowSchema, +) -> PolarsResult { + let Some(pred) = predicate else { + return Ok(true); + }; + use polars_io::prelude::_internal::*; + // TODO! + // Optimize this. Now we partition the predicate columns twice. (later on reading as well) + // I think we must add metadata context where we can cache and amortize the partitioning. + let mut part_md = PartitionedColumnChunkMD::new(rg_md); + let live = pred.live_variables(); + part_md.set_partitions( + live.as_ref() + .map(|vars| vars.iter().map(|s| s.as_ref()).collect::>()) + .as_ref(), + ); + read_this_row_group(Some(pred), &part_md, reader_schema) +} + impl RowGroupDataFetcher { fn into_stream(self) -> RowGroupDataStream { RowGroupDataStream::new(self) @@ -1089,10 +1112,10 @@ impl RowGroupDataFetcher { self.current_row_group_idx += 1; if self.use_statistics - && !match polars_io::prelude::_internal::read_this_row_group( - self.predicate.as_deref(), + && !match read_this_row_group( &row_group_metadata, - &self.reader_schema, + self.predicate.as_deref(), + self.reader_schema.as_ref(), ) { Ok(v) => v, Err(e) => return Some(Err(e)), diff --git a/crates/polars/tests/it/io/parquet/arrow/mod.rs b/crates/polars/tests/it/io/parquet/arrow/mod.rs index d12785db1cc2..f5e0b2e39e3d 100644 --- a/crates/polars/tests/it/io/parquet/arrow/mod.rs +++ b/crates/polars/tests/it/io/parquet/arrow/mod.rs @@ -17,8 +17,6 @@ use polars_parquet::write::*; use super::read::file::FileReader; -type ArrayStats = (Box, Statistics); - fn new_struct( arrays: Vec>, names: Vec, @@ -32,23 +30,17 @@ fn new_struct( StructArray::new(ArrowDataType::Struct(fields), arrays, validity) } -pub fn read_column(mut reader: R, column: &str) -> PolarsResult { +pub fn read_column(mut reader: R, column: &str) -> PolarsResult> { let metadata = p_read::read_metadata(&mut reader)?; let schema = p_read::infer_schema(&metadata)?; - let row_group = &metadata.row_groups[0]; - let schema = schema.filter(|_, f| f.name == column); - let field = &schema.fields[0]; - - let statistics = deserialize(field, row_group)?; - let mut reader = FileReader::new(reader, metadata.row_groups, schema, None); let array = reader.next().unwrap()?.into_arrays().pop().unwrap(); - Ok((array, statistics)) + Ok(array) } pub fn pyarrow_nested_edge(column: &str) -> Box { @@ -1290,10 +1282,6 @@ fn integration_read(data: &[u8], limit: Option) -> PolarsResult, ) -> PolarsResult<()> { - round_trip_opt_stats(column, file, version, compression, encodings, true) + round_trip_opt_stats(column, file, version, compression, encodings) } fn round_trip_opt_stats( @@ -18,9 +18,8 @@ fn round_trip_opt_stats( version: Version, compression: CompressionOptions, encodings: Vec, - check_stats: bool, ) -> PolarsResult<()> { - let (array, statistics) = match file { + let (array, _statistics) = match file { "nested" => ( pyarrow_nested_nullable(column), pyarrow_nested_nullable_statistics(column), @@ -68,12 +67,9 @@ fn round_trip_opt_stats( std::fs::write("list_struct_list_nullable.parquet", &data).unwrap(); - let (result, stats) = read_column(&mut Cursor::new(data), "a1")?; + let result = read_column(&mut Cursor::new(data), "a1")?; assert_eq!(array.as_ref(), result.as_ref()); - if check_stats { - assert_eq!(statistics, stats); - } Ok(()) } @@ -364,7 +360,6 @@ fn list_nested_inner_required_required_i64() -> PolarsResult<()> { Version::V1, CompressionOptions::Uncompressed, vec![Encoding::Plain], - false, ) } @@ -376,7 +371,6 @@ fn v1_nested_struct_list_nullable() -> PolarsResult<()> { Version::V1, CompressionOptions::Uncompressed, vec![Encoding::Plain], - true, ) } @@ -388,7 +382,6 @@ fn v1_nested_list_struct_list_nullable() -> PolarsResult<()> { Version::V1, CompressionOptions::Uncompressed, vec![Encoding::Plain], - true, ) }