Skip to content

Commit

Permalink
Revert "[SPARK-49179][SQL] Fix v2 multi bucketed inner joins throw As…
Browse files Browse the repository at this point in the history
…sertionError"

This reverts commit 1dd558c.
  • Loading branch information
dongjoon-hyun committed Aug 13, 2024
1 parent 1dd558c commit 9852e33
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ case class EnsureRequirements(
private def createKeyGroupedShuffleSpec(
partitioning: Partitioning,
distribution: ClusteredDistribution): Option[KeyGroupedShuffleSpec] = {
def tryCreate(partitioning: KeyGroupedPartitioning): Option[KeyGroupedShuffleSpec] = {
def check(partitioning: KeyGroupedPartitioning): Option[KeyGroupedShuffleSpec] = {
val attributes = partitioning.expressions.flatMap(_.collectLeaves())
val clustering = distribution.clustering

Expand All @@ -567,10 +567,11 @@ case class EnsureRequirements(
}

partitioning match {
case p: KeyGroupedPartitioning => tryCreate(p)
case p: KeyGroupedPartitioning => check(p)
case PartitioningCollection(partitionings) =>
val specs = partitionings.map(p => createKeyGroupedShuffleSpec(p, distribution))
specs.filter(_.isDefined).map(_.get).headOption
assert(specs.forall(_.isEmpty) || specs.forall(_.isDefined))
specs.head
case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,28 +328,6 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
.add("price", FloatType)
.add("time", TimestampType)

test("SPARK-49179: Fix v2 multi bucketed inner joins throw AssertionError") {
val cols = Array(
Column.create("id", LongType),
Column.create("name", StringType))
val buckets = Array(bucket(8, "id"))

withTable("t1", "t2", "t3") {
Seq("t1", "t2", "t3").foreach { t =>
createTable(t, cols, buckets)
sql(s"INSERT INTO testcat.ns.$t VALUES (1, 'aa'), (2, 'bb'), (3, 'cc')")
}
val df = sql(
"""
|SELECT t1.id, t2.id, t3.name FROM testcat.ns.t1
|JOIN testcat.ns.t2 ON t1.id = t2.id
|JOIN testcat.ns.t3 ON t1.id = t3.id
|""".stripMargin)
checkAnswer(df, Seq(Row(1, 1, "aa"), Row(2, 2, "bb"), Row(3, 3, "cc")))
assert(collectShuffles(df.queryExecution.executedPlan).isEmpty)
}
}

test("partitioned join: join with two partition keys and matching & sorted partitions") {
val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
createTable(items, items_schema, items_partitions)
Expand Down

0 comments on commit 9852e33

Please sign in to comment.