Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32012][SQL] Incrementally create and materialize query stage to avoid unnecessary local shuffle #28846

Closed
wants to merge 2 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Jun 17, 2020

What changes were proposed in this pull request?

This patch changes the current way of creating query stages in AQE. Instead of creating query stages in batch, incrementally creating query stage can bring the optimization in earlier. It could avoid unnecessary local shuffle.

Why are the changes needed?

The current way of creating query stage in AQE is in batch. For example, the children of a sort merge join will be materialized as query stages in a batch. Then AQE brings the optimization in and optimize sort merge join to broadcast join. Except for the broadcasted exchange, we don't need do any exchange on another side of join but we already materialized the exchange. Currently AQE wraps the materialized exchange with local reader, but it still brings unnecessary I/O. We can avoid unnecessary local shuffle by incrementally creating query stage.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit tests.

@SparkQA
Copy link

SparkQA commented Jun 17, 2020

Test build #124149 has finished for PR 28846 at commit e171a6c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 17, 2020

Test build #124152 has finished for PR 28846 at commit 523e1d5.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jun 17, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jun 17, 2020

Test build #124159 has finished for PR 28846 at commit 523e1d5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

cloud-fan commented Jun 17, 2020

Can you elaborate it more? How does this optimization help to plan broadcast join?

For example, the children of a sort merge join will be materialized as query stages in a batch. Then AQE brings the optimization in and optimize sort merge join to broadcast join.

The AQE needs to wait for the stage to finish, so that it knows the size and can change SMJ to BHJ. How can we avoid unnecessary I/O after the stage is finished?

@viirya
Copy link
Member Author

viirya commented Jun 17, 2020

Use an example to elaborate it. This query SELECT * FROM testData join testData2 ON key = a where value = '1' is one of test case in AdaptiveQueryExecSuite.

The adaptivePlan in current master:

 *(3) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft
:- BroadcastQueryStage 2
:  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#144]
:     +- CustomShuffleReader local
:        +- ShuffleQueryStage 0
:           +- Exchange hashpartitioning(key#13, 5), true, [id=#110]
:              +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1))
:                 +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false) AS value#14]                                                                                                  
:                    +- Scan[obj#12]
+- CustomShuffleReader local
   +- ShuffleQueryStage 1
      +- Exchange hashpartitioning(a#23, 5), true, [id=#121]
         +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
            +- Scan[obj#22]

In above adaptivePlan, AQE produces two ShuffleQueryStages because two exchanges were materialized in batch and then AQE re-optimizes the query. Although AQE can optimize SortMergeJoin as BroadcastHashJoin, the exchanges were already materialized and only thing AQE can do is reading it with local reader.

The adaptivePlan in this change:

*(2) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft                                  
:- BroadcastQueryStage 1
:  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#137]
:     +- CustomShuffleReader local
:        +- ShuffleQueryStage 0
:           +- Exchange hashpartitioning(key#13, 5), true, [id=#110]
:              +- *(1) Filter (isnotnull(value#14) AND (value#14 = 1))
:                 +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, from
String, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false) AS value#14]
:                    +- Scan[obj#12]
+- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS
b#24]
   +- Scan[obj#22]

In this change, AQE only materializes one exchange and then optimizes SortMergeJoin as BroadcastHashJoin. After that, we don't need to produce another exchange.

@cloud-fan
Copy link
Contributor

How do you achieve it? Do you hold off the execution of one query stage, and wait until another query stage completes?

@viirya
Copy link
Member Author

viirya commented Jun 18, 2020

This change does not create and materialize all query stages of the join in a batch. It creates and materializes one stage first and then re-optimize the join. So once it makes the join as broadcast join, it won't create the unnecessary exchange.

@cloud-fan
Copy link
Contributor

cloud-fan commented Jun 18, 2020

It creates one stage first and then re-optimize the join.

This is the confusing part. Creating a stage is not enough, we must wait for it to complete, then we can know the size and optimize the join to broadcast join.

@viirya
Copy link
Member Author

viirya commented Jun 18, 2020

I updated previous comment. "It creates and materializes one stage first..."

You can see the query plan in previous comment, it optimizes the join to broadcast join.

@cloud-fan
Copy link
Contributor

It creates and materializes

Do you mean to trigger the materialization or wait for it to complete?

@viirya
Copy link
Member Author

viirya commented Jun 18, 2020

It needs to wait for it to complete, this is how AQE does. As you said, we need to know the size of exchange.

@maryannxue
Copy link
Contributor

The question is: This means you hold off the other stage, right? Shouldn't it cause any regressions? If this is eventually a SMJ instead of a BHJ, one of the stages will be delayed.
And how do you know if you are starting the larger stage or the smaller one first??

@viirya
Copy link
Member Author

viirya commented Jun 18, 2020

Does triggering all stages of a join, mean they are running at the same time actually? I think it means they are put into scheduler. When the stages are put to running depends on resources provision. If first stage uses all resources, I think later stage still needs to held off?

It is also related to one question I have, the speed-up of AQE is gained by triggering all stages (not holding off other stage as you said) together, or optimizing join from SMJ to BHJ (if we only consider join case)? I may misunderstand, but before having AQE in SparkSQL, I think we don't trigger all stages like that too, right?

@cloud-fan
Copy link
Contributor

If first stage uses all resources, I think later stage still needs to held off?

That's true, but that's an assumption. It's also possible that these 2 jobs indeed run together.

the speed-up of AQE is gained by triggering all stages (not holding off other stage as you said) together, or optimizing join from SMJ to BHJ (if we only consider join case)

In the benchmark, the default parallelism takes all the CPU cores. I think the most perf gain should be from shuffle partition coalescing and SMJ -> BHJ. cc @JkSelf

That said, by design AQE triggers all independent stages at the same time, to maximize the parallelism. And it's helpful if the resource is sufficient (or auto-scaling). I don't think we should change this design.

@cloud-fan
Copy link
Contributor

And as @maryannxue said, you may trigger the large side first and it doesn't make sense to hold off. Ideally we should trigger both sides and cancel the large side if the small side completes very quickly. It will be great if you can explore the cancelation approach.

@viirya
Copy link
Member Author

viirya commented Jun 22, 2020

@cloud-fan Thanks for clarifying. The idea sounds worth exploring as we can avoid local shuffle under current parallelism design of independent stages in AQE. I will explore the possibility.

@JkSelf
Copy link
Contributor

JkSelf commented Jun 22, 2020

In our previous 3TB TPC-DS benchmark, the perf improvement is mainly benefit from the coalescing shuffle partitions and SMJ -> BHJ two features. The result is here for reference.

@dongjoon-hyun
Copy link
Member

Hi, @viirya . Could you rebase this PR to the master branch please?

@viirya viirya marked this pull request as draft August 7, 2020 15:31
@SparkQA
Copy link

SparkQA commented Aug 7, 2020

Test build #127207 has finished for PR 28846 at commit 6bb0b63.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Nov 16, 2020
@github-actions github-actions bot closed this Nov 17, 2020
@viirya viirya deleted the inc-aqe-3 branch December 27, 2023 18:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants