Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: combine normal and cdf plan until write for merge #3142

Merged
merged 2 commits into from
Jan 20, 2025

Conversation

ion-elgreco
Copy link
Collaborator

@ion-elgreco ion-elgreco commented Jan 18, 2025

Description

During some exploration to make merge streamable, the complication is an ArrowArrayStreamReader can only be consumed once, so when we materialize (execute) a physical plan which has this LazyTableProvider twice, the second run will have no data.

This PR makes the MERGE+ MERGE_CDF plan a combined plan, and splits out the data during the write. A side benefit is that we now just have one function to do the writing and return all actions.

@JonasDev1 your work in MERGE to use min_max pruning from the source for the target scan also complicates things a bit since we consume the stream as a whole. We can solve this by caching the df beforehand, but then everything will stay in memory defeating the streamed execution. I'm curious if you have any idea's on how we could do this without full materialization? I couldn't find you on Slack, but feel free to ping me there to discuss it a bit more

Other thoughts

I moved this writer temporarily under the merge module, but the idea is to use this later also for the normal write operation. But I need the Logical plan refactor to be merged first

@github-actions github-actions bot added the binding/rust Issues for the Rust crate label Jan 18, 2025
@ion-elgreco ion-elgreco changed the title refactor: combine normal and cdf plan until write refactor: combine normal and cdf plan until write for merge Jan 18, 2025
Copy link

codecov bot commented Jan 18, 2025

Codecov Report

Attention: Patch coverage is 79.75709% with 50 lines in your changes missing coverage. Please review.

Project coverage is 71.89%. Comparing base (a73d646) to head (9c01a9e).
Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
crates/core/src/operations/merge/writer.rs 80.31% 13 Missing and 24 partials ⚠️
crates/core/src/operations/merge/mod.rs 77.96% 2 Missing and 11 partials ⚠️
Additional details and impacted files
@@           Coverage Diff            @@
##             main    #3142    +/-   ##
========================================
  Coverage   71.88%   71.89%            
========================================
  Files         134      135     +1     
  Lines       43479    43629   +150     
  Branches    43479    43629   +150     
========================================
+ Hits        31257    31367   +110     
- Misses      10201    10219    +18     
- Partials     2021     2043    +22     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@ion-elgreco ion-elgreco marked this pull request as draft January 19, 2025 17:18
@ion-elgreco ion-elgreco marked this pull request as ready for review January 19, 2025 17:29
@JonasDev1
Copy link
Contributor

One possible optimization:
You could also manage the two writes without the union and the data split durring the write, if you add the cdc column to the original df with all possible states copy, delete, insert, update and then split them in the write plan. This should be faster and save memory.

@JonasDev1
Copy link
Contributor

Regarding min max: I haven't understood it yet, but each stream should actually only get one part of the data, then only this part would be cached, or do you mean that the problem occurs because the cache is not cleared between the calls?

@ion-elgreco
Copy link
Collaborator Author

ion-elgreco commented Jan 20, 2025

One possible optimization: You could also manage the two writes without the union and the data split durring the write, if you add the cdc column to the original df with all possible states copy, delete, insert, update and then split them in the write plan. This should be faster and save memory.

What do you mean with split them in the write plan?

@ion-elgreco
Copy link
Collaborator Author

ion-elgreco commented Jan 20, 2025

Regarding min max: I haven't understood it yet, but each stream should actually only get one part of the data, then only this part would be cached, or do you mean that the problem occurs because the cache is not cleared between the calls?

The issue is, that the early filter uses min_max stats, which requires an aggregation execution plan. This execution plan will consume the source stream. Once consumed, the RecordBatchGenerator returns 0 batches.

The only way to resolve that is caching the result in a memtable prior to executing, but than all of your data is in memory and not streamed anymore.

Take a look at the code here, I disabled the min_max stats gatherin when streaming is on: #3145

But if you have any ideas on how to do this while in streaming, I am open for input :D

@JonasDev1
Copy link
Contributor

One possible optimization: You could also manage the two writes without the union and the data split durring the write, if you add the cdc column to the original df with all possible states copy, delete, insert, update and then split them in the write plan. This should be faster and save memory.

What do you mean with split them in the write plan?

Instead of doing the split into normal_df and cdf_df by null filtering you can filter by change value. Then you need to pass the df only once with all changes.

Something like:

normal_df = batch_df.clone().filter(col(CDC_COLUMN_NAME).isin("copy", "insert", "copy")).drop_columns(&[CDC_COLUMN_NAME])?;
cdf_df = batch_df.filter(col(CDC_COLUMN_NAME).isin("delete", "insert", "copy"))

@ion-elgreco
Copy link
Collaborator Author

One possible optimization: You could also manage the two writes without the union and the data split durring the write, if you add the cdc column to the original df with all possible states copy, delete, insert, update and then split them in the write plan. This should be faster and save memory.

What do you mean with split them in the write plan?

Instead of doing the split into normal_df and cdf_df by null filtering you can filter by change value. Then you need to pass the df only once with all changes.

Something like:

normal_df = batch_df.clone().filter(col(CDC_COLUMN_NAME).isin("copy", "insert", "copy")).drop_columns(&[CDC_COLUMN_NAME])?;
cdf_df = batch_df.filter(col(CDC_COLUMN_NAME).isin("delete", "insert", "copy"))

Ah right, see what you mean now! Let me check!

@ion-elgreco ion-elgreco marked this pull request as draft January 20, 2025 19:58
@ion-elgreco ion-elgreco force-pushed the refactor--combine_execution_plans branch from 8df3d14 to 027e369 Compare January 20, 2025 21:46
Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com>
Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com>
@ion-elgreco ion-elgreco force-pushed the refactor--combine_execution_plans branch from 027e369 to 9c01a9e Compare January 20, 2025 21:46
@ion-elgreco ion-elgreco marked this pull request as ready for review January 20, 2025 21:46
@rtyler rtyler enabled auto-merge January 20, 2025 21:58
Copy link
Member

@rtyler rtyler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very interesting, let's see what happens 😄

@rtyler rtyler added this pull request to the merge queue Jan 20, 2025
Merged via the queue into delta-io:main with commit 0cf4ff4 Jan 20, 2025
23 checks passed
let normal_df = batch_df
.clone()
.filter(col(CDC_COLUMN_NAME).in_list(
vec![lit("delete"), lit("source_delete"), lit("update_preimage")],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the copy missing here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JonasDev1 this filter does col(CDC_COL) NOT IN [delete, source_delete, update_preimage], so we want to keep copy, insert, update_postimage

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could have probably just filtered on that now that I think about it haha

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh the true means negation :D
Then it makes sense, thank you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants