Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jul 17, 2023
1 parent 58bc261 commit fedc9a6
Show file tree
Hide file tree
Showing 17 changed files with 422 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, CTERelationRef, LogicalPlan, SubqueryAlias, UnresolvedWith, WithCTE}
import org.apache.spark.sql.catalyst.plans.logical.{Command, CTEInChildren, CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, SubqueryAlias, UnresolvedWith, WithCTE}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.util.TypeUtils._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.{LEGACY_CTE_PRECEDENCE_POLICY, LegacyBehaviorPolicy}

/**
Expand Down Expand Up @@ -54,16 +55,40 @@ object CTESubstitution extends Rule[LogicalPlan] {
if (!plan.containsPattern(UNRESOLVED_WITH)) {
return plan
}

val forceInline = if (conf.getConf(SQLConf.LEGACY_INLINE_CTE_IN_COMMANDS)) {
// The legacy behavior always inlines the CTE relations for queries in commands.
plan.exists {
case _: Command | _: ParsedStatement | _: InsertIntoDir => true
case _ => false
}
} else {
val commands = plan.collect {
case c @ (_: Command | _: ParsedStatement | _: InsertIntoDir) => c
}
if (commands.length == 1) {
// If there is only one command and it's `CTEInChildren`, we can resolve
// CTE normally and don't need to force inline.
!commands.head.isInstanceOf[CTEInChildren]
} else if (commands.length > 1) {
// This can happen with the multi-insert statement. We should fall back to
// the legacy behavior.
true
} else {
false
}
}

val cteDefs = ArrayBuffer.empty[CTERelationDef]
val (substituted, firstSubstituted) =
LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match {
case LegacyBehaviorPolicy.EXCEPTION =>
assertNoNameConflictsInCTE(plan)
traverseAndSubstituteCTE(plan, Seq.empty, cteDefs)
traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs)
case LegacyBehaviorPolicy.LEGACY =>
(legacyTraverseAndSubstituteCTE(plan, cteDefs), None)
case LegacyBehaviorPolicy.CORRECTED =>
traverseAndSubstituteCTE(plan, Seq.empty, cteDefs)
traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs)
}
if (cteDefs.isEmpty) {
substituted
Expand Down Expand Up @@ -129,7 +154,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
plan.resolveOperatorsUp {
case UnresolvedWith(child, relations) =>
val resolvedCTERelations =
resolveCTERelations(relations, isLegacy = true, Seq.empty, cteDefs)
resolveCTERelations(relations, isLegacy = true, forceInline = false, Seq.empty, cteDefs)
substituteCTE(child, alwaysInline = true, resolvedCTERelations)
}
}
Expand Down Expand Up @@ -166,25 +191,27 @@ object CTESubstitution extends Rule[LogicalPlan] {
* SELECT * FROM t
* )
* @param plan the plan to be traversed
* @param forceInline always inline the CTE relations if this is true
* @param outerCTEDefs already resolved outer CTE definitions with names
* @param cteDefs all accumulated CTE definitions
* @return the plan where CTE substitution is applied and optionally the last substituted `With`
* where CTE definitions will be gathered to
*/
private def traverseAndSubstituteCTE(
plan: LogicalPlan,
forceInline: Boolean,
outerCTEDefs: Seq[(String, CTERelationDef)],
cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = {
var firstSubstituted: Option[LogicalPlan] = None
val newPlan = plan.resolveOperatorsDownWithPruning(
_.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) {
case UnresolvedWith(child: LogicalPlan, relations) =>
val resolvedCTERelations =
resolveCTERelations(relations, isLegacy = false, outerCTEDefs, cteDefs) ++
resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs) ++
outerCTEDefs
val substituted = substituteCTE(
traverseAndSubstituteCTE(child, resolvedCTERelations, cteDefs)._1,
false,
traverseAndSubstituteCTE(child, forceInline, resolvedCTERelations, cteDefs)._1,
forceInline,
resolvedCTERelations)
if (firstSubstituted.isEmpty) {
firstSubstituted = Some(substituted)
Expand All @@ -202,9 +229,11 @@ object CTESubstitution extends Rule[LogicalPlan] {
private def resolveCTERelations(
relations: Seq[(String, SubqueryAlias)],
isLegacy: Boolean,
forceInline: Boolean,
outerCTEDefs: Seq[(String, CTERelationDef)],
cteDefs: ArrayBuffer[CTERelationDef]): Seq[(String, CTERelationDef)] = {
var resolvedCTERelations = if (isLegacy) {
val alwaysInline = isLegacy || forceInline
var resolvedCTERelations = if (alwaysInline) {
Seq.empty
} else {
outerCTEDefs
Expand All @@ -227,12 +256,12 @@ object CTESubstitution extends Rule[LogicalPlan] {
// WITH t3 AS (SELECT * FROM t1)
// )
// t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`.
traverseAndSubstituteCTE(relation, resolvedCTERelations, cteDefs)._1
traverseAndSubstituteCTE(relation, forceInline, resolvedCTERelations, cteDefs)._1
}
// CTE definition can reference a previous one
val substituted = substituteCTE(innerCTEResolved, isLegacy, resolvedCTERelations)
val substituted = substituteCTE(innerCTEResolved, alwaysInline, resolvedCTERelations)
val cteRelation = CTERelationDef(substituted)
if (!(isLegacy)) {
if (!alwaysInline) {
cteDefs += cteRelation
}
// Prepending new CTEs makes sure that those have higher priority over outer ones.
Expand Down Expand Up @@ -271,18 +300,18 @@ object CTESubstitution extends Rule[LogicalPlan] {
}

/**
* Finds all logical nodes that should have `WithCTE` in their children like
* `InsertIntoStatement`, put `WithCTE` on top of the children and don't place `WithCTE`
* on top of the plan. If there are no such nodes, put `WithCTE` on the top.
* For commands which extend `CTEInChildren`, we should place the `WithCTE` node on its
* children. There are two reasons:
* 1. Some rules will pattern match the root command nodes, and we should keep command
* as the root node to not break them.
* 2. `Dataset` eagerly executes the commands inside a query plan. However, the CTE
* references inside commands will be invalid if we execute the command alone, as
* the CTE definitions are outside of the command.
*/
private def withCTEDefs(p: LogicalPlan, cteDefs: Seq[CTERelationDef]): LogicalPlan = {
val withCTE = WithCTE(p, cteDefs)
var onTop = true
val newPlan = p.resolveOperatorsDown {
case cteInChildren: CTEInChildren =>
onTop = false
cteInChildren.withCTE(withCTE)
p match {
case c: CTEInChildren => c.withCTEDefs(cteDefs)
case _ => WithCTE(p, cteDefs)
}
if (onTop) withCTE else WithCTE(newPlan, cteDefs)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,16 @@ case class WithCTE(plan: LogicalPlan, cteDefs: Seq[CTERelationDef]) extends Logi
}
}

/**
* The logical node which is able to place the `WithCTE` node on its children.
*/
trait CTEInChildren extends LogicalPlan {
def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
withNewChildren(children.map(WithCTE(_, cteDefs)))
}
}


case class WithWindowDefinition(
windowDefinitions: Map[String, WindowSpecDefinition],
child: LogicalPlan) extends UnaryNode {
Expand All @@ -896,15 +906,6 @@ case class WithWindowDefinition(
copy(child = newChild)
}

/**
* The logical node is able to insert the given `WithCTE` into its children.
*/
trait CTEInChildren extends LogicalPlan {
def withCTE(withCTE: WithCTE): LogicalPlan = {
withNewChildren(children.map(withCTE.withNewPlan))
}
}

/**
* @param order The ordering expressions
* @param global True means global sorting apply for entire data set,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,15 +393,13 @@ case class WriteDelta(
}

trait V2CreateTableAsSelectPlan
extends V2CreateTablePlan
extends V2CreateTablePlan
with AnalysisOnlyCommand
with CTEInChildren {
def query: LogicalPlan

override def withCTE(withCTE: WithCTE): LogicalPlan = {
withNameAndQuery(
newName = this.name,
newQuery = withCTE.copy(plan = this.query))
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
withNameAndQuery(newName = name, newQuery = WithCTE(query, cteDefs))
}

override lazy val resolved: Boolean = childrenResolved && {
Expand Down Expand Up @@ -1250,10 +1248,8 @@ case class AlterViewAs(
newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan =
copy(child = newLeft, query = newRight)

override def withCTE(withCTE: WithCTE): LogicalPlan = {
withNewChildrenInternal(
newLeft = this.left,
newRight = withCTE.copy(plan = this.right))
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
withNewChildren(Seq(child, WithCTE(query, cteDefs)))
}
}

Expand All @@ -1275,10 +1271,8 @@ case class CreateView(
newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan =
copy(child = newLeft, query = newRight)

override def withCTE(withCTE: WithCTE): LogicalPlan = {
withNewChildrenInternal(
newLeft = this.left,
newRight = withCTE.copy(plan = this.right))
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
withNewChildren(Seq(child, WithCTE(query, cteDefs)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3759,6 +3759,14 @@ object SQLConf {
.checkValues(LegacyBehaviorPolicy.values.map(_.toString))
.createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)

val LEGACY_INLINE_CTE_IN_COMMANDS = buildConf("spark.sql.legacy.inlineCTEInCommands")
.internal()
.doc("If true, always inline the CTE relations for the queries in commands. This is the " +
"legacy behavior which may produce incorrect results because Spark may evaluate a CTE " +
"relation more than once, even if it's nondeterministic.")
.booleanConf
.createWithDefault(false)

val LEGACY_TIME_PARSER_POLICY = buildConf("spark.sql.legacy.timeParserPolicy")
.internal()
.doc("When LEGACY, java.text.SimpleDateFormat is used for formatting and parsing " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE}
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources._

Expand Down Expand Up @@ -77,7 +77,7 @@ case class InsertIntoDataSourceDirCommand(
Seq.empty[Row]
}

override def withCTE(withCTE: WithCTE): LogicalPlan = {
copy(query = withCTE.copy(plan = this.query))
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
copy(query = WithCTE(query, cteDefs))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.net.URI

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE}
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE}
import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.CommandExecutionMode
Expand Down Expand Up @@ -234,7 +234,7 @@ case class CreateDataSourceTableAsSelectCommand(
}
}

override def withCTE(withCTE: WithCTE): LogicalPlan = {
copy(query = withCTE.copy(plan = this.query))
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
copy(query = WithCTE(query, cteDefs))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -748,8 +748,8 @@ case class DescribeQueryCommand(queryText: String, plan: LogicalPlan)
result.toSeq
}

override def withCTE(withCTE: WithCTE): LogicalPlan = {
copy(plan = withCTE.copy(plan = this.plan))
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
copy(plan = WithCTE(plan, cteDefs))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, GlobalTempView, LocalTempView, ViewType}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, TemporaryViewRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, CTEInChildren, LogicalPlan, Project, View, WithCTE}
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, CTEInChildren, CTERelationDef, LogicalPlan, Project, View, WithCTE}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
import org.apache.spark.sql.errors.QueryCompilationErrors
Expand Down Expand Up @@ -216,8 +216,8 @@ case class CreateViewCommand(
)
}

override def withCTE(withCTE: WithCTE): LogicalPlan = {
copy(plan = withCTE.copy(plan = this.plan))
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
copy(plan = WithCTE(plan, cteDefs))
}
}

Expand Down Expand Up @@ -312,8 +312,8 @@ case class AlterViewAsCommand(
session.sessionState.catalog.alterTable(updatedViewMeta)
}

override def withCTE(withCTE: WithCTE): LogicalPlan = {
copy(query = withCTE.copy(plan = this.query))
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
copy(query = WithCTE(query, cteDefs))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE}
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE}
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.sources.InsertableRelation

Expand Down Expand Up @@ -48,7 +48,7 @@ case class InsertIntoDataSourceCommand(
Seq.empty[Row]
}

override def withCTE(withCTE: WithCTE): LogicalPlan = {
copy(query = withCTE.copy(plan = this.query))
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
copy(query = WithCTE(query, cteDefs))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogT
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.SparkPlan
Expand Down Expand Up @@ -57,7 +57,7 @@ case class InsertIntoHadoopFsRelationCommand(
catalogTable: Option[CatalogTable],
fileIndex: Option[FileIndex],
outputColumnNames: Seq[String])
extends V1WriteCommand with CTEInChildren {
extends V1WriteCommand {

private lazy val parameters = CaseInsensitiveMap(options)

Expand Down Expand Up @@ -277,8 +277,4 @@ case class InsertIntoHadoopFsRelationCommand(

override protected def withNewChildInternal(
newChild: LogicalPlan): InsertIntoHadoopFsRelationCommand = copy(query = newChild)

override def withCTE(withCTE: WithCTE): LogicalPlan = {
withNewChildInternal(withCTE.copy(plan = this.query))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.util.control.NonFatal

import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, LogicalPlan, WithCTE}
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE}
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.sources.CreatableRelationProvider
Expand Down Expand Up @@ -69,7 +69,7 @@ case class SaveIntoDataSourceCommand(
SaveIntoDataSourceCommand(query.clone(), dataSource, options, mode)
}

override def withCTE(withCTE: WithCTE): LogicalPlan = {
copy(query = withCTE.copy(plan = this.query))
override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = {
copy(query = WithCTE(query, cteDefs))
}
}
Loading

0 comments on commit fedc9a6

Please sign in to comment.