-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-35855][SQL] Unify reuse map data structures in non-AQE and AQE rules #33021
[SPARK-35855][SQL] Unify reuse map data structures in non-AQE and AQE rules #33021
Conversation
… rules (`ReuseExchangeAndSubquery`, `ReuseAdaptiveSubquery`) to a simple `Map[<canonicalized plan>, <plan>]`
cc @cloud-fan |
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #140137 has finished for PR 33021 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @peter-toth . Could you fix a UT failure? It seems to fail in both Jenkins and GitHub Action.
BroadcastJoinSuite.broadcast join where streamed side's output partitioning is PartitioningCollection
…to matching `LocalTableScanExec`s. This is necessary as with this PR reuse depends only on canonicalized plans.
Thanks @dongjoon-hyun. I've fixed it in 5e19a49 |
@@ -479,9 +479,9 @@ abstract class BroadcastJoinSuiteBase extends QueryTest with SQLTestUtils | |||
test("broadcast join where streamed side's output partitioning is PartitioningCollection") { | |||
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "500") { | |||
val t1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1") | |||
val t2 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i2", "j2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These slight changes are needed to avoid ReuseExchange
nodes in the test plans and keep the original test logic as this PR changes the ReuseExchangeAndSubquery
rule to identify reuses based on canonicalized plans only (the schema of the exchanges are no longer need to match).
Without these changes the canonicalized plans of LocalTableScanExec
in t1, t2 and t4 are the same as they have the same content.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@peter-toth are you saying that the new logic may reuse an exchange even if the schema doesn't match? The canonicalized form of the plan doesn't consider schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Schema contains the name of the output attributes as well, but in canonicalized plans we always replace names to "none" and only the structure of plan matters.
Please also note that ReusedExchangeExec
captures the output of the original exchange so that can we restore the schema in a ReusedExchangeExec
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, so the new logic is more effective (can trigger reuse in more cases), and we need to update the test here to not trigger reuse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is, but I don't think this comes up frequently in real world applications.
Changing t2
and t4
seemed easier than preparing the test for reused exchanged.
I also looked into why the issue didn't come up with the AQE version of the suite (BroadcastJoinSuiteAE
) before this PR and that's because the test doesn't run the queries before inspecting the plans and the initial AQE plans don't contain reuses (but final plans do).
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #140162 has finished for PR 33021 at commit
|
thanks, merging to master! |
Thanks for the review! |
What changes were proposed in this pull request?
This PR unifies reuse map data structures in non-AQE and AQE rules to a simple
Map[<canonicalized plan>, <plan>]
based on the discussion here: #28885 (comment)Why are the changes needed?
The proposed
Map[<canonicalized plan>, <plan>]
is simpler than the currently usedMap[<schema>, ArrayBuffer[<plan>]]
inReuseMap
/ReuseExchangeAndSubquery
(non-AQE) and consistent with theReuseAdaptiveSubquery
(AQE) subquery reuse rule.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing UTs.