Skip to content

Commit

Permalink
fix: Properly project unordered column in parquet prefiltered (#20189)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Lin <simonlin.rqmmw@slmail.me>
  • Loading branch information
coastalwhite and nameexhaustion authored Dec 7, 2024
1 parent fdaf517 commit 430bb4d
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 58 deletions.
6 changes: 2 additions & 4 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::OnceLock;
use std::{mem, ops};

use polars_row::ArrayRef;
use polars_schema::schema::ensure_matching_schema_names;
use polars_schema::schema::debug_ensure_matching_schema_names;
use polars_utils::itertools::Itertools;
use rayon::prelude::*;

Expand Down Expand Up @@ -1754,9 +1754,7 @@ impl DataFrame {
cols: &[PlSmallStr],
schema: &Schema,
) -> PolarsResult<Vec<Column>> {
if cfg!(debug_assertions) {
ensure_matching_schema_names(schema, &self.schema())?;
}
debug_ensure_matching_schema_names(schema, &self.schema())?;

cols.iter()
.map(|name| {
Expand Down
31 changes: 24 additions & 7 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ fn rg_to_dfs_prefiltered(
}

let mask_setting = PrefilterMaskSetting::init_from_env();
let projected_schema = schema.try_project_indices(projection).unwrap();

let dfs: Vec<Option<DataFrame>> = POOL.install(move || {
// Set partitioned fields to prevent quadratic behavior.
Expand Down Expand Up @@ -415,7 +416,8 @@ fn rg_to_dfs_prefiltered(

// Apply the predicate to the live columns and save the dataframe and the bitmask
let md = &file_metadata.row_groups[rg_idx];
let mut df = unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns) };
let mut df =
unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns.clone()) };

materialize_hive_partitions(
&mut df,
Expand All @@ -426,6 +428,10 @@ fn rg_to_dfs_prefiltered(
let s = predicate.evaluate_io(&df)?;
let mask = s.bool().expect("filter predicates was not of type boolean");

// Create without hive columns - the first merge phase does not handle hive partitions. This also saves
// some unnecessary filtering.
let mut df = unsafe { DataFrame::new_no_checks(md.num_rows(), live_columns) };

if let Some(rc) = &row_index {
df.with_row_index_mut(rc.name.clone(), Some(rg_offsets[rg_idx] + rc.offset));
}
Expand Down Expand Up @@ -458,6 +464,13 @@ fn rg_to_dfs_prefiltered(

// We don't need to do any further work if there are no dead columns
if dead_idx_to_col_idx.is_empty() {
materialize_hive_partitions(
&mut df,
schema.as_ref(),
hive_partition_columns,
md.num_rows(),
);

return Ok(Some(df));
}

Expand Down Expand Up @@ -541,10 +554,7 @@ fn rg_to_dfs_prefiltered(
let height = df.height();
let live_columns = df.take_columns();

assert_eq!(
live_columns.len() + dead_columns.len(),
projection.len() + hive_partition_columns.map_or(0, |x| x.len())
);
assert_eq!(live_columns.len() + dead_columns.len(), projection.len());

let mut merged = Vec::with_capacity(live_columns.len() + dead_columns.len());

Expand All @@ -561,13 +571,20 @@ fn rg_to_dfs_prefiltered(
hive::merge_sorted_to_schema_order(
&mut dead_columns.into_iter(), // df_columns
&mut live_columns.into_iter().skip(row_index.is_some() as usize), // hive_columns
schema,
&projected_schema,
&mut merged,
);

// SAFETY: This is completely based on the schema so all column names are unique
// and the length is given by the parquet file which should always be the same.
let df = unsafe { DataFrame::new_no_checks(height, merged) };
let mut df = unsafe { DataFrame::new_no_checks(height, merged) };

materialize_hive_partitions(
&mut df,
schema.as_ref(),
hive_partition_columns,
md.num_rows(),
);

PolarsResult::Ok(Some(df))
})
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl FileScan {
#[cfg(feature = "ipc")]
Self::Ipc { .. } => _file_options.row_index.is_some(),
#[cfg(feature = "parquet")]
Self::Parquet { .. } => _file_options.row_index.is_some(),
Self::Parquet { .. } => false,
#[allow(unreachable_patterns)]
_ => false,
}
Expand Down
24 changes: 22 additions & 2 deletions crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ impl ProjectionPushDown {
&acc_projections,
expr_arena,
&file_info.schema,
scan_type.sort_projection(&file_options) || hive_parts.is_some(),
scan_type.sort_projection(&file_options),
)?;

hive_parts = if let Some(hive_parts) = hive_parts {
Expand Down Expand Up @@ -480,10 +480,30 @@ impl ProjectionPushDown {
// based on its position in the file. This is extremely important for the
// new-streaming engine.

// row_index is separate
let opt_row_index_col_name = file_options
.row_index
.as_ref()
.map(|v| &v.name)
.filter(|v| schema.contains(v))
.cloned();

if let Some(name) = &opt_row_index_col_name {
out.insert_at_index(
0,
name.clone(),
schema.get(name).unwrap().clone(),
)
.unwrap();
}

{
let df_fields_iter = &mut schema
.iter()
.filter(|fld| !partition_schema.contains(fld.0))
.filter(|fld| {
!partition_schema.contains(fld.0)
&& Some(fld.0) != opt_row_index_col_name.as_ref()
})
.map(|(a, b)| (a.clone(), b.clone()));

let hive_fields_iter = &mut partition_schema
Expand Down
33 changes: 7 additions & 26 deletions crates/polars-schema/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,39 +401,20 @@ where
}
}

pub fn ensure_matching_schema_names<D>(lhs: &Schema<D>, rhs: &Schema<D>) -> PolarsResult<()> {
let mut iter_lhs = lhs.iter_names();
let mut iter_rhs = rhs.iter_names();
pub fn debug_ensure_matching_schema_names<D>(lhs: &Schema<D>, rhs: &Schema<D>) -> PolarsResult<()> {
if cfg!(debug_assertions) {
let lhs = lhs.iter_names().collect::<Vec<_>>();
let rhs = rhs.iter_names().collect::<Vec<_>>();

for i in 0..iter_lhs.len().min(iter_rhs.len()) {
let l = iter_lhs.next().unwrap();
let r = iter_rhs.next().unwrap();

if l != r {
if lhs != rhs {
polars_bail!(
SchemaMismatch:
"schema names differ at position {}: {} != {}",
1 + i, l, r
"lhs: {:?} rhs: {:?}",
lhs, rhs
)
}
}

if let Some(v) = iter_lhs.next() {
polars_bail!(
SchemaMismatch:
"schema contained extra column: {}",
v
)
}

if let Some(v) = iter_rhs.next() {
polars_bail!(
SchemaMismatch:
"schema didn't contain column: {}",
v
)
}

Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl RowGroupDecoder {
}

let mut decoded_cols = Vec::with_capacity(row_group_data.row_group_metadata.n_columns());
self.decode_all_columns(
self.decode_projected_columns(
&mut decoded_cols,
&row_group_data,
Some(polars_parquet::read::Filter::Range(slice_range.clone())),
Expand Down Expand Up @@ -217,7 +217,7 @@ impl RowGroupDecoder {

/// Potentially parallelizes based on number of rows & columns. Decoded columns are appended to
/// `out_vec`.
async fn decode_all_columns(
async fn decode_projected_columns(
&self,
out_vec: &mut Vec<Column>,
row_group_data: &Arc<RowGroupData>,
Expand Down Expand Up @@ -497,7 +497,7 @@ impl RowGroupDecoder {
// for `hive::merge_sorted_to_schema_order`.
let mut opt_decode_err = None;

let mut decoded_live_cols_iter = self
let decoded_live_cols_iter = self
.predicate_arrow_field_indices
.iter()
.map(|&i| self.projected_arrow_schema.get_at_index(i).unwrap())
Expand All @@ -512,26 +512,21 @@ impl RowGroupDecoder {
},
}
});
let mut hive_cols_iter = shared_file_state.hive_series.iter().map(|s| {
let hive_cols_iter = shared_file_state.hive_series.iter().map(|s| {
debug_assert!(s.len() >= projection_height);
s.slice(0, projection_height)
});

hive::merge_sorted_to_schema_order(
&mut decoded_live_cols_iter,
&mut hive_cols_iter,
&self.reader_schema,
&mut live_columns,
);

live_columns.extend(decoded_live_cols_iter);
live_columns.extend(hive_cols_iter);
opt_decode_err.transpose()?;

if let Some(file_path_series) = &shared_file_state.file_path_series {
debug_assert!(file_path_series.len() >= projection_height);
live_columns.push(file_path_series.slice(0, projection_height));
}

let live_df = unsafe {
let mut live_df = unsafe {
DataFrame::new_no_checks(row_group_data.row_group_metadata.num_rows(), live_columns)
};

Expand All @@ -542,20 +537,52 @@ impl RowGroupDecoder {
.evaluate_io(&live_df)?;
let mask = mask.bool().unwrap();

unsafe {
live_df.get_columns_mut().truncate(
self.row_index.is_some() as usize + self.predicate_arrow_field_indices.len(),
)
}

let filtered =
unsafe { filter_cols(live_df.take_columns(), mask, self.min_values_per_thread) }
.await?;

let height = if let Some(fst) = filtered.first() {
let filtered_height = if let Some(fst) = filtered.first() {
fst.len()
} else {
mask.num_trues()
};

let live_df_filtered = unsafe { DataFrame::new_no_checks(height, filtered) };
let mut live_df_filtered = unsafe { DataFrame::new_no_checks(filtered_height, filtered) };

if self.non_predicate_arrow_field_indices.is_empty() {
// User or test may have explicitly requested prefiltering

hive::merge_sorted_to_schema_order(
unsafe {
&mut live_df_filtered
.get_columns_mut()
.drain(..)
.collect::<Vec<_>>()
.into_iter()
},
&mut shared_file_state
.hive_series
.iter()
.map(|s| s.slice(0, filtered_height)),
&self.reader_schema,
unsafe { live_df_filtered.get_columns_mut() },
);

unsafe {
live_df_filtered.get_columns_mut().extend(
shared_file_state
.file_path_series
.as_ref()
.map(|c| c.slice(0, filtered_height)),
)
}

return Ok(live_df_filtered);
}

Expand Down Expand Up @@ -621,13 +648,36 @@ impl RowGroupDecoder {
&mut live_columns
.into_iter()
.skip(self.row_index.is_some() as usize), // hive_columns
&self.reader_schema,
&self.projected_arrow_schema,
&mut merged,
);

opt_decode_err.transpose()?;

let df = unsafe { DataFrame::new_no_checks(expected_num_rows, merged) };
let mut out = Vec::with_capacity(
merged.len()
+ shared_file_state.hive_series.len()
+ shared_file_state.file_path_series.is_some() as usize,
);

hive::merge_sorted_to_schema_order(
&mut merged.into_iter(),
&mut shared_file_state
.hive_series
.iter()
.map(|s| s.slice(0, filtered_height)),
&self.reader_schema,
&mut out,
);

out.extend(
shared_file_state
.file_path_series
.as_ref()
.map(|c| c.slice(0, filtered_height)),
);

let df = unsafe { DataFrame::new_no_checks(expected_num_rows, out) };
Ok(df)
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-stream/src/physical_plan/lower_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,8 @@ fn build_select_node_with_ctx(

if let Some(columns) = all_simple_columns {
let input_schema = ctx.phys_sm[input].output_schema.clone();
if input_schema.len() == columns.len()
if !cfg!(debug_assertions)
&& input_schema.len() == columns.len()
&& input_schema.iter_names().zip(&columns).all(|(l, r)| l == r)
{
// Input node already has the correct schema, just pass through.
Expand Down
1 change: 1 addition & 0 deletions py-polars/tests/unit/io/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ def test_hive_partition_force_async_17155(tmp_path: Path, monkeypatch: Any) -> N
[
(partial(pl.scan_parquet, parallel="row_groups"), pl.DataFrame.write_parquet),
(partial(pl.scan_parquet, parallel="columns"), pl.DataFrame.write_parquet),
(partial(pl.scan_parquet, parallel="prefiltered"), pl.DataFrame.write_parquet),
(
lambda *a, **kw: pl.scan_parquet(*a, parallel="prefiltered", **kw).filter(
pl.col("b") == pl.col("b")
Expand Down
Loading

0 comments on commit 430bb4d

Please sign in to comment.