Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: merge using partition filters #1958

Merged
merged 12 commits into from
Dec 20, 2023
Merged

Conversation

emcake
Copy link
Contributor

@emcake emcake commented Dec 11, 2023

Description

This upgrades merge so that it can leverage partitions where specified in the join predicate. There are two ways we can leverage partitions:

  1. static references, i.e target.partition = 1.
  2. Inferring from the data, i.e source.partition = target.partition.

In the first case, this implements the logic described in this comment. Any predicate mentioning the source that is not covered by (2) is pruned, which will leave predicates on just the target columns (and will be amenable to file pruning)

In the second case, we first construct a version of the predicate with references to source replaced with placeholders:

target.partition = source.partition and foo > 42

becomes:

target.partition = $1 and foo > 42

We then stream through the source table, gathering the distinct tuples of the mentioned partitions:

| partition |
-------------
|       1   |
|       5   |
|       7   |

and then expand out the sql to take these into account:

(target.partition = 1 and foo > 42)
or (target.partition = 5 and foo > 42)
or (target.partition = 7 and foo > 42)

And insert this filter into the target chain. We also use the same filter to process the file list, meaning we only make remove actions for files that will be targeted by the scan.

I considered whether it would be possible to do this via datafusion sql in a generic manner, for example by first joining against the distinct partitions. I don't think it's possible - because each of the filters on the logical plans are static, there's no opportunity for it to push the distinct partition tuples down into the scan. Another variant would be to make it so the source and partition tables share the same output_partitioning structure, but as far as I can tell you wouldn't be able to make the partitions line up such that you can do the merge effectively and not read the whole table (plus DeltaScan doesn't guarantee that one datafusion partition is one DeltaTable partition).

I think the static bit is a no brainer but the eager read of the source table may cause issues if the source table is of a similar size to the target table. It may be prudent hide that part behind a feature flag on the merge, but would love comments on it.

Performance

I created a 16GB table locally with 1.25 billion rows over 1k partitions, and when updating 1 partition a full merge takes 1000-ish seconds:

merge took 985.0801 seconds
merge metrics: MergeMetrics { num_source_rows: 1250000, num_target_rows_inserted: 468790, num_target_rows_updated: 781210, num_target_rows_deleted: 0, num_target_rows_copied: 1249687667, num_output_rows: 1250937667, num_target_files_added: 1001, num_target_files_removed: 1001, execution_time_ms: 983851, scan_time_ms: 0, rewrite_time_ms: 983322 }

but with partitioning it takes about 3:

merge took 2.6337671 seconds
merge metrics: MergeMetrics { num_source_rows: 1250000, num_target_rows_inserted: 468877, num_target_rows_updated: 781123, num_target_rows_deleted: 0, num_target_rows_copied: 468877, num_output_rows: 1718877, num_target_files_added: 2, num_target_files_removed: 2, execution_time_ms: 2622, scan_time_ms: 0, rewrite_time_ms: 2316 }

In practice, the tables I'm wanting to use this for are terabytes in size so using merge is currently impractical. This would be a significant speed boost to them.

Related Issue(s)

closes #1846

@github-actions github-actions bot added binding/rust Issues for the Rust crate crate/core labels Dec 11, 2023
Copy link

ACTION NEEDED

delta-rs follows the Conventional Commits specification for release automation.

The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification.

@emcake
Copy link
Contributor Author

emcake commented Dec 11, 2023

@Blajda I know you have done a bunch of things around merge, this may be of interest.

@emcake emcake changed the title merge using partition filters feat: merge using partition filters Dec 11, 2023
@Blajda
Copy link
Collaborator

Blajda commented Dec 12, 2023

@emcake this PR looks great. Good Job.

I considered whether it would be possible to do this via datafusion sql in a generic manner, for example by first joining against the distinct partitions. I don't think it's possible - because each of the filters on the logical plans are static, there's no opportunity for it to push the distinct partition tuples down into the scan.

Yeah with current Datafusion this is not possible however there are talks about supporting sideways information passing which will essentially allow the build side of the hash join to pass information to the probe side which would enable this "dynamic" filtering.

I think the static bit is a no brainer but the eager read of the source table may cause issues if the source table is of a similar size to the target table. It may be prudent hide that part behind a feature flag on the merge, but would love comments on it.

I don't this is required. We are already using a hash join so the entire build side (source) is loaded into memory. For these types of merges we will need to switch to using a sort merge join.


For your generalize filter method I'd like to see additional unit tests written so it easier to see how they get transformed.

I will follow up with another optimization that saves rewriting unmodified files with changes from this branch

@Blajda
Copy link
Collaborator

Blajda commented Dec 13, 2023

Performance with the tcpds benchmark is on par with main. Before is main. After is this branch

+-------------------------------------------------------------------------------------+---------------------+--------------------+------------------------------------------+
| name                                                                                | before_duration_avg | after_duration_avg | before_duration_avg / after_duration_avg |
+-------------------------------------------------------------------------------------+---------------------+--------------------+------------------------------------------+
| delete_only_fileMatchedFraction_0.05_rowMatchedFraction_0.05                        | 6101.0              | 6070.0             | 1.0051070840197693                       |
| multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_0.05            | 5351.0              | 5334.0             | 1.0031871016122984                       |
| multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_0.50            | 5401.0              | 5306.0             | 1.0179042593290615                       |
| multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_1.0             | 5204.0              | 5955.0             | 0.8738874895046179                       |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.01_rowNotMatchedFraction_0.1   | 5066.0              | 5043.0             | 1.0045607773150902                       |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.0_rowNotMatchedFraction_0.1    | 4367.0              | 4569.0             | 0.9557890129131101                       |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.1_rowNotMatchedFraction_0.0    | 4655.0              | 4943.0             | 0.9417357879830063                       |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.1_rowNotMatchedFraction_0.01   | 4587.0              | 4796.0             | 0.9564220183486238                       |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.5_rowNotMatchedFraction_0.001  | 4682.0              | 4611.0             | 1.0153979613966602                       |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.99_rowNotMatchedFraction_0.001 | 4696.0              | 4862.0             | 0.9658576717400247                       |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_1.0_rowNotMatchedFraction_0.001  | 4532.0              | 4634.0             | 0.9779887785930081                       |
| upsert_fileMatchedFraction_0.5_rowMatchedFraction_0.001_rowNotMatchedFraction_0.001 | 4545.0              | 4763.0             | 0.9542305269787948                       |
| upsert_fileMatchedFraction_1.0_rowMatchedFraction_0.001_rowNotMatchedFraction_0.001 | 4547.0              | 4742.0             | 0.958878110501898                        |
+-------------------------------------------------------------------------------------+---------------------+--------------------+------------------------------------------+

@emcake
Copy link
Contributor Author

emcake commented Dec 14, 2023

@Blajda @wjones127 @rtyler @roeap I think this is now ready for review.

@Blajda
Copy link
Collaborator

Blajda commented Dec 19, 2023

@emcake Thanks for making the changes. Seems like the new unit tests are a bit flaky since the order of the disjunctions is not stable. One suggestion is to split the actual expression on OR into an array and then sort. Then you'd have a consistent ordering on the expressions.

@emcake
Copy link
Contributor Author

emcake commented Dec 19, 2023

@emcake Thanks for making the changes. Seems like the new unit tests are a bit flaky since the order of the disjunctions is not stable. One suggestion is to split the actual expression on OR into an array and then sort. Then you'd have a consistent ordering on the expressions.

I wondered if this would bite me! will do.

@emcake
Copy link
Contributor Author

emcake commented Dec 19, 2023

@Blajda this isn't as pretty as I would like, I basically have to deconstruct the predicate into its component parts. this means that it's more fragile with how the predicate is constructed, but it will definitely verify the current behaviour.

})
.reduce(Expr::or)
.unwrap();
let split_pred = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to implement this yourself. Datafusion provides a utility for this: https://docs.rs/datafusion/latest/datafusion/logical_expr/utils/fn.split_binary_owned.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll use that for the first layer unpacking the ORs. but I think I still need to do manual mangling on unwrapping the EQ expressions, because Expr doesn't implement Ord so I need to extract the strings to sort them?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah alright if that's the case then I think this is good!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split_binary_owned is new to v34: https://docs.rs/datafusion/33.0.0/datafusion/?search=split_binary_owned

if #1983 makes it first then I can update it. :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge it now just want to give some time before I press the merge button. If you want to wait until after the other PR let me know.

@Blajda Blajda self-requested a review December 19, 2023 23:06
Copy link
Collaborator

@Blajda Blajda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emcake This PR looks good to me. Thanks again for making this contribution.
Seems like the tests that failed also fail on main.

@ion-elgreco ion-elgreco enabled auto-merge (squash) December 20, 2023 10:16
@ion-elgreco ion-elgreco merged commit 11ea2a5 into delta-io:main Dec 20, 2023
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate crate/core
Projects
None yet
Development

Successfully merging this pull request may close these issues.

merge very slow compared to delete + append on larger dataset
3 participants