Skip to content

Commit

Permalink
[SPARK-48343][SQL] Introduction of SQL Scripting interpreter
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Previous [PR](apache#46665) introduced parser changes for SQL Scripting. This PR is a follow-up to introduce the interpreter for SQL Scripting language and proposes the following changes:
- `SqlScriptingExecutionNode` - introduces execution nodes for SQL scripting, used during interpretation phase:
  - `SingleStatementExec` - executable node for `SingleStatement` logical node; wraps logical plan of the single statement.
  - `CompoundNestedStatementIteratorExec` - implements base recursive iterator logic for all nesting statements.
  - `CompoundBodyExec` - concrete implementation of `CompoundNestedStatementIteratorExec` for `CompoundBody` logical node.
- `SqlScriptingInterpreter` - introduces the interpreter for SQL scripts. Product of interpretation is the iterator over the statements that should be executed.

Follow-up PRs will introduce further statements, support for exceptions thrown from parser/interpreter, exception handling in SQL, etc.
More details can be found in [Jira item](https://issues.apache.org/jira/browse/SPARK-48343) for this task and its parent (where the design doc is uploaded as well).

### Why are the changes needed?
The intent is to add support for SQL scripting (and stored procedures down the line). It gives users the ability to develop complex logic and ETL entirely in SQL.

Until now, users had to write verbose SQL statements or combine SQL + Python to efficiently write the logic. This is an effort to breach that gap and enable complex logic to be written entirely in SQL.

### Does this PR introduce _any_ user-facing change?
No.
This PR is second in series of PRs that will introduce changes to sql() API to add support for SQL scripting, but for now, the API remains unchanged.
In the future, the API will remain the same as well, but it will have new possibility to execute SQL scripts.

### How was this patch tested?
There are tests for newly introduced parser changes:
- `SqlScriptingExecutionNodeSuite` - unit tests for execution nodes.
- `SqlScriptingInterpreterSuite` - tests for interpreter (with parser integration).

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#47026 from davidm-db/sql_scripting_interpreter.

Authored-by: David Milicevic <david.milicevic@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
davidm-db authored and cloud-fan committed Jul 8, 2024
1 parent cd1d687 commit 2c54aa5
Show file tree
Hide file tree
Showing 8 changed files with 535 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ object CheckConnectJvmClientCompatibility {
ProblemFilters.exclude[Problem]("org.apache.spark.sql.streaming.ui.*"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.test.*"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.util.*"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.scripting.*"),

// Skip private[sql] constructors
ProblemFilters.exclude[Problem]("org.apache.spark.sql.*.this"),
Expand Down
3 changes: 2 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ object SparkBuild extends PomBuild {
"org.apache.spark.api.python",
"org.apache.spark.network",
"org.apache.spark.deploy",
"org.apache.spark.util.collection"
"org.apache.spark.util.collection",
"org.apache.spark.sql.scripting"
).mkString(":"),
"-doc-title", "Spark " + version.value.replaceAll("-SNAPSHOT", "") + " ScalaDoc"
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ case class SingleStatement(parsedPlan: LogicalPlan)

override val origin: Origin = CurrentOrigin.get

def getText(sqlScriptText: String): String = {
if (origin.startIndex.isEmpty || origin.stopIndex.isEmpty) {
return null
}
sqlScriptText.substring(origin.startIndex.get, origin.stopIndex.get + 1)
/**
* Get the SQL query text corresponding to this statement.
* @return
* SQL query text.
*/
def getText: String = {
assert(origin.sqlText.isDefined && origin.startIndex.isDefined && origin.stopIndex.isDefined)
origin.sqlText.get.substring(origin.startIndex.get, origin.stopIndex.get + 1)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class SqlScriptingParserSuite extends SparkFunSuite with SQLHelper {
assert(tree.collection.length == 1)
assert(tree.collection.head.isInstanceOf[SingleStatement])
val sparkStatement = tree.collection.head.asInstanceOf[SingleStatement]
assert(sparkStatement.getText(sqlScriptText) == "SELECT 1;")
assert(sparkStatement.getText == "SELECT 1;")
}

test("single select without ;") {
Expand All @@ -38,7 +38,7 @@ class SqlScriptingParserSuite extends SparkFunSuite with SQLHelper {
assert(tree.collection.length == 1)
assert(tree.collection.head.isInstanceOf[SingleStatement])
val sparkStatement = tree.collection.head.asInstanceOf[SingleStatement]
assert(sparkStatement.getText(sqlScriptText) == "SELECT 1")
assert(sparkStatement.getText == "SELECT 1")
}

test("multi select without ; - should fail") {
Expand All @@ -62,7 +62,7 @@ class SqlScriptingParserSuite extends SparkFunSuite with SQLHelper {
.zip(tree.collection)
.foreach { case (expected, statement) =>
val sparkStatement = statement.asInstanceOf[SingleStatement]
val statementText = sparkStatement.getText(sqlScriptText)
val statementText = sparkStatement.getText
assert(statementText == expected)
}
}
Expand Down Expand Up @@ -124,7 +124,7 @@ class SqlScriptingParserSuite extends SparkFunSuite with SQLHelper {
.zip(tree.collection)
.foreach { case (expected, statement) =>
val sparkStatement = statement.asInstanceOf[SingleStatement]
val statementText = sparkStatement.getText(sqlScriptText)
val statementText = sparkStatement.getText
assert(statementText == expected)
}
}
Expand All @@ -148,16 +148,16 @@ class SqlScriptingParserSuite extends SparkFunSuite with SQLHelper {
assert(tree.collection.head.isInstanceOf[CompoundBody])
val body1 = tree.collection.head.asInstanceOf[CompoundBody]
assert(body1.collection.length == 1)
assert(body1.collection.head.asInstanceOf[SingleStatement].getText(sqlScriptText)
assert(body1.collection.head.asInstanceOf[SingleStatement].getText
== "SELECT 1")

val body2 = tree.collection(1).asInstanceOf[CompoundBody]
assert(body2.collection.length == 1)
assert(body2.collection.head.isInstanceOf[CompoundBody])
val nestedBody = body2.collection.head.asInstanceOf[CompoundBody]
assert(nestedBody.collection.head.asInstanceOf[SingleStatement].getText(sqlScriptText)
assert(nestedBody.collection.head.asInstanceOf[SingleStatement].getText
== "SELECT 2")
assert(nestedBody.collection(1).asInstanceOf[SingleStatement].getText(sqlScriptText)
assert(nestedBody.collection(1).asInstanceOf[SingleStatement].getText
== "SELECT 3")
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.scripting

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.{Origin, WithOrigin}

/**
* Trait for all SQL scripting execution nodes used during interpretation phase.
*/
sealed trait CompoundStatementExec extends Logging {

/**
* Whether the statement originates from the SQL script or is created during the interpretation.
* Example: DropVariable statements are automatically created at the end of each compound.
*/
val isInternal: Boolean = false

/**
* Reset execution of the current node.
*/
def reset(): Unit
}

/**
* Leaf node in the execution tree.
*/
trait LeafStatementExec extends CompoundStatementExec

/**
* Non-leaf node in the execution tree. It is an iterator over executable child nodes.
*/
trait NonLeafStatementExec extends CompoundStatementExec {

/**
* Construct the iterator to traverse the tree rooted at this node in an in-order traversal.
* @return
* Tree iterator.
*/
def getTreeIterator: Iterator[CompoundStatementExec]
}

/**
* Executable node for SingleStatement.
* @param parsedPlan
* Logical plan of the parsed statement.
* @param origin
* Origin descriptor for the statement.
* @param isInternal
* Whether the statement originates from the SQL script or it is created during the
* interpretation. Example: DropVariable statements are automatically created at the end of each
* compound.
*/
class SingleStatementExec(
var parsedPlan: LogicalPlan,
override val origin: Origin,
override val isInternal: Boolean)
extends LeafStatementExec with WithOrigin {

/**
* Whether this statement has been executed during the interpretation phase.
* Example: Statements in conditions of If/Else, While, etc.
*/
var isExecuted = false

/**
* Get the SQL query text corresponding to this statement.
* @return
* SQL query text.
*/
def getText: String = {
assert(origin.sqlText.isDefined && origin.startIndex.isDefined && origin.stopIndex.isDefined)
origin.sqlText.get.substring(origin.startIndex.get, origin.stopIndex.get + 1)
}

override def reset(): Unit = isExecuted = false
}

/**
* Abstract class for all statements that contain nested statements.
* Implements recursive iterator logic over all child execution nodes.
* @param collection
* Collection of child execution nodes.
*/
abstract class CompoundNestedStatementIteratorExec(collection: Seq[CompoundStatementExec])
extends NonLeafStatementExec {

private var localIterator = collection.iterator
private var curr = if (localIterator.hasNext) Some(localIterator.next()) else None

private lazy val treeIterator: Iterator[CompoundStatementExec] =
new Iterator[CompoundStatementExec] {
override def hasNext: Boolean = {
val childHasNext = curr match {
case Some(body: NonLeafStatementExec) => body.getTreeIterator.hasNext
case Some(_: LeafStatementExec) => true
case None => false
case _ => throw SparkException.internalError(
"Unknown statement type encountered during SQL script interpretation.")
}
localIterator.hasNext || childHasNext
}

@scala.annotation.tailrec
override def next(): CompoundStatementExec = {
curr match {
case None => throw SparkException.internalError(
"No more elements to iterate through in the current SQL compound statement.")
case Some(statement: LeafStatementExec) =>
curr = if (localIterator.hasNext) Some(localIterator.next()) else None
statement
case Some(body: NonLeafStatementExec) =>
if (body.getTreeIterator.hasNext) {
body.getTreeIterator.next()
} else {
curr = if (localIterator.hasNext) Some(localIterator.next()) else None
next()
}
case _ => throw SparkException.internalError(
"Unknown statement type encountered during SQL script interpretation.")
}
}
}

override def getTreeIterator: Iterator[CompoundStatementExec] = treeIterator

override def reset(): Unit = {
collection.foreach(_.reset())
localIterator = collection.iterator
curr = if (localIterator.hasNext) Some(localIterator.next()) else None
}
}

/**
* Executable node for CompoundBody.
* @param statements
* Executable nodes for nested statements within the CompoundBody.
*/
class CompoundBodyExec(statements: Seq[CompoundStatementExec])
extends CompoundNestedStatementIteratorExec(statements)
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.scripting

import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier
import org.apache.spark.sql.catalyst.parser.{CompoundBody, CompoundPlanStatement, SingleStatement}
import org.apache.spark.sql.catalyst.plans.logical.{CreateVariable, DropVariable, LogicalPlan}
import org.apache.spark.sql.catalyst.trees.Origin

/**
* SQL scripting interpreter - builds SQL script execution plan.
*/
case class SqlScriptingInterpreter() {

/**
* Build execution plan and return statements that need to be executed,
* wrapped in the execution node.
*
* @param compound
* CompoundBody for which to build the plan.
* @return
* Iterator through collection of statements to be executed.
*/
def buildExecutionPlan(compound: CompoundBody): Iterator[CompoundStatementExec] = {
transformTreeIntoExecutable(compound).asInstanceOf[CompoundBodyExec].getTreeIterator
}

/**
* Fetch the name of the Create Variable plan.
* @param plan
* Plan to fetch the name from.
* @return
* Name of the variable.
*/
private def getDeclareVarNameFromPlan(plan: LogicalPlan): Option[UnresolvedIdentifier] =
plan match {
case CreateVariable(name: UnresolvedIdentifier, _, _) => Some(name)
case _ => None
}

/**
* Transform the parsed tree to the executable node.
* @param node
* Root node of the parsed tree.
* @return
* Executable statement.
*/
private def transformTreeIntoExecutable(node: CompoundPlanStatement): CompoundStatementExec =
node match {
case body: CompoundBody =>
// TODO [SPARK-48530]: Current logic doesn't support scoped variables and shadowing.
val variables = body.collection.flatMap {
case st: SingleStatement => getDeclareVarNameFromPlan(st.parsedPlan)
case _ => None
}
val dropVariables = variables
.map(varName => DropVariable(varName, ifExists = true))
.map(new SingleStatementExec(_, Origin(), isInternal = true))
.reverse
new CompoundBodyExec(
body.collection.map(st => transformTreeIntoExecutable(st)) ++ dropVariables)
case sparkStatement: SingleStatement =>
new SingleStatementExec(
sparkStatement.parsedPlan,
sparkStatement.origin,
isInternal = false)
}
}
Loading

0 comments on commit 2c54aa5

Please sign in to comment.