From 60fcf6527aa993e51923684c59d645157729cbb7 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 28 Jun 2024 21:36:00 +0800 Subject: [PATCH] [SPARK-48307][SQL][FOLLOWUP] not-inlined CTE references sibling should not fail ### What changes were proposed in this pull request? This is a follow-up of https://github.com/apache/spark/pull/46617 to fix a bug. When we re-construct the `WithCTE` node, we should use the new CTE definitions that have been applied `inlineCTE`. ### Why are the changes needed? bug fix, otherwise we may hit errors such as ``` java.util.NoSuchElementException: key not found: 0 at scala.collection.MapOps.default(Map.scala:289) at scala.collection.MapOps.default$(Map.scala:288) at scala.collection.AbstractMap.default(Map.scala:420) at scala.collection.mutable.HashMap.apply(HashMap.scala:440) at org.apache.spark.sql.catalyst.optimizer.PushdownPredicatesAndPruneColumnsForCTEDef$.gatherPredicatesAndAttributes(PushdownPredicatesAndPruneColumnsForCTEDef.scala:74) at org.apache.spark.sql.catalyst.optimizer.PushdownPredicatesAndPruneColumnsForCTEDef$.$anonfun$gatherPredicatesAndAttributes$1(PushdownPredicatesAndPruneColumnsForCTEDef.scala:68) at scala.collection.immutable.Vector.foreach(Vector.scala:2124) at org.apache.spark.sql.catalyst.optimizer.PushdownPredicatesAndPruneColumnsForCTEDef$.gatherPredicatesAndAttributes(PushdownPredicatesAndPruneColumnsForCTEDef.scala:67) ``` ### Does this PR introduce _any_ user-facing change? no, the bug is not released yet ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? no Closes #47141 from cloud-fan/fix. Lead-authored-by: Wenchen Fan Co-authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/optimizer/InlineCTE.scala | 15 ++++++++------- .../org/apache/spark/sql/CTEInlineSuite.scala | 11 +++++++++++ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala index 50828b945bb40..8cc25328ce70b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InlineCTE.scala @@ -140,7 +140,8 @@ case class InlineCTE( cteMap: mutable.Map[Long, CTEReferenceInfo]): LogicalPlan = { plan match { case WithCTE(child, cteDefs) => - val remainingDefs = cteDefs.filter { cteDef => + val notInlined = mutable.ArrayBuffer.empty[CTERelationDef] + cteDefs.foreach { cteDef => val refInfo = cteMap(cteDef.id) if (refInfo.refCount > 0) { val newDef = refInfo.cteDef.copy(child = inlineCTE(refInfo.cteDef.child, cteMap)) @@ -148,17 +149,17 @@ case class InlineCTE( cteMap(cteDef.id) = cteMap(cteDef.id).copy( cteDef = newDef, shouldInline = inlineDecision ) - // Retain the not-inlined CTE relations in place. - !inlineDecision - } else { - keepDanglingRelations + if (!inlineDecision) notInlined += newDef + } else if (keepDanglingRelations) { + notInlined += refInfo.cteDef } } val inlined = inlineCTE(child, cteMap) - if (remainingDefs.isEmpty) { + if (notInlined.isEmpty) { inlined } else { - WithCTE(inlined, remainingDefs) + // Retain the not-inlined CTE relations in place. + WithCTE(inlined, notInlined.toSeq) } case ref: CTERelationRef => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala index a06b50d175f90..7b608b7438c29 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala @@ -703,6 +703,17 @@ abstract class CTEInlineSuiteBase checkErrorTableNotFound(e, "`tab_non_exists`", ExpectedContext("tab_non_exists", 83, 96)) } } + + test("SPARK-48307: not-inlined CTE references sibling") { + val df = sql( + """ + |WITH + |v1 AS (SELECT 1 col), + |v2 AS (SELECT col, rand() FROM v1) + |SELECT l.col FROM v2 l JOIN v2 r ON l.col = r.col + |""".stripMargin) + checkAnswer(df, Row(1)) + } } class CTEInlineSuiteAEOff extends CTEInlineSuiteBase with DisableAdaptiveExecutionSuite