-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable datafusion.optimizer.filter_null_join_keys by default #12369
Conversation
…datafusion into filter_null_join_keys
@@ -281,11 +278,9 @@ fn test_same_name_but_not_ambiguous() { | |||
let expected = "LeftSemi Join: t1.col_int32 = t2.col_int32\ | |||
\n Aggregate: groupBy=[[t1.col_int32]], aggr=[[]]\ | |||
\n SubqueryAlias: t1\ | |||
\n Filter: test.col_int32 IS NOT NULL\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was actually a regression in https://github.com/apache/datafusion/pull/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the link seems to be incomplete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This #12348 , I think I move the fix out of this PR, so it can be reviewed separately
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have any benchmarks that pushing these into Filters is faster than evaluating them in joins?
I am surprised that the overhead of filtering (and this copying) all non-null rows outweighs the benefits
If the filter was pushed all the way to the scan, I could see it potentially helping. However, I don't see any plans where the filter is actually pushed into a scan (perhaps because all the tests operate on CSV / MemTable which don't support filters)
@@ -281,11 +278,9 @@ fn test_same_name_but_not_ambiguous() { | |||
let expected = "LeftSemi Join: t1.col_int32 = t2.col_int32\ | |||
\n Aggregate: groupBy=[[t1.col_int32]], aggr=[[]]\ | |||
\n SubqueryAlias: t1\ | |||
\n Filter: test.col_int32 IS NOT NULL\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the link seems to be incomplete
07)------------TableScan: sales_global projection=[zip_code, country, sn, ts, currency] | ||
08)----------SubqueryAlias: e | ||
09)------------TableScan: sales_global projection=[sn, ts, currency, amount] | ||
07)------------Filter: sales_global.currency IS NOT NULL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Benefits: Input to join is smaller, so smaller input, faster build, no nulls need to be hashed
But in order to skip hashing nulls, the input array would have to be "filtered" (aka copy the matching rows)
lower chance for data skew, other join can be planned, downstream kernels are faster, can possibly be pushed down into scan, etc. In distributed setting, might save a lot of IO as well.
The argument in the distributed setting makes sense to me, but the other ones seem like they are all of the class "faster in some cases but slower in others"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But in order to skip hashing nulls, the input array would have to be "filtered" (aka copy the matching rows)
Correct, but you save some copying in RepartitionExec
/ build side concatenate as well, and copying / checking columns of keys in probe side.
In case there aren't any nulls (even if column is nullable), there is no copying happening.
Even with CSV / MemTable in many cases null filter can be combined with existing filter expressions, so no extra copying is happening (less copying in fact as fewer rows need to be copied).
Thanks, I'll try and see if there are any benchmarks. I tried with TPCH, but there aren't any nulls in there so benchmarks aren't changed (as expected). |
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. |
Which issue does this PR close?
Closes #12375
Rationale for this change
It is better to have it enabled by default, as pushing a filter down below a join is generally faster than the overhead saved by not having to execute the filter.
Besides that, in many cases the filter can be pushed down further to the scan or might enable other optimizations (now or future ones):
Benefits: Input to join is smaller, so smaller input, faster build, no nulls need to be hashed, lower chance for data skew, other join can be planned, downstream kernels are faster, can possibly be pushed down into scan, etc. In distributed setting, might save a lot of IO as well.
Downside: evaluation of expression + copying overhead if it doesn't filter out much rows. This might be slower than executing join without prefiltering on nulls.
What changes are included in this PR?
enable
datafusion.optimizer.filter_null_join_keys
, fix tests / expectationsAre these changes tested?
Existing tests
Are there any user-facing changes?
Only slightly different plans.