diff --git a/Cargo.lock b/Cargo.lock index 20850f1eacf0..3df9957f8056 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3128,6 +3128,7 @@ dependencies = [ "fs4", "futures", "glob", + "hashbrown", "home", "itoa", "memchr", diff --git a/crates/polars-io/Cargo.toml b/crates/polars-io/Cargo.toml index ef569e61519f..11f9a36dd7a6 100644 --- a/crates/polars-io/Cargo.toml +++ b/crates/polars-io/Cargo.toml @@ -28,6 +28,7 @@ fast-float = { workspace = true, optional = true } flate2 = { workspace = true, optional = true } futures = { workspace = true, optional = true } glob = { version = "0.3" } +hashbrown = { workspace = true } itoa = { workspace = true, optional = true } memchr = { workspace = true } memmap = { workspace = true } diff --git a/crates/polars-io/src/lib.rs b/crates/polars-io/src/lib.rs index 5aa6e7fcebab..f3540f4e13fd 100644 --- a/crates/polars-io/src/lib.rs +++ b/crates/polars-io/src/lib.rs @@ -1,6 +1,7 @@ #![cfg_attr(docsrs, feature(doc_auto_cfg))] #![cfg_attr(feature = "simd", feature(portable_simd))] #![allow(ambiguous_glob_reexports)] +extern crate core; #[cfg(feature = "avro")] pub mod avro; diff --git a/crates/polars-io/src/parquet/read/metadata.rs b/crates/polars-io/src/parquet/read/metadata.rs new file mode 100644 index 000000000000..1b9ac9d4c8e3 --- /dev/null +++ b/crates/polars-io/src/parquet/read/metadata.rs @@ -0,0 +1,52 @@ +use hashbrown::hash_map::RawEntryMut; +use polars_parquet::read::{ColumnChunkMetaData, RowGroupMetaData}; +use polars_utils::aliases::{PlHashMap, PlHashSet}; +use polars_utils::idx_vec::UnitVec; +use polars_utils::unitvec; + +pub(super) struct ColumnToColumnChunkMD<'a> { + partitions: PlHashMap>, + pub metadata: &'a RowGroupMetaData, +} + +impl<'a> ColumnToColumnChunkMD<'a> { + pub(super) fn new(metadata: &'a RowGroupMetaData) -> Self { + Self { + partitions: Default::default(), + metadata, + } + } + + pub(super) fn set_partitions(&mut self, field_names: Option<&PlHashSet<&str>>) { + for (i, ccmd) in self.metadata.columns().iter().enumerate() { + let name = &ccmd.descriptor().path_in_schema[0]; + if field_names + .map(|field_names| field_names.contains(name.as_str())) + .unwrap_or(true) + { + let entry = self.partitions.raw_entry_mut().from_key(name.as_str()); + + match entry { + RawEntryMut::Vacant(slot) => { + slot.insert(name.to_string(), unitvec![i]); + }, + RawEntryMut::Occupied(mut slot) => { + slot.get_mut().push(i); + }, + }; + } + } + } + + pub(super) fn get_partitions(&self, name: &str) -> UnitVec<&ColumnChunkMetaData> { + debug_assert!( + !self.partitions.is_empty(), + "fields should be partitioned first" + ); + let columns = self.metadata.columns(); + self.partitions + .get(name) + .map(|idx| idx.iter().map(|i| &columns[*i]).collect::>()) + .unwrap_or_default() + } +} diff --git a/crates/polars-io/src/parquet/read/mmap.rs b/crates/polars-io/src/parquet/read/mmap.rs index 0aba7a9841d1..84725fd7a2e1 100644 --- a/crates/polars-io/src/parquet/read/mmap.rs +++ b/crates/polars-io/src/parquet/read/mmap.rs @@ -6,8 +6,7 @@ use bytes::Bytes; use polars_core::datatypes::PlHashMap; use polars_error::PolarsResult; use polars_parquet::read::{ - column_iter_to_arrays, get_field_columns, BasicDecompressor, ColumnChunkMetaData, Filter, - PageReader, + column_iter_to_arrays, BasicDecompressor, ColumnChunkMetaData, Filter, PageReader, }; use polars_utils::mmap::{MemReader, MemSlice}; @@ -32,11 +31,10 @@ pub enum ColumnStore { /// For cloud files the relevant memory regions should have been prefetched. pub(super) fn mmap_columns<'a>( store: &'a ColumnStore, - columns: &'a [ColumnChunkMetaData], - field_name: &str, + field_columns: &'a [&ColumnChunkMetaData], ) -> Vec<(&'a ColumnChunkMetaData, MemSlice)> { - get_field_columns(columns, field_name) - .into_iter() + field_columns + .iter() .map(|meta| _mmap_single_column(store, meta)) .collect() } diff --git a/crates/polars-io/src/parquet/read/mod.rs b/crates/polars-io/src/parquet/read/mod.rs index 14c24bce12ac..826f486efb59 100644 --- a/crates/polars-io/src/parquet/read/mod.rs +++ b/crates/polars-io/src/parquet/read/mod.rs @@ -16,6 +16,7 @@ #[cfg(feature = "cloud")] mod async_impl; +mod metadata; mod mmap; mod options; mod predicates; diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 833bc3a1e54b..9e2c8541813f 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -9,7 +9,9 @@ use polars_core::utils::{accumulate_dataframes_vertical, split_df}; use polars_core::POOL; use polars_parquet::parquet::error::ParquetResult; use polars_parquet::parquet::statistics::Statistics; -use polars_parquet::read::{self, FileMetaData, Filter, PhysicalType, RowGroupMetaData}; +use polars_parquet::read::{ + self, ColumnChunkMetaData, FileMetaData, Filter, PhysicalType, RowGroupMetaData, +}; use polars_utils::mmap::MemSlice; use polars_utils::vec::inplace_zip_filtermap; use rayon::prelude::*; @@ -24,6 +26,7 @@ use super::{mmap, ParallelStrategy}; use crate::hive::materialize_hive_partitions; use crate::mmap::{MmapBytesReader, ReaderBytes}; use crate::parquet::metadata::FileMetaDataRef; +use crate::parquet::read::metadata::ColumnToColumnChunkMD; use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR; use crate::predicates::{apply_predicate, PhysicalIoExpr}; use crate::utils::get_reader_bytes; @@ -58,7 +61,8 @@ fn assert_dtypes(data_type: &ArrowDataType) { fn column_idx_to_series( column_i: usize, - md: &RowGroupMetaData, + // The metadata belonging to this column + field_md: &[&ColumnChunkMetaData], filter: Option, file_schema: &ArrowSchema, store: &mmap::ColumnStore, @@ -69,8 +73,7 @@ fn column_idx_to_series( { assert_dtypes(field.data_type()) } - - let columns = mmap_columns(store, md.columns(), &field.name); + let columns = mmap_columns(store, field_md); let stats = columns .iter() .map(|(col_md, _)| col_md.statistics().transpose()) @@ -203,6 +206,24 @@ fn rg_to_dfs( } } +/// Collect a HashSet of the projected columns. +/// Returns `None` if all columns are projected. +fn projected_columns_set<'a>( + schema: &'a ArrowSchema, + projection: &[usize], +) -> Option> { + if projection.len() == schema.len() { + None + } else { + Some( + projection + .iter() + .map(|i| schema.fields[*i].name.as_str()) + .collect::>(), + ) + } +} + #[allow(clippy::too_many_arguments)] fn rg_to_dfs_prefiltered( store: &mmap::ColumnStore, @@ -278,15 +299,35 @@ fn rg_to_dfs_prefiltered( debug_assert_eq!(dead_idx_to_col_idx.len(), num_dead_columns); POOL.install(|| { + // Set partitioned fields to prevent quadratic behavior. + // Ensure all row groups are partitioned. + + // Hashset, because row-groups will be modified in place later, so we cannot trust the indexes. + let part_md = { + let projected_columns = projected_columns_set(schema, projection); + + row_groups + .par_iter() + .map(|rg_info| { + let md = &file_metadata.row_groups[rg_info.index as usize]; + let mut part_md = ColumnToColumnChunkMD::new(md); + part_md.set_partitions(projected_columns.as_ref()); + (rg_info.index, part_md) + }) + .collect::>() + }; + // Collect the data for the live columns let mut live_columns = (0..row_groups.len() * num_live_columns) .into_par_iter() .map(|i| { let col_idx = live_idx_to_col_idx[i % num_live_columns]; - let rg_idx = row_groups[i / num_live_columns].index as usize; - let md = &file_metadata.row_groups[rg_idx]; - column_idx_to_series(col_idx, md, None, schema, store) + let name = &schema.fields[col_idx].name; + let rg_idx = row_groups[i / num_live_columns].index; + let field_md = part_md.get(&rg_idx).unwrap().get_partitions(name); + + column_idx_to_series(col_idx, field_md.as_slice(), None, schema, store) }) .collect::>>()?; @@ -390,15 +431,22 @@ fn rg_to_dfs_prefiltered( .into_par_iter() .map(|i| { let col_idx = dead_idx_to_col_idx[i % num_dead_columns]; - let rg_idx = row_groups[i / num_dead_columns].index as usize; + let name = &schema.fields[col_idx].name; let (mask, _) = &dfs[i / num_dead_columns]; - let md = &file_metadata.row_groups[rg_idx]; - debug_assert_eq!(md.num_rows(), mask.len()); + let rg_idx = row_groups[i / num_dead_columns].index; + + #[cfg(debug_assertions)] + { + let md = &file_metadata.row_groups[rg_idx as usize]; + debug_assert_eq!(md.num_rows(), mask.len()); + } + let field_md = part_md.get(&rg_idx).unwrap().get_partitions(name); + column_idx_to_series( col_idx, - md, + field_md.as_slice(), Some(Filter::new_masked(mask.clone())), schema, store, @@ -470,14 +518,22 @@ fn rg_to_dfs_optionally_par_over_columns( assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err()) } + // Set partitioned fields to prevent quadratic behavior. + let projected_columns = projected_columns_set(schema, projection); + let mut part_md = ColumnToColumnChunkMD::new(md); + part_md.set_partitions(projected_columns.as_ref()); + let columns = if let ParallelStrategy::Columns = parallel { POOL.install(|| { projection .par_iter() .map(|column_i| { + let name = &schema.fields[*column_i].name; + let part = part_md.get_partitions(name); + column_idx_to_series( *column_i, - md, + part.as_slice(), Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)), schema, store, @@ -489,9 +545,12 @@ fn rg_to_dfs_optionally_par_over_columns( projection .iter() .map(|column_i| { + let name = &schema.fields[*column_i].name; + let part = part_md.get_partitions(name); + column_idx_to_series( *column_i, - md, + part.as_slice(), Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)), schema, store, @@ -566,9 +625,24 @@ fn rg_to_dfs_par_over_rg( } let dfs = POOL.install(|| { + // Set partitioned fields to prevent quadratic behavior. + // Ensure all row groups are partitioned. + let part_md = { + let projected_columns = projected_columns_set(schema, projection); + row_groups + .par_iter() + .map(|(_, rg, _, _)| { + let mut ccmd = ColumnToColumnChunkMD::new(rg); + ccmd.set_partitions(projected_columns.as_ref()); + ccmd + }) + .collect::>() + }; + row_groups .into_par_iter() - .map(|(rg_idx, md, slice, row_count_start)| { + .enumerate() + .map(|(iter_idx, (rg_idx, _md, slice, row_count_start))| { if slice.1 == 0 || use_statistics && !read_this_row_group( @@ -588,9 +662,12 @@ fn rg_to_dfs_par_over_rg( let columns = projection .iter() .map(|column_i| { + let name = &schema.fields[*column_i].name; + let field_md = part_md[iter_idx].get_partitions(name); + column_idx_to_series( *column_i, - md, + field_md.as_slice(), Some(Filter::new_ranged(slice.0, slice.0 + slice.1)), schema, store, diff --git a/crates/polars-parquet/src/parquet/read/column/mod.rs b/crates/polars-parquet/src/parquet/read/column/mod.rs index 1a1277637f27..d6bcda08fe2d 100644 --- a/crates/polars-parquet/src/parquet/read/column/mod.rs +++ b/crates/polars-parquet/src/parquet/read/column/mod.rs @@ -1,4 +1,3 @@ -use std::io::{Read, Seek}; use std::vec::IntoIter; use super::{get_field_columns, get_page_iterator, MemReader, PageReader}; @@ -7,10 +6,6 @@ use crate::parquet::metadata::{ColumnChunkMetaData, RowGroupMetaData}; use crate::parquet::page::CompressedPage; use crate::parquet::schema::types::ParquetType; -#[cfg(feature = "async")] -#[cfg_attr(docsrs, doc(cfg(feature = "async")))] -mod stream; - /// Returns a [`ColumnIterator`] of column chunks corresponding to `field`. /// /// Contrarily to [`get_page_iterator`] that returns a single iterator of pages, this iterator @@ -149,38 +144,3 @@ impl MutStreamingIterator for ReadColumnIterator { self.current.as_mut() } } - -/// Reads all columns that are part of the parquet field `field_name` -/// # Implementation -/// This operation is IO-bounded `O(C)` where C is the number of columns associated to -/// the field (one for non-nested types) -/// It reads the columns sequentially. Use [`read_column`] to fork this operation to multiple -/// readers. -pub fn read_columns<'a, R: Read + Seek>( - reader: &mut R, - columns: &'a [ColumnChunkMetaData], - field_name: &'a str, -) -> Result)>, ParquetError> { - get_field_columns(columns, field_name) - .map(|column| read_column(reader, column).map(|c| (column, c))) - .collect() -} - -/// Reads a column chunk into memory -/// This operation is IO-bounded and allocates the column's `compressed_size`. -pub fn read_column(reader: &mut R, column: &ColumnChunkMetaData) -> Result, ParquetError> -where - R: Read + Seek, -{ - let (start, length) = column.byte_range(); - reader.seek(std::io::SeekFrom::Start(start))?; - - let mut chunk = vec![]; - chunk.try_reserve(length as usize)?; - reader.by_ref().take(length).read_to_end(&mut chunk)?; - Ok(chunk) -} - -#[cfg(feature = "async")] -#[cfg_attr(docsrs, doc(cfg(feature = "async")))] -pub use stream::{read_column_async, read_columns_async}; diff --git a/crates/polars-parquet/src/parquet/read/column/stream.rs b/crates/polars-parquet/src/parquet/read/column/stream.rs deleted file mode 100644 index 63319d2260c6..000000000000 --- a/crates/polars-parquet/src/parquet/read/column/stream.rs +++ /dev/null @@ -1,51 +0,0 @@ -use futures::future::{try_join_all, BoxFuture}; -use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; - -use crate::parquet::error::ParquetError; -use crate::parquet::metadata::ColumnChunkMetaData; -use crate::parquet::read::get_field_columns; - -/// Reads a single column chunk into memory asynchronously -pub async fn read_column_async<'b, R, F>( - factory: F, - meta: &ColumnChunkMetaData, -) -> Result, ParquetError> -where - R: AsyncRead + AsyncSeek + Send + Unpin, - F: Fn() -> BoxFuture<'b, std::io::Result>, -{ - let mut reader = factory().await?; - let (start, length) = meta.byte_range(); - reader.seek(std::io::SeekFrom::Start(start)).await?; - - let mut chunk = vec![]; - chunk.try_reserve(length as usize)?; - reader.take(length).read_to_end(&mut chunk).await?; - Result::Ok(chunk) -} - -/// Reads all columns that are part of the parquet field `field_name` -/// # Implementation -/// This operation is IO-bounded `O(C)` where C is the number of columns associated to -/// the field (one for non-nested types) -/// -/// It does so asynchronously via a single `join_all` over all the necessary columns for -/// `field_name`. -pub async fn read_columns_async< - 'a, - 'b, - R: AsyncRead + AsyncSeek + Send + Unpin, - F: Fn() -> BoxFuture<'b, std::io::Result> + Clone, ->( - factory: F, - columns: &'a [ColumnChunkMetaData], - field_name: &'a str, -) -> Result)>, ParquetError> { - let fields = get_field_columns(columns, field_name).collect::>(); - let futures = fields - .iter() - .map(|meta| async { read_column_async(factory.clone(), meta).await }); - - let columns = try_join_all(futures).await?; - Ok(fields.into_iter().zip(columns).collect()) -}