Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35093] [SQL] AQE now uses newQueryStage plan as key for looking up cached exchanges for re-use #32195

Closed
wants to merge 1 commit into from

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Apr 15, 2021

What changes were proposed in this pull request?

AQE has an optimization where it attempts to reuse compatible exchanges but it does not take into account whether the exchanges are columnar or not, resulting in incorrect reuse under some circumstances.

This PR simply changes the key used to lookup cached stages. It now uses the canonicalized form of the new query stage (potentially created by a plugin) rather than using the canonicalized form of the original exchange.

Why are the changes needed?

When using the RAPIDS Accelerator for Apache Spark we sometimes see a new query stage correctly create a row-based exchange and then Spark replaces it with a cached columnar exchange, which is not compatible, and this causes queries to fail.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

The patch has been tested with the query that highlighted this issue. I looked at writing unit tests for this but it would involve implementing a mock columnar exchange in the tests so would be quite a bit of work. If anyone has ideas on other ways to test this I am happy to hear them.

@github-actions github-actions bot added the SQL label Apr 15, 2021
@github-actions
Copy link

Test build #753174945 for PR 32195 at commit e735c1b.

@andygrove
Copy link
Member Author

@tgravescs fyi

@tgravescs
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Apr 15, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42020/

@SparkQA
Copy link

SparkQA commented Apr 15, 2021

Test build #137445 has finished for PR 32195 at commit e735c1b.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@github-actions
Copy link

Test build #753637969 for PR 32195 at commit 074f091.

@SparkQA
Copy link

SparkQA commented Apr 15, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42022/

@SparkQA
Copy link

SparkQA commented Apr 15, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42022/

@HyukjinKwon
Copy link
Member

cc @maryannxue FYI

@dongjoon-hyun
Copy link
Member

Hi, @andygrove . Could you clarify that this is not a correctness issue in the PR description? It only causes job failures, doesn't it?

resulting in incorrect reuse under some circumstances.

@SparkQA
Copy link

SparkQA commented Apr 16, 2021

Test build #137447 has finished for PR 32195 at commit 074f091.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean, if Spark converts reused query stage from columnar to row-based, it will be compatible? Just out of curiosity.

@andygrove
Copy link
Member Author

Hi, @andygrove . Could you clarify that this is not a correctness issue in the PR description? It only causes job failures, doesn't it?

Yes @dongjoon-hyun that is correct. It would result in an invalid plan that would fail to execute.

@andygrove
Copy link
Member Author

Does it mean, if Spark converts reused query stage from columnar to row-based, it will be compatible? Just out of curiosity.

@viirya Spark plugins can create either columnar or row-based exchanges as required for compatibility with other parts of the query plan.

For example. a columnar BroadcastHashJoin would expect both of its child plans to also be columnar, so if Spark replaces one of the child plans with a row-based plan it would not be compatible at execution time.

@viirya
Copy link
Member

viirya commented Apr 16, 2021

Does ReuseExchange rule also possibly suffer from the issue? In the rule, Spark also only checks canonicalized plans of exchanges to choose reused exchange.

@andygrove
Copy link
Member Author

Does ReuseExchange rule also possibly suffer from the issue? In the rule, Spark also only checks canonicalized plans of exchanges to choose reused exchange.

This hasn't been a problem for us. We only ran into issues with the AQE exchange reuse logic because of the adaptive nature where query stages are being created during execution, and with Spark making changes after we provide the new query stage it is too late for our plugin to correct the issue.

Without AQE, where the ReuseExchange rule would be applied (as far as I can see), our plugin would operate on the physical plan after Spark has applied the exchange reuse logic and we would be able to modify the plan as required after that, and before execution, to insert any necessary transitions.

@andygrove
Copy link
Member Author

I'll take another look at ReuseExchange tomorrow. It might be worth considering adding a similar check here, perhaps as a separate PR.

@tgravescs
Copy link
Contributor

If I'm understanding this properly it seems like the root cause is actually that AQE looks at the plan before the columnar changes are applied and physical plan gets executed so it thinks they are canonically the same when really after preparations (specifically here ApplyColumnarRulesAndInsertTransitions) and execution they aren't. I wonder if there is a better way to do that part. Might be nice to investigate that a bit further to see if that is possible.

@dongjoon-hyun
Copy link
Member

cc @cloud-fan

@cloud-fan
Copy link
Contributor

it seems like the root cause is actually that AQE looks at the plan before the columnar changes are applied

This is a good point. Without AQE, ApplyColumnarRulesAndInsertTransitions is executed before ReuseExchange.

@tgravescs
Copy link
Contributor

so I was looking at this today and I think its fairly easy fix to just to change it to store the plan after its been through all the rules.

val queryStage = context.stageCache.getOrElseUpdate(newStage.plan.canonicalized, newStage)
instead of the

val queryStage = context.stageCache.getOrElseUpdate(e.canonicalized, newStage)

The only downside I can think of is if there is something that is matching on the original plan but after running the rules makes it not canonically the same but really is, it would take a bit longer to have to tranverse down the plan and apply all the rules. But that kind of seems like a bug to me anyway.
If anyone sees an issue with that let me know, otherwise either @andygrove can update this or I can put up a PR.

@andygrove
Copy link
Member Author

Thanks, @tgravescs. That is a much simpler change! I have updated the PR.

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change looks good to me, but would be great for others to take a look as well.

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Test build #137742 has finished for PR 32195 at commit 1035842.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42269/

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42269/

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42270/

@SparkQA
Copy link

SparkQA commented Apr 21, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42270/

@cloud-fan
Copy link
Contributor

I'm OK with that. The fix is simple and supportColumns API is in 3.0/3.1

@tgravescs
Copy link
Contributor

@andygrove could update the description and then test on the 3.1.2 and 3.0.3 branches when you get a chance?

@andygrove andygrove changed the title [SPARK-35093] [SQL] AQE now respects supportsColumnar when attempting to reuse exchanges [SPARK-35093] [SQL] AQE now uses newQueryStage plan as key for looking up cached exchanges for reiuse May 18, 2021
@andygrove andygrove changed the title [SPARK-35093] [SQL] AQE now uses newQueryStage plan as key for looking up cached exchanges for reiuse [SPARK-35093] [SQL] AQE now uses newQueryStage plan as key for looking up cached exchanges for re-use May 18, 2021
@andygrove
Copy link
Member Author

@cloud-fan @tgravescs I updated the title and description. Let me know if this is still not clear.

@dongjoon-hyun
Copy link
Member

Yes, please backport this to the old branches.

@andygrove
Copy link
Member Author

I applied this patch to the latest in branch-3.0 and branch-3.1 and ran manual tests to confirm that this fixes the issue for us in those releases.

@tgravescs
Copy link
Contributor

I'm not sure why the label pull requests check isn't running here. Anyone know?

@viirya
Copy link
Member

viirya commented May 18, 2021

Not only here, I also saw other PRs their label pull requests are queued too.

@asfgit asfgit closed this in 52e3cf9 May 19, 2021
asfgit pushed a commit that referenced this pull request May 19, 2021
… up cached exchanges for re-use

### What changes were proposed in this pull request?
AQE has an optimization where it attempts to reuse compatible exchanges but it does not take into account whether the exchanges are columnar or not, resulting in incorrect reuse under some circumstances.

This PR simply changes the key used to lookup cached stages. It now uses the canonicalized form of the new query stage (potentially created by a plugin) rather than using the canonicalized form of the original exchange.

### Why are the changes needed?
When using the [RAPIDS Accelerator for Apache Spark](https://github.com/NVIDIA/spark-rapids) we sometimes see a new query stage correctly create a row-based exchange and then Spark replaces it with a cached columnar exchange, which is not compatible, and this causes queries to fail.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
The patch has been tested with the query that highlighted this issue. I looked at writing unit tests for this but it would involve implementing a mock columnar exchange in the tests so would be quite a bit of work. If anyone has ideas on other ways to test this I am happy to hear them.

Closes #32195 from andygrove/SPARK-35093.

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
(cherry picked from commit 52e3cf9)
Signed-off-by: Thomas Graves <tgraves@apache.org>
asfgit pushed a commit that referenced this pull request May 19, 2021
… up cached exchanges for re-use

### What changes were proposed in this pull request?
AQE has an optimization where it attempts to reuse compatible exchanges but it does not take into account whether the exchanges are columnar or not, resulting in incorrect reuse under some circumstances.

This PR simply changes the key used to lookup cached stages. It now uses the canonicalized form of the new query stage (potentially created by a plugin) rather than using the canonicalized form of the original exchange.

### Why are the changes needed?
When using the [RAPIDS Accelerator for Apache Spark](https://github.com/NVIDIA/spark-rapids) we sometimes see a new query stage correctly create a row-based exchange and then Spark replaces it with a cached columnar exchange, which is not compatible, and this causes queries to fail.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
The patch has been tested with the query that highlighted this issue. I looked at writing unit tests for this but it would involve implementing a mock columnar exchange in the tests so would be quite a bit of work. If anyone has ideas on other ways to test this I am happy to hear them.

Closes #32195 from andygrove/SPARK-35093.

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
(cherry picked from commit 52e3cf9)
Signed-off-by: Thomas Graves <tgraves@apache.org>
@tgravescs
Copy link
Contributor

thanks @andygrove, thanks everyone for the reviews.
I've merged to master, branch-3.1 and branch-3.0

@dongjoon-hyun
Copy link
Member

Hi, All. This seems to break branch-3.0.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented May 19, 2021

Could you check if that is a flakiness or not?

[info] AdaptiveQueryExecSuite:
[info] - Change merge join to broadcast join (521 milliseconds)
[info] - Reuse the parallelism of CoalescedShuffleReaderExec in LocalShuffleReaderExec (265 milliseconds)
[info] - Reuse the default parallelism in LocalShuffleReaderExec (260 milliseconds)
[info] - Scalar subquery (422 milliseconds)
[info] - Scalar subquery in later stages (567 milliseconds)
[info] - multiple joins *** FAILED *** (1 second, 25 milliseconds)
[info]   ArrayBuffer(BroadcastHashJoin [b#147752], [a#147761], Inner, BuildLeft

@andygrove
Copy link
Member Author

I'm looking now

@andygrove
Copy link
Member Author

The tests worked fine for me locally at commit 706f91e. AQE is non-deterministic because query stages can change depending on the order of other stages completing, so this is likely a flakey test. I will investigate further and see how we can make it more robust.

@andygrove
Copy link
Member Author

It looks like this is not a new issue, although potentially the recent change made it more likely to happen. We should probably re-open https://issues.apache.org/jira/browse/SPARK-32304

@HyukjinKwon
Copy link
Member

Late LGTM2

flyrain pushed a commit to flyrain/spark that referenced this pull request Sep 21, 2021
… up cached exchanges for re-use

### What changes were proposed in this pull request?
AQE has an optimization where it attempts to reuse compatible exchanges but it does not take into account whether the exchanges are columnar or not, resulting in incorrect reuse under some circumstances.

This PR simply changes the key used to lookup cached stages. It now uses the canonicalized form of the new query stage (potentially created by a plugin) rather than using the canonicalized form of the original exchange.

### Why are the changes needed?
When using the [RAPIDS Accelerator for Apache Spark](https://github.com/NVIDIA/spark-rapids) we sometimes see a new query stage correctly create a row-based exchange and then Spark replaces it with a cached columnar exchange, which is not compatible, and this causes queries to fail.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
The patch has been tested with the query that highlighted this issue. I looked at writing unit tests for this but it would involve implementing a mock columnar exchange in the tests so would be quite a bit of work. If anyone has ideas on other ways to test this I am happy to hear them.

Closes apache#32195 from andygrove/SPARK-35093.

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
(cherry picked from commit 52e3cf9)
Signed-off-by: Thomas Graves <tgraves@apache.org>
fishcus pushed a commit to fishcus/spark that referenced this pull request Jan 12, 2022
… up cached exchanges for re-use

### What changes were proposed in this pull request?
AQE has an optimization where it attempts to reuse compatible exchanges but it does not take into account whether the exchanges are columnar or not, resulting in incorrect reuse under some circumstances.

This PR simply changes the key used to lookup cached stages. It now uses the canonicalized form of the new query stage (potentially created by a plugin) rather than using the canonicalized form of the original exchange.

### Why are the changes needed?
When using the [RAPIDS Accelerator for Apache Spark](https://github.com/NVIDIA/spark-rapids) we sometimes see a new query stage correctly create a row-based exchange and then Spark replaces it with a cached columnar exchange, which is not compatible, and this causes queries to fail.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
The patch has been tested with the query that highlighted this issue. I looked at writing unit tests for this but it would involve implementing a mock columnar exchange in the tests so would be quite a bit of work. If anyone has ideas on other ways to test this I am happy to hear them.

Closes apache#32195 from andygrove/SPARK-35093.

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
(cherry picked from commit 52e3cf9)
Signed-off-by: Thomas Graves <tgraves@apache.org>
domybest11 pushed a commit to domybest11/spark that referenced this pull request Jun 15, 2022
… up cached exchanges for re-use

### What changes were proposed in this pull request?
AQE has an optimization where it attempts to reuse compatible exchanges but it does not take into account whether the exchanges are columnar or not, resulting in incorrect reuse under some circumstances.

This PR simply changes the key used to lookup cached stages. It now uses the canonicalized form of the new query stage (potentially created by a plugin) rather than using the canonicalized form of the original exchange.

### Why are the changes needed?
When using the [RAPIDS Accelerator for Apache Spark](https://github.com/NVIDIA/spark-rapids) we sometimes see a new query stage correctly create a row-based exchange and then Spark replaces it with a cached columnar exchange, which is not compatible, and this causes queries to fail.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
The patch has been tested with the query that highlighted this issue. I looked at writing unit tests for this but it would involve implementing a mock columnar exchange in the tests so would be quite a bit of work. If anyone has ideas on other ways to test this I am happy to hear them.

Closes apache#32195 from andygrove/SPARK-35093.

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants