Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49558][SQL] Add SQL pipe syntax for LIMIT/OFFSET and ORDER/SORT/CLUSTER/DISTRIBUTE BY #48413

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4928,7 +4928,7 @@
},
"COMBINATION_QUERY_RESULT_CLAUSES" : {
"message" : [
"Combination of ORDER BY/SORT BY/DISTRIBUTE BY/CLUSTER BY."
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
"Combination of <clauses>."
]
},
"COMMENT_NAMESPACE" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,7 @@ operatorPipeRightSide
| sample
| joinRelation
| operator=(UNION | EXCEPT | SETMINUS | INTERSECT) setQuantifier? right=queryTerm
| queryOrganization
;

// When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in Spark SQL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,13 @@ private[sql] object QueryParsingErrors extends DataTypeErrorsBase {
ctx)
}

def combinationQueryResultClausesUnsupportedError(ctx: QueryOrganizationContext): Throwable = {
new ParseException(errorClass = "UNSUPPORTED_FEATURE.COMBINATION_QUERY_RESULT_CLAUSES", ctx)
def combinationQueryResultClausesUnsupportedError(
ctx: QueryOrganizationContext,
clauses: String): Throwable = {
new ParseException(
errorClass = "UNSUPPORTED_FEATURE.COMBINATION_QUERY_RESULT_CLAUSES",
messageParameters = Map("clauses" -> clauses),
ctx)
}

def distributeByUnsupportedError(ctx: QueryOrganizationContext): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,8 @@ class AstBuilder extends DataTypeAstBuilder
* Create a top-level plan with Common Table Expressions.
*/
override def visitQuery(ctx: QueryContext): LogicalPlan = withOrigin(ctx) {
val query = plan(ctx.queryTerm).optionalMap(ctx.queryOrganization)(withQueryResultClauses)
val query = plan(ctx.queryTerm).optionalMap(ctx.queryOrganization)(
withQueryResultClauses(_, _, forPipeOperators = false))

// Apply CTEs
query.optionalMap(ctx.ctes)(withCTE)
Expand Down Expand Up @@ -491,7 +492,7 @@ class AstBuilder extends DataTypeAstBuilder
val selects = ctx.fromStatementBody.asScala.map { body =>
withFromStatementBody(body, from).
// Add organization statements.
optionalMap(body.queryOrganization)(withQueryResultClauses)
optionalMap(body.queryOrganization)(withQueryResultClauses(_, _, forPipeOperators = false))
}
// If there are multiple SELECT just UNION them together into one query.
if (selects.length == 1) {
Expand Down Expand Up @@ -537,7 +538,8 @@ class AstBuilder extends DataTypeAstBuilder
val inserts = ctx.multiInsertQueryBody.asScala.map { body =>
withInsertInto(body.insertInto,
withFromStatementBody(body.fromStatementBody, from).
optionalMap(body.fromStatementBody.queryOrganization)(withQueryResultClauses))
optionalMap(body.fromStatementBody.queryOrganization)(
withQueryResultClauses(_, _, forPipeOperators = false)))
}

// If there are multiple INSERTS just UNION them together into one query.
Expand Down Expand Up @@ -976,31 +978,37 @@ class AstBuilder extends DataTypeAstBuilder
/**
* Add ORDER BY/SORT BY/CLUSTER BY/DISTRIBUTE BY/LIMIT/WINDOWS clauses to the logical plan. These
* clauses determine the shape (ordering/partitioning/rows) of the query result.
*
* If 'forPipeOperators' is true, throws an error if the WINDOW clause is present (since this is
* not currently supported) or if more than one clause is present (this can be useful when parsing
* clauses used with pipe operations which only allow one instance of these clauses each).
*/
private def withQueryResultClauses(
ctx: QueryOrganizationContext,
query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
query: LogicalPlan,
forPipeOperators: Boolean): LogicalPlan = withOrigin(ctx) {
import ctx._
var clause = ""

// Handle ORDER BY, SORT BY, DISTRIBUTE BY, and CLUSTER BY clause.
val withOrder = if (
!order.isEmpty && sort.isEmpty && distributeBy.isEmpty && clusterBy.isEmpty) {
// ORDER BY ...
clause = "ORDER BY"
Sort(order.asScala.map(visitSortItem).toSeq, global = true, query)
} else if (order.isEmpty && !sort.isEmpty && distributeBy.isEmpty && clusterBy.isEmpty) {
// SORT BY ...
clause = "SORT BY"
Sort(sort.asScala.map(visitSortItem).toSeq, global = false, query)
} else if (order.isEmpty && sort.isEmpty && !distributeBy.isEmpty && clusterBy.isEmpty) {
// DISTRIBUTE BY ...
clause = "DISTRIBUTE BY"
withRepartitionByExpression(ctx, expressionList(distributeBy), query)
} else if (order.isEmpty && !sort.isEmpty && !distributeBy.isEmpty && clusterBy.isEmpty) {
// SORT BY ... DISTRIBUTE BY ...
clause = "SORT BY ... DISTRIBUTE BY ..."
Sort(
sort.asScala.map(visitSortItem).toSeq,
global = false,
withRepartitionByExpression(ctx, expressionList(distributeBy), query))
} else if (order.isEmpty && sort.isEmpty && distributeBy.isEmpty && !clusterBy.isEmpty) {
// CLUSTER BY ...
clause = "CLUSTER BY"
val expressions = expressionList(clusterBy)
Sort(
expressions.map(SortOrder(_, Ascending)),
Expand All @@ -1010,21 +1018,40 @@ class AstBuilder extends DataTypeAstBuilder
// [EMPTY]
query
} else {
throw QueryParsingErrors.combinationQueryResultClausesUnsupportedError(ctx)
throw QueryParsingErrors.combinationQueryResultClausesUnsupportedError(
ctx, "ORDER BY/SORT BY/DISTRIBUTE BY/CLUSTER BY")
}

// WINDOWS
val withWindow = withOrder.optionalMap(windowClause)(withWindowClause)
val withWindow = withOrder.optionalMap(windowClause) {
withWindowClause
}
if (forPipeOperators && windowClause != null) {
throw QueryParsingErrors.combinationQueryResultClausesUnsupportedError(
ctx, s"WINDOW clauses within SQL pipe operators")
dtenedor marked this conversation as resolved.
Show resolved Hide resolved
}

// OFFSET
// - OFFSET 0 is the same as omitting the OFFSET clause
val offsetClause = "OFFSET"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shall we put all the constant clauses into an object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, it is good to keep magic strings out of the code.

val withOffset = withWindow.optional(offset) {
if (forPipeOperators && clause.nonEmpty) {
throw QueryParsingErrors.combinationQueryResultClausesUnsupportedError(
ctx, s"the $clause and $offsetClause clauses")
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ctx, s"the $clause and $offsetClause clauses")
ctx, s"the co-existence of $clause and $offsetClause clauses")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, this is done.

}
clause = offsetClause
Offset(typedVisit(offset), withWindow)
}

// LIMIT
// - LIMIT ALL is the same as omitting the LIMIT clause
withOffset.optional(limit) {
val limitClause = "LIMIT"
if (forPipeOperators && clause.nonEmpty && clause != offsetClause) {
throw QueryParsingErrors.combinationQueryResultClausesUnsupportedError(
ctx, s"the $clause and $limitClause clauses")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ctx, s"the $clause and $limitClause clauses")
ctx, s"the co-existence of $clause and $limitClause clauses")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, this is done.

}
clause = limitClause
Limit(typedVisit(limit), withOffset)
}
}
Expand Down Expand Up @@ -5883,6 +5910,18 @@ class AstBuilder extends DataTypeAstBuilder
if (!SQLConf.get.getConf(SQLConf.OPERATOR_PIPE_SYNTAX_ENABLED)) {
operationNotAllowed("Operator pipe SQL syntax using |>", ctx)
}
// This helper function adds a table subquery boundary between the new operator to be added
// (such as a filter or sort) and the input plan if one does not already exist. This helps the
// analyzer behave as if we had added the corresponding SQL clause after a table subquery
// containing the input plan.
def withSubqueryAlias(): LogicalPlan = left match {
case s: SubqueryAlias =>
s
case u: UnresolvedRelation =>
u
case _ =>
SubqueryAlias(SubqueryAlias.generateSubqueryName(), left)
}
Option(ctx.selectClause).map { c =>
withSelectQuerySpecification(
ctx = ctx,
Expand All @@ -5895,18 +5934,7 @@ class AstBuilder extends DataTypeAstBuilder
relation = left,
isPipeOperatorSelect = true)
}.getOrElse(Option(ctx.whereClause).map { c =>
// Add a table subquery boundary between the new filter and the input plan if one does not
// already exist. This helps the analyzer behave as if we had added the WHERE clause after a
// table subquery containing the input plan.
val withSubqueryAlias = left match {
case s: SubqueryAlias =>
s
case u: UnresolvedRelation =>
u
case _ =>
SubqueryAlias(SubqueryAlias.generateSubqueryName(), left)
}
withWhereClause(c, withSubqueryAlias)
withWhereClause(c, withSubqueryAlias())
}.getOrElse(Option(ctx.pivotClause()).map { c =>
if (ctx.unpivotClause() != null) {
throw QueryParsingErrors.unpivotWithPivotInFromClauseNotAllowedError(ctx)
Expand All @@ -5924,7 +5952,9 @@ class AstBuilder extends DataTypeAstBuilder
}.getOrElse(Option(ctx.operator).map { c =>
val all = Option(ctx.setQuantifier()).exists(_.ALL != null)
visitSetOperationImpl(left, plan(ctx.right), all, c.getType)
}.get))))))
}.getOrElse(Option(ctx.queryOrganization).map { c =>
withQueryResultClauses(c, withSubqueryAlias(), forPipeOperators = true)
}.get)))))))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ErrorParserSuite extends AnalysisTest {
checkError(
exception = parseException("select *\nfrom r\norder by q\ncluster by q"),
condition = "UNSUPPORTED_FEATURE.COMBINATION_QUERY_RESULT_CLAUSES",
parameters = Map.empty,
parameters = Map("clauses" -> "ORDER BY/SORT BY/DISTRIBUTE BY/CLUSTER BY"),
context = ExpectedContext(fragment = "order by q\ncluster by q", start = 16, stop = 38))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,11 @@ class PlanParserSuite extends AnalysisTest {
}

val sql1 = s"$baseSql order by a sort by a"
val parameters = Map("clauses" -> "ORDER BY/SORT BY/DISTRIBUTE BY/CLUSTER BY")
checkError(
exception = parseException(sql1),
condition = "UNSUPPORTED_FEATURE.COMBINATION_QUERY_RESULT_CLAUSES",
parameters = Map.empty,
parameters = parameters,
context = ExpectedContext(
fragment = "order by a sort by a",
start = 16,
Expand All @@ -404,7 +405,7 @@ class PlanParserSuite extends AnalysisTest {
checkError(
exception = parseException(sql2),
condition = "UNSUPPORTED_FEATURE.COMBINATION_QUERY_RESULT_CLAUSES",
parameters = Map.empty,
parameters = parameters,
context = ExpectedContext(
fragment = "cluster by a distribute by a",
start = 16,
Expand All @@ -414,7 +415,7 @@ class PlanParserSuite extends AnalysisTest {
checkError(
exception = parseException(sql3),
condition = "UNSUPPORTED_FEATURE.COMBINATION_QUERY_RESULT_CLAUSES",
parameters = Map.empty,
parameters = parameters,
context = ExpectedContext(
fragment = "order by a cluster by a",
start = 16,
Expand All @@ -424,7 +425,7 @@ class PlanParserSuite extends AnalysisTest {
checkError(
exception = parseException(sql4),
condition = "UNSUPPORTED_FEATURE.COMBINATION_QUERY_RESULT_CLAUSES",
parameters = Map.empty,
parameters = parameters,
context = ExpectedContext(
fragment = "order by a distribute by a",
start = 16,
Expand Down
Loading