diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 09bf49d393602..6da3e1bb88deb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -20,11 +20,12 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.expressions.SubqueryExpression -import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, CTERelationRef, LogicalPlan, SubqueryAlias, UnresolvedWith, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{Command, CTEInChildren, CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias, UnresolvedWith, WithCTE} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.catalyst.util.TypeUtils._ import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.{LEGACY_CTE_PRECEDENCE_POLICY, LegacyBehaviorPolicy} /** @@ -54,16 +55,40 @@ object CTESubstitution extends Rule[LogicalPlan] { if (!plan.containsPattern(UNRESOLVED_WITH)) { return plan } + + val forceInline = if (conf.getConf(SQLConf.LEGACY_INLINE_CTE_IN_COMMANDS)) { + // The legacy behavior always inlines the CTE relations for queries in commands. + plan.exists { + case _: Command | _: ParsedStatement | _: InsertIntoDir => true + case _ => false + } + } else { + val commands = plan.collect { + case c @ (_: Command | _: ParsedStatement | _: InsertIntoDir) => c + } + if (commands.length == 1) { + // If there is only one command and it's `CTEInChildren`, we can resolve + // CTE normally and don't need to force inline. + !commands.head.isInstanceOf[CTEInChildren] + } else if (commands.length > 1) { + // This can happen with the multi-insert statement. We should fall back to + // the legacy behavior. + true + } else { + false + } + } + val cteDefs = ArrayBuffer.empty[CTERelationDef] val (substituted, firstSubstituted) = LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match { case LegacyBehaviorPolicy.EXCEPTION => assertNoNameConflictsInCTE(plan) - traverseAndSubstituteCTE(plan, Seq.empty, cteDefs) + traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs) case LegacyBehaviorPolicy.LEGACY => (legacyTraverseAndSubstituteCTE(plan, cteDefs), None) case LegacyBehaviorPolicy.CORRECTED => - traverseAndSubstituteCTE(plan, Seq.empty, cteDefs) + traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs) } if (cteDefs.isEmpty) { substituted @@ -129,7 +154,7 @@ object CTESubstitution extends Rule[LogicalPlan] { plan.resolveOperatorsUp { case UnresolvedWith(child, relations) => val resolvedCTERelations = - resolveCTERelations(relations, isLegacy = true, Seq.empty, cteDefs) + resolveCTERelations(relations, isLegacy = true, forceInline = false, Seq.empty, cteDefs) substituteCTE(child, alwaysInline = true, resolvedCTERelations) } } @@ -166,6 +191,7 @@ object CTESubstitution extends Rule[LogicalPlan] { * SELECT * FROM t * ) * @param plan the plan to be traversed + * @param forceInline always inline the CTE relations if this is true * @param outerCTEDefs already resolved outer CTE definitions with names * @param cteDefs all accumulated CTE definitions * @return the plan where CTE substitution is applied and optionally the last substituted `With` @@ -173,6 +199,7 @@ object CTESubstitution extends Rule[LogicalPlan] { */ private def traverseAndSubstituteCTE( plan: LogicalPlan, + forceInline: Boolean, outerCTEDefs: Seq[(String, CTERelationDef)], cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = { var firstSubstituted: Option[LogicalPlan] = None @@ -180,11 +207,11 @@ object CTESubstitution extends Rule[LogicalPlan] { _.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) { case UnresolvedWith(child: LogicalPlan, relations) => val resolvedCTERelations = - resolveCTERelations(relations, isLegacy = false, outerCTEDefs, cteDefs) ++ + resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs) ++ outerCTEDefs val substituted = substituteCTE( - traverseAndSubstituteCTE(child, resolvedCTERelations, cteDefs)._1, - false, + traverseAndSubstituteCTE(child, forceInline, resolvedCTERelations, cteDefs)._1, + forceInline, resolvedCTERelations) if (firstSubstituted.isEmpty) { firstSubstituted = Some(substituted) @@ -202,9 +229,11 @@ object CTESubstitution extends Rule[LogicalPlan] { private def resolveCTERelations( relations: Seq[(String, SubqueryAlias)], isLegacy: Boolean, + forceInline: Boolean, outerCTEDefs: Seq[(String, CTERelationDef)], cteDefs: ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = { - var resolvedCTERelations = if (isLegacy) { + val alwaysInline = isLegacy || forceInline + var resolvedCTERelations = if (alwaysInline) { Seq.empty } else { outerCTEDefs @@ -227,12 +256,12 @@ object CTESubstitution extends Rule[LogicalPlan] { // WITH t3 AS (SELECT * FROM t1) // ) // t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`. - traverseAndSubstituteCTE(relation, resolvedCTERelations, cteDefs)._1 + traverseAndSubstituteCTE(relation, forceInline, resolvedCTERelations, cteDefs)._1 } // CTE definition can reference a previous one - val substituted = substituteCTE(innerCTEResolved, isLegacy, resolvedCTERelations) + val substituted = substituteCTE(innerCTEResolved, alwaysInline, resolvedCTERelations) val cteRelation = CTERelationDef(substituted) - if (!(isLegacy)) { + if (!alwaysInline) { cteDefs += cteRelation } // Prepending new CTEs makes sure that those have higher priority over outer ones. @@ -271,18 +300,18 @@ object CTESubstitution extends Rule[LogicalPlan] { } /** - * Finds all logical nodes that should have `WithCTE` in their children like - * `InsertIntoStatement`, put `WithCTE` on top of the children and don't place `WithCTE` - * on top of the plan. If there are no such nodes, put `WithCTE` on the top. + * For commands which extend `CTEInChildren`, we should place the `WithCTE` node on its + * children. There are two reasons: + * 1. Some rules will pattern match the root command nodes, and we should keep command + * as the root node to not break them. + * 2. `Dataset` eagerly executes the commands inside a query plan. However, the CTE + * references inside commands will be invalid if we execute the command alone, as + * the CTE definitions are outside of the command. */ private def withCTEDefs(p: LogicalPlan, cteDefs: Seq[CTERelationDef]): LogicalPlan = { - val withCTE = WithCTE(p, cteDefs) - var onTop = true - val newPlan = p.resolveOperatorsDown { - case cteInChildren: CTEInChildren => - onTop = false - cteInChildren.withCTE(withCTE) + p match { + case c: CTEInChildren => c.withCTEDefs(cteDefs) + case _ => WithCTE(p, cteDefs) } - if (onTop) withCTE else WithCTE(newPlan, cteDefs) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 4cf09a9a734aa..7258d985db843 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -887,6 +887,16 @@ case class WithCTE(plan: LogicalPlan, cteDefs: Seq[CTERelationDef]) extends Logi } } +/** + * The logical node which is able to place the `WithCTE` node on its children. + */ +trait CTEInChildren extends LogicalPlan { + def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + withNewChildren(children.map(WithCTE(_, cteDefs))) + } +} + + case class WithWindowDefinition( windowDefinitions: Map[String, WindowSpecDefinition], child: LogicalPlan) extends UnaryNode { @@ -896,15 +906,6 @@ case class WithWindowDefinition( copy(child = newChild) } -/** - * The logical node is able to insert the given `WithCTE` into its children. - */ -trait CTEInChildren extends LogicalPlan { - def withCTE(withCTE: WithCTE): LogicalPlan = { - withNewChildren(children.map(withCTE.withNewPlan)) - } -} - /** * @param order The ordering expressions * @param global True means global sorting apply for entire data set, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 0f31f00681959..5c83da1a96aae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -393,15 +393,13 @@ case class WriteDelta( } trait V2CreateTableAsSelectPlan - extends V2CreateTablePlan + extends V2CreateTablePlan with AnalysisOnlyCommand with CTEInChildren { def query: LogicalPlan - override def withCTE(withCTE: WithCTE): LogicalPlan = { - withNameAndQuery( - newName = this.name, - newQuery = withCTE.copy(plan = this.query)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + withNameAndQuery(newName = name, newQuery = WithCTE(query, cteDefs)) } override lazy val resolved: Boolean = childrenResolved && { @@ -1250,10 +1248,8 @@ case class AlterViewAs( newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan = copy(child = newLeft, query = newRight) - override def withCTE(withCTE: WithCTE): LogicalPlan = { - withNewChildrenInternal( - newLeft = this.left, - newRight = withCTE.copy(plan = this.right)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + withNewChildren(Seq(child, WithCTE(query, cteDefs))) } } @@ -1275,10 +1271,8 @@ case class CreateView( newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan = copy(child = newLeft, query = newRight) - override def withCTE(withCTE: WithCTE): LogicalPlan = { - withNewChildrenInternal( - newLeft = this.left, - newRight = withCTE.copy(plan = this.right)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + withNewChildren(Seq(child, WithCTE(query, cteDefs))) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9093b0745a6c5..e231884033b09 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3759,6 +3759,14 @@ object SQLConf { .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + val LEGACY_INLINE_CTE_IN_COMMANDS = buildConf("spark.sql.legacy.inlineCTEInCommands") + .internal() + .doc("If true, always inline the CTE relations for the queries in commands. This is the " + + "legacy behavior which may produce incorrect results because Spark may evaluate a CTE " + + "relation more than once, even if it's nondeterministic.") + .booleanConf + .createWithDefault(false) + val LEGACY_TIME_PARSER_POLICY = buildConf("spark.sql.legacy.timeParserPolicy") .internal() .doc("When LEGACY, java.text.SimpleDateFormat is used for formatting and parsing " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala index 0a9064261c7ad..67d38b28c83ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources._ @@ -77,7 +77,7 @@ case class InsertIntoDataSourceDirCommand( Seq.empty[Row] } - override def withCTE(withCTE: WithCTE): LogicalPlan = { - copy(query = withCTE.copy(plan = this.query)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(query = WithCTE(query, cteDefs)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index b1b2fd53c74a2..54e8181c56cb4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -21,7 +21,7 @@ import java.net.URI import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE} import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.CommandExecutionMode @@ -234,7 +234,7 @@ case class CreateDataSourceTableAsSelectCommand( } } - override def withCTE(withCTE: WithCTE): LogicalPlan = { - copy(query = withCTE.copy(plan = this.query)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(query = WithCTE(query, cteDefs)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 30fcf6ccdaf5a..88e940ffdc78d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -748,8 +748,8 @@ case class DescribeQueryCommand(queryText: String, plan: LogicalPlan) result.toSeq } - override def withCTE(withCTE: WithCTE): LogicalPlan = { - copy(plan = withCTE.copy(plan = this.plan)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(plan = WithCTE(plan, cteDefs)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 8a12b162f9949..8ac982b9bdd91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, GlobalTempView, LocalTempView, ViewType} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, TemporaryViewRelation} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression} -import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, CTEInChildren, LogicalPlan, Project, View, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, CTEInChildren, CTERelationDef, LogicalPlan, Project, View, WithCTE} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import org.apache.spark.sql.errors.QueryCompilationErrors @@ -216,8 +216,8 @@ case class CreateViewCommand( ) } - override def withCTE(withCTE: WithCTE): LogicalPlan = { - copy(plan = withCTE.copy(plan = this.plan)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(plan = WithCTE(plan, cteDefs)) } } @@ -312,8 +312,8 @@ case class AlterViewAsCommand( session.sessionState.catalog.alterTable(updatedViewMeta) } - override def withCTE(withCTE: WithCTE): LogicalPlan = { - copy(query = withCTE.copy(plan = this.query)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(query = WithCTE(query, cteDefs)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala index 7cffd6efdb703..128f6acdeaa69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE} import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.sources.InsertableRelation @@ -48,7 +48,7 @@ case class InsertIntoDataSourceCommand( Seq.empty[Row] } - override def withCTE(withCTE: WithCTE): LogicalPlan = { - copy(query = withCTE.copy(plan = this.query)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(query = WithCTE(query, cteDefs)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 1c98854b81cbd..fe6ec094812e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogT import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} -import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.SparkPlan @@ -57,7 +57,7 @@ case class InsertIntoHadoopFsRelationCommand( catalogTable: Option[CatalogTable], fileIndex: Option[FileIndex], outputColumnNames: Seq[String]) - extends V1WriteCommand with CTEInChildren { + extends V1WriteCommand { private lazy val parameters = CaseInsensitiveMap(options) @@ -277,8 +277,4 @@ case class InsertIntoHadoopFsRelationCommand( override protected def withNewChildInternal( newChild: LogicalPlan): InsertIntoHadoopFsRelationCommand = copy(query = newChild) - - override def withCTE(withCTE: WithCTE): LogicalPlan = { - withNewChildInternal(withCTE.copy(plan = this.query)) - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala index 2d76e7c3afa47..5423232db4293 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala @@ -21,7 +21,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE} import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.sources.CreatableRelationProvider @@ -69,7 +69,7 @@ case class SaveIntoDataSourceCommand( SaveIntoDataSourceCommand(query.clone(), dataSource, options, mode) } - override def withCTE(withCTE: WithCTE): LogicalPlan = { - copy(query = withCTE.copy(plan = this.query)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(query = WithCTE(query, cteDefs)) } } diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/ansi/double-quoted-identifiers-enabled.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/ansi/double-quoted-identifiers-enabled.sql.out index 2a6bcce99d195..a155bccd0919c 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/ansi/double-quoted-identifiers-enabled.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/ansi/double-quoted-identifiers-enabled.sql.out @@ -410,11 +410,15 @@ CREATE TEMPORARY VIEW "myview"("c1") AS WITH "v"("a") AS (SELECT 1) SELECT "a" FROM "v" -- !query analysis CreateViewCommand `myview`, [(c1,None)], WITH "v"("a") AS (SELECT 1) SELECT "a" FROM "v", false, false, LocalTempView, true - +- Project [a#x] - +- SubqueryAlias v - +- Project [1#x AS a#x] - +- Project [1 AS 1#x] - +- OneRowRelation + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias v + : +- Project [1#x AS a#x] + : +- Project [1 AS 1#x] + : +- OneRowRelation + +- Project [a#x] + +- SubqueryAlias v + +- CTERelationRef xxxx, true, [a#x] -- !query diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/cte-command.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/cte-command.sql.out new file mode 100644 index 0000000000000..93dab5aaa6dfc --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/cte-command.sql.out @@ -0,0 +1,152 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE TABLE cte_tbl USING csv AS WITH s AS (SELECT 42 AS col) SELECT * FROM s +-- !query analysis +CreateDataSourceTableAsSelectCommand `spark_catalog`.`default`.`cte_tbl`, ErrorIfExists, [col] + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias s + : +- Project [42 AS col#x] + : +- OneRowRelation + +- Project [col#x] + +- SubqueryAlias s + +- CTERelationRef xxxx, true, [col#x] + + +-- !query +SELECT * FROM cte_tbl +-- !query analysis +Project [col#x] ++- SubqueryAlias spark_catalog.default.cte_tbl + +- Relation spark_catalog.default.cte_tbl[col#x] csv + + +-- !query +CREATE TEMPORARY VIEW cte_view AS WITH s AS (SELECT 42 AS col) SELECT * FROM s +-- !query analysis +CreateViewCommand `cte_view`, WITH s AS (SELECT 42 AS col) SELECT * FROM s, false, false, LocalTempView, true + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias s + : +- Project [42 AS col#x] + : +- OneRowRelation + +- Project [col#x] + +- SubqueryAlias s + +- CTERelationRef xxxx, true, [col#x] + + +-- !query +SELECT * FROM cte_view +-- !query analysis +Project [col#x] ++- SubqueryAlias cte_view + +- View (`cte_view`, [col#x]) + +- Project [cast(col#x as int) AS col#x] + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias s + : +- Project [42 AS col#x] + : +- OneRowRelation + +- Project [col#x] + +- SubqueryAlias s + +- CTERelationRef xxxx, true, [col#x] + + +-- !query +WITH s AS (SELECT 43 AS col) +INSERT INTO cte_tbl SELECT * FROM S +-- !query analysis +InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col] ++- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias s + : +- Project [43 AS col#x] + : +- OneRowRelation + +- Project [col#x] + +- SubqueryAlias S + +- CTERelationRef xxxx, true, [col#x] + + +-- !query +SELECT * FROM cte_tbl +-- !query analysis +Project [col#x] ++- SubqueryAlias spark_catalog.default.cte_tbl + +- Relation spark_catalog.default.cte_tbl[col#x] csv + + +-- !query +INSERT INTO cte_tbl WITH s AS (SELECT 44 AS col) SELECT * FROM s +-- !query analysis +InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col] ++- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias s + : +- Project [44 AS col#x] + : +- OneRowRelation + +- Project [col#x] + +- SubqueryAlias s + +- CTERelationRef xxxx, true, [col#x] + + +-- !query +SELECT * FROM cte_tbl +-- !query analysis +Project [col#x] ++- SubqueryAlias spark_catalog.default.cte_tbl + +- Relation spark_catalog.default.cte_tbl[col#x] csv + + +-- !query +CREATE TABLE cte_tbl2 (col INT) USING csv +-- !query analysis +CreateDataSourceTableCommand `spark_catalog`.`default`.`cte_tbl2`, false + + +-- !query +WITH s AS (SELECT 45 AS col) +FROM s +INSERT INTO cte_tbl SELECT col +INSERT INTO cte_tbl2 SELECT col +-- !query analysis +Union false, false +:- InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col] +: +- Project [col#x] +: +- SubqueryAlias s +: +- Project [45 AS col#x] +: +- OneRowRelation ++- InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl2, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl2], Append, `spark_catalog`.`default`.`cte_tbl2`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl2), [col] + +- Project [col#x] + +- SubqueryAlias s + +- Project [45 AS col#x] + +- OneRowRelation + + +-- !query +SELECT * FROM cte_tbl +-- !query analysis +Project [col#x] ++- SubqueryAlias spark_catalog.default.cte_tbl + +- Relation spark_catalog.default.cte_tbl[col#x] csv + + +-- !query +SELECT * FROM cte_tbl2 +-- !query analysis +Project [col#x] ++- SubqueryAlias spark_catalog.default.cte_tbl2 + +- Relation spark_catalog.default.cte_tbl2[col#x] csv + + +-- !query +DROP TABLE cte_tbl +-- !query analysis +DropTable false, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.cte_tbl + + +-- !query +DROP TABLE cte_tbl2 +-- !query analysis +DropTable false, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.cte_tbl2 diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out index e53480e96bed8..f58f8faa0be3f 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out @@ -452,10 +452,14 @@ with test as (select 42) insert into test select * from test -- !query analysis InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/test, false, Parquet, [path=file:[not included in comparison]/{warehouse_dir}/test], Append, `spark_catalog`.`default`.`test`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/test), [i] +- Project [cast(42#x as int) AS i#x] - +- Project [42#x] - +- SubqueryAlias test - +- Project [42 AS 42#x] - +- OneRowRelation + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias test + : +- Project [42 AS 42#x] + : +- OneRowRelation + +- Project [42#x] + +- SubqueryAlias test + +- CTERelationRef xxxx, true, [42#x] -- !query diff --git a/sql/core/src/test/resources/sql-tests/inputs/cte-command.sql b/sql/core/src/test/resources/sql-tests/inputs/cte-command.sql new file mode 100644 index 0000000000000..ee90c2de49ebf --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/cte-command.sql @@ -0,0 +1,33 @@ +-- WITH inside CTE +CREATE TABLE cte_tbl USING csv AS WITH s AS (SELECT 42 AS col) SELECT * FROM s; + +SELECT * FROM cte_tbl; + +-- WITH inside CREATE VIEW +CREATE TEMPORARY VIEW cte_view AS WITH s AS (SELECT 42 AS col) SELECT * FROM s; + +SELECT * FROM cte_view; + +-- INSERT inside WITH +WITH s AS (SELECT 43 AS col) +INSERT INTO cte_tbl SELECT * FROM S; + +SELECT * FROM cte_tbl; + +-- WITH inside INSERT +INSERT INTO cte_tbl WITH s AS (SELECT 44 AS col) SELECT * FROM s; + +SELECT * FROM cte_tbl; + +CREATE TABLE cte_tbl2 (col INT) USING csv; +-- Multi-INSERT +WITH s AS (SELECT 45 AS col) +FROM s +INSERT INTO cte_tbl SELECT col +INSERT INTO cte_tbl2 SELECT col; + +SELECT * FROM cte_tbl; +SELECT * FROM cte_tbl2; + +DROP TABLE cte_tbl; +DROP TABLE cte_tbl2; diff --git a/sql/core/src/test/resources/sql-tests/results/cte-command.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-command.sql.out new file mode 100644 index 0000000000000..67ac321a1954d --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/cte-command.sql.out @@ -0,0 +1,121 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE TABLE cte_tbl USING csv AS WITH s AS (SELECT 42 AS col) SELECT * FROM s +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM cte_tbl +-- !query schema +struct +-- !query output +42 + + +-- !query +CREATE TEMPORARY VIEW cte_view AS WITH s AS (SELECT 42 AS col) SELECT * FROM s +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM cte_view +-- !query schema +struct +-- !query output +42 + + +-- !query +WITH s AS (SELECT 43 AS col) +INSERT INTO cte_tbl SELECT * FROM S +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM cte_tbl +-- !query schema +struct +-- !query output +42 +43 + + +-- !query +INSERT INTO cte_tbl WITH s AS (SELECT 44 AS col) SELECT * FROM s +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM cte_tbl +-- !query schema +struct +-- !query output +42 +43 +44 + + +-- !query +CREATE TABLE cte_tbl2 (col INT) USING csv +-- !query schema +struct<> +-- !query output + + + +-- !query +WITH s AS (SELECT 45 AS col) +FROM s +INSERT INTO cte_tbl SELECT col +INSERT INTO cte_tbl2 SELECT col +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM cte_tbl +-- !query schema +struct +-- !query output +42 +43 +44 +45 + + +-- !query +SELECT * FROM cte_tbl2 +-- !query schema +struct +-- !query output +45 + + +-- !query +DROP TABLE cte_tbl +-- !query schema +struct<> +-- !query output + + + +-- !query +DROP TABLE cte_tbl2 +-- !query schema +struct<> +-- !query output + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 5bf04460f522a..eef2ae1f737dc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -21,7 +21,7 @@ import scala.util.control.NonFatal import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.{DataWritingCommand, LeafRunnableCommand} @@ -112,7 +112,7 @@ case class CreateHiveTableAsSelectCommand( s"TableName: ${tableDesc.identifier.table}]" } - override def withCTE(withCTE: WithCTE): LogicalPlan = { - copy(query = withCTE.copy(plan = this.query)) + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(query = WithCTE(query, cteDefs)) } }