-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-34168] [SQL] Support DPP in AQE when the join is Broadcast hash join at the beginning #31258
Conversation
@cloud-fan Please help me review. Thanks. |
Test build #134257 has finished for PR 31258 at commit
|
Hi, @JkSelf . Could you fix the scala style? |
…nd insert the DPP filter after the build side excuted
@cloud-fan Updated based on the offline discussions. Please help review again. Thanks. |
Kubernetes integration test starting |
Kubernetes integration test status failure |
cc @maryannxue |
Test build #134321 has finished for PR 31258 at commit
|
SubqueryBroadcastExec(name, broadcastKeyIndex, buildKeys, exchange) | ||
|
||
// Update the inputPlan and the currentPhysicalPlan of the adaptivePlan. | ||
adaptivePlan.inputPlan = broadcastValues |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we wrap the adaptivePlan
with subquery broadcast? Then we don't need to mutate adaptivePlan.inputPlan
here and keep inputPlan
as immutable.
@@ -133,7 +133,7 @@ case class AdaptiveSparkPlanExec( | |||
inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations"))) | |||
} | |||
|
|||
@volatile private var currentPhysicalPlan = initialPlan | |||
@volatile var currentPhysicalPlan = initialPlan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exposing a mutable variable seems not a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Fixed.
@@ -101,7 +102,6 @@ case class InsertAdaptiveSparkPlan( | |||
// TODO migrate dynamic-partition-pruning onto adaptive execution. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So DPP is supported and this comment looks out-of-dated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Hi, @JkSelf .
|
Test build #134426 has finished for PR 31258 at commit
|
Test build #134634 has finished for PR 31258 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #134636 has finished for PR 31258 at commit
|
Test build #134859 has finished for PR 31258 at commit
|
Kubernetes integration test starting |
*/ | ||
case class ShuffleQueryStageExec( | ||
override val id: Int, | ||
override val plan: SparkPlan) extends QueryStageExec { | ||
override val plan: SparkPlan, | ||
_canonicalized: SparkPlan) extends QueryStageExec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we missed to add override def doCanonicalize(): SparkPlan = _canonicalized
Kubernetes integration test status failure |
Test build #134871 has finished for PR 31258 at commit
|
Test build #134870 has finished for PR 31258 at commit
|
@JkSelf @cloud-fan This implementation can not reuse SELECT count(*)
FROM (SELECT c.c_customer_sk,
s.*
FROM customer c
JOIN store_sales s
ON c.c_customer_sk = ss_customer_sk) t1
JOIN date_dim
ON ss_sold_date_sk = d_date_sk
AND d_year = 2002
|
@wangyum Yes. This implementation only is the first PR to support the join is bhj before apply AQE rules. We will support the join is smj and then convert to bhj use case in the following PRs. |
Kubernetes integration test starting |
Kubernetes integration test status success |
@@ -1345,7 +1371,9 @@ abstract class DynamicPartitionPruningSuiteBase | |||
} | |||
} | |||
|
|||
test("SPARK-32817: DPP throws error when the broadcast side is empty") { | |||
test("SPARK-32817: DPP throws error when the broadcast side is empty", | |||
DisableAdaptiveExecution("EliminateJoinToEmptyRelation " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can disable this rule by setting ADAPTIVE_OPTIMIZER_EXCLUDED_RULES
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good start!
Test build #134926 has finished for PR 31258 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #134971 has finished for PR 31258 at commit
|
@@ -165,7 +175,8 @@ case class ShuffleQueryStageExec( | |||
override def newReuseInstance(newStageId: Int, newOutput: Seq[Attribute]): QueryStageExec = { | |||
val reuse = ShuffleQueryStageExec( | |||
newStageId, | |||
ReusedExchangeExec(newOutput, shuffle)) | |||
ReusedExchangeExec(newOutput, shuffle), | |||
shuffle.canonicalized) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this should be _canonicalized
@@ -229,7 +245,8 @@ case class BroadcastQueryStageExec( | |||
override def newReuseInstance(newStageId: Int, newOutput: Seq[Attribute]): QueryStageExec = { | |||
val reuse = BroadcastQueryStageExec( | |||
newStageId, | |||
ReusedExchangeExec(newOutput, broadcast)) | |||
ReusedExchangeExec(newOutput, broadcast), | |||
broadcast.canonicalized) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
thanks, merging to master! |
Test build #135033 has finished for PR 31258 at commit
|
… join at the beginning This PR is to enable AQE and DPP when the join is broadcast hash join at the beginning, which can benefit the performance improvement from DPP and AQE at the same time. This PR will make use of the result of build side and then insert the DPP filter into the probe side. Improve performance No adding new ut Closes apache#31258 from JkSelf/supportDPP1. Authored-by: jiake <ke.a.jia@intel.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This PR is to enable AQE and DPP when the join is broadcast hash join at the beginning, which can benefit the performance improvement from DPP and AQE at the same time. This PR will make use of the result of build side and then insert the DPP filter into the probe side.
Why are the changes needed?
Improve performance
Does this PR introduce any user-facing change?
No
How was this patch tested?
adding new ut