Skip to content

Commit

Permalink
perf: Fix accidental quadratic parquet metadata (#18327)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Aug 23, 2024
1 parent ffb66aa commit d6703c4
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 112 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ fast-float = { workspace = true, optional = true }
flate2 = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
glob = { version = "0.3" }
hashbrown = { workspace = true }
itoa = { workspace = true, optional = true }
memchr = { workspace = true }
memmap = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![cfg_attr(feature = "simd", feature(portable_simd))]
#![allow(ambiguous_glob_reexports)]
extern crate core;

#[cfg(feature = "avro")]
pub mod avro;
Expand Down
52 changes: 52 additions & 0 deletions crates/polars-io/src/parquet/read/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use hashbrown::hash_map::RawEntryMut;
use polars_parquet::read::{ColumnChunkMetaData, RowGroupMetaData};
use polars_utils::aliases::{PlHashMap, PlHashSet};
use polars_utils::idx_vec::UnitVec;
use polars_utils::unitvec;

pub(super) struct ColumnToColumnChunkMD<'a> {
partitions: PlHashMap<String, UnitVec<usize>>,
pub metadata: &'a RowGroupMetaData,
}

impl<'a> ColumnToColumnChunkMD<'a> {
pub(super) fn new(metadata: &'a RowGroupMetaData) -> Self {
Self {
partitions: Default::default(),
metadata,
}
}

pub(super) fn set_partitions(&mut self, field_names: Option<&PlHashSet<&str>>) {
for (i, ccmd) in self.metadata.columns().iter().enumerate() {
let name = &ccmd.descriptor().path_in_schema[0];
if field_names
.map(|field_names| field_names.contains(name.as_str()))
.unwrap_or(true)
{
let entry = self.partitions.raw_entry_mut().from_key(name.as_str());

match entry {
RawEntryMut::Vacant(slot) => {
slot.insert(name.to_string(), unitvec![i]);
},
RawEntryMut::Occupied(mut slot) => {
slot.get_mut().push(i);
},
};
}
}
}

pub(super) fn get_partitions(&self, name: &str) -> UnitVec<&ColumnChunkMetaData> {
debug_assert!(
!self.partitions.is_empty(),
"fields should be partitioned first"
);
let columns = self.metadata.columns();
self.partitions
.get(name)
.map(|idx| idx.iter().map(|i| &columns[*i]).collect::<UnitVec<_>>())
.unwrap_or_default()
}
}
10 changes: 4 additions & 6 deletions crates/polars-io/src/parquet/read/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use bytes::Bytes;
use polars_core::datatypes::PlHashMap;
use polars_error::PolarsResult;
use polars_parquet::read::{
column_iter_to_arrays, get_field_columns, BasicDecompressor, ColumnChunkMetaData, Filter,
PageReader,
column_iter_to_arrays, BasicDecompressor, ColumnChunkMetaData, Filter, PageReader,
};
use polars_utils::mmap::{MemReader, MemSlice};

Expand All @@ -32,11 +31,10 @@ pub enum ColumnStore {
/// For cloud files the relevant memory regions should have been prefetched.
pub(super) fn mmap_columns<'a>(
store: &'a ColumnStore,
columns: &'a [ColumnChunkMetaData],
field_name: &str,
field_columns: &'a [&ColumnChunkMetaData],
) -> Vec<(&'a ColumnChunkMetaData, MemSlice)> {
get_field_columns(columns, field_name)
.into_iter()
field_columns
.iter()
.map(|meta| _mmap_single_column(store, meta))
.collect()
}
Expand Down
1 change: 1 addition & 0 deletions crates/polars-io/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#[cfg(feature = "cloud")]
mod async_impl;
mod metadata;
mod mmap;
mod options;
mod predicates;
Expand Down
107 changes: 92 additions & 15 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use polars_core::utils::{accumulate_dataframes_vertical, split_df};
use polars_core::POOL;
use polars_parquet::parquet::error::ParquetResult;
use polars_parquet::parquet::statistics::Statistics;
use polars_parquet::read::{self, FileMetaData, Filter, PhysicalType, RowGroupMetaData};
use polars_parquet::read::{
self, ColumnChunkMetaData, FileMetaData, Filter, PhysicalType, RowGroupMetaData,
};
use polars_utils::mmap::MemSlice;
use polars_utils::vec::inplace_zip_filtermap;
use rayon::prelude::*;
Expand All @@ -24,6 +26,7 @@ use super::{mmap, ParallelStrategy};
use crate::hive::materialize_hive_partitions;
use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::parquet::metadata::FileMetaDataRef;
use crate::parquet::read::metadata::ColumnToColumnChunkMD;
use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR;
use crate::predicates::{apply_predicate, PhysicalIoExpr};
use crate::utils::get_reader_bytes;
Expand Down Expand Up @@ -58,7 +61,8 @@ fn assert_dtypes(data_type: &ArrowDataType) {

fn column_idx_to_series(
column_i: usize,
md: &RowGroupMetaData,
// The metadata belonging to this column
field_md: &[&ColumnChunkMetaData],
filter: Option<Filter>,
file_schema: &ArrowSchema,
store: &mmap::ColumnStore,
Expand All @@ -69,8 +73,7 @@ fn column_idx_to_series(
{
assert_dtypes(field.data_type())
}

let columns = mmap_columns(store, md.columns(), &field.name);
let columns = mmap_columns(store, field_md);
let stats = columns
.iter()
.map(|(col_md, _)| col_md.statistics().transpose())
Expand Down Expand Up @@ -203,6 +206,24 @@ fn rg_to_dfs(
}
}

/// Collect a HashSet of the projected columns.
/// Returns `None` if all columns are projected.
fn projected_columns_set<'a>(
schema: &'a ArrowSchema,
projection: &[usize],
) -> Option<PlHashSet<&'a str>> {
if projection.len() == schema.len() {
None
} else {
Some(
projection
.iter()
.map(|i| schema.fields[*i].name.as_str())
.collect::<PlHashSet<_>>(),
)
}
}

#[allow(clippy::too_many_arguments)]
fn rg_to_dfs_prefiltered(
store: &mmap::ColumnStore,
Expand Down Expand Up @@ -278,15 +299,35 @@ fn rg_to_dfs_prefiltered(
debug_assert_eq!(dead_idx_to_col_idx.len(), num_dead_columns);

POOL.install(|| {
// Set partitioned fields to prevent quadratic behavior.
// Ensure all row groups are partitioned.

// Hashset, because row-groups will be modified in place later, so we cannot trust the indexes.
let part_md = {
let projected_columns = projected_columns_set(schema, projection);

row_groups
.par_iter()
.map(|rg_info| {
let md = &file_metadata.row_groups[rg_info.index as usize];
let mut part_md = ColumnToColumnChunkMD::new(md);
part_md.set_partitions(projected_columns.as_ref());
(rg_info.index, part_md)
})
.collect::<PlHashMap<_, _>>()
};

// Collect the data for the live columns
let mut live_columns = (0..row_groups.len() * num_live_columns)
.into_par_iter()
.map(|i| {
let col_idx = live_idx_to_col_idx[i % num_live_columns];
let rg_idx = row_groups[i / num_live_columns].index as usize;

let md = &file_metadata.row_groups[rg_idx];
column_idx_to_series(col_idx, md, None, schema, store)
let name = &schema.fields[col_idx].name;
let rg_idx = row_groups[i / num_live_columns].index;
let field_md = part_md.get(&rg_idx).unwrap().get_partitions(name);

column_idx_to_series(col_idx, field_md.as_slice(), None, schema, store)
})
.collect::<PolarsResult<Vec<_>>>()?;

Expand Down Expand Up @@ -390,15 +431,22 @@ fn rg_to_dfs_prefiltered(
.into_par_iter()
.map(|i| {
let col_idx = dead_idx_to_col_idx[i % num_dead_columns];
let rg_idx = row_groups[i / num_dead_columns].index as usize;
let name = &schema.fields[col_idx].name;

let (mask, _) = &dfs[i / num_dead_columns];

let md = &file_metadata.row_groups[rg_idx];
debug_assert_eq!(md.num_rows(), mask.len());
let rg_idx = row_groups[i / num_dead_columns].index;

#[cfg(debug_assertions)]
{
let md = &file_metadata.row_groups[rg_idx as usize];
debug_assert_eq!(md.num_rows(), mask.len());
}
let field_md = part_md.get(&rg_idx).unwrap().get_partitions(name);

column_idx_to_series(
col_idx,
md,
field_md.as_slice(),
Some(Filter::new_masked(mask.clone())),
schema,
store,
Expand Down Expand Up @@ -470,14 +518,22 @@ fn rg_to_dfs_optionally_par_over_columns(
assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
}

// Set partitioned fields to prevent quadratic behavior.
let projected_columns = projected_columns_set(schema, projection);
let mut part_md = ColumnToColumnChunkMD::new(md);
part_md.set_partitions(projected_columns.as_ref());

let columns = if let ParallelStrategy::Columns = parallel {
POOL.install(|| {
projection
.par_iter()
.map(|column_i| {
let name = &schema.fields[*column_i].name;
let part = part_md.get_partitions(name);

column_idx_to_series(
*column_i,
md,
part.as_slice(),
Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
schema,
store,
Expand All @@ -489,9 +545,12 @@ fn rg_to_dfs_optionally_par_over_columns(
projection
.iter()
.map(|column_i| {
let name = &schema.fields[*column_i].name;
let part = part_md.get_partitions(name);

column_idx_to_series(
*column_i,
md,
part.as_slice(),
Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)),
schema,
store,
Expand Down Expand Up @@ -566,9 +625,24 @@ fn rg_to_dfs_par_over_rg(
}

let dfs = POOL.install(|| {
// Set partitioned fields to prevent quadratic behavior.
// Ensure all row groups are partitioned.
let part_md = {
let projected_columns = projected_columns_set(schema, projection);
row_groups
.par_iter()
.map(|(_, rg, _, _)| {
let mut ccmd = ColumnToColumnChunkMD::new(rg);
ccmd.set_partitions(projected_columns.as_ref());
ccmd
})
.collect::<Vec<_>>()
};

row_groups
.into_par_iter()
.map(|(rg_idx, md, slice, row_count_start)| {
.enumerate()
.map(|(iter_idx, (rg_idx, _md, slice, row_count_start))| {
if slice.1 == 0
|| use_statistics
&& !read_this_row_group(
Expand All @@ -588,9 +662,12 @@ fn rg_to_dfs_par_over_rg(
let columns = projection
.iter()
.map(|column_i| {
let name = &schema.fields[*column_i].name;
let field_md = part_md[iter_idx].get_partitions(name);

column_idx_to_series(
*column_i,
md,
field_md.as_slice(),
Some(Filter::new_ranged(slice.0, slice.0 + slice.1)),
schema,
store,
Expand Down
40 changes: 0 additions & 40 deletions crates/polars-parquet/src/parquet/read/column/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::io::{Read, Seek};
use std::vec::IntoIter;

use super::{get_field_columns, get_page_iterator, MemReader, PageReader};
Expand All @@ -7,10 +6,6 @@ use crate::parquet::metadata::{ColumnChunkMetaData, RowGroupMetaData};
use crate::parquet::page::CompressedPage;
use crate::parquet::schema::types::ParquetType;

#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
mod stream;

/// Returns a [`ColumnIterator`] of column chunks corresponding to `field`.
///
/// Contrarily to [`get_page_iterator`] that returns a single iterator of pages, this iterator
Expand Down Expand Up @@ -149,38 +144,3 @@ impl MutStreamingIterator for ReadColumnIterator {
self.current.as_mut()
}
}

/// Reads all columns that are part of the parquet field `field_name`
/// # Implementation
/// This operation is IO-bounded `O(C)` where C is the number of columns associated to
/// the field (one for non-nested types)
/// It reads the columns sequentially. Use [`read_column`] to fork this operation to multiple
/// readers.
pub fn read_columns<'a, R: Read + Seek>(
reader: &mut R,
columns: &'a [ColumnChunkMetaData],
field_name: &'a str,
) -> Result<Vec<(&'a ColumnChunkMetaData, Vec<u8>)>, ParquetError> {
get_field_columns(columns, field_name)
.map(|column| read_column(reader, column).map(|c| (column, c)))
.collect()
}

/// Reads a column chunk into memory
/// This operation is IO-bounded and allocates the column's `compressed_size`.
pub fn read_column<R>(reader: &mut R, column: &ColumnChunkMetaData) -> Result<Vec<u8>, ParquetError>
where
R: Read + Seek,
{
let (start, length) = column.byte_range();
reader.seek(std::io::SeekFrom::Start(start))?;

let mut chunk = vec![];
chunk.try_reserve(length as usize)?;
reader.by_ref().take(length).read_to_end(&mut chunk)?;
Ok(chunk)
}

#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub use stream::{read_column_async, read_columns_async};
Loading

0 comments on commit d6703c4

Please sign in to comment.