Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49269][SQL] Eagerly evaluate VALUES() list in AstBuilder #47791

Original file line number Diff line number Diff line change
Expand Up @@ -468,19 +468,14 @@ class AstBuilder extends DataTypeAstBuilder
val (relationCtx, options, cols, partition, ifPartitionNotExists, byName)
= visitInsertIntoTable(table)
withIdentClause(relationCtx, Seq(query), (ident, otherPlans) => {
val insertIntoStatement = InsertIntoStatement(
InsertIntoStatement(
createUnresolvedRelation(relationCtx, ident, options),
partition,
cols,
otherPlans.head,
overwrite = false,
ifPartitionNotExists,
byName)
if (conf.getConf(SQLConf.OPTIMIZE_INSERT_INTO_VALUES_PARSER)) {
EvaluateUnresolvedInlineTable.evaluate(insertIntoStatement)
} else {
insertIntoStatement
}
})
case table: InsertOverwriteTableContext =>
val (relationCtx, options, cols, partition, ifPartitionNotExists, byName)
Expand Down Expand Up @@ -1897,7 +1892,12 @@ class AstBuilder extends DataTypeAstBuilder
Seq.tabulate(rows.head.size)(i => s"col${i + 1}")
}

val table = UnresolvedInlineTable(aliases, rows.toSeq)
val unresolvedTable = UnresolvedInlineTable(aliases, rows.toSeq)
val table = if (conf.getConf(SQLConf.EAGER_EVAL_OF_UNRESOLVED_INLINE_TABLE_ENABLED)) {
EvaluateUnresolvedInlineTable.evaluate(unresolvedTable)
} else {
unresolvedTable
}
table.optionalMap(ctx.tableAlias.strictIdentifier)(aliasPlan)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,8 @@ import org.apache.spark.sql.types.{StructField, StructType}
object EvaluateUnresolvedInlineTable extends SQLConfHelper
with AliasHelper with EvalHelper with CastSupport {

def evaluate(plan: LogicalPlan): LogicalPlan = {
traversePlanAndEvalUnresolvedInlineTable(plan)
}

def traversePlanAndEvalUnresolvedInlineTable(plan: LogicalPlan): LogicalPlan = {
plan match {
case table: UnresolvedInlineTable if table.expressionsResolved =>
evaluateUnresolvedInlineTable(table)
case _ => plan.mapChildren(traversePlanAndEvalUnresolvedInlineTable)
}
}
def evaluate(plan: UnresolvedInlineTable): LogicalPlan =
if (plan.expressionsResolved) evaluateUnresolvedInlineTable(plan) else plan

def evaluateUnresolvedInlineTable(table: UnresolvedInlineTable): LogicalPlan = {
validateInputDimension(table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -969,11 +969,11 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val OPTIMIZE_INSERT_INTO_VALUES_PARSER =
buildConf("spark.sql.parser.optimizeInsertIntoValuesParser")
val EAGER_EVAL_OF_UNRESOLVED_INLINE_TABLE_ENABLED =
buildConf("spark.sql.parser.eagerEvalOfUnresolvedInlineTable")
.internal()
.doc("Controls whether we optimize the ASTree that gets generated when parsing " +
"`insert into ... values` DML statements.")
"VALUES lists (UnresolvedInlineTable) by eagerly evaluating it in the AST Builder.")
.booleanConf
.createWithDefault(true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2633,7 +2633,7 @@ class DDLParserSuite extends AnalysisTest {

for (optimizeInsertIntoValues <- Seq(true, false)) {
withSQLConf(
SQLConf.OPTIMIZE_INSERT_INTO_VALUES_PARSER.key ->
SQLConf.EAGER_EVAL_OF_UNRESOLVED_INLINE_TABLE_ENABLED.key ->
optimizeInsertIntoValues.toString) {
comparePlans(parsePlan(dateTypeSql), insertPartitionPlan(
"2019-01-02", optimizeInsertIntoValues))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser
import scala.annotation.nowarn

import org.apache.spark.SparkThrowable
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{EvaluateUnresolvedInlineTable, FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, NamedParameter, PosParameter, RelationTimeTravel, UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedGenerator, UnresolvedInlineTable, UnresolvedRelation, UnresolvedStar, UnresolvedSubqueryColumnAliases, UnresolvedTableValuedFunction, UnresolvedTVFAliases}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -1000,14 +1000,28 @@ class PlanParserSuite extends AnalysisTest {
}

test("inline table") {
assertEqual("values 1, 2, 3, 4",
UnresolvedInlineTable(Seq("col1"), Seq(1, 2, 3, 4).map(x => Seq(Literal(x)))))
for (optimizeValues <- Seq(true, false)) {
withSQLConf(SQLConf.EAGER_EVAL_OF_UNRESOLVED_INLINE_TABLE_ENABLED.key ->
optimizeValues.toString) {
val unresolvedTable1 =
UnresolvedInlineTable(Seq("col1"), Seq(1, 2, 3, 4).map(x => Seq(Literal(x))))
val table1 = if (optimizeValues) {
EvaluateUnresolvedInlineTable.evaluate(unresolvedTable1)
} else {
unresolvedTable1
}
assertEqual("values 1, 2, 3, 4", table1)

assertEqual(
"values (1, 'a'), (2, 'b') as tbl(a, b)",
UnresolvedInlineTable(
Seq("a", "b"),
Seq(Literal(1), Literal("a")) :: Seq(Literal(2), Literal("b")) :: Nil).as("tbl"))
val unresolvedTable2 = UnresolvedInlineTable(
Seq("a", "b"), Seq(Literal(1), Literal("a")) :: Seq(Literal(2), Literal("b")) :: Nil)
val table2 = if (optimizeValues) {
EvaluateUnresolvedInlineTable.evaluate(unresolvedTable2)
} else {
unresolvedTable2
}
assertEqual("values (1, 'a'), (2, 'b') as tbl(a, b)", table2.as("tbl"))
}
}
}

test("simple select query with !> and !<") {
Expand Down Expand Up @@ -1907,12 +1921,22 @@ class PlanParserSuite extends AnalysisTest {
}

test("SPARK-42553: NonReserved keyword 'interval' can be column name") {
comparePlans(
parsePlan("SELECT interval FROM VALUES ('abc') AS tbl(interval);"),
UnresolvedInlineTable(
Seq("interval"),
Seq(Literal("abc")) :: Nil).as("tbl").select($"interval")
)
for (optimizeValues <- Seq(true, false)) {
withSQLConf(SQLConf.EAGER_EVAL_OF_UNRESOLVED_INLINE_TABLE_ENABLED.key ->
optimizeValues.toString) {
val unresolvedTable =
UnresolvedInlineTable(Seq("interval"), Seq(Literal("abc")) :: Nil)
val table = if (optimizeValues) {
EvaluateUnresolvedInlineTable.evaluate(unresolvedTable)
} else {
unresolvedTable
}
comparePlans(
parsePlan("SELECT interval FROM VALUES ('abc') AS tbl(interval);"),
table.as("tbl").select($"interval")
)
}
}
}

test("SPARK-44066: parsing of positional parameters") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ org.apache.spark.sql.AnalysisException
-- !query
select * from values ("one", 2.0), ("two") as data(a, b)
-- !query analysis
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "INVALID_INLINE_TABLE.NUM_COLUMNS_MISMATCH",
"sqlState" : "42000",
Expand Down Expand Up @@ -157,7 +157,7 @@ org.apache.spark.sql.AnalysisException
-- !query
select * from values ("one"), ("two") as data(a, b)
-- !query analysis
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "INVALID_INLINE_TABLE.NUM_COLUMNS_MISMATCH",
"sqlState" : "42000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ SELECT a, b,
SUM(b) OVER(ORDER BY A ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
FROM (VALUES(1,1),(2,2),(3,(cast('nan' as int))),(4,3),(5,4)) t(a,b)
-- !query analysis
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "INVALID_INLINE_TABLE.FAILED_SQL_EXPRESSION_EVALUATION",
"sqlState" : "42000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ org.apache.spark.sql.AnalysisException
-- !query
select udf(a), udf(b) from values ("one", 2.0), ("two") as data(a, b)
-- !query analysis
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "INVALID_INLINE_TABLE.NUM_COLUMNS_MISMATCH",
"sqlState" : "42000",
Expand Down Expand Up @@ -143,7 +143,7 @@ org.apache.spark.sql.AnalysisException
-- !query
select udf(a), udf(b) from values ("one"), ("two") as data(a, b)
-- !query analysis
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "INVALID_INLINE_TABLE.NUM_COLUMNS_MISMATCH",
"sqlState" : "42000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ select * from values ("one", 2.0), ("two") as data(a, b)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "INVALID_INLINE_TABLE.NUM_COLUMNS_MISMATCH",
"sqlState" : "42000",
Expand Down Expand Up @@ -177,7 +177,7 @@ select * from values ("one"), ("two") as data(a, b)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "INVALID_INLINE_TABLE.NUM_COLUMNS_MISMATCH",
"sqlState" : "42000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ FROM (VALUES(1,1),(2,2),(3,(cast('nan' as int))),(4,3),(5,4)) t(a,b)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "INVALID_INLINE_TABLE.FAILED_SQL_EXPRESSION_EVALUATION",
"sqlState" : "42000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ select udf(a), udf(b) from values ("one", 2.0), ("two") as data(a, b)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "INVALID_INLINE_TABLE.NUM_COLUMNS_MISMATCH",
"sqlState" : "42000",
Expand Down Expand Up @@ -161,7 +161,7 @@ select udf(a), udf(b) from values ("one"), ("two") as data(a, b)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "INVALID_INLINE_TABLE.NUM_COLUMNS_MISMATCH",
"sqlState" : "42000",
Expand Down
Loading