-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31809][SQL] Infer IsNotNull from some special equality join keys #28642
Conversation
Test build #123119 has finished for PR 28642 at commit
|
testConstraintsAfterJoin( | ||
testRelation.subquery('left), | ||
testRelation.subquery('right), | ||
testRelation.where(IsNotNull(Coalesce(Seq('a, 'b)))).subquery('left), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hive> EXPLAIN SELECT t1.* FROM t1 JOIN t2 ON coalesce(t1.a, t1.b)=t2.a;
OK
STAGE DEPENDENCIES:
Stage-4 is a root stage
Stage-3 depends on stages: Stage-4
Stage-0 depends on stages: Stage-3
STAGE PLANS:
Stage: Stage-4
Map Reduce Local Work
Alias -> Map Local Tables:
$hdt$_0:t1
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
$hdt$_0:t1
TableScan
alias: t1
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Filter Operator
predicate: COALESCE(a,b) is not null (type: boolean)
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Select Operator
expressions: a (type: string), b (type: string), c (type: string)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
HashTable Sink Operator
keys:
0 COALESCE(_col0,_col1) (type: string)
1 _col0 (type: string)
Stage: Stage-3
Map Reduce
Map Operator Tree:
TableScan
alias: t2
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Filter Operator
predicate: a is not null (type: boolean)
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Select Operator
expressions: a (type: string)
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 COALESCE(_col0,_col1) (type: string)
1 _col0 (type: string)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Execution mode: vectorized
Local Work:
Map Reduce Local Work
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
|
||
test("Should not infer IsNotNull for non null-intolerant child from same table") { | ||
comparePlans(Optimize.execute(testRelation.where(Coalesce(Seq('a, 'b)) === 'c).analyze), | ||
testRelation.where(Coalesce(Seq('a, 'b)) === 'c && IsNotNull('c)).analyze) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hive> EXPLAIN SELECT t1.* FROM t1 WHERE coalesce(t1.a, t1.b)=t1.c;
OK
STAGE DEPENDENCIES:
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
TableScan
alias: t1
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Filter Operator
predicate: (COALESCE(a,b) = c) (type: boolean)
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Select Operator
expressions: a (type: string), b (type: string), c (type: string)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
ListSink
Time taken: 4.026 seconds, Fetched: 20 row(s)
retest this please |
Test build #123553 has finished for PR 28642 at commit
|
Test build #123607 has finished for PR 28642 at commit
|
@@ -1039,7 +1039,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan | |||
val pythonEvals = collect(joinNode.get) { | |||
case p: BatchEvalPythonExec => p | |||
} | |||
assert(pythonEvals.size == 2) | |||
assert(pythonEvals.size == 4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon I'm not sure if this change can optimize python udf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I don't think it's more efficient to have BatchEvalPythonExec
more. It will require more Python executions which aren't trivial.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I quickly checked:
== Physical Plan ==
*(3) Project [a#225, b#226, c#236, d#237]
+- *(3) BroadcastHashJoin [cast(pythonUDF0#256 as int)], [cast(pythonUDF0#257 as int)], Inner, BuildRight
:- BatchEvalPython [udf(cast(a#225 as string))], [pythonUDF0#256]
: +- *(1) Project [_1#220 AS a#225, _2#221 AS b#226]
: +- *(1) Project [_1#220, _2#221]
: +- *(1) Filter isnotnull(cast(pythonUDF0#254 as int))
: +- BatchEvalPython [udf(cast(_1#220 as string))], [pythonUDF0#254]
: +- LocalTableScan [_1#220, _2#221]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(cast(input[2, string, true] as int) as bigint))), [id=#140]
+- BatchEvalPython [udf(cast(c#236 as string))], [pythonUDF0#257]
+- *(2) Project [_1#231 AS c#236, _2#232 AS d#237]
+- *(2) Project [_1#231, _2#232]
+- *(2) Filter isnotnull(cast(pythonUDF0#255 as int))
+- BatchEvalPython [udf(cast(_1#231 as string))], [pythonUDF0#255]
+- LocalTableScan [_1#231, _2#232]
We should probably avoid inferring the is-not-null filter in this case.
retest this please |
Test build #124032 has finished for PR 28642 at commit
|
retest this please |
cc @cloud-fan FYI |
Test build #124038 has finished for PR 28642 at commit
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
Test build #142271 has finished for PR 28642 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Kubernetes integration test starting |
Test build #142276 has finished for PR 28642 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #142299 has finished for PR 28642 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test starting |
Kubernetes integration test status failure |
private def resultMayBeNull(e: Expression): Boolean = e match { | ||
case Cast(child, dataType, _, _) => !Cast.canUpCast(child.dataType, dataType) | ||
case _: Coalesce => true | ||
case _ => false | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan @HyukjinKwon It will not infer all equality join keys. For example:
Infer | Will not infer |
---|---|
cast(strCol AS double) = doubleCol | upper(strCol) = upperStrCol |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #144659 has finished for PR 28642 at commit
|
Test build #144661 has finished for PR 28642 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #144680 has finished for PR 28642 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Show resolved
Hide resolved
Test build #144703 has finished for PR 28642 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
private def resultMayBeNull(exp: Expression): Boolean = exp match { | ||
case e if !e.nullable => false | ||
case Cast(child: Attribute, dataType, _, _) => !Cast.canUpCast(child.dataType, dataType) | ||
case c: Coalesce if c.children.forall(_.isInstanceOf[Attribute]) => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we rely on the NullIntolerant
interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can infer NullIntolerant
already. For example:
spark.sql("create table t1 (id string, value int) using parquet")
spark.sql("create table t2 (id int, value int) using parquet")
spark.sql("select * from t1 join t2 on t1.id = t2.id").explain("extended")
== Optimized Logical Plan ==
Join Inner, (cast(id#0 as int) = id#2)
:- Filter isnotnull(id#0)
: +- Relation default.t1[id#0,value#1] parquet
+- Filter isnotnull(id#2)
+- Relation default.t2[id#2,value#3] parquet
Cast
is NullIntolerant
. We can infer IsNotNull(t1.id)
already. But I also want to Infer isnotnull(cast(t1.id as int))
because t1.id
may contains many strings that can not be casted to int.
@@ -1215,6 +1215,15 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] | |||
} | |||
} | |||
|
|||
// Whether the result of this expression may be null. For example: CAST(strCol AS double) | |||
// We will infer an IsNotNull expression for this expression to avoid skew join. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it better to infer IsNotNull(col)
instead of IsNotNull(CAST(col AS other_type))
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can infer IsNotNull(col)
already. For example:
spark.sql("create table t1 (id string, value int) using parquet")
spark.sql("create table t2 (id int, value int) using parquet")
spark.sql("select * from t1 join t2 on t1.id = t2.id").explain("extended")
Before this pr:
== Optimized Logical Plan ==
Join Inner, (cast(id#0 as int) = id#2)
:- Filter isnotnull(id#0)
: +- Relation default.t1[id#0,value#1] parquet
+- Filter isnotnull(id#2)
+- Relation default.t2[id#2,value#3] parquet
After this pr:
== Optimized Logical Plan ==
Join Inner, (cast(id#0 as int) = id#2)
:- Filter (isnotnull(id#0) AND isnotnull(cast(id#0 as int)))
: +- Relation default.t1[id#0,value#1] parquet
+- Filter isnotnull(id#2)
+- Relation default.t2[id#2,value#3] parquet
Infer isnotnull(cast(t1.id as int))
may filter out many strings that can not be casted to int.
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
We can infer
IsNotNull
from some special equality join keys. For example:The
coalesce(t1.a, t1.b)
orCAST(t1.a AS DOUBLE)
may generate a lot of null values, which will lead to skew join.After this pr:
Why are the changes needed?
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test and benchmark test:
Case1:
Case2: