Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44355][SQL] Move WithCTE into command queries #42036

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{Command, CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias, UnresolvedWith, WithCTE}
import org.apache.spark.sql.catalyst.plans.logical.{Command, CTEInChildren, CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias, UnresolvedWith, WithCTE}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.util.TypeUtils._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.{LEGACY_CTE_PRECEDENCE_POLICY, LegacyBehaviorPolicy}

/**
* Analyze WITH nodes and substitute child plan with CTE references or CTE definitions depending
* on the conditions below:
* 1. If in legacy mode, or if the query is a SQL command or DML statement, replace with CTE
* definitions, i.e., inline CTEs.
* 1. If in legacy mode, replace with CTE definitions, i.e., inline CTEs.
* 2. Otherwise, replace with CTE references `CTERelationRef`s. The decision to inline or not
* inline will be made later by the rule `InlineCTE` after query analysis.
*
Expand All @@ -46,42 +46,62 @@ import org.apache.spark.sql.internal.SQLConf.{LEGACY_CTE_PRECEDENCE_POLICY, Lega
* dependency for any valid CTE query (i.e., given CTE definitions A and B with B referencing A,
* A is guaranteed to appear before B). Otherwise, it must be an invalid user query, and an
* analysis exception will be thrown later by relation resolving rules.
*
* If the query is a SQL command or DML statement (extends `CTEInChildren`),
* place `WithCTE` into their children.
*/
object CTESubstitution extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
if (!plan.containsPattern(UNRESOLVED_WITH)) {
return plan
}
val isCommand = plan.exists {
case _: Command | _: ParsedStatement | _: InsertIntoDir => true
case _ => false

val commands = plan.collect {
case c @ (_: Command | _: ParsedStatement | _: InsertIntoDir) => c
}
val forceInline = if (commands.length == 1) {
if (conf.getConf(SQLConf.LEGACY_INLINE_CTE_IN_COMMANDS)) {
// The legacy behavior always inlines the CTE relations for queries in commands.
true
} else {
// If there is only one command and it's `CTEInChildren`, we can resolve
// CTE normally and don't need to force inline.
!commands.head.isInstanceOf[CTEInChildren]
}
} else if (commands.length > 1) {
// This can happen with the multi-insert statement. We should fall back to
// the legacy behavior.
true
} else {
false
}

val cteDefs = ArrayBuffer.empty[CTERelationDef]
val (substituted, firstSubstituted) =
LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match {
case LegacyBehaviorPolicy.EXCEPTION =>
assertNoNameConflictsInCTE(plan)
traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs)
traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs)
case LegacyBehaviorPolicy.LEGACY =>
(legacyTraverseAndSubstituteCTE(plan, cteDefs), None)
case LegacyBehaviorPolicy.CORRECTED =>
traverseAndSubstituteCTE(plan, isCommand, Seq.empty, cteDefs)
traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs)
}
if (cteDefs.isEmpty) {
substituted
} else if (substituted eq firstSubstituted.get) {
WithCTE(substituted, cteDefs.toSeq)
withCTEDefs(substituted, cteDefs.toSeq)
} else {
var done = false
substituted.resolveOperatorsWithPruning(_ => !done) {
case p if p eq firstSubstituted.get =>
// `firstSubstituted` is the parent of all other CTEs (if any).
done = true
WithCTE(p, cteDefs.toSeq)
withCTEDefs(p, cteDefs.toSeq)
case p if p.children.count(_.containsPattern(CTE)) > 1 =>
// This is the first common parent of all CTEs.
done = true
WithCTE(p, cteDefs.toSeq)
withCTEDefs(p, cteDefs.toSeq)
}
}
}
Expand Down Expand Up @@ -131,7 +151,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
plan.resolveOperatorsUp {
case UnresolvedWith(child, relations) =>
val resolvedCTERelations =
resolveCTERelations(relations, isLegacy = true, isCommand = false, Seq.empty, cteDefs)
resolveCTERelations(relations, isLegacy = true, forceInline = false, Seq.empty, cteDefs)
substituteCTE(child, alwaysInline = true, resolvedCTERelations)
}
}
Expand Down Expand Up @@ -168,27 +188,27 @@ object CTESubstitution extends Rule[LogicalPlan] {
* SELECT * FROM t
* )
* @param plan the plan to be traversed
* @param isCommand if this is a command
* @param forceInline always inline the CTE relations if this is true
* @param outerCTEDefs already resolved outer CTE definitions with names
* @param cteDefs all accumulated CTE definitions
* @return the plan where CTE substitution is applied and optionally the last substituted `With`
* where CTE definitions will be gathered to
*/
private def traverseAndSubstituteCTE(
plan: LogicalPlan,
isCommand: Boolean,
forceInline: Boolean,
outerCTEDefs: Seq[(String, CTERelationDef)],
cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = {
var firstSubstituted: Option[LogicalPlan] = None
val newPlan = plan.resolveOperatorsDownWithPruning(
_.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) {
case UnresolvedWith(child: LogicalPlan, relations) =>
val resolvedCTERelations =
resolveCTERelations(relations, isLegacy = false, isCommand, outerCTEDefs, cteDefs) ++
resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs) ++
outerCTEDefs
val substituted = substituteCTE(
traverseAndSubstituteCTE(child, isCommand, resolvedCTERelations, cteDefs)._1,
isCommand,
traverseAndSubstituteCTE(child, forceInline, resolvedCTERelations, cteDefs)._1,
forceInline,
resolvedCTERelations)
if (firstSubstituted.isEmpty) {
firstSubstituted = Some(substituted)
Expand All @@ -206,10 +226,11 @@ object CTESubstitution extends Rule[LogicalPlan] {
private def resolveCTERelations(
relations: Seq[(String, SubqueryAlias)],
isLegacy: Boolean,
isCommand: Boolean,
forceInline: Boolean,
outerCTEDefs: Seq[(String, CTERelationDef)],
cteDefs: ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = {
var resolvedCTERelations = if (isLegacy || isCommand) {
val alwaysInline = isLegacy || forceInline
var resolvedCTERelations = if (alwaysInline) {
Seq.empty
} else {
outerCTEDefs
Expand All @@ -232,12 +253,12 @@ object CTESubstitution extends Rule[LogicalPlan] {
// WITH t3 AS (SELECT * FROM t1)
// )
// t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`.
traverseAndSubstituteCTE(relation, isCommand, resolvedCTERelations, cteDefs)._1
traverseAndSubstituteCTE(relation, forceInline, resolvedCTERelations, cteDefs)._1
}
// CTE definition can reference a previous one
val substituted = substituteCTE(innerCTEResolved, isLegacy || isCommand, resolvedCTERelations)
val substituted = substituteCTE(innerCTEResolved, alwaysInline, resolvedCTERelations)
val cteRelation = CTERelationDef(substituted)
if (!(isLegacy || isCommand)) {
if (!alwaysInline) {
cteDefs += cteRelation
}
// Prepending new CTEs makes sure that those have higher priority over outer ones.
Expand All @@ -249,7 +270,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
private def substituteCTE(
plan: LogicalPlan,
alwaysInline: Boolean,
cteRelations: Seq[(String, CTERelationDef)]): LogicalPlan =
cteRelations: Seq[(String, CTERelationDef)]): LogicalPlan = {
plan.resolveOperatorsUpWithPruning(
_.containsAnyPattern(RELATION_TIME_TRAVEL, UNRESOLVED_RELATION, PLAN_EXPRESSION)) {
case RelationTimeTravel(UnresolvedRelation(Seq(table), _, _), _, _)
Expand All @@ -273,4 +294,22 @@ object CTESubstitution extends Rule[LogicalPlan] {
e.withNewPlan(apply(substituteCTE(e.plan, alwaysInline, cteRelations)))
}
}
}

/**
* For commands which extend `CTEInChildren`, we should place the `WithCTE` node on its
* children. There are two reasons:
* 1. Some rules will pattern match the root command nodes, and we should keep command
* as the root node to not break them.
* 2. `Dataset` eagerly executes the commands inside a query plan. For example,
* sql("WITH v ... CREATE TABLE t AS SELECT * FROM v") will create the table instead of just
* analyzing the command. However, the CTE references inside commands will be invalid if we
* execute the command alone, as the CTE definitions are outside of the command.
*/
private def withCTEDefs(p: LogicalPlan, cteDefs: Seq[CTERelationDef]): LogicalPlan = {
p match {
case c: CTEInChildren => c.withCTEDefs(cteDefs)
case _ => WithCTE(p, cteDefs)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ case class InsertIntoDir(
provider: Option[String],
child: LogicalPlan,
overwrite: Boolean = true)
extends UnaryNode {
extends UnaryNode with CTEInChildren {

override def output: Seq[Attribute] = Seq.empty
override def metadataOutput: Seq[Attribute] = Nil
Expand Down Expand Up @@ -887,6 +887,16 @@ case class WithCTE(plan: LogicalPlan, cteDefs: Seq[CTERelationDef]) extends Logi
}
}

/**
* The logical node which is able to place the `WithCTE` node on its children.
*/
trait CTEInChildren extends LogicalPlan {
def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
withNewChildren(children.map(WithCTE(_, cteDefs)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it makes sense to assert that we have only 1 child.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems fine to have multiple children, we just duplicate the CTE relations. The current code does not allow it though, and go back to inline CTE.

Copy link
Contributor

@peter-toth peter-toth Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that it is always fine to duplicate CTE relations into multiple childrens.
For example, if we have a non-deterministic relation definition and 1-1 reference to it in 2 childrens of CTEInChildren and then here we duplicate the relations into the 2 childrens then I think the InlineCTE rule will decide to inline the relation 2 times, which is not correct.
But I agree with you, I don't see that this could happen now.

}
}


case class WithWindowDefinition(
windowDefinitions: Map[String, WindowSpecDefinition],
child: LogicalPlan) extends UnaryNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.types.DataType
* Parsed logical plans are located in Catalyst so that as much SQL parsing logic as possible is be
* kept in a [[org.apache.spark.sql.catalyst.parser.AbstractSqlParser]].
*/
abstract class ParsedStatement extends LogicalPlan {
abstract class ParsedStatement extends LogicalPlan with CTEInChildren {
// Redact properties and options when parsed nodes are used by generic methods like toString
override def productIterator: Iterator[Any] = super.productIterator.map {
case mapArg: Map[_, _] => conf.redactOptions(mapArg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ trait KeepAnalyzedQuery extends Command {
/**
* Base trait for DataSourceV2 write commands
*/
trait V2WriteCommand extends UnaryCommand with KeepAnalyzedQuery {
trait V2WriteCommand extends UnaryCommand with KeepAnalyzedQuery with CTEInChildren {
def table: NamedRelation
def query: LogicalPlan
def isByName: Boolean
Expand Down Expand Up @@ -392,9 +392,16 @@ case class WriteDelta(
}
}

trait V2CreateTableAsSelectPlan extends V2CreateTablePlan with AnalysisOnlyCommand {
trait V2CreateTableAsSelectPlan
extends V2CreateTablePlan
with AnalysisOnlyCommand
with CTEInChildren {
def query: LogicalPlan

override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
withNameAndQuery(newName = name, newQuery = WithCTE(query, cteDefs))
}

override lazy val resolved: Boolean = childrenResolved && {
// the table schema is created from the query schema, so the only resolution needed is to check
// that the columns referenced by the table's partitioning exist in the query schema
Expand Down Expand Up @@ -1234,12 +1241,16 @@ case class RepairTable(
case class AlterViewAs(
child: LogicalPlan,
originalText: String,
query: LogicalPlan) extends BinaryCommand {
query: LogicalPlan) extends BinaryCommand with CTEInChildren {
override def left: LogicalPlan = child
override def right: LogicalPlan = query
override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan =
copy(child = newLeft, query = newRight)

override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
withNewChildren(Seq(child, WithCTE(query, cteDefs)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just copy(query = WithCTE(... like at other places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withNewChildren can copy over the tree node tags.

}
}

/**
Expand All @@ -1253,12 +1264,16 @@ case class CreateView(
originalText: Option[String],
query: LogicalPlan,
allowExisting: Boolean,
replace: Boolean) extends BinaryCommand {
replace: Boolean) extends BinaryCommand with CTEInChildren {
override def left: LogicalPlan = child
override def right: LogicalPlan = query
override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan =
copy(child = newLeft, query = newRight)

override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
withNewChildren(Seq(child, WithCTE(query, cteDefs)))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3759,6 +3759,14 @@ object SQLConf {
.checkValues(LegacyBehaviorPolicy.values.map(_.toString))
.createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)

val LEGACY_INLINE_CTE_IN_COMMANDS = buildConf("spark.sql.legacy.inlineCTEInCommands")
.internal()
.doc("If true, always inline the CTE relations for the queries in commands. This is the " +
"legacy behavior which may produce incorrect results because Spark may evaluate a CTE " +
"relation more than once, even if it's nondeterministic.")
.booleanConf
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
.createWithDefault(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need version here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!


val LEGACY_TIME_PARSER_POLICY = buildConf("spark.sql.legacy.timeParserPolicy")
.internal()
.doc("When LEGACY, java.text.SimpleDateFormat is used for formatting and parsing " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryCommand}
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, UnaryCommand}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
Expand All @@ -35,7 +35,7 @@ import org.apache.spark.util.SerializableConfiguration
/**
* A special `Command` which writes data out and updates metrics.
*/
trait DataWritingCommand extends UnaryCommand {
trait DataWritingCommand extends UnaryCommand with CTEInChildren {
/**
* The input query plan that produces the data to be written.
* IMPORTANT: the input query plan MUST be analyzed, so that we can carry its output columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources._

Expand All @@ -42,7 +42,7 @@ case class InsertIntoDataSourceDirCommand(
storage: CatalogStorageFormat,
provider: String,
query: LogicalPlan,
overwrite: Boolean) extends LeafRunnableCommand {
overwrite: Boolean) extends LeafRunnableCommand with CTEInChildren {

override def innerChildren: Seq[LogicalPlan] = query :: Nil

Expand Down Expand Up @@ -76,4 +76,8 @@ case class InsertIntoDataSourceDirCommand(

Seq.empty[Row]
}

override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
copy(query = WithCTE(query, cteDefs))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.net.URI

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE}
import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.CommandExecutionMode
Expand Down Expand Up @@ -141,7 +141,7 @@ case class CreateDataSourceTableAsSelectCommand(
mode: SaveMode,
query: LogicalPlan,
outputColumnNames: Seq[String])
extends LeafRunnableCommand {
extends LeafRunnableCommand with CTEInChildren {
assert(query.resolved)
override def innerChildren: Seq[LogicalPlan] = query :: Nil

Expand Down Expand Up @@ -233,4 +233,8 @@ case class CreateDataSourceTableAsSelectCommand(
throw ex
}
}

override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
copy(query = WithCTE(query, cteDefs))
}
}
Loading