Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49558][SQL] Add SQL pipe syntax for LIMIT/OFFSET and ORDER/SORT/CLUSTER/DISTRIBUTE BY #48413

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -4926,6 +4926,11 @@
"Catalog <catalogName> does not support <operation>."
]
},
"CLAUSE_WITH_PIPE_OPERATORS" : {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe @srielau can also comment here, but it looks to me that we have two new errors:

  1. WINDOW in the SQL pipe operator. We don't support window definition in SQL pipe yet.
  2. More than one QUERY_RESULT_CLAUSES in SQL pipe. SQL pipe is designed to specify one operator at a time, so we don't allow any combination, such as ORDER BY col LIMIT 1, or LIMIT 1 OFFSET 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. appears to be a 0A000 (unsupported feature)
  2. Isn't that a syntax error?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, 2 is a syntax error for the pipe statement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, split this into two new errors per suggestion.

"message" : [
"The SQL pipe operator syntax using |> does not support <clauses>."
]
},
"COMBINATION_QUERY_RESULT_CLAUSES" : {
"message" : [
"Combination of ORDER BY/SORT BY/DISTRIBUTE BY/CLUSTER BY."
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -5006,6 +5011,11 @@
"Multiple bucket TRANSFORMs."
]
},
"MULTIPLE_QUERY_RESULT_CLAUSES_WITH_PIPE_OPERATORS" : {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we turn it into a top-level error class? This is syntax error, not unsupported feature, and should have a different SQL state.

"message" : [
"Syntax error: the SQL pipe operator syntax using |> does not support <clauses>. Please separate the multiple result clauses into separate pipe operators and then retry the query again."
Copy link
Contributor

@cloud-fan cloud-fan Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

<clause1> and <clause2> can not coexist in SQL pipe operator using '|>'. Please separate ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to make the error parameters simple, so that people can consume it programatically.

]
},
"MULTI_ACTION_ALTER" : {
"message" : [
"The target JDBC server hosting table <tableName> does not support ALTER TABLE with multiple actions. Split the ALTER TABLE up into individual actions to avoid this error."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,7 @@ operatorPipeRightSide
| sample
| joinRelation
| operator=(UNION | EXCEPT | SETMINUS | INTERSECT) setQuantifier? right=queryTerm
| queryOrganization
;

// When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in Spark SQL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,24 @@ private[sql] object QueryParsingErrors extends DataTypeErrorsBase {
ctx)
}

def clausesWithPipeOperatorsUnsupportedError(
ctx: QueryOrganizationContext,
clauses: String): Throwable = {
new ParseException(
errorClass = "UNSUPPORTED_FEATURE.CLAUSE_WITH_PIPE_OPERATORS",
messageParameters = Map("clauses" -> clauses),
ctx)
}

def multipleQueryResultClausesWithPipeOperatorsUnsupportedError(
ctx: QueryOrganizationContext,
clauses: String): Throwable = {
new ParseException(
errorClass = "UNSUPPORTED_FEATURE.MULTIPLE_QUERY_RESULT_CLAUSES_WITH_PIPE_OPERATORS",
messageParameters = Map("clauses" -> clauses),
ctx)
}

def combinationQueryResultClausesUnsupportedError(ctx: QueryOrganizationContext): Throwable = {
new ParseException(errorClass = "UNSUPPORTED_FEATURE.COMBINATION_QUERY_RESULT_CLAUSES", ctx)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,8 @@ class AstBuilder extends DataTypeAstBuilder
* Create a top-level plan with Common Table Expressions.
*/
override def visitQuery(ctx: QueryContext): LogicalPlan = withOrigin(ctx) {
val query = plan(ctx.queryTerm).optionalMap(ctx.queryOrganization)(withQueryResultClauses)
val query = plan(ctx.queryTerm).optionalMap(ctx.queryOrganization)(
withQueryResultClauses(_, _, forPipeOperators = false))

// Apply CTEs
query.optionalMap(ctx.ctes)(withCTE)
Expand Down Expand Up @@ -491,7 +492,7 @@ class AstBuilder extends DataTypeAstBuilder
val selects = ctx.fromStatementBody.asScala.map { body =>
withFromStatementBody(body, from).
// Add organization statements.
optionalMap(body.queryOrganization)(withQueryResultClauses)
optionalMap(body.queryOrganization)(withQueryResultClauses(_, _, forPipeOperators = false))
}
// If there are multiple SELECT just UNION them together into one query.
if (selects.length == 1) {
Expand Down Expand Up @@ -537,7 +538,8 @@ class AstBuilder extends DataTypeAstBuilder
val inserts = ctx.multiInsertQueryBody.asScala.map { body =>
withInsertInto(body.insertInto,
withFromStatementBody(body.fromStatementBody, from).
optionalMap(body.fromStatementBody.queryOrganization)(withQueryResultClauses))
optionalMap(body.fromStatementBody.queryOrganization)(
withQueryResultClauses(_, _, forPipeOperators = false)))
}

// If there are multiple INSERTS just UNION them together into one query.
Expand Down Expand Up @@ -976,31 +978,37 @@ class AstBuilder extends DataTypeAstBuilder
/**
* Add ORDER BY/SORT BY/CLUSTER BY/DISTRIBUTE BY/LIMIT/WINDOWS clauses to the logical plan. These
* clauses determine the shape (ordering/partitioning/rows) of the query result.
*
* If 'forPipeOperators' is true, throws an error if the WINDOW clause is present (since this is
* not currently supported) or if more than one clause is present (this can be useful when parsing
* clauses used with pipe operations which only allow one instance of these clauses each).
*/
private def withQueryResultClauses(
ctx: QueryOrganizationContext,
query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
query: LogicalPlan,
forPipeOperators: Boolean): LogicalPlan = withOrigin(ctx) {
import ctx._
var clause = ""

// Handle ORDER BY, SORT BY, DISTRIBUTE BY, and CLUSTER BY clause.
val withOrder = if (
!order.isEmpty && sort.isEmpty && distributeBy.isEmpty && clusterBy.isEmpty) {
// ORDER BY ...
clause = "ORDER BY"
Sort(order.asScala.map(visitSortItem).toSeq, global = true, query)
} else if (order.isEmpty && !sort.isEmpty && distributeBy.isEmpty && clusterBy.isEmpty) {
// SORT BY ...
clause = "SORT BY"
Sort(sort.asScala.map(visitSortItem).toSeq, global = false, query)
} else if (order.isEmpty && sort.isEmpty && !distributeBy.isEmpty && clusterBy.isEmpty) {
// DISTRIBUTE BY ...
clause = "DISTRIBUTE BY"
withRepartitionByExpression(ctx, expressionList(distributeBy), query)
} else if (order.isEmpty && !sort.isEmpty && !distributeBy.isEmpty && clusterBy.isEmpty) {
// SORT BY ... DISTRIBUTE BY ...
clause = "SORT BY ... DISTRIBUTE BY ..."
Sort(
sort.asScala.map(visitSortItem).toSeq,
global = false,
withRepartitionByExpression(ctx, expressionList(distributeBy), query))
} else if (order.isEmpty && sort.isEmpty && distributeBy.isEmpty && !clusterBy.isEmpty) {
// CLUSTER BY ...
clause = "CLUSTER BY"
val expressions = expressionList(clusterBy)
Sort(
expressions.map(SortOrder(_, Ascending)),
Expand All @@ -1014,17 +1022,35 @@ class AstBuilder extends DataTypeAstBuilder
}

// WINDOWS
val withWindow = withOrder.optionalMap(windowClause)(withWindowClause)
val withWindow = withOrder.optionalMap(windowClause) {
withWindowClause
}
if (forPipeOperators && windowClause != null) {
throw QueryParsingErrors.clausesWithPipeOperatorsUnsupportedError(
ctx, s"the WINDOW clause")
}

// OFFSET
// - OFFSET 0 is the same as omitting the OFFSET clause
val offsetClause = "OFFSET"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shall we put all the constant clauses into an object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, it is good to keep magic strings out of the code.

val withOffset = withWindow.optional(offset) {
if (forPipeOperators && clause.nonEmpty) {
throw QueryParsingErrors.multipleQueryResultClausesWithPipeOperatorsUnsupportedError(
ctx, s"the co-existence of the $clause and $offsetClause clauses")
}
clause = offsetClause
Offset(typedVisit(offset), withWindow)
}

// LIMIT
// - LIMIT ALL is the same as omitting the LIMIT clause
withOffset.optional(limit) {
val limitClause = "LIMIT"
if (forPipeOperators && clause.nonEmpty && clause != offsetClause) {
throw QueryParsingErrors.multipleQueryResultClausesWithPipeOperatorsUnsupportedError(
ctx, s"the co-existence of the $clause and $limitClause clauses")
}
clause = limitClause
Limit(typedVisit(limit), withOffset)
}
}
Expand Down Expand Up @@ -5883,6 +5909,18 @@ class AstBuilder extends DataTypeAstBuilder
if (!SQLConf.get.getConf(SQLConf.OPERATOR_PIPE_SYNTAX_ENABLED)) {
operationNotAllowed("Operator pipe SQL syntax using |>", ctx)
}
// This helper function adds a table subquery boundary between the new operator to be added
// (such as a filter or sort) and the input plan if one does not already exist. This helps the
// analyzer behave as if we had added the corresponding SQL clause after a table subquery
// containing the input plan.
def withSubqueryAlias(): LogicalPlan = left match {
case s: SubqueryAlias =>
s
case u: UnresolvedRelation =>
u
case _ =>
SubqueryAlias(SubqueryAlias.generateSubqueryName(), left)
}
Option(ctx.selectClause).map { c =>
withSelectQuerySpecification(
ctx = ctx,
Expand All @@ -5895,18 +5933,7 @@ class AstBuilder extends DataTypeAstBuilder
relation = left,
isPipeOperatorSelect = true)
}.getOrElse(Option(ctx.whereClause).map { c =>
// Add a table subquery boundary between the new filter and the input plan if one does not
// already exist. This helps the analyzer behave as if we had added the WHERE clause after a
// table subquery containing the input plan.
val withSubqueryAlias = left match {
case s: SubqueryAlias =>
s
case u: UnresolvedRelation =>
u
case _ =>
SubqueryAlias(SubqueryAlias.generateSubqueryName(), left)
}
withWhereClause(c, withSubqueryAlias)
withWhereClause(c, withSubqueryAlias())
}.getOrElse(Option(ctx.pivotClause()).map { c =>
if (ctx.unpivotClause() != null) {
throw QueryParsingErrors.unpivotWithPivotInFromClauseNotAllowedError(ctx)
Expand All @@ -5924,7 +5951,9 @@ class AstBuilder extends DataTypeAstBuilder
}.getOrElse(Option(ctx.operator).map { c =>
val all = Option(ctx.setQuantifier()).exists(_.ALL != null)
visitSetOperationImpl(left, plan(ctx.right), all, c.getType)
}.get))))))
}.getOrElse(Option(ctx.queryOrganization).map { c =>
withQueryResultClauses(c, withSubqueryAlias(), forPipeOperators = true)
}.get)))))))
}

/**
Expand Down
Loading