Skip to content

Commit

Permalink
Support Spark 3.4
Browse files Browse the repository at this point in the history
## Description

Makes changes to support Spark 3.4. These include compile necessary changes, and test _and_ code changes due to changes in Spark behavior.

Some of the bigger changes include
- A lot of changes regarding error classes. These include...
  - Spark 3.4 changed `class ErrorInfo` to private. This means the current approach in `DeltaThrowableHelper` can no longer work. We now use `ErrorClassJsonReader` (these are the changes to `DeltaThrowableHelper` and `DeltaThrowableSuite`
  - Many error functions switched the first argument from `message: String` to `errorClass: String` which **does not** cause a compile error, but instead causes a "SparkException-error not found" when called. Some things affected include `ParseException(...)`, `a.failAnalysis(..)`.
  - Supports error subclasses
- Spark 3.4 supports insert-into-by-name and no longer reorders such queries to be insert-into-by-ordinal. See apache/spark#39334. In `DeltaAnalysis.scala` we need to perform schema validation checks and schema evolution for such queries; right now we only match when `!isByName`
- SPARK-27561 added support for lateral column alias. This broke our generation expression validation checks for generated columns. We now separately check for generated columns that reference other generated columns in `GeneratedColumn.scala`
- `DelegatingCatalogExtension` deprecates `createTable(..., schema: StructType, ...)` in favor of `createTable(..., columns: Array[Column], ...)`
- `_metadata.file_path` is not always encoded. We update `DeleteWithDeletionVectorsHelper.scala` to accomodate for this.
- Support for SQL `REPLACE WHERE`. Tests are added to `DeltaSuite`.
-  Misc test changes due to minor changes in Spark behavior or error messages

Resolves #1696

Existing tests should suffice since there are no major Delta behavior changes _besides_ support for `REPLACE WHERE` for which we have added tests.

## Does this PR introduce _any_ user-facing changes?

Yes. Spark 3.4 will be supported. `REPLACE WHERE` is supported in SQL.

GitOrigin-RevId: b282c95c4e6a7a1915c2a4ae9841b5e43ed4724d
  • Loading branch information
allisonport-db committed May 3, 2023
1 parent 9f217a5 commit 5c3f4d3
Show file tree
Hide file tree
Showing 49 changed files with 361 additions and 338 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
pyenv install 3.7.4
pyenv global system 3.7.4
pipenv --python 3.7 install
pipenv run pip install pyspark==3.3.2
pipenv run pip install pyspark==3.4.0
pipenv run pip install flake8==3.5.0 pypandoc==1.3.3
pipenv run pip install importlib_metadata==3.10.0
pipenv run pip install mypy==0.910
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ RUN apt-get update && apt-get install -y python3-pip
# cryptography. Otherwise, building wheels for these packages fails.
RUN pip3 install --upgrade pip

RUN pip3 install pyspark==3.3.2
RUN pip3 install pyspark==3.4.0

RUN pip3 install mypy==0.910

Expand Down
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.nio.file.Files
import TestParallelization._

val sparkVersion = "3.3.2"
val sparkVersion = "3.4.0"
val scala212 = "2.12.15"
val scala213 = "2.13.5"
val default_scala_version = scala212
Expand Down Expand Up @@ -204,6 +204,8 @@ lazy val storageS3DynamoDB = (project in file("storage-s3-dynamodb"))
)
)

// Requires iceberg release on 3.4
/**
lazy val deltaIceberg = (project in file("delta-iceberg"))
.dependsOn(core % "compile->compile;test->test;provided->provided")
.settings (
Expand All @@ -221,6 +223,7 @@ lazy val deltaIceberg = (project in file("delta-iceberg"))
"org.scala-lang.modules" %% "scala-collection-compat" % "2.1.1"
)
)
*/

/**
* Get list of python files and return the mapping between source files and target paths
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1023,12 +1023,24 @@
],
"sqlState" : "42601"
},
"DELTA_MERGE_RESOLVED_ATTRIBUTE_MISSING_FROM_INPUT" : {
"message" : [
"Resolved attribute(s) <missingAttributes> missing from <input> in operator <merge>"
],
"sqlState" : "42601"
},
"DELTA_MERGE_UNEXPECTED_ASSIGNMENT_KEY" : {
"message" : [
"Unexpected assignment key: <unexpectedKeyClass> - <unexpectedKeyObject>"
],
"sqlState" : "22005"
},
"DELTA_MERGE_UNRESOLVED_EXPRESSION" : {
"message" : [
"Cannot resolve <sqlExpr> in <clause> given <cols>"
],
"sqlState" : "42601"
},
"DELTA_METADATA_ABSENT" : {
"message" : [
"Couldn't find Metadata while committing the first version of the Delta table."
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
case replaceHeader: ReplaceTableHeaderContext =>
(visitTableIdentifier(replaceHeader.table), replaceHeader.CREATE() != null, true, false)
case _ =>
throw new ParseException("Incorrect CLONE header expected REPLACE or CREATE table", ctx)
throw new DeltaParseException("Incorrect CLONE header expected REPLACE or CREATE table", ctx)
}
}

Expand All @@ -273,7 +273,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
val (target, isCreate, isReplace, ifNotExists) = visitCloneTableHeader(ctx.cloneTableHeader())

if (!isCreate && ifNotExists) {
throw new ParseException(
throw new DeltaParseException(
"IF NOT EXISTS cannot be used together with REPLACE", ctx.cloneTableHeader())
}

Expand Down Expand Up @@ -340,7 +340,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
*/
override def visitOptimizeTable(ctx: OptimizeTableContext): AnyRef = withOrigin(ctx) {
if (ctx.path == null && ctx.table == null) {
throw new ParseException("OPTIMIZE command requires a file path or table name.", ctx)
throw new DeltaParseException("OPTIMIZE command requires a file path or table name.", ctx)
}
val interleaveBy = Option(ctx.zorderSpec).map(visitZorderSpec).getOrElse(Seq.empty)
OptimizeTableCommand(
Expand Down Expand Up @@ -406,7 +406,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
ctx.identifier.asScala.toSeq match {
case Seq(tbl) => TableIdentifier(tbl.getText)
case Seq(db, tbl) => TableIdentifier(tbl.getText, Some(db.getText))
case _ => throw new ParseException(s"Illegal table name ${ctx.getText}", ctx)
case _ => throw new DeltaParseException(s"Illegal table name ${ctx.getText}", ctx)
}
}

Expand Down Expand Up @@ -537,7 +537,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
case ("interval", Nil) => CalendarIntervalType
case (dt, params) =>
val dtStr = if (params.nonEmpty) s"$dt(${params.mkString(",")})" else dt
throw new ParseException(s"DataType $dtStr is not supported.", ctx)
throw new DeltaParseException(s"DataType $dtStr is not supported.", ctx)
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions core/src/main/scala/io/delta/tables/DeltaTableBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -339,17 +339,15 @@ class DeltaTableBuilder private[tables](

val stmt = builderOption match {
case CreateTableOptions(ifNotExists) =>
val unresolvedTable: LogicalPlan =
org.apache.spark.sql.catalyst.analysis.UnresolvedDBObjectName(table, isNamespace = false)
val unresolvedTable = org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier(table)
CreateTable(
unresolvedTable,
StructType(columns.toSeq),
partitioning,
tableSpec,
ifNotExists)
case ReplaceTableOptions(orCreate) =>
val unresolvedTable: LogicalPlan =
org.apache.spark.sql.catalyst.analysis.UnresolvedDBObjectName(table, isNamespace = false)
val unresolvedTable = org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier(table)
ReplaceTable(
unresolvedTable,
StructType(columns.toSeq),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,10 @@ object DeltaMergeInto {
// Note: This will throw error only on unresolved attribute issues,
// not other resolution errors like mismatched data types.
val cols = "columns " + plan.children.flatMap(_.output).map(_.sql).mkString(", ")
a.failAnalysis(msg = s"cannot resolve ${a.sql} in $mergeClauseType given $cols")
throw new DeltaAnalysisException(
errorClass = "DELTA_MERGE_UNRESOLVED_EXPRESSION",
messageParameters = Array(a.sql, mergeClauseType, cols),
origin = Some(a.origin))
}
resolvedExpr
}
Expand Down Expand Up @@ -536,7 +539,8 @@ object DeltaMergeInto {
Seq(d)

case _ =>
action.failAnalysis(msg = s"Unexpected action expression '$action' in clause $clause")
action.failAnalysis("INTERNAL_ERROR",
Map("message" -> s"Unexpected action expression '$action' in clause $clause"))
}
}

Expand Down Expand Up @@ -625,9 +629,12 @@ object DeltaMergeInto {
if (resolvedMerge.missingInput.nonEmpty) {
val missingAttributes = resolvedMerge.missingInput.mkString(",")
val input = resolvedMerge.inputSet.mkString(",")
val msgForMissingAttributes = s"Resolved attribute(s) $missingAttributes missing " +
s"from $input in operator ${resolvedMerge.simpleString(SQLConf.get.maxToStringFields)}."
resolvedMerge.failAnalysis(msg = msgForMissingAttributes)
throw new DeltaAnalysisException(
errorClass = "DELTA_MERGE_RESOLVED_ATTRIBUTE_MISSING_FROM_INPUT",
messageParameters = Array(missingAttributes, input,
resolvedMerge.simpleString(SQLConf.get.maxToStringFields)),
origin = Some(resolvedMerge.origin)
)
}

resolvedMerge
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ object ColumnWithDefaultExprUtils extends DeltaLogging {
incrementalExecution.queryId,
incrementalExecution.runId,
incrementalExecution.currentBatchId,
incrementalExecution.prevOffsetSeqMetadata,
incrementalExecution.offsetSeqMetadata
)
newIncrementalExecution.executedPlan // Force the lazy generation of execution plan
Expand Down
Loading

0 comments on commit 5c3f4d3

Please sign in to comment.