Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32201][SQL] More general skew join pattern matching #29021

Closed
wants to merge 15 commits into from

Conversation

LantaoJin
Copy link
Contributor

@LantaoJin LantaoJin commented Jul 7, 2020

What changes were proposed in this pull request?

Current the AQE skew join handling logic is very specified.
It only can handle the pattern like this (2 tables):

  SMJ
     Sort
       Shuffle
     Sort
       Shuffle

We propose a more general skew Join pattern matching patch with less code changes.
In this patch, we can handle N-table join, join with aggregation, and so on.
PS: Here, N tables SMJ will be optimized to N-1 BCJ + 1 SMJ after AE. This PR won't handle the case N SMJ after AE. I will handle it in another PR.

Why are the changes needed?

In our production user cases, we found lots of slow jobs due to data skewing even we have enabled AQE skewed join. After investigated their patterns, we found current skewed join handle logic is so specified which can satisfied less production queries. The production queries are much more complicated than this pattern.

  SMJ
     Sort
       Shuffle
     Sort
       Shuffle

A straightforward case I will introduce here:
Screen_Shot_2020-07-06_at_2_55_34_PM

In above plan, there are 5 tables join case. This is not a simple case could be matched by the above pattern. But we still could see it is very similar with the pattern if we removed all the red boxes.

From the stage graph, the plan is much more straightforward:
Screen_Shot_2020-07-06_at_2_54_56_PM

The green boxes pattern is what we want to handle whatever red boxes exist or not.

Does this PR introduce any user-facing change?

No

How was this patch tested?

We give two unit tests.

  1. 2 tables SMJ with aggregation, we can handle the side of skew join without agg.

Before:
Screenshot_2020-07-17 test-sql-context - Details for Query 26

After:
Screenshot_2020-07-17 test-sql-context - Details for Query 25

  1. N tables join with BCJ(AE changed SMJ to BCJ), we can handle both sides of skew join.

Before:
Screenshot_2020-07-17 test-sql-context - Details for Query 29(1)

After:
Screenshot_2020-07-17 test-sql-context - Details for Query 28(1)

@SparkQA
Copy link

SparkQA commented Jul 7, 2020

Test build #125187 has finished for PR 29021 at commit 479d56b.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LantaoJin
Copy link
Contributor Author

Also this PR can work with #28947 to match more pattern together.

@cloud-fan
Copy link
Contributor

cloud-fan commented Jul 7, 2020

SMJ
     Sort
       Shuffle
     Sort
       HashAggregate
         Shuffle

This is an interesting use case. We must be careful when dealing with it. The key of skew join handling is to split the skew partition into smaller parts. For HashAggregate, I'm not sure if this works, as now the values of the same key may exist in different after-split partitions. This makes HashAggregate incorrect, as it requires the values of the same key stay in one partition so that it can group by the key.

@LantaoJin
Copy link
Contributor Author

Yes. You are correct. I have recognized this case. I will should skip aggregation :(

@LantaoJin LantaoJin changed the title [SPARK-32201][SQL] More general skew join pattern matching [WIP][SPARK-32201][SQL] More general skew join pattern matching Jul 7, 2020
@SparkQA
Copy link

SparkQA commented Jul 7, 2020

Test build #125189 has finished for PR 29021 at commit 947927a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 7, 2020

Test build #125215 has finished for PR 29021 at commit 607eb08.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 8, 2020

Test build #125293 has finished for PR 29021 at commit 80bef0d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 10, 2020

Test build #125582 has finished for PR 29021 at commit dd09e70.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class CoalescedHashPartitioning(

@SparkQA
Copy link

SparkQA commented Jul 11, 2020

Test build #125667 has finished for PR 29021 at commit 76dc3ea.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 15, 2020

Test build #125893 has finished for PR 29021 at commit 7465bfa.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@LantaoJin LantaoJin changed the title [WIP][SPARK-32201][SQL] More general skew join pattern matching [SPARK-32201][SQL] More general skew join pattern matching Jul 16, 2020
@LantaoJin LantaoJin changed the title [SPARK-32201][SQL] More general skew join pattern matching [WIP][SPARK-32201][SQL] More general skew join pattern matching Jul 16, 2020
@SparkQA
Copy link

SparkQA commented Jul 16, 2020

Test build #125950 has finished for PR 29021 at commit 42a52a1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 16, 2020

Test build #125961 has finished for PR 29021 at commit 973d87e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 16, 2020

Test build #125968 has finished for PR 29021 at commit 6f7dbc2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 16, 2020

Test build #125967 has finished for PR 29021 at commit d65a210.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LantaoJin LantaoJin changed the title [WIP][SPARK-32201][SQL] More general skew join pattern matching [SPARK-32201][SQL] More general skew join pattern matching Jul 17, 2020
@LantaoJin
Copy link
Contributor Author

Gentle ping @cloud-fan

@@ -142,6 +142,14 @@ object OptimizeLocalShuffleReader {
def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match {
case s: ShuffleQueryStageExec =>
s.shuffle.canChangeNumPartitions
// This CustomShuffleReaderExec used in skew side, its numPartitions increased.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means the rule of OptimizeLocalShuffleReader is disabled when enable the rule of OptimizedSkwedJoin rule ?

Copy link
Contributor Author

@LantaoJin LantaoJin Jul 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly. In this more general skew join handling, we can match more patterns. For example, we can handle skew join like https://user-images.githubusercontent.com/1853780/87743215-01e9e780-c81b-11ea-97d9-f274b379912e.png. The number partitions of CustomShuffleReader in the the BCJ (changed from SMJ by AE) after OptimizeLocalShuffleReader is not equals to the anther side. So simply, I disable createLocalReader.

@@ -340,3 +340,28 @@ case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning {
case _ => false
}
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to add a new Partitioning ?

Copy link
Contributor Author

@LantaoJin LantaoJin Jul 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CoalescedHashPartitioning can satisfy the ClusteredDistribution because a skew join may match the case which contains Aggregation (non-skew side). UnknownPartitioning cannot satisfy ClusteredDistribution and add an additional shuffle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @JkSelf I will provide another approach that removes this CoalescedHashPartitioning and simplify the code. But current implementation with CoalescedHashPartitioning might be more general for more cases.

@SparkQA
Copy link

SparkQA commented Jul 20, 2020

Test build #126165 has finished for PR 29021 at commit 3cd411f.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 21, 2020

Test build #126238 has finished for PR 29021 at commit 5bed68c.

  • This patch fails PySpark pip packaging tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LantaoJin
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 22, 2020

Test build #126291 has finished for PR 29021 at commit 5bed68c.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LantaoJin
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 22, 2020

Test build #126315 has finished for PR 29021 at commit 5bed68c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LantaoJin
Copy link
Contributor Author

Hi @cloud-fan @JkSelf , please help to review this PR. I am going to file a new PR for handling three tables SMJ skew which scope is beyond this PR.

@@ -263,18 +299,31 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
val shuffleStages = collectShuffleStages(plan)

if (shuffleStages.length == 2) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not we break this limitation first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this PR is not to address the case which has multiple SMJ. We have another PR to change this limitation:

  1. optimizeSingleStageSkewJoin. This is the case one table is a bucket table and the SMJ is bucketing join with one side shuffle and skewing
  2. optimizeThreeShuffleStageSkewJoin. This is to address three tables SMJ (Two SMJs in one stage and no one can be changed to BCJ in AQE).

val right = rightOpt.get
assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
val numPartitions = left.partitionsWithSizes.length
// We use the median size of the original shuffle partitions to detect skewed partitions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is very hard to reason about. We need to clearly define:

  1. what nodes can appear between the shuffle stage and SMJ. As we discussed before, Agg can't appear at the skew side.
  2. how to estimate the size? Since there are nodes in the middle, the stats of the shuffle stage may not be accurate for the final join child. (e.g. Filter in the middle)

Copy link
Contributor Author

@LantaoJin LantaoJin Jul 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. what nodes can appear between the shuffle stage and SMJ. As we discussed before, Agg can't appear at the skew side.

In the canSplitLeftSide and canSplitRightSide, I added a allUnspecifiedDistribution(plan) check. Current we only support the nodes with UnspecifiedDistribution.

  1. how to estimate the size? Since there are nodes in the middle, the stats of the shuffle stage may not be accurate for the final join child. (e.g. Filter in the middle)

Filter should be pushdown to leaf, I didn't see this user case. Project may be a command case in the middle? Yes. the input size of shuffle stage may not be accurate. But the disadvantage is launching more tasks. I think the benefit from handling the skewing is more important than the disadvantage.

@github-actions
Copy link

github-actions bot commented Nov 2, 2020

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Nov 2, 2020
@github-actions github-actions bot closed this Nov 3, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants