From 0e5e55495aaa4b4fb8063c5dd68bd66e39f46241 Mon Sep 17 00:00:00 2001 From: Gijs Burghoorn Date: Thu, 12 Sep 2024 12:45:38 +0200 Subject: [PATCH] fix: Parquet prefiltered with projection pushdown (#18714) --- crates/polars-io/src/parquet/read/read_impl.rs | 10 ++++++++++ py-polars/tests/unit/io/test_parquet.py | 13 +++++++++++++ 2 files changed, 23 insertions(+) diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index c621b698cebc..15be3f2a8e09 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -266,11 +266,21 @@ fn rg_to_dfs_prefiltered( let num_live_columns = live_variables.len(); let num_dead_columns = projection.len() - num_live_columns; + // @NOTE: This is probably already sorted, but just to be sure. + let mut projection_sorted = projection.to_vec(); + projection_sorted.sort(); + // We create two look-up tables that map indexes offsets into the live- and dead-set onto // column indexes of the schema. let mut live_idx_to_col_idx = Vec::with_capacity(num_live_columns); let mut dead_idx_to_col_idx = Vec::with_capacity(num_dead_columns); + let mut offset = 0; for (i, field) in schema.iter_values().enumerate() { + if projection_sorted.get(offset).copied() != Some(i) { + continue; + } + + offset += 1; if live_variables.contains(&field.name[..]) { live_idx_to_col_idx.push(i); } else { diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 170cc2a015be..e035a44686f0 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -1904,3 +1904,16 @@ def test_write_binary_open_file(tmp_path: Path) -> None: out = pl.read_parquet(path) assert_frame_equal(out, df) + + +def test_prefilter_with_projection() -> None: + f = io.BytesIO() + pl.DataFrame({"a": [1], "b": [2]}).write_parquet(f) + + f.seek(0) + ( + pl.scan_parquet(f, parallel="prefiltered") + .filter(pl.col.a == 1) + .select(pl.col.a) + .collect() + )