Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48012][SQL] SPJ: Support Transfrom Expressions for One Side Shuffle #46255

Closed
wants to merge 5 commits into from

Conversation

szehon-ho
Copy link
Contributor

@szehon-ho szehon-ho commented Apr 27, 2024

Why are the changes needed?

Support SPJ one-side shuffle if other side has partition transform expression

How was this patch tested?

New unit test in KeyGroupedPartitioningSuite

Was this patch authored or co-authored using generative AI tooling?

No.

@szehon-ho
Copy link
Contributor Author

szehon-ho commented Apr 27, 2024

Some implementation notes. SPARK-41471 works by making the ShuffleExchangeExec side of the join have a KeyGroupedPartitioning, which is created by the other side's KeyGroupedShuffleSpec and is a clone of it (with that side's partition expression and values). That way both sides of the join have KeyGroupedPartioning and SPJ can work. But previously only AttributeExpressions were supported for the other side's partition expressions.

Code changes:

  • Remove check in KeyGroupedShuffleSpec::canCreatePartitioning that allows only AttributeReference, and add support for TransformExpression
  • Implement TransformExpression.eval(), by re-using the code from V2ExpressionUtils. This allows the ShuffleExchangeExec to evaluate the partition key with transform expressions from each row.

Some fixes:

  • normalize the valueMap key type in KeyGroupedPartitioner to use specific Seq implementation class. Previously the partitioner's map are initialized with keys as Vector , but then compared with keys as ArraySeq, and these seem to have different hashcodes, so will always create new entries with new partition ids.
  • Change the test YearsTransform to have the same logic as the InMemoryBaseTable. This was pointed out in SPARK-41471 pr.

Limitations:

  • This feature is disabled if partiallyClustered is enabled. Partiallly clustered implies the partitioned side of the join has multiple partitions with the same value, and does not group them. Not sure at the moment, how the KeyGroupedPartitioner on the shuffle side can handle that.
  • This feature is disabled if allowJoinKeysLessThanPartitionKeys is enabled and partitions are transform expressions. allowJoinKeysLessThanPartitionKeys feature works by 'grouping' the BatchScanExec's partitions again by join keys. If enabled along with this feature, there is a failure happens when checking that both sides of the join (ShuffleExchangeExec and the partitioned BatchScanExec side) have outputPartioning with same numPartitions. This actually works in the first optimizer pass, as ShuffleExchangeExec's KeyGroupedPartioning is created as a clone of the other side (including partition values). But after that there is a 'grouping' phase triggered here:
        // Now we need to push-down the common partition information to the scan in each child
        newLeft = populateCommonPartitionInfo(left, mergedPartValues, leftSpec.joinKeyPositions,
          leftReducers, applyPartialClustering, replicateLeftSide)
        newRight = populateCommonPartitionInfo(right, mergedPartValues, rightSpec.joinKeyPositions,
          rightReducers, applyPartialClustering, replicateRightSide)

This updates the BatchScanExec's outputPartitioning with new numPartitions. after the re-grouping by join key. But it does not update the ShuffleExchangeExec's outputPartitioning's numPartitons. Hence the error in subsequent optimizer pass:

requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions.
java.lang.IllegalArgumentException: requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions.
	at scala.Predef$.require(Predef.scala:337)
	at org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:550)
	at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning(ShuffledJoin.scala:49)
	at org.apache.spark.sql.execution.joins.ShuffledJoin.outputPartitioning$(ShuffledJoin.scala:47)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:39)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$ensureDistributionAndOrdering$1(EnsureRequirements.scala:66)
	at scala.collection.immutable.Vector1.map(Vector.scala:2140)
	at scala.collection.immutable.Vector1.map(Vector.scala:385)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:65)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:657)
	at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:632)

This can be reproduced by removing this check and running the relevant unit test added in this pr. It needs more investigation to be enabled in follow up pr.

@szehon-ho szehon-ho force-pushed the spj_auto_bucket branch 3 times, most recently from 2bb6375 to ca81d6f Compare April 29, 2024 06:55
@szehon-ho
Copy link
Contributor Author

szehon-ho commented Apr 29, 2024

@sunchao , @Hisoka-X can you guys take a look?

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late review. This looks pretty good to me. I'll take another pass soon.

@@ -149,7 +150,9 @@ private[spark] class KeyGroupedPartitioner(
override val numPartitions: Int) extends Partitioner {
override def getPartition(key: Any): Int = {
val keys = key.asInstanceOf[Seq[Any]]
valueMap.getOrElseUpdate(keys, Utils.nonNegativeMod(keys.hashCode, numPartitions))
val normalizedKeys = ArraySeq.from(keys)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious what does this do? why it is normalized?

Copy link
Contributor Author

@szehon-ho szehon-ho May 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iirc, I hit a bug due to trying to compare different Seq types (more info in the pr description)

normalize the valueMap key type in KeyGroupedPartitioner to use specific Seq implementation class. Previously the partitioner's map are initialized with keys as Vector , but then compared with keys as ArraySeq, and these seem to have different hashcodes, so will always create new entries with new partition ids.

override def canCreatePartitioning: Boolean = {
// Allow one side shuffle for SPJ for now only if partially-clustered is not enabled
// and for join keys less than partition keys only if transforms are not enabled.
val checkExprType = if (SQLConf.get.v2BucketingAllowJoinKeysSubsetOfPartitionKeys) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand the reason behind this. Also, it might be better to add some logging here if it is easy.

Copy link
Contributor Author

@szehon-ho szehon-ho May 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc, I hit a pretty hard bug when trying to enable the feature with v2BucketingAllowJoinKeysSubsetOfPartitionKeys (more in the pr description). As we may need to rethink the logic of v2BucketingAllowJoinKeysSubsetOfPartitionKeys to fix, I was going to disable for now, and try to fix in a subsequent PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at it, maybe i will do logging in another pr. There's no table name so not sure if its valuable to log the decision?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, sounds good.

szehon-ho added 5 commits June 7, 2024 18:51
…uffle

 ### Why are the changes needed?

Support SPJ one-side shuffle if other side has partition transform expression

    ### How was this patch tested?
New unit test in KeyGroupedPartitioningSuite

    ### Was this patch authored or co-authored using generative AI tooling?
 No.
Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sunchao sunchao closed this in 24bce72 Jun 9, 2024
@sunchao
Copy link
Member

sunchao commented Jun 9, 2024

Thanks! Merged to master/4.0

szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Aug 7, 2024
…uffle

Support SPJ one-side shuffle if other side has partition transform expression

  ### How was this patch tested?

New unit test in KeyGroupedPartitioningSuite

  ### Was this patch authored or co-authored using generative AI tooling?

 No.

Closes apache#46255 from szehon-ho/spj_auto_bucket.

Authored-by: Szehon Ho <szehon.apache@gmail.com>
Signed-off-by: Chao Sun <chao@openai.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants