Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys #42306

Closed
wants to merge 5 commits into from

Conversation

szehon-ho
Copy link
Contributor

@szehon-ho szehon-ho commented Aug 2, 2023

What changes were proposed in this pull request?

  • Add new conf spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled
  • Change key compatibility checks in EnsureRequirements. Remove checks where all partition keys must be in join keys to allow isKeyCompatible = true in this case (if this flag is enabled)
  • Change BatchScanExec/DataSourceV2Relation to group splits by join keys if they differ from partition keys (previously grouped only by partition values). Do same for all auxiliary data structure, like commonPartValues.
  • Implement partiallyClustered skew-handling.
    • Group only the replicate side (now by join key as well), replicate by the total size of other-side partitions that share the join key.
    • add an additional sort for partitions based on join key, as when we group the replicate side, partition ordering becomes out of order from the non-replicate side.

Why are the changes needed?

  • Support Storage Partition Join in cases where the join condition does not contain all the partition keys, but just some of them

Does this PR introduce any user-facing change?

No

How was this patch tested?

-Added tests in KeyGroupedPartitioningSuite
-Found two existing problems, will address in separate PR:

@github-actions github-actions bot added the SQL label Aug 2, 2023
@szehon-ho szehon-ho changed the title [SQL][SPARK-44647] Support SPJ where join keys are less than cluster keys [SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys Aug 2, 2023
// Support only when all cluster key have an associated partition expression key
requiredClustering.exists(x => attributes.exists(_.semanticEquals(x))) &&
// and if all partition expression contain only a single partition key.
expressions.forall(_.collectLeaves().size == 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm why this condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was to fix a test, I couldn't find it back to be honest. There was a test somewhere that was trying this case (which isnt actually supported in the code currently), and I think asserting the right exception, which I think would break if SPJ is activated. I could revert this and see again to find the test, if you want.

groupSplits = true).get
// In the case where we replicate partitions, we have grouped
// the partitions by the join key if they differ
val groupByExpressions =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we override KeyGroupedPartitioning method in this class, and wrap the logic of handling join keys in the method? We can return a new KeyGroupedPartitioning instance whose expressions, partitionValues are "projected" on the join keys.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, changed outputPartitioning to return KeyGroupedPartitoning to reflect that.

node.mapChildren(child => populatePartitionValues(
child, values, applyPartialClustering, replicatePartitions))
node.mapChildren(child => populateStoragePartitionJoinParams(
child, values, partitionGroupByPositions, applyPartialClustering, replicatePartitions))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of populating partitionGroupByPositions, can we populate StoragePartitionJoinParams.keyGroupedPartitioning instead? which can be the subset of expressions that participate in the join.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will need some more guidance on this one.

@szehon-ho szehon-ho force-pushed the spj_attempt_master branch 3 times, most recently from 059824b to 62fa5dd Compare August 24, 2023 22:34

object KeyGroupedShuffleSpec {

def isExpressionCompatible(left: Expression, right: Expression): Boolean =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just grouping the new static into companion object, so the diff looks a bit bigger, let me know if I should revert

@dongjoon-hyun
Copy link
Member

Could you re-trigger the failed pipeline or rebase this PR to the master branch once more, @szehon-ho ?

@szehon-ho
Copy link
Contributor Author

Hi @dongjoon-hyun , I think @sunchao had another idea he is thinking about, was going to wait a bit for that to update the pr

@dongjoon-hyun
Copy link
Member

Oh, got it!

@szehon-ho szehon-ho force-pushed the spj_attempt_master branch 2 times, most recently from 048701b to dac34ec Compare September 5, 2023 12:52
Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @szehon-ho ! Looks great with a few comments.

if (SQLConf.get.getConf(
SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS)) {
requiredClustering.exists(x => attributes.exists(_.semanticEquals(x))) &&
expressions.forall(_.collectLeaves().size == 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this deserves some comments since otherwise it's a bit confusing why we need it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comment, please check if it makes sense

if (SQLConf.get.getConf(
SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS)) {
requiredClustering.forall(x => attributes.exists(_.semanticEquals(x))) &&
expressions.forall(_.collectLeaves().size == 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be guaranteed currently - it might be better to have this invariant check somewhere else like when constructing a KeyGroupedPartitioning, but OK to leave it here for now

@@ -674,7 +711,8 @@ case class HashShuffleSpec(

case class KeyGroupedShuffleSpec(
partitioning: KeyGroupedPartitioning,
distribution: ClusteredDistribution) extends ShuffleSpec {
distribution: ClusteredDistribution,
joinKeyPositions: Option[Seq[Int]] = None) extends ShuffleSpec {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can add some comments for KeyGroupedShuffleSpec to explain what is this for, otherwise it's a bit hard to understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments, please check and suggest if it can be improved.

@szehon-ho szehon-ho force-pushed the spj_attempt_master branch 2 times, most recently from fe4920d to 3fd3a7b Compare September 8, 2023 06:18
@szehon-ho
Copy link
Contributor Author

@sunchao thanks! addressed review comments

…keys

 ### What changes were proposed in this pull request?
- Add new conf spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled
- Change key compatibility checks in EnsureRequirements.  Remove checks where all partition keys must be in join keys to allow isKeyCompatible = true in this case (if this flag is enabled)
- "Project" partitions by join keys in KeyGroupedPartitioning/KeyGroupedShuffleSpec
- Add join key grouping to the partition grouping in BatchScanExec

   ### Why are the changes needed?
- Support Storage Partition Join in cases where the join condition does not contain all the partition keys, but just some of them

    ### Does this PR introduce _any_ user-facing change?
No

    ### How was this patch tested?
-Added tests in KeyGroupedPartitioningSuite
-Because of apache#37886   we have to select all join keys to trigger SPJ in this case, otherwise DSV2 scan does not report KeyGroupedPartitioning and SPJ does not get triggered.  Need to see how to relax this in separate PR.
Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

@dongjoon-hyun
Copy link
Member

Merged to mater for Apache Spark 4.0.0. Thank you so much, @szehon-ho and @sunchao !

@sunchao
Copy link
Member

sunchao commented Sep 11, 2023

Thanks @szehon-ho @dongjoon-hyun !

@irsath
Copy link

irsath commented Nov 19, 2023

Hi @dongjoon-hyun @sunchao,
Do you see any blocker to backport this to spark 3.5 ?
I think it would be useful for many use case (including mine) that partition by date for GDPR purpose but still need SPJ on the other partitioning column.

@dongjoon-hyun
Copy link
Member

Apache Spark has a back-porting policy which allows only bug fixes, @irsath . Given that this PR is an improvement, we are unable to touch the release branches like branch-3.5 for this improvement.

@irsath
Copy link

irsath commented Nov 20, 2023

Right, sorry for the typo but I meant: what if we make a 3.6 with this PR ?

I never contributed to OSS spark but if your ok with the idea I can try to do a PR in that regard.

szehon-ho added a commit to szehon-ho/spark that referenced this pull request Feb 7, 2024
…keys

- Add new conf spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled
- Change key compatibility checks in EnsureRequirements.  Remove checks where all partition keys must be in join keys to allow isKeyCompatible = true in this case (if this flag is enabled)
- Change BatchScanExec/DataSourceV2Relation to group splits by join keys if they differ from partition keys (previously grouped only by partition values).  Do same for all auxiliary data structure, like commonPartValues.
- Implement partiallyClustered skew-handling.
  - Group only the replicate side (now by join key as well), replicate by the total size of other-side partitions that share the join key.
  - add an additional sort for partitions based on join key, as when we group the replicate side, partition ordering becomes out of order from the non-replicate side.

- Support Storage Partition Join in cases where the join condition does not contain all the partition keys, but just some of them

No

-Added tests in KeyGroupedPartitioningSuite
-Found two existing problems, will address in separate PR:
- Because of apache#37886   we have to select all join keys to trigger SPJ in this case, otherwise DSV2 scan does not report KeyGroupedPartitioning and SPJ does not get triggered.  Need to see how to relax this.
- https://issues.apache.org/jira/browse/SPARK-44641 was found when testing this change.  This pr refactors some of those code to add group-by-join-key, but doesnt change the underlying logic, so issue continues to exist.  Hopefully this will also get fixed in another way.

Closes apache#42306 from szehon-ho/spj_attempt_master.

Authored-by: Szehon Ho <szehon.apache@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Mar 26, 2024
…keys

- Add new conf spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled
- Change key compatibility checks in EnsureRequirements.  Remove checks where all partition keys must be in join keys to allow isKeyCompatible = true in this case (if this flag is enabled)
- Change BatchScanExec/DataSourceV2Relation to group splits by join keys if they differ from partition keys (previously grouped only by partition values).  Do same for all auxiliary data structure, like commonPartValues.
- Implement partiallyClustered skew-handling.
  - Group only the replicate side (now by join key as well), replicate by the total size of other-side partitions that share the join key.
  - add an additional sort for partitions based on join key, as when we group the replicate side, partition ordering becomes out of order from the non-replicate side.

- Support Storage Partition Join in cases where the join condition does not contain all the partition keys, but just some of them

No

-Added tests in KeyGroupedPartitioningSuite
-Found two existing problems, will address in separate PR:
- Because of apache#37886   we have to select all join keys to trigger SPJ in this case, otherwise DSV2 scan does not report KeyGroupedPartitioning and SPJ does not get triggered.  Need to see how to relax this.
- https://issues.apache.org/jira/browse/SPARK-44641 was found when testing this change.  This pr refactors some of those code to add group-by-join-key, but doesnt change the underlying logic, so issue continues to exist.  Hopefully this will also get fixed in another way.

Closes apache#42306 from szehon-ho/spj_attempt_master.

Authored-by: Szehon Ho <szehon.apache@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants