Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34637][SQL] Improve the performance of AQE and DPP through logical optimization. #31941

Closed
wants to merge 1 commit into from

Conversation

weixiuli
Copy link
Contributor

@weixiuli weixiuli commented Mar 23, 2021

What changes were proposed in this pull request?

Both AQE and DPP can be applied at the same time in https://issues.apache.org/jira/browse/SPARK-34168, while AQE and DPP can only enable when the join is Broadcast hash join at the beginning.

This pr supports Dynamic Partition Pruning in Adaptive Execution can be applied at the same time in more scenarios , the processing idea is as follows:

  1. Firstly, we should check whether the inputPlan has a BroadcastHashJoinExec of the build side before the adaptive dynamic pruning optimizer rule.
  2. When the above broadcastHashJoinExec exists but the broadcastQueryStage of the build side of join has't created, we should create a broadcastQueryStage of the build side for the DPP optimizer, and cache it for AQE reuse.
  3. When the above broadcastHashJoinExec exists and the broadcastQueryStage of the build side of join has created, we can reuse it again for the DPP optimizer
  4. When the above broadcastHashJoinExec does't exist, we should fallback the DPP optimizer.

Why are the changes needed?

To support Dynamic Partition Pruning in Adaptive Execution can be applied at the same time in more scenarios.

Does this PR introduce any user-facing change?

NO

How was this patch tested?

Add unittests.

…ical optimization.

To support Dynamic Partition Pruning in Adaptive Execution:
1. Firstly, we should check whether the sparkPlan has a BroadcastHashJoinExec of the build side before the adaptive dynamic pruning optimizer rule.
2. When the above broadcastHashJoinExec exists but the broadcastQueryStage of the build side of join has't created, we should create a broadcastQueryStage of the build side for the DPP optimizer, and cache it for AQE reuse.
3. When the above broadcastHashJoinExec exists and the broadcastQueryStage of the build side of join has created, we can reuse it again for the DPP optimizer
4. When the above broadcastHashJoinExec does't exist, we should forback the DPP optimizer.
@weixiuli
Copy link
Contributor Author

@cloud-fan @maryannxue @JkSelf PTAL.

@github-actions github-actions bot added the SQL label Mar 23, 2021
@cloud-fan
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Mar 23, 2021

Test build #136407 has started for PR 31941 at commit 0208465.

val reuseQueryStage = adaptivePlan.reuseQueryStage(existingStage.get, exchange)
logDebug(s"PlanAdaptiveDynamicPruningFilters: reuseQueryStage => $reuseQueryStage")
Option(reuseQueryStage)
} else if (conf.dynamicPartitionPruningCreateBroadcastEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we don't need this config. It's always beneficial to do so. The AQE + DPP integration is only in master and not released yet, so we don't need to worry about regressions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your time. Yes, we may keep it now for reviewing and remove it finally.

@cloud-fan
Copy link
Contributor

This looks similar to #31756 , @JkSelf can you take a look?

@SparkQA
Copy link

SparkQA commented Mar 23, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40991/

@SparkQA
Copy link

SparkQA commented Mar 23, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40991/

@JkSelf
Copy link
Contributor

JkSelf commented Mar 24, 2021

@cloud-fan @weixiuli
Yes. PR#31756 is resolving the DPP + AQE use case limitations.

@JkSelf
Copy link
Contributor

JkSelf commented Mar 24, 2021

When found the buildPlan in SubqueryAdaptiveBroadcastExec can be reused, we can apply the DPP filter firstly(same with the PlanDynamicPruningFilters rule) and then back the AQE framework to reuse the broadcast exchange in the build side. And If the build side exchange is running and not finished, we can use the wait or cancel this stage mechanism to avoid the broadcast exchange be executed twice.

@cloud-fan @weixiuli Please correct me if wrong understandings. Thanks.

@weixiuli
Copy link
Contributor Author

weixiuli commented Mar 25, 2021

When found the buildPlan in SubqueryAdaptiveBroadcastExec can be reused, we can apply the DPP filter firstly(same with the PlanDynamicPruningFilters rule) and then back the AQE framework to reuse the broadcast exchange in the build side. And If the build side exchange is running and not finished, we can use the wait or cancel this stage mechanism to avoid the broadcast exchange be executed twice.

@cloud-fan @weixiuli Please correct me if wrong understandings. Thanks.

Yes, when the InputPlan has a build-side BroadcastHashJoinExec before the adaptive dynamic trimming optimizer rule,
we should ensure that the build plan can be created by the DPP when not created by the AQE, and that the AQE can reuse it later, or that the DPP can reuse it when it has already been created by the AQE. Not only does it avoid the broadcast exchange be executed twice, but it also guarantees that BuildPlan can be created and used by the DPP if it is not created by the AQE, which is beneficial to both the DPP and AQE.

@JkSelf
Copy link
Contributor

JkSelf commented Mar 25, 2021

I think it is better to make use of the AQE framework to reuse the broadcast exchange or newQueryStage.
@cloud-fan What is your point of view?

@cloud-fan
Copy link
Contributor

I think it is better to make use of the AQE framework to reuse the broadcast exchange or newQueryStage.

I agree, and I think this PR does it?

When planning the DPP filter, the broadcast plan may have 2 different states:

  1. It's already submitted as a query stage, which means it's available in the stage cache. No matter it's running or completed, we will create a ReusedQueryStage for DPP filter.
  2. It's not submitted yet and not available in the stage cache. We should create a fresh QueryStage for DPP filter and put it in the stage cache, so that the AQE framework can reuse it later.

Case 2 is a bit tricky due to race conditions. Maybe the DPP filter and AQE framework are creating a fresh query stage at the same time. We should double-check it.

@JkSelf
Copy link
Contributor

JkSelf commented Mar 26, 2021

I think it is better to make use of the AQE framework to reuse the broadcast exchange or newQueryStage.

@cloud-fan
I may need to explain a little bit more about this.

  1. In my understanding, PlanDynamicPruningFilters rule is just simply judge whether there is an exchange that can be reused to decide whether to insert DPP or not. And the process of real reuse is in ReuseExchange rule. I think this way of thinking is clearer.
  2. When AQE was enabled, we implemented the ReuseExchange rule in the AQE Framework. When the exchange was created, we went to the stageCache to find out if there is an exchange that can be reused, and if there is, we reuse it.
  3. In the PlanAdaptiveDynamicPruningFilters rule, I am more inclined to the idea of PlanDynamicPruningFilters rule, just add DPP filter by judging whether there is an exchange that can be reused. The real reuse process is left to AQE Framework instead of looking in the stageCache to create the reused exchange or calling the newQueryStage method to create a new quey stage in the PlanAdaptiveDynamicPruningFilters rule. Of course, we did this in PR#31258. But I think we may need to make some improvements in subsequent implementations.

@cloud-fan
Copy link
Contributor

@JkSelf are you saying that the PlanAdaptiveDynamicPruningFilters should simply create BroadcastExchange and let the AQE framework create/reuse query stage?

@weixiuli
Copy link
Contributor Author

3. In the PlanAdaptiveDynamicPruningFilters rule, I am more inclined to the idea of PlanDynamicPruningFilters rule, just add DPP filter by judging whether there is an exchange that can be reused. The real reuse process is left to AQE Framework instead of looking in the stageCache to create the reused exchange or calling the newQueryStage method to create a new quey stage in the PlanAdaptiveDynamicPruningFilters rule.

@JkSelf I don't think so, the 'case 2’ said by @cloud-fan would be ignored if following your opinion. This PR can solve the 'case 2’. In addition, I think this modification is relatively concise.

@JkSelf
Copy link
Contributor

JkSelf commented Mar 26, 2021

are you saying that the PlanAdaptiveDynamicPruningFilters should simply create BroadcastExchange and let the AQE framework create/reuse query stage?

@cloud-fan
Yes. I think this implementation is clearer.

@cloud-fan
Copy link
Contributor

cloud-fan commented Mar 26, 2021

@JkSelf your idea looks concise, but unfortunately PlanAdaptiveDynamicPruningFilters is a stage optimization rule and I'm not sure if AQE framework can still kick in at that point to reuse the stage. @JkSelf can you try this idea and see if we can make it work?

@JkSelf
Copy link
Contributor

JkSelf commented Mar 26, 2021

the 'case 2’ said by @cloud-fan would be ignored if following your opinion.

@weixiuli
case 2 will be done in createQueryStages method of AdaptiveSparkPlanExec after back to AQE framework. It will call newQueryStage method if there is no reused exchange.

@JkSelf
Copy link
Contributor

JkSelf commented Mar 26, 2021

@cloud-fan I will try this idea in PR#31756 later.

@SparkQA
Copy link

SparkQA commented Apr 1, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41383/

@SparkQA
Copy link

SparkQA commented Apr 1, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41383/

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jul 11, 2021
@github-actions github-actions bot closed this Jul 12, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants