Skip to content

Commit

Permalink
Accept only TABLE() first argument
Browse files Browse the repository at this point in the history
  • Loading branch information
szehon-ho committed Aug 11, 2023
1 parent cecb085 commit f9a5cf7
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 13 deletions.
10 changes: 10 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -3443,6 +3443,16 @@
"Window function <funcName> requires an OVER clause."
]
},
"WITH_OPTIONS_EXPECTED_TABLE" : {
"message" : [
"`with_options` function requires a table argument, \"TABLE($t)\"."
]
},
"WITH_OPTIONS_EXPECTED_SIMPLE_TABLE" : {
"message" : [
"`with_options` function only handles a simple table argument, \"TABLE($t)\"."
]
},
"WRITE_STREAM_NOT_ALLOWED" : {
"message" : [
"`writeStream` can be called only on streaming Dataset/DataFrame."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ package org.apache.spark.sql.catalyst.plans.logical
import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.{AliasIdentifier, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis.{AnsiTypeCoercion, MultiInstanceRelation, Resolver, TypeCoercion, TypeCoercionBase, UnresolvedRelation, UnresolvedUnaryNode}
import org.apache.spark.sql.catalyst.analysis.{AnsiTypeCoercion, MultiInstanceRelation, Resolver, TypeCoercion, TypeCoercionBase, UnresolvedUnaryNode}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.CatalogTable.VIEW_STORING_ANALYZED_PLAN
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, TypedImperativeAggregate}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
Expand All @@ -34,6 +33,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -1142,10 +1142,11 @@ case class Range(

@ExpressionDescription(
usage = "_FUNC_(identifier: String, options: Map) - " +
"Returns the data source relation with the given configuration.",
"Returns the data source relation with the given options. " +
"The first argument must be a simple TABLE() parameter.",
examples = """
Examples:
> SELECT * FROM _FUNC_('cat.db.table', map('foo','bar'));
> SELECT * FROM _FUNC_(TABLE(cat.db.table), map('split-size','5'));
1,a
""",
since = "4.0.0",
Expand All @@ -1154,8 +1155,8 @@ case class RelationWithOptions(child: LogicalPlan)
extends UnaryNode {
override def output: Seq[Attribute] = Nil

def this(identifier: Expression, options: Expression) = {
this(RelationWithOptions.childRelation(identifier, options))
def this(table: Expression, options: Expression) = {
this(RelationWithOptions.withOptions(table, options))
}

override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
Expand All @@ -1165,11 +1166,29 @@ case class RelationWithOptions(child: LogicalPlan)
}

object RelationWithOptions {
def childRelation(identifier: Expression, options: Expression): UnresolvedRelation = {
val tableIdentifier = CatalystSqlParser.parseMultipartIdentifier(
identifier.asInstanceOf[Literal].toString)
def withOptions(tableExpr: Expression, options: Expression):
LogicalPlan = {
val relationOptions = ExprUtils.convertToMapData(options)
UnresolvedRelation(tableIdentifier, new CaseInsensitiveStringMap(relationOptions.asJava))

if (!tableExpr.isInstanceOf[FunctionTableSubqueryArgumentExpression]) {
throw QueryCompilationErrors.withOptionsExpectedTableError(tableExpr.sql)
}
val table = tableExpr.asInstanceOf[FunctionTableSubqueryArgumentExpression]

table.plan match {
// Support only a direct call to Table(t1)
// Support only DataSourceV2Relation as its the only relation with options
case t @ SubqueryAlias(_, r @ DataSourceV2Relation(_, _, _, _, options))
=> t.copy(child = r.copy(options = merge(options, relationOptions)))
case _ => throw QueryCompilationErrors.withOptionsExpectedSimpleTableError(table.toString)
}
}

def merge(original: CaseInsensitiveStringMap, newMap: Map[String, String]):
CaseInsensitiveStringMap = {
val map = new java.util.HashMap[String, String](newMap.asJava)
map.putAll(original)
new CaseInsensitiveStringMap(map)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1974,6 +1974,20 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
)
}

def withOptionsExpectedSimpleTableError(actual: String): Throwable = {
new AnalysisException(
errorClass = "WITH_OPTIONS_EXPECTED_SIMPLE_TABLE",
messageParameters = Map.empty
)
}

def withOptionsExpectedTableError(actual: String): Throwable = {
new AnalysisException(
errorClass = "WITH_OPTIONS_EXPECTED_TABLE",
messageParameters = Map.empty
)
}

def identifierTooManyNamePartsError(originalIdentifier: String): Throwable = {
new AnalysisException(
errorClass = "IDENTIFIER_TOO_MANY_NAME_PARTS",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3301,7 +3301,7 @@ class DataSourceV2SQLSuiteV1Filter
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b')")

val df = sql(s"SELECT * FROM with_options('$t1', map('split-size', '5'))")
val df = sql(s"SELECT * FROM with_options(TABLE ($t1), map('split-size', '5'))")
val collected = df.queryExecution.optimizedPlan.collect {
case scan: DataSourceV2ScanRelation =>
assert(scan.relation.options.get("split-size") == "5")
Expand All @@ -3319,15 +3319,27 @@ class DataSourceV2SQLSuiteV1Filter
"The `with_options` requires 2 parameters but the actual number is 1"))

val wrongFirstArg = intercept[AnalysisException](
sql(s"SELECT * FROM with_options(foo, map('foo','bar'))"))
sql(s"SELECT * FROM with_options(foo, map('split-size','5'))"))
assert(wrongFirstArg.message.contains(
"A column, variable, or function parameter with name `foo` cannot be resolved"
))

val wrongFirstArg2 = intercept[AnalysisException](
sql(s"SELECT * FROM with_options(array('$t1'), map('split-size','5'))"))
assert(wrongFirstArg2.message.contains(
"`with_options` function requires a table argument, \"TABLE($t)\""
))

val wrongSecondArg = intercept[AnalysisException](
sql(s"SELECT * FROM with_options('$t1', array('foo'))"))
sql(s"SELECT * FROM with_options(TABLE ($t1), array('split-size', '5'))"))
assert(wrongSecondArg.message.contains(
"Must use the `map()` function for options"))

val unsupportedTableCall = intercept[AnalysisException](
sql(s"SELECT * FROM with_options(TABLE (SELECT * FROM range(0,1))" +
s", map('split-size','5'))"))
assert(unsupportedTableCall.message.contains(
"`with_options` function only handles a simple table argument, \"TABLE($t)\""))
}
}

Expand Down

0 comments on commit f9a5cf7

Please sign in to comment.