Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Partition metadata for parquet statistic loading #18343

Merged
merged 2 commits into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion crates/polars-io/src/parquet/read/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::cloud::{
build_object_store, object_path_from_str, CloudLocation, CloudOptions, PolarsObjectStore,
};
use crate::parquet::metadata::FileMetaDataRef;
use crate::parquet::read::metadata::PartitionedColumnChunkMD;
use crate::pl_async::get_runtime;
use crate::predicates::PhysicalIoExpr;

Expand Down Expand Up @@ -277,8 +278,19 @@ impl FetchRowGroupsFromObjectStore {
row_group_range
.filter_map(|i| {
let rg = &row_groups[i];

// TODO!
// Optimize this. Now we partition the predicate columns twice. (later on reading as well)
// I think we must add metadata context where we can cache and amortize the partitioning.
let mut part_md = PartitionedColumnChunkMD::new(rg);
let live = pred.live_variables();
part_md.set_partitions(
live.as_ref()
.map(|vars| vars.iter().map(|s| s.as_ref()).collect::<PlHashSet<_>>())
.as_ref(),
);
let should_be_read =
matches!(read_this_row_group(Some(pred), rg, &schema), Ok(true));
matches!(read_this_row_group(Some(pred), &part_md, &schema), Ok(true));

// Already add the row groups that will be skipped to the prefetched data.
if !should_be_read {
Expand Down
31 changes: 18 additions & 13 deletions crates/polars-io/src/parquet/read/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,34 @@ use polars_utils::aliases::{PlHashMap, PlHashSet};
use polars_utils::idx_vec::UnitVec;
use polars_utils::unitvec;

pub(super) struct ColumnToColumnChunkMD<'a> {
partitions: PlHashMap<String, UnitVec<usize>>,
pub metadata: &'a RowGroupMetaData,
/// This is a utility struct that Partitions the `ColumnChunkMetaData` by `field.name == descriptor.path_in_schema[0]`
/// This is required to fix quadratic behavior in wide parquet files. See #18319.
pub struct PartitionedColumnChunkMD<'a> {
partitions: Option<PlHashMap<String, UnitVec<usize>>>,
metadata: &'a RowGroupMetaData,
}

impl<'a> ColumnToColumnChunkMD<'a> {
pub(super) fn new(metadata: &'a RowGroupMetaData) -> Self {
impl<'a> PartitionedColumnChunkMD<'a> {
pub fn new(metadata: &'a RowGroupMetaData) -> Self {
Self {
partitions: Default::default(),
metadata,
}
}

pub(super) fn set_partitions(&mut self, field_names: Option<&PlHashSet<&str>>) {
pub(super) fn num_rows(&self) -> usize {
self.metadata.num_rows()
}

pub fn set_partitions(&mut self, field_names: Option<&PlHashSet<&str>>) {
let mut partitions = PlHashMap::default();
for (i, ccmd) in self.metadata.columns().iter().enumerate() {
let name = &ccmd.descriptor().path_in_schema[0];
if field_names
.map(|field_names| field_names.contains(name.as_str()))
.unwrap_or(true)
{
let entry = self.partitions.raw_entry_mut().from_key(name.as_str());
let entry = partitions.raw_entry_mut().from_key(name.as_str());

match entry {
RawEntryMut::Vacant(slot) => {
Expand All @@ -36,17 +43,15 @@ impl<'a> ColumnToColumnChunkMD<'a> {
};
}
}
self.partitions = Some(partitions)
}

pub(super) fn get_partitions(&self, name: &str) -> UnitVec<&ColumnChunkMetaData> {
debug_assert!(
!self.partitions.is_empty(),
"fields should be partitioned first"
);
pub fn get_partitions(&self, name: &str) -> Option<UnitVec<&ColumnChunkMetaData>> {
let columns = self.metadata.columns();
self.partitions
.as_ref()
.expect("fields should be partitioned first")
.get(name)
.map(|idx| idx.iter().map(|i| &columns[*i]).collect::<UnitVec<_>>())
.unwrap_or_default()
}
}
1 change: 1 addition & 0 deletions crates/polars-io/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub use reader::{BatchedParquetReader, ParquetReader};
pub use utils::materialize_empty_df;

pub mod _internal {
pub use super::metadata::PartitionedColumnChunkMD;
pub use super::mmap::to_deserializer;
pub use super::predicates::read_this_row_group;
}
25 changes: 14 additions & 11 deletions crates/polars-io/src/parquet/read/predicates.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use arrow::datatypes::ArrowSchemaRef;
use polars_core::prelude::*;
use polars_parquet::read::statistics::{deserialize, Statistics};
use polars_parquet::read::RowGroupMetaData;

use crate::parquet::read::metadata::PartitionedColumnChunkMD;
use crate::predicates::{BatchStats, ColumnStats, PhysicalIoExpr};

impl ColumnStats {
Expand All @@ -16,17 +15,21 @@ impl ColumnStats {
}
}

/// Collect the statistics in a column chunk.
/// Collect the statistics in a row-group
pub(crate) fn collect_statistics(
md: &RowGroupMetaData,
part_md: &PartitionedColumnChunkMD,
schema: &ArrowSchema,
) -> PolarsResult<Option<BatchStats>> {
// TODO! fix this performance. This is a full sequential scan.
let stats = schema
.fields
.iter()
.map(|field| {
let st = deserialize(field, md)?;
Ok(ColumnStats::from_arrow_stats(st, field))
.map(|field| match part_md.get_partitions(&field.name) {
Some(md) => {
let st = deserialize(field, &md)?;
Ok(ColumnStats::from_arrow_stats(st, field))
},
None => Ok(ColumnStats::new(field.into(), None, None, None)),
})
.collect::<PolarsResult<Vec<_>>>()?;

Expand All @@ -37,18 +40,18 @@ pub(crate) fn collect_statistics(
Ok(Some(BatchStats::new(
Arc::new(schema.into()),
stats,
Some(md.num_rows()),
Some(part_md.num_rows()),
)))
}

pub fn read_this_row_group(
predicate: Option<&dyn PhysicalIoExpr>,
md: &RowGroupMetaData,
schema: &ArrowSchemaRef,
part_md: &PartitionedColumnChunkMD,
schema: &ArrowSchema,
) -> PolarsResult<bool> {
if let Some(pred) = predicate {
if let Some(pred) = pred.as_stats_evaluator() {
if let Some(stats) = collect_statistics(md, schema)? {
if let Some(stats) = collect_statistics(part_md, schema)? {
let should_read = pred.should_read(&stats);
// a parquet file may not have statistics of all columns
if matches!(should_read, Ok(false)) {
Expand Down
75 changes: 35 additions & 40 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use super::{mmap, ParallelStrategy};
use crate::hive::materialize_hive_partitions;
use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::parquet::metadata::FileMetaDataRef;
use crate::parquet::read::metadata::ColumnToColumnChunkMD;
use crate::parquet::read::metadata::PartitionedColumnChunkMD;
use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR;
use crate::predicates::{apply_predicate, PhysicalIoExpr};
use crate::utils::get_reader_bytes;
Expand Down Expand Up @@ -248,18 +248,32 @@ fn rg_to_dfs_prefiltered(
polars_bail!(ComputeError: "Parquet file contains too many row groups (> {})", u32::MAX);
}

let projected_columns = projected_columns_set(schema, projection);

let part_mds = POOL.install(|| {
file_metadata
.row_groups
.par_iter()
.map(|rg| {
let mut part_md = PartitionedColumnChunkMD::new(rg);
part_md.set_partitions(projected_columns.as_ref());
part_md
})
.collect::<Vec<_>>()
});

let mut row_offset = *previous_row_count;
let mut row_groups: Vec<RowGroupInfo> = (row_group_start..row_group_end)
.filter_map(|index| {
let part_md = &part_mds[index];
let md = &file_metadata.row_groups[index];

let current_offset = row_offset;
let current_row_count = md.num_rows() as IdxSize;
row_offset += current_row_count;

if use_statistics {
match read_this_row_group(Some(predicate), &file_metadata.row_groups[index], schema)
{
match read_this_row_group(Some(predicate), part_md, schema) {
Ok(false) => return None,
Ok(true) => {},
Err(e) => return Some(Err(e)),
Expand Down Expand Up @@ -302,21 +316,6 @@ fn rg_to_dfs_prefiltered(
// Set partitioned fields to prevent quadratic behavior.
// Ensure all row groups are partitioned.

// Hashset, because row-groups will be modified in place later, so we cannot trust the indexes.
let part_md = {
let projected_columns = projected_columns_set(schema, projection);

row_groups
.par_iter()
.map(|rg_info| {
let md = &file_metadata.row_groups[rg_info.index as usize];
let mut part_md = ColumnToColumnChunkMD::new(md);
part_md.set_partitions(projected_columns.as_ref());
(rg_info.index, part_md)
})
.collect::<PlHashMap<_, _>>()
};

// Collect the data for the live columns
let mut live_columns = (0..row_groups.len() * num_live_columns)
.into_par_iter()
Expand All @@ -325,7 +324,7 @@ fn rg_to_dfs_prefiltered(

let name = &schema.fields[col_idx].name;
let rg_idx = row_groups[i / num_live_columns].index;
let field_md = part_md.get(&rg_idx).unwrap().get_partitions(name);
let field_md = part_mds[rg_idx as usize].get_partitions(name).unwrap();

column_idx_to_series(col_idx, field_md.as_slice(), None, schema, store)
})
Expand Down Expand Up @@ -442,7 +441,7 @@ fn rg_to_dfs_prefiltered(
let md = &file_metadata.row_groups[rg_idx as usize];
debug_assert_eq!(md.num_rows(), mask.len());
}
let field_md = part_md.get(&rg_idx).unwrap().get_partitions(name);
let field_md = part_mds[rg_idx as usize].get_partitions(name).unwrap();

column_idx_to_series(
col_idx,
Expand Down Expand Up @@ -502,13 +501,17 @@ fn rg_to_dfs_optionally_par_over_columns(

for rg_idx in row_group_start..row_group_end {
let md = &file_metadata.row_groups[rg_idx];

// Set partitioned fields to prevent quadratic behavior.
let projected_columns = projected_columns_set(schema, projection);
let mut part_md = PartitionedColumnChunkMD::new(md);
part_md.set_partitions(projected_columns.as_ref());

let rg_slice =
split_slice_at_file(&mut n_rows_processed, md.num_rows(), slice.0, slice_end);
let current_row_count = md.num_rows() as IdxSize;

if use_statistics
&& !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)?
{
if use_statistics && !read_this_row_group(predicate, &part_md, schema)? {
*previous_row_count += rg_slice.1 as IdxSize;
continue;
}
Expand All @@ -518,18 +521,13 @@ fn rg_to_dfs_optionally_par_over_columns(
assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
}

// Set partitioned fields to prevent quadratic behavior.
let projected_columns = projected_columns_set(schema, projection);
let mut part_md = ColumnToColumnChunkMD::new(md);
part_md.set_partitions(projected_columns.as_ref());

let columns = if let ParallelStrategy::Columns = parallel {
POOL.install(|| {
projection
.par_iter()
.map(|column_i| {
let name = &schema.fields[*column_i].name;
let part = part_md.get_partitions(name);
let part = part_md.get_partitions(name).unwrap();

column_idx_to_series(
*column_i,
Expand All @@ -546,7 +544,7 @@ fn rg_to_dfs_optionally_par_over_columns(
.iter()
.map(|column_i| {
let name = &schema.fields[*column_i].name;
let part = part_md.get_partitions(name);
let part = part_md.get_partitions(name).unwrap();

column_idx_to_series(
*column_i,
Expand Down Expand Up @@ -627,12 +625,12 @@ fn rg_to_dfs_par_over_rg(
let dfs = POOL.install(|| {
// Set partitioned fields to prevent quadratic behavior.
// Ensure all row groups are partitioned.
let part_md = {
let part_mds = {
let projected_columns = projected_columns_set(schema, projection);
row_groups
.par_iter()
.map(|(_, rg, _, _)| {
let mut ccmd = ColumnToColumnChunkMD::new(rg);
let mut ccmd = PartitionedColumnChunkMD::new(rg);
ccmd.set_partitions(projected_columns.as_ref());
ccmd
})
Expand All @@ -642,14 +640,11 @@ fn rg_to_dfs_par_over_rg(
row_groups
.into_par_iter()
.enumerate()
.map(|(iter_idx, (rg_idx, _md, slice, row_count_start))| {
.map(|(iter_idx, (_rg_idx, _md, slice, row_count_start))| {
let part_md = &part_mds[iter_idx];

if slice.1 == 0
|| use_statistics
&& !read_this_row_group(
predicate,
&file_metadata.row_groups[rg_idx],
schema,
)?
|| use_statistics && !read_this_row_group(predicate, part_md, schema)?
{
return Ok(None);
}
Expand All @@ -663,7 +658,7 @@ fn rg_to_dfs_par_over_rg(
.iter()
.map(|column_i| {
let name = &schema.fields[*column_i].name;
let field_md = part_md[iter_idx].get_partitions(name);
let field_md = part_md.get_partitions(name).unwrap();

column_idx_to_series(
*column_i,
Expand Down
12 changes: 0 additions & 12 deletions crates/polars-parquet/src/arrow/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,6 @@ pub use crate::parquet::{
FallibleStreamingIterator,
};

/// Returns all [`ColumnChunkMetaData`] associated to `field_name`.
/// For non-nested parquet types, this returns a single column
pub fn get_field_columns<'a>(
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Vec<&'a ColumnChunkMetaData> {
columns
.iter()
.filter(|x| x.descriptor().path_in_schema[0] == field_name)
.collect()
}

/// Returns all [`ColumnChunkMetaData`] associated to `field_name`.
/// For non-nested parquet types, this returns a single column
pub fn get_field_pages<'a, T>(
Expand Down
10 changes: 4 additions & 6 deletions crates/polars-parquet/src/arrow/read/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ use arrow::with_match_primitive_type_full;
use ethnum::I256;
use polars_error::{polars_bail, PolarsResult};

use crate::parquet::metadata::RowGroupMetaData;
use crate::parquet::schema::types::{
PhysicalType as ParquetPhysicalType, PrimitiveType as ParquetPrimitiveType,
};
use crate::parquet::statistics::{PrimitiveStatistics, Statistics as ParquetStatistics};
use crate::parquet::types::int96_to_i64_ns;
use crate::read::ColumnChunkMetaData;

mod binary;
mod binview;
Expand All @@ -28,7 +28,6 @@ mod struct_;
mod utf8;

use self::list::DynMutableListArray;
use super::get_field_columns;

/// Arrow-deserialized parquet Statistics of a file
#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -543,12 +542,11 @@ fn push(
///
/// # Errors
/// This function errors if the deserialization of the statistics fails (e.g. invalid utf8)
pub fn deserialize(field: &Field, row_group: &RowGroupMetaData) -> PolarsResult<Statistics> {
pub fn deserialize(field: &Field, field_md: &[&ColumnChunkMetaData]) -> PolarsResult<Statistics> {
let mut statistics = MutableStatistics::try_new(field)?;

let columns = get_field_columns(row_group.columns(), field.name.as_ref());
let mut stats = columns
.into_iter()
let mut stats = field_md
.iter()
.map(|column| {
Ok((
column.statistics().transpose()?,
Expand Down
Loading