Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement predicate pushdown for parquet reader #349

Open
wants to merge 17 commits into
base: main
Choose a base branch
from

Conversation

OussamaSaoudi-db
Copy link
Collaborator

This PR implements predicate pushdown for the parquet reader used in both the sync and default engines.

Closes: #341

Copy link

codecov bot commented Sep 19, 2024

Codecov Report

Attention: Patch coverage is 83.74384% with 33 lines in your changes missing coverage. Please review.

Project coverage is 74.94%. Comparing base (896accc) to head (11763bd).
Report is 19 commits behind head on main.

Files with missing lines Patch % Lines
kernel/src/engine/default/parquet.rs 75.24% 17 Missing and 8 partials ⚠️
kernel/src/engine/arrow_expression.rs 86.00% 3 Missing and 4 partials ⚠️
kernel/src/engine/sync/parquet.rs 92.85% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #349      +/-   ##
==========================================
+ Coverage   74.03%   74.94%   +0.91%     
==========================================
  Files          43       43              
  Lines        8137     8542     +405     
  Branches     8137     8542     +405     
==========================================
+ Hits         6024     6402     +378     
- Misses       1733     1738       +5     
- Partials      380      402      +22     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Collaborator

@zachschuermann zachschuermann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

took a quick look

kernel/src/engine/arrow_expression.rs Outdated Show resolved Hide resolved
@@ -112,7 +121,7 @@ impl FileOpener for ParquetOpener {
// let projection = self.projection.clone();
let table_schema = self.table_schema.clone();
let limit = self.limit;

let predicate = self.predicate.clone();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe if the 'to_row_filter' above takes ref we don't need to clone?

kernel/src/snapshot.rs Outdated Show resolved Hide resolved
kernel/src/snapshot.rs Outdated Show resolved Hide resolved
@@ -30,6 +32,18 @@ use crate::schema::{DataType, PrimitiveType, SchemaRef};
use crate::{EngineData, ExpressionEvaluator, ExpressionHandler};

// TODO leverage scalars / Datum
//
pub fn expression_to_row_filter(predicate: Expression) -> RowFilter {
let arrow_predicate = ArrowPredicateFn::new(ProjectionMask::all(), move |batch| {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I'm taking all columns. Is there an opportunity to pass in the projection mask too? Could this bring performance gains?

Copy link
Collaborator

@nicklan nicklan Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think without a projection mask, this provides no benefit. From the docs:

RowFilter applies predicates in order, after decoding only the columns required. As predicates eliminate rows, fewer rows from subsequent columns may be required, thus potentially reducing IO and decode.

So in the case we expect to use this, it will help because it will only decode the required cols and then can skip without decoding the rest of it, but we'll need to either:

  1. Specify which columns the predicate applies to
  2. Work it out from the expression

2 would be better, but if that proves too tricky we could require having it passed in. Possibly some of the code in arrow_utils::get_requested_indices could help with getting the projection mask, but that's written for schemas and we don't necessarily have those here. But you probably could construct the needed stuff.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay so we'd definitely like projection. I was initially only thinking about the row group skipping optimization.

Re option 2: I could imagine a query select * from table where value > 50. Here I believe the filter would only have the value column, but we are looking for all columns. If that's the case, I think option 1 is the way.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of things:

  1. For actual "data path" we don't push the predicate down in the "all in one" scan (see here). Although an engine could choose to do that since read_parquet_files does take a predicate. Filtering like this is not always a win for reading parquet, so we'll need to be a bit more careful about when we want to actually push things down. Our example code also doesn't push it down yet. Once this is working, we can maybe think about providing some guidance to engines about when to push down.
  2. my option 1 vs 2 above was just about figuring out which columns to project out to evaluate the predicate. You definitely want to project columns out. So for your example, you could either require the caller to tell you that it's expression requires the value column, or you could look at the expression itself and notice that that's the only column it references. Figuring it out from the expression is much nicer for users, but requires more work because you need need to examine the expression, see which cols it references. Regardless you then have to figure out what the positions of those columns are in the parquet file, which is very non-trivial (see get_requested_indices). You might want to limit this to only allow filtering on root cols.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update on this: I'm now filtering columns, but only at the root level since StructType::project only works at root level. I extract the columns by recursing down the expression structure. I've put up an issue to explore projections in nested columns #353.

fn test_snapshot_read_metadata() {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
fn get_snapshot_from_path(path: &str, version: Option<Version>) -> Snapshot {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zachschuermann I moved setup code into this helper function. How's it look?

@OussamaSaoudi-db OussamaSaoudi-db changed the title [WIP] Implement predicate pushdown for parquet reader Implement predicate pushdown for parquet reader Sep 20, 2024
@OussamaSaoudi-db OussamaSaoudi-db marked this pull request as ready for review September 20, 2024 21:29
@@ -30,6 +32,18 @@ use crate::schema::{DataType, PrimitiveType, SchemaRef};
use crate::{EngineData, ExpressionEvaluator, ExpressionHandler};

// TODO leverage scalars / Datum
//
pub fn expression_to_row_filter(predicate: Expression) -> RowFilter {
let arrow_predicate = ArrowPredicateFn::new(ProjectionMask::all(), move |batch| {
Copy link
Collaborator

@nicklan nicklan Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think without a projection mask, this provides no benefit. From the docs:

RowFilter applies predicates in order, after decoding only the columns required. As predicates eliminate rows, fewer rows from subsequent columns may be required, thus potentially reducing IO and decode.

So in the case we expect to use this, it will help because it will only decode the required cols and then can skip without decoding the rest of it, but we'll need to either:

  1. Specify which columns the predicate applies to
  2. Work it out from the expression

2 would be better, but if that proves too tricky we could require having it passed in. Possibly some of the code in arrow_utils::get_requested_indices could help with getting the projection mask, but that's written for schemas and we don't necessarily have those here. But you probably could construct the needed stuff.

kernel/src/engine/arrow_expression.rs Outdated Show resolved Hide resolved
}

#[test]
fn test_replay_protocol_metadata_filtering_predicate() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So currently this just checks that we don't break anything right?

I think we should probably have a more specific check in parquet reader that manually creates the expected expression and pushes it into a filter and ensures it does what it says it will.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I'll go do that 👍

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put up some new tests in default/parquet.rs. I feel like test_parquet_protocol_metadata_filter is a little ugly, but I don't see any easy ways to simplify it or make it more reusable.

@OussamaSaoudi-db OussamaSaoudi-db changed the title Implement predicate pushdown for parquet reader [WIP] Implement predicate pushdown for parquet reader Sep 24, 2024
@OussamaSaoudi-db OussamaSaoudi-db changed the title [WIP] Implement predicate pushdown for parquet reader Implement predicate pushdown for parquet reader Sep 24, 2024
parquet_schema,
parquet_physical_schema,
)?;
builder = builder.with_row_filter(row_filter);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this will work as well as we wish it would...

In my experience, this kind of row-level pushdown doesn't consistently help performance -- even with the kind of lazy materialization arrow-rust brags about (it won't fetch column chunks until proving at least one row is needed).

The reason is: Every row pays the cost of evaluating the filter, while any I/O reduction is only partial at best. We still have to fetch the columns the predicate touches, so any I/O savings come by not fetching payload columns. But that only works if the filter eliminated ALL rows from the row group. And if row groups can be skipped, then stats-based row group skipping can often do it much more cheaply (no extra I/O at all).

Meanwhile, in cases that don't see any I/O reduction, pushing down the filtering just shifts complexity from the query engine to the file scanner. And that's usually a net loss because the output of the scan is likely consumed in a pipelined single pass either way.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: There are absolutely cases where row-level filter pushdown is a performance win... but there are too many cases where it doesn't help or even hurts performance instead. And it's data-dependent, so hard to predict how any one query will be affected.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks ryan planning on pausing this work for now :)

@zachschuermann zachschuermann added the merge hold Don't allow the PR to merge label Oct 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
merge hold Don't allow the PR to merge
Development

Successfully merging this pull request may close these issues.

Implement and test predicate pushdown for parquet in engines
4 participants