Skip to content

Commit

Permalink
perf: Partition metadata for parquet statistic loading (#18343)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Aug 24, 2024
1 parent 5cfc1f0 commit dd1fc86
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 110 deletions.
14 changes: 13 additions & 1 deletion crates/polars-io/src/parquet/read/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::cloud::{
build_object_store, object_path_from_str, CloudLocation, CloudOptions, PolarsObjectStore,
};
use crate::parquet::metadata::FileMetaDataRef;
use crate::parquet::read::metadata::PartitionedColumnChunkMD;
use crate::pl_async::get_runtime;
use crate::predicates::PhysicalIoExpr;

Expand Down Expand Up @@ -277,8 +278,19 @@ impl FetchRowGroupsFromObjectStore {
row_group_range
.filter_map(|i| {
let rg = &row_groups[i];

// TODO!
// Optimize this. Now we partition the predicate columns twice. (later on reading as well)
// I think we must add metadata context where we can cache and amortize the partitioning.
let mut part_md = PartitionedColumnChunkMD::new(rg);
let live = pred.live_variables();
part_md.set_partitions(
live.as_ref()
.map(|vars| vars.iter().map(|s| s.as_ref()).collect::<PlHashSet<_>>())
.as_ref(),
);
let should_be_read =
matches!(read_this_row_group(Some(pred), rg, &schema), Ok(true));
matches!(read_this_row_group(Some(pred), &part_md, &schema), Ok(true));

// Already add the row groups that will be skipped to the prefetched data.
if !should_be_read {
Expand Down
31 changes: 18 additions & 13 deletions crates/polars-io/src/parquet/read/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,34 @@ use polars_utils::aliases::{PlHashMap, PlHashSet};
use polars_utils::idx_vec::UnitVec;
use polars_utils::unitvec;

pub(super) struct ColumnToColumnChunkMD<'a> {
partitions: PlHashMap<String, UnitVec<usize>>,
pub metadata: &'a RowGroupMetaData,
/// This is a utility struct that Partitions the `ColumnChunkMetaData` by `field.name == descriptor.path_in_schema[0]`
/// This is required to fix quadratic behavior in wide parquet files. See #18319.
pub struct PartitionedColumnChunkMD<'a> {
partitions: Option<PlHashMap<String, UnitVec<usize>>>,
metadata: &'a RowGroupMetaData,
}

impl<'a> ColumnToColumnChunkMD<'a> {
pub(super) fn new(metadata: &'a RowGroupMetaData) -> Self {
impl<'a> PartitionedColumnChunkMD<'a> {
pub fn new(metadata: &'a RowGroupMetaData) -> Self {
Self {
partitions: Default::default(),
metadata,
}
}

pub(super) fn set_partitions(&mut self, field_names: Option<&PlHashSet<&str>>) {
pub(super) fn num_rows(&self) -> usize {
self.metadata.num_rows()
}

pub fn set_partitions(&mut self, field_names: Option<&PlHashSet<&str>>) {
let mut partitions = PlHashMap::default();
for (i, ccmd) in self.metadata.columns().iter().enumerate() {
let name = &ccmd.descriptor().path_in_schema[0];
if field_names
.map(|field_names| field_names.contains(name.as_str()))
.unwrap_or(true)
{
let entry = self.partitions.raw_entry_mut().from_key(name.as_str());
let entry = partitions.raw_entry_mut().from_key(name.as_str());

match entry {
RawEntryMut::Vacant(slot) => {
Expand All @@ -36,17 +43,15 @@ impl<'a> ColumnToColumnChunkMD<'a> {
};
}
}
self.partitions = Some(partitions)
}

pub(super) fn get_partitions(&self, name: &str) -> UnitVec<&ColumnChunkMetaData> {
debug_assert!(
!self.partitions.is_empty(),
"fields should be partitioned first"
);
pub fn get_partitions(&self, name: &str) -> Option<UnitVec<&ColumnChunkMetaData>> {
let columns = self.metadata.columns();
self.partitions
.as_ref()
.expect("fields should be partitioned first")
.get(name)
.map(|idx| idx.iter().map(|i| &columns[*i]).collect::<UnitVec<_>>())
.unwrap_or_default()
}
}
1 change: 1 addition & 0 deletions crates/polars-io/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub use reader::{BatchedParquetReader, ParquetReader};
pub use utils::materialize_empty_df;

pub mod _internal {
pub use super::metadata::PartitionedColumnChunkMD;
pub use super::mmap::to_deserializer;
pub use super::predicates::read_this_row_group;
}
25 changes: 14 additions & 11 deletions crates/polars-io/src/parquet/read/predicates.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use arrow::datatypes::ArrowSchemaRef;
use polars_core::prelude::*;
use polars_parquet::read::statistics::{deserialize, Statistics};
use polars_parquet::read::RowGroupMetaData;

use crate::parquet::read::metadata::PartitionedColumnChunkMD;
use crate::predicates::{BatchStats, ColumnStats, PhysicalIoExpr};

impl ColumnStats {
Expand All @@ -16,17 +15,21 @@ impl ColumnStats {
}
}

/// Collect the statistics in a column chunk.
/// Collect the statistics in a row-group
pub(crate) fn collect_statistics(
md: &RowGroupMetaData,
part_md: &PartitionedColumnChunkMD,
schema: &ArrowSchema,
) -> PolarsResult<Option<BatchStats>> {
// TODO! fix this performance. This is a full sequential scan.
let stats = schema
.fields
.iter()
.map(|field| {
let st = deserialize(field, md)?;
Ok(ColumnStats::from_arrow_stats(st, field))
.map(|field| match part_md.get_partitions(&field.name) {
Some(md) => {
let st = deserialize(field, &md)?;
Ok(ColumnStats::from_arrow_stats(st, field))
},
None => Ok(ColumnStats::new(field.into(), None, None, None)),
})
.collect::<PolarsResult<Vec<_>>>()?;

Expand All @@ -37,18 +40,18 @@ pub(crate) fn collect_statistics(
Ok(Some(BatchStats::new(
Arc::new(schema.into()),
stats,
Some(md.num_rows()),
Some(part_md.num_rows()),
)))
}

pub fn read_this_row_group(
predicate: Option<&dyn PhysicalIoExpr>,
md: &RowGroupMetaData,
schema: &ArrowSchemaRef,
part_md: &PartitionedColumnChunkMD,
schema: &ArrowSchema,
) -> PolarsResult<bool> {
if let Some(pred) = predicate {
if let Some(pred) = pred.as_stats_evaluator() {
if let Some(stats) = collect_statistics(md, schema)? {
if let Some(stats) = collect_statistics(part_md, schema)? {
let should_read = pred.should_read(&stats);
// a parquet file may not have statistics of all columns
if matches!(should_read, Ok(false)) {
Expand Down
75 changes: 35 additions & 40 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use super::{mmap, ParallelStrategy};
use crate::hive::materialize_hive_partitions;
use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::parquet::metadata::FileMetaDataRef;
use crate::parquet::read::metadata::ColumnToColumnChunkMD;
use crate::parquet::read::metadata::PartitionedColumnChunkMD;
use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR;
use crate::predicates::{apply_predicate, PhysicalIoExpr};
use crate::utils::get_reader_bytes;
Expand Down Expand Up @@ -248,18 +248,32 @@ fn rg_to_dfs_prefiltered(
polars_bail!(ComputeError: "Parquet file contains too many row groups (> {})", u32::MAX);
}

let projected_columns = projected_columns_set(schema, projection);

let part_mds = POOL.install(|| {
file_metadata
.row_groups
.par_iter()
.map(|rg| {
let mut part_md = PartitionedColumnChunkMD::new(rg);
part_md.set_partitions(projected_columns.as_ref());
part_md
})
.collect::<Vec<_>>()
});

let mut row_offset = *previous_row_count;
let mut row_groups: Vec<RowGroupInfo> = (row_group_start..row_group_end)
.filter_map(|index| {
let part_md = &part_mds[index];
let md = &file_metadata.row_groups[index];

let current_offset = row_offset;
let current_row_count = md.num_rows() as IdxSize;
row_offset += current_row_count;

if use_statistics {
match read_this_row_group(Some(predicate), &file_metadata.row_groups[index], schema)
{
match read_this_row_group(Some(predicate), part_md, schema) {
Ok(false) => return None,
Ok(true) => {},
Err(e) => return Some(Err(e)),
Expand Down Expand Up @@ -302,21 +316,6 @@ fn rg_to_dfs_prefiltered(
// Set partitioned fields to prevent quadratic behavior.
// Ensure all row groups are partitioned.

// Hashset, because row-groups will be modified in place later, so we cannot trust the indexes.
let part_md = {
let projected_columns = projected_columns_set(schema, projection);

row_groups
.par_iter()
.map(|rg_info| {
let md = &file_metadata.row_groups[rg_info.index as usize];
let mut part_md = ColumnToColumnChunkMD::new(md);
part_md.set_partitions(projected_columns.as_ref());
(rg_info.index, part_md)
})
.collect::<PlHashMap<_, _>>()
};

// Collect the data for the live columns
let mut live_columns = (0..row_groups.len() * num_live_columns)
.into_par_iter()
Expand All @@ -325,7 +324,7 @@ fn rg_to_dfs_prefiltered(

let name = &schema.fields[col_idx].name;
let rg_idx = row_groups[i / num_live_columns].index;
let field_md = part_md.get(&rg_idx).unwrap().get_partitions(name);
let field_md = part_mds[rg_idx as usize].get_partitions(name).unwrap();

column_idx_to_series(col_idx, field_md.as_slice(), None, schema, store)
})
Expand Down Expand Up @@ -442,7 +441,7 @@ fn rg_to_dfs_prefiltered(
let md = &file_metadata.row_groups[rg_idx as usize];
debug_assert_eq!(md.num_rows(), mask.len());
}
let field_md = part_md.get(&rg_idx).unwrap().get_partitions(name);
let field_md = part_mds[rg_idx as usize].get_partitions(name).unwrap();

column_idx_to_series(
col_idx,
Expand Down Expand Up @@ -502,13 +501,17 @@ fn rg_to_dfs_optionally_par_over_columns(

for rg_idx in row_group_start..row_group_end {
let md = &file_metadata.row_groups[rg_idx];

// Set partitioned fields to prevent quadratic behavior.
let projected_columns = projected_columns_set(schema, projection);
let mut part_md = PartitionedColumnChunkMD::new(md);
part_md.set_partitions(projected_columns.as_ref());

let rg_slice =
split_slice_at_file(&mut n_rows_processed, md.num_rows(), slice.0, slice_end);
let current_row_count = md.num_rows() as IdxSize;

if use_statistics
&& !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)?
{
if use_statistics && !read_this_row_group(predicate, &part_md, schema)? {
*previous_row_count += rg_slice.1 as IdxSize;
continue;
}
Expand All @@ -518,18 +521,13 @@ fn rg_to_dfs_optionally_par_over_columns(
assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
}

// Set partitioned fields to prevent quadratic behavior.
let projected_columns = projected_columns_set(schema, projection);
let mut part_md = ColumnToColumnChunkMD::new(md);
part_md.set_partitions(projected_columns.as_ref());

let columns = if let ParallelStrategy::Columns = parallel {
POOL.install(|| {
projection
.par_iter()
.map(|column_i| {
let name = &schema.fields[*column_i].name;
let part = part_md.get_partitions(name);
let part = part_md.get_partitions(name).unwrap();

column_idx_to_series(
*column_i,
Expand All @@ -546,7 +544,7 @@ fn rg_to_dfs_optionally_par_over_columns(
.iter()
.map(|column_i| {
let name = &schema.fields[*column_i].name;
let part = part_md.get_partitions(name);
let part = part_md.get_partitions(name).unwrap();

column_idx_to_series(
*column_i,
Expand Down Expand Up @@ -627,12 +625,12 @@ fn rg_to_dfs_par_over_rg(
let dfs = POOL.install(|| {
// Set partitioned fields to prevent quadratic behavior.
// Ensure all row groups are partitioned.
let part_md = {
let part_mds = {
let projected_columns = projected_columns_set(schema, projection);
row_groups
.par_iter()
.map(|(_, rg, _, _)| {
let mut ccmd = ColumnToColumnChunkMD::new(rg);
let mut ccmd = PartitionedColumnChunkMD::new(rg);
ccmd.set_partitions(projected_columns.as_ref());
ccmd
})
Expand All @@ -642,14 +640,11 @@ fn rg_to_dfs_par_over_rg(
row_groups
.into_par_iter()
.enumerate()
.map(|(iter_idx, (rg_idx, _md, slice, row_count_start))| {
.map(|(iter_idx, (_rg_idx, _md, slice, row_count_start))| {
let part_md = &part_mds[iter_idx];

if slice.1 == 0
|| use_statistics
&& !read_this_row_group(
predicate,
&file_metadata.row_groups[rg_idx],
schema,
)?
|| use_statistics && !read_this_row_group(predicate, part_md, schema)?
{
return Ok(None);
}
Expand All @@ -663,7 +658,7 @@ fn rg_to_dfs_par_over_rg(
.iter()
.map(|column_i| {
let name = &schema.fields[*column_i].name;
let field_md = part_md[iter_idx].get_partitions(name);
let field_md = part_md.get_partitions(name).unwrap();

column_idx_to_series(
*column_i,
Expand Down
12 changes: 0 additions & 12 deletions crates/polars-parquet/src/arrow/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,6 @@ pub use crate::parquet::{
FallibleStreamingIterator,
};

/// Returns all [`ColumnChunkMetaData`] associated to `field_name`.
/// For non-nested parquet types, this returns a single column
pub fn get_field_columns<'a>(
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Vec<&'a ColumnChunkMetaData> {
columns
.iter()
.filter(|x| x.descriptor().path_in_schema[0] == field_name)
.collect()
}

/// Returns all [`ColumnChunkMetaData`] associated to `field_name`.
/// For non-nested parquet types, this returns a single column
pub fn get_field_pages<'a, T>(
Expand Down
10 changes: 4 additions & 6 deletions crates/polars-parquet/src/arrow/read/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ use arrow::with_match_primitive_type_full;
use ethnum::I256;
use polars_error::{polars_bail, PolarsResult};

use crate::parquet::metadata::RowGroupMetaData;
use crate::parquet::schema::types::{
PhysicalType as ParquetPhysicalType, PrimitiveType as ParquetPrimitiveType,
};
use crate::parquet::statistics::{PrimitiveStatistics, Statistics as ParquetStatistics};
use crate::parquet::types::int96_to_i64_ns;
use crate::read::ColumnChunkMetaData;

mod binary;
mod binview;
Expand All @@ -28,7 +28,6 @@ mod struct_;
mod utf8;

use self::list::DynMutableListArray;
use super::get_field_columns;

/// Arrow-deserialized parquet Statistics of a file
#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -543,12 +542,11 @@ fn push(
///
/// # Errors
/// This function errors if the deserialization of the statistics fails (e.g. invalid utf8)
pub fn deserialize(field: &Field, row_group: &RowGroupMetaData) -> PolarsResult<Statistics> {
pub fn deserialize(field: &Field, field_md: &[&ColumnChunkMetaData]) -> PolarsResult<Statistics> {
let mut statistics = MutableStatistics::try_new(field)?;

let columns = get_field_columns(row_group.columns(), field.name.as_ref());
let mut stats = columns
.into_iter()
let mut stats = field_md
.iter()
.map(|column| {
Ok((
column.statistics().transpose()?,
Expand Down
Loading

0 comments on commit dd1fc86

Please sign in to comment.