Skip to content

Commit

Permalink
[SPARK-49179][SQL][3.4] Fix v2 multi bucketed inner joins throw Asser…
Browse files Browse the repository at this point in the history
…tionError

backport apache#47683 to branch-3.4

### What changes were proposed in this pull request?

For SMJ with inner join, it just wraps left and right output partitioning to `PartitioningCollection` so it may not satisfy the target required clustering.

### Why are the changes needed?

Fix exception if the query contains multi bucketed inner joins

```sql
SELECT * FROM testcat.ns.t1
JOIN testcat.ns.t2 ON t1.id = t2.id
JOIN testcat.ns.t3 ON t1.id = t3.id
```

```
Cause: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:264)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.createKeyGroupedShuffleSpec(EnsureRequirements.scala:642)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$checkKeyGroupCompatible$1(EnsureRequirements.scala:385)
at scala.collection.immutable.List.map(List.scala:247)
at scala.collection.immutable.List.map(List.scala:79)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:382)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:364)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:166)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:714)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:689)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$4(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:497)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:689)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:51)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$2(AdaptiveSparkPlanExec.scala:882)
```

### Does this PR introduce _any_ user-facing change?

yes, it's a bug fix

### How was this patch tested?

add test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#47736 from ulysses-you/SPARK-49179-3.4.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: youxiduo <youxiduo@corp.netease.com>
  • Loading branch information
ulysses-you committed Aug 13, 2024
1 parent 9852e33 commit 758d18e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ case class EnsureRequirements(
private def createKeyGroupedShuffleSpec(
partitioning: Partitioning,
distribution: ClusteredDistribution): Option[KeyGroupedShuffleSpec] = {
def check(partitioning: KeyGroupedPartitioning): Option[KeyGroupedShuffleSpec] = {
def tryCreate(partitioning: KeyGroupedPartitioning): Option[KeyGroupedShuffleSpec] = {
val attributes = partitioning.expressions.flatMap(_.collectLeaves())
val clustering = distribution.clustering

Expand All @@ -567,11 +567,10 @@ case class EnsureRequirements(
}

partitioning match {
case p: KeyGroupedPartitioning => check(p)
case p: KeyGroupedPartitioning => tryCreate(p)
case PartitioningCollection(partitionings) =>
val specs = partitionings.map(p => createKeyGroupedShuffleSpec(p, distribution))
assert(specs.forall(_.isEmpty) || specs.forall(_.isDefined))
specs.head
specs.filter(_.isDefined).map(_.get).headOption
case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,28 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
.add("price", FloatType)
.add("time", TimestampType)

test("SPARK-49179: Fix v2 multi bucketed inner joins throw AssertionError") {
val cols = new StructType()
.add("id", LongType)
.add("name", StringType)
val buckets = Array(bucket(8, "id"))

withTable("t1", "t2", "t3") {
Seq("t1", "t2", "t3").foreach { t =>
createTable(t, cols, buckets)
sql(s"INSERT INTO testcat.ns.$t VALUES (1, 'aa'), (2, 'bb'), (3, 'cc')")
}
val df = sql(
"""
|SELECT t1.id, t2.id, t3.name FROM testcat.ns.t1
|JOIN testcat.ns.t2 ON t1.id = t2.id
|JOIN testcat.ns.t3 ON t1.id = t3.id
|""".stripMargin)
checkAnswer(df, Seq(Row(1, 1, "aa"), Row(2, 2, "bb"), Row(3, 3, "cc")))
assert(collectShuffles(df.queryExecution.executedPlan).isEmpty)
}
}

test("partitioned join: join with two partition keys and matching & sorted partitions") {
val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
createTable(items, items_schema, items_partitions)
Expand Down

0 comments on commit 758d18e

Please sign in to comment.