Skip to content

Commit

Permalink
[SPARK-48307][SQL][FOLLOWUP] not-inlined CTE references sibling shoul…
Browse files Browse the repository at this point in the history
…d not fail

### What changes were proposed in this pull request?

This is a follow-up of apache#46617 to fix a bug. When we re-construct the `WithCTE` node, we should use the new CTE definitions that have been applied `inlineCTE`.

### Why are the changes needed?

bug fix, otherwise we may hit errors such as
```
java.util.NoSuchElementException: key not found: 0
	at scala.collection.MapOps.default(Map.scala:289)
	at scala.collection.MapOps.default$(Map.scala:288)
	at scala.collection.AbstractMap.default(Map.scala:420)
	at scala.collection.mutable.HashMap.apply(HashMap.scala:440)
	at org.apache.spark.sql.catalyst.optimizer.PushdownPredicatesAndPruneColumnsForCTEDef$.gatherPredicatesAndAttributes(PushdownPredicatesAndPruneColumnsForCTEDef.scala:74)
	at org.apache.spark.sql.catalyst.optimizer.PushdownPredicatesAndPruneColumnsForCTEDef$.$anonfun$gatherPredicatesAndAttributes$1(PushdownPredicatesAndPruneColumnsForCTEDef.scala:68)
	at scala.collection.immutable.Vector.foreach(Vector.scala:2124)
	at org.apache.spark.sql.catalyst.optimizer.PushdownPredicatesAndPruneColumnsForCTEDef$.gatherPredicatesAndAttributes(PushdownPredicatesAndPruneColumnsForCTEDef.scala:67)
```

### Does this PR introduce _any_ user-facing change?

no, the bug is not released yet

### How was this patch tested?

new test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#47141 from cloud-fan/fix.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
2 people authored and asl3 committed Jul 1, 2024
1 parent e064584 commit 60fcf65
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,25 +140,26 @@ case class InlineCTE(
cteMap: mutable.Map[Long, CTEReferenceInfo]): LogicalPlan = {
plan match {
case WithCTE(child, cteDefs) =>
val remainingDefs = cteDefs.filter { cteDef =>
val notInlined = mutable.ArrayBuffer.empty[CTERelationDef]
cteDefs.foreach { cteDef =>
val refInfo = cteMap(cteDef.id)
if (refInfo.refCount > 0) {
val newDef = refInfo.cteDef.copy(child = inlineCTE(refInfo.cteDef.child, cteMap))
val inlineDecision = shouldInline(newDef, refInfo.refCount)
cteMap(cteDef.id) = cteMap(cteDef.id).copy(
cteDef = newDef, shouldInline = inlineDecision
)
// Retain the not-inlined CTE relations in place.
!inlineDecision
} else {
keepDanglingRelations
if (!inlineDecision) notInlined += newDef
} else if (keepDanglingRelations) {
notInlined += refInfo.cteDef
}
}
val inlined = inlineCTE(child, cteMap)
if (remainingDefs.isEmpty) {
if (notInlined.isEmpty) {
inlined
} else {
WithCTE(inlined, remainingDefs)
// Retain the not-inlined CTE relations in place.
WithCTE(inlined, notInlined.toSeq)
}

case ref: CTERelationRef =>
Expand Down
11 changes: 11 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,17 @@ abstract class CTEInlineSuiteBase
checkErrorTableNotFound(e, "`tab_non_exists`", ExpectedContext("tab_non_exists", 83, 96))
}
}

test("SPARK-48307: not-inlined CTE references sibling") {
val df = sql(
"""
|WITH
|v1 AS (SELECT 1 col),
|v2 AS (SELECT col, rand() FROM v1)
|SELECT l.col FROM v2 l JOIN v2 r ON l.col = r.col
|""".stripMargin)
checkAnswer(df, Row(1))
}
}

class CTEInlineSuiteAEOff extends CTEInlineSuiteBase with DisableAdaptiveExecutionSuite
Expand Down

0 comments on commit 60fcf65

Please sign in to comment.