Skip to content

Commit

Permalink
Support Spark 3.4
Browse files Browse the repository at this point in the history
Makes changes to support Spark 3.4. These include compile necessary changes, and test _and_ code changes due to changes in Spark behavior.

Some of the bigger changes include
- A lot of changes regarding error classes. These include...
  - Spark 3.4 changed `class ErrorInfo` to private. This means the current approach in `DeltaThrowableHelper` can no longer work. We now use `ErrorClassJsonReader` (these are the changes to `DeltaThrowableHelper` and `DeltaThrowableSuite`
  - Many error functions switched the first argument from `message: String` to `errorClass: String` which **does not** cause a compile error, but instead causes a "SparkException-error not found" when called. Some things affected include `ParseException(...)`, `a.failAnalysis(..)`.
  - Supports error subclasses
- Spark 3.4 supports insert-into-by-name and no longer reorders such queries to be insert-into-by-ordinal. See apache/spark#39334. In `DeltaAnalysis.scala` we need to perform schema validation checks and schema evolution for such queries; right now we only match when `!isByName`
- SPARK-27561 added support for lateral column alias. This broke our generation expression validation checks for generated columns. We now separately check for generated columns that reference other generated columns in `GeneratedColumn.scala`
- `DelegatingCatalogExtension` deprecates `createTable(..., schema: StructType, ...)` in favor of `createTable(..., columns: Array[Column], ...)`
- `_metadata.file_path` is not always encoded. We update `DeleteWithDeletionVectorsHelper.scala` to accomodate for this.
- Support for SQL `REPLACE WHERE`. Tests are added to `DeltaSuite`.
-  Misc test changes due to minor changes in Spark behavior or error messages

Resolves delta-io#1696

Existing tests should suffice since there are no major Delta behavior changes _besides_ support for `REPLACE WHERE` for which we have added tests.

Yes. Spark 3.4 will be supported. `REPLACE WHERE` is supported in SQL.

GitOrigin-RevId: b282c95c4e6a7a1915c2a4ae9841b5e43ed4724d

Fix a test in DeltaVacuumSuite to pass locally

"vacuum after purging deletion vectors" in `DeltaVacuumSuite` fails locally because the local filesystem only writes modification times to second accuracy.

This means a transaction might have timestamp `1683694325000` but the tombstone for a file removed in that transaction could have deletionTimestamp `1683694325372`.

---> The test fails since we set the clock to the transaction timestamp + retention period, which isn't late enough to expire the tombstones in that transaction.

GitOrigin-RevId: 63018c48524edb0f8edd9e40f1b21cc97bc546cc

Add estLogicalFileSize to FileAction

Add estLogicalFileSize to FileAction for easier Deletion Vector processing.

GitOrigin-RevId: c7cf0ad32e378bcfc4e4c046c5d76667bb8659c7

Support insert-into-by-name for generated columns

Spark 3.4 no longer requires users to provide _all_ columns in insert-by-name queries. This means Delta can now support omitting generated columns from the column list in such queries.

This test adds support for this and adds some additional tests related to the changed by-name support.

Resolves delta-io#1215

Adds unit tests.

Yes. Users will be able to omit generated columns from the column list when inserting by name.

Closes delta-io#1743

GitOrigin-RevId: 8694fab3d93b71b4230bf6f5dd0f2a21be6f3634

Implement PURGE to remove DVs from Delta tables

This PR introduces a `REORG TABLE ... APPLY (PURGE)` SQL command that can materialize soft-delete operations by DVs.

The command works by rewriting and bin-packing (if applicable) only files that have DVs attached, which is different from the `OPTIMIZE` command where all files (with and without) DV will be bin-packed. To achieve this, we hack the `OPTIMIZE` logic so files of any size with DVs will be rewritten.

Follow-up:
- Set the correct commit info. Now the resulting version is marked as `optimize` rather than `purge`.
- Clean up DVs from the filesystem.

New tests.

Closes delta-io#1732

Signed-off-by: Venki Korukanti <venki.korukanti@databricks.com>
GitOrigin-RevId: 98ef156d62698986bfb54681e386971e2fec08b8

Unify predicate strings in CommitInfo to record the information in a consistent way.

GitOrigin-RevId: 043a6a4181c112b9c9a45906c1275fbbdbbb1388

Minor refactoring to Delta source.

GitOrigin-RevId: 3625a5c44999139ef4976c62473b233167a4aa83

Add Option.whenNot Scala extension helper and replace usage of Option.when(!cond).

GitOrigin-RevId: e26244544cadeeff1d55862f840d4c6c5570e83b

Introduce DomainMetadata action to delta spec

We propose to introduce a new Action type called DomainMetadata to the Delta spec. In a nutshell, DomainMetadata allows specifying configurations (string key/value pairs) per metadata domain, and a custom conflict handler can be registered to a metadata domain. More details can be found in the design doc [here](https://docs.google.com/document/d/16MHP7P78cq9g8SWhAyfgFlUe-eH0xhnMAymgBVR1fCA/edit?usp=sharing).

The github issue delta-io#1741 was created.

Spec only change and no test is needed.

Closes delta-io#1742

GitOrigin-RevId: 5d33d8b99e33c5c1e689672a8ca2ab3863feab54

DV stress test: Delete from a table of a large number of rows with DVs

This PR tests DELETing from a table of 2 billion rows (`2<<31 + 10`), some of which are marked as deleted by a DV. The goal is to ensure that DV can still be read and manipulated in such a scenario.

We don't `delete a large number of rows` and `materialize DV` because they run too slow to fit in a unit test (9 and 20 minutes respectively).

GitOrigin-RevId: 1273c9372907be0345465c2176a7f76115adbb47

RESTORE support for Delta tables with deletion vectors

This PR is part of the feature: Support Delta tables with deletion vectors (more details at delta-io#1485)

It adds running RESTORE on a Delta table with deletion vectors. The main change is to take into consideration of the `AddFile.deletionVector` when comparing the target version being restored to and the current version to find the list of data files to add and remove.

Added tests

Closes delta-io#1735

GitOrigin-RevId: b722e0b058ede86f652cd4e4229a7217916511da

Disallow overwriteSchema with dynamic partitions overwrite

Disallow overwriteSchema when partitionOverwriteMode is set to dynamic.
Otherwise, the table might become corrupted as schemas of newly written partitions would
not match the non-overwritten partitions.

GitOrigin-RevId: 1012793448c1ffed9a3f8bde507d9fe1ee183803

SHALLOW CLONE support for Delta tables with deletion vectors.

This PR is part of the feature: Support Delta tables with deletion vectors (more details at delta-io#1485)

This PR adds support for SHALLOW CLONEing a Delta table with deletion vectors. The main change is to convert the relative path of DV file in `AddFile` to absolute path when cloning the table.

Added tests

Closes delta-io#1733

GitOrigin-RevId: b634496b57b93fc4b7a7cc16e33c200e3a83ba64

Adds tests for REPLACE WHERE SQL syntax

Spark 3.4 added RELACE WHERE SQL support for insert. This PR adds tests for the feature after upgrading to Spark 3.4.

Closes delta-io#1737

GitOrigin-RevId: 8bf0e7423a6f0846d5f9ef4e637ee9ced9bef8d1

Fix a test in `DeltaThrowableSuite.scala`

Fix a test in `DeltaThrowableSuite.scala`

GitOrigin-RevId: 28acd5fe8d8cadd569c479fe0f02d99dac1c13b3

Fix statistics computation issues with Delta tables with DVs

This PR makes following changes:
- Delta protocol [requires](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#writer-requirement-for-deletion-vectors) that every `AddFile` with DV must have `numRecords` in file statistics. The current implementation of DELETE with DVs violates this requirement when the source `AddFile` has no statistics to begin with. This PR fixes it by computing stats for `AddFile`s with missing stats and have DVs generated as part of the DELETE with DV operation. The stats are generated by reading the Parquet file footer.
- DELETE with DVs currently has a bug where setting the `tightBounds=false` for `AddFile`s with DVs doesn't correctly set the `NULL_COUNT` for column with all nulls.
- Throw error when stats re-computation command is run on Delta tables with DVs. This is a TODO, we need to implement it but for now throw error to avoid calculating wrong statistics for Delta tables with DVs.

GitOrigin-RevId: f69968961dcf4766b6847a191b66aae7f9ff295d

Remove the check that disables writes to Delta tables with deletion vectors

Given that now we have support for writing into DV tables and table utility operations as as part of the delta-io#1485 and delta-io#1591, we should remove the check.

Closes delta-io#1736

Signed-off-by: Venki Korukanti <venki.korukanti@databricks.com>
GitOrigin-RevId: 17e7e9c6796229ada77148a730c69348a55890b9

Regex based table matching in DeleteScalaSuite

Use a more reliable regex-based approach to getting a `DeltaTable` instance from a sql identifier string in `DeleteScalaSuite`.

GitOrigin-RevId: 1d0e1477a7d22373e8478d7debc3565c092090da

Enable SQL support for WHEN NOT MATCHED BY SOURCE

The SQL syntax for merge with WHEN NOT MATCHED BY SOURCE clauses was shipped with Spark 3.4. Now that Delta picked up Spark 3.4, we can enable SQL support and mix in SQL tests for WHEN NOT MATCHED BY SOURCE.

Existing tests for WHEN NOT MATCHED BY SOURCE are now run in the Merge SQL suite.

Closes delta-io#1740

GitOrigin-RevId: 1ddd1216e13f854901da47896936527618ea4dca

Minor refactor to DeltaCatalog.scala

GitOrigin-RevId: 53b083f9abf92330d253fbdd9208d2783428dd98

Correctly recurse into nested arrays & maps in add/drop columns

It is not possible today in Delta tables to add or drop nested fields under two or more levels of directly nested arrays or maps.
The following is a valid use case but fails today:
```
CREATE TABLE test (data array<array<struct<a: int>>>)
ALTER TABLE test ADD COLUMNS (data.element.element.b string)
```

This change updates helper methods `findColumnPosition`, `addColumn` and `dropColumn` in `SchemaUtils` to correctly recurse into directly nested maps and arrays.

Note that changes in Spark are also required for `ALTER TABLE ADD/CHANGE/DROP COLUMN`  to work: apache/spark#40879. The fix is merged in Spark but will only be available in Delta in the next Spark release.

In addition, `findColumnPosition` which currently both returns the position of nested field and the size of its parent, making it overly complex, is split into two distinct and generic methods: `findColumnPosition` and `getNestedTypeFromPosition`.

- Tests for `findColumnPosition`, `addColumn` and `dropColumn` with two levels of nested maps and arrays are added to `SchemaUtilsSuite`. Other cases for these methods are already covered by existing tests.
- Tested locally that  ALTER TABLE ADD/CHANGE/DROP COLUMN(S) works correctly with Spark fix apache/spark#40879
- Added missing tests coverage for ALTER TABLE ADD/CHANGE/DROP COLUMN(S) with a single map or array.

Closes delta-io#1731

GitOrigin-RevId: 53ed05813f4002ae986926506254d780e2ecddfa
  • Loading branch information
allisonport-db authored and sirsha-chatterjee committed May 16, 2023
1 parent 9f217a5 commit 1dbc04c
Show file tree
Hide file tree
Showing 103 changed files with 3,894 additions and 946 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
pyenv install 3.7.4
pyenv global system 3.7.4
pipenv --python 3.7 install
pipenv run pip install pyspark==3.3.2
pipenv run pip install pyspark==3.4.0
pipenv run pip install flake8==3.5.0 pypandoc==1.3.3
pipenv run pip install importlib_metadata==3.10.0
pipenv run pip install mypy==0.910
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ RUN apt-get update && apt-get install -y python3-pip
# cryptography. Otherwise, building wheels for these packages fails.
RUN pip3 install --upgrade pip

RUN pip3 install pyspark==3.3.2
RUN pip3 install pyspark==3.4.0

RUN pip3 install mypy==0.910

Expand Down
54 changes: 50 additions & 4 deletions PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
- [Protocol Evolution](#protocol-evolution)
- [Commit Provenance Information](#commit-provenance-information)
- [Increase Row ID High-Water Mark](#increase-row-id-high-watermark)
- [Domain Metadata](#domain-metadata)
- [Reader Requirements for Domain Metadata](#reader-requirements-for-domain-metadata)
- [Writer Requirements for Domain Metadata](#writer-requirements-for-domain-metadata)
- [Action Reconciliation](#action-reconciliation)
- [Table Features](#table-features)
- [Table Features for new and Existing Tables](#table-features-for-new-and-existing-tables)
Expand Down Expand Up @@ -475,7 +478,7 @@ Protocol versioning allows a newer client to exclude older readers and/or writer
The _protocol version_ will be increased whenever non-forward-compatible changes are made to this specification.
In the case where a client is running an invalid protocol version, an error should be thrown instructing the user to upgrade to a newer protocol version of their Delta client library.

Since breaking changes must be accompanied by an increase in the protocol version recorded in a table or by the addition of a table feature, clients can assume that unrecognized fields or actions are never required in order to correctly interpret the transaction log. Clients must ignore such unrecognized fields, and should not produce an error when reading a table that contains unrecognized fields.
Since breaking changes must be accompanied by an increase in the protocol version recorded in a table or by the addition of a table feature, clients can assume that unrecognized actions, fields, and/or metadata domains are never required in order to correctly interpret the transaction log. Clients must ignore such unrecognized fields, and should not produce an error when reading a table that contains unrecognized fields.

Reader Version 3 and Writer Version 7 add two lists of table features to the protocol action. The capability for readers and writers to operate on such a table is not only dependent on their supported protocol versions, but also on whether they support all features listed in `readerFeatures` and `writerFeatures`. See [Table Features](#table-features) section for more information.

Expand Down Expand Up @@ -564,22 +567,63 @@ The following is an example `rowIdHighWaterMark` action:
}
```

### Domain Metadata
The domain metadata action contains a configuration (string-string map) for a named metadata domain. Two overlapping transactions conflict if they both contain a domain metadata action for the same metadata domain.

There are two types of metadata domains:
1. **User-controlled metadata domains** have names that start with anything other than the `delta.` prefix. Any Delta client implementation or user application can modify these metadata domains, and can allow users to modify them arbitrarily. Delta clients and user applications are encouraged to use a naming convention designed to avoid conflicts with other clients' or users' metadata domains (e.g. `com.databricks.*` or `org.apache.*`).
2. **System-controlled metadata domains** have names that start with the `delta.` prefix. This prefix is reserved for metadata domains defined by the Delta spec, and Delta client implementations must not allow users to modify the metadata for system-controlled domains. A Delta client implementation should only update metadata for system-controlled domains that it knows about and understands. System-controlled metadata domains are used by various table features and each table feature may impose additional semantics on the metadata domains it uses.

The schema of the `domainMetadata` action is as follows:

Field Name | Data Type | Description
-|-|-
domain | String | Identifier for this domain (system- or user-provided)
configuration | Map[String, String] | A map containing configuration for the metadata domain
removed | Boolean | When `true`, the action serves as a tombstone to logically delete a metadata domain. Writers should preserve an accurate pre-image of the configuration.

Enablement:
- The table must be on Writer Version 7.
- A feature name `domainMetadata` must exist in the table's `writerFeatures`.

#### Reader Requirements for Domain Metadata
- Readers must preserve all domains even if they don't understand them, i.e. the snapshot read must include them.
- Any system-controlled domain that requires special attention from a reader is a [breaking change](#protocol-evolution), and must be part of a reader-writer table feature that specifies the desired behavior.

#### Writer Requirements for Domain Metadata
- Writers must not allow users to modify or delete system-controlled domains.
- Writers must only modify or delete system-controlled domains they understand.
- Any system-controlled domain that needs special attention from a writer is a [breaking change](#protocol-evolution), and must be part of a writer table feature that specifies the desired behavior.

The following is an example `domainMetadata` action:
```json
{
"domainMetadata": {
"domain": "delta.deltaTableFeatureX",
"configuration": {"key1": "..."},
"removed": false
}
}
```

# Action Reconciliation
A given snapshot of the table can be computed by replaying the events committed to the table in ascending order by commit version. A given snapshot of a Delta table consists of:

- A single `protocol` action
- A single `metaData` action
- At most one `rowIdHighWaterMark` action
- A map from `appId` to transaction `version`
- A collection of `add` actions with unique `path`s.
- A collection of `txn` actions with unique `appId`s
- A collection of `domainMetadata` actions with unique `domain`s.
- A collection of `add` actions with unique `(path, deletionVector.uniqueId)` keys.
- A collection of `remove` actions with unique `(path, deletionVector.uniqueId)` keys. The intersection of the primary keys in the `add` collection and `remove` collection must be empty. That means a logical file cannot exist in both the `remove` and `add` collections at the same time; however, the same *data file* can exist with *different* DVs in the `remove` collection, as logically they represent different content. The `remove` actions act as _tombstones_, and only exist for the benefit of the VACUUM command. Snapshot reads only return `add` actions on the read path.

To achieve the requirements above, related actions from different delta files need to be reconciled with each other:

- The latest `protocol` action seen wins
- The latest `metaData` action seen wins
- The latest `rowIdHighWaterMark` action seen wins
- For transaction identifiers, the latest `version` seen for a given `appId` wins
- For `txn` actions, the latest `version` seen for a given `appId` wins
- For `domainMetadata`, the latest `domainMetadata` seen for a given `domain` wins. The actions with `removed=true` act as tombstones to suppress earlier versions. Snapshot reads do _not_ return removed `domainMetadata` actions.
- Logical files in a table are identified by their `(path, deletionVector.uniqueId)` primary key. File actions (`add` or `remove`) reference logical files, and a log can contain any number of references to a single file.
- To replay the log, scan all file actions and keep only the newest reference for each logical file.
- `add` actions in the result identify logical files currently present in the table (for queries). `remove` actions in the result identify tombstones of logical files no longer present in the table (for VACUUM).
Expand Down Expand Up @@ -861,6 +905,7 @@ Checkpoint files must be written in [Apache Parquet](https://parquet.apache.org/
* The [metadata](#Change-Metadata) of the table
* Files that have been [added and removed](#Add-File-and-Remove-File)
* [Transaction identifiers](#Transaction-Identifiers)
* [Domain Metadata](#Domain-Metadata)

Commit provenance information does not need to be included in the checkpoint. All of these actions are stored as their individual columns in parquet as struct fields.

Expand Down Expand Up @@ -1019,6 +1064,7 @@ Feature | Name | Readers or Writers?
[Deletion Vectors](#deletion-vectors) | `deletionVectors` | Readers and writers
[Row IDs](#row-ids) | `rowIds` | Writers only
[Timestamp without Timezone](#timestamp-ntz) | `timestampNTZ` | Readers and writers
[Domain Metadata](#domain-metadata) | `domainMetadata` | Writers only

## Deletion Vector Format

Expand Down
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.nio.file.Files
import TestParallelization._

val sparkVersion = "3.3.2"
val sparkVersion = "3.4.0"
val scala212 = "2.12.15"
val scala213 = "2.13.5"
val default_scala_version = scala212
Expand Down Expand Up @@ -204,6 +204,8 @@ lazy val storageS3DynamoDB = (project in file("storage-s3-dynamodb"))
)
)

// Requires iceberg release on 3.4
/**
lazy val deltaIceberg = (project in file("delta-iceberg"))
.dependsOn(core % "compile->compile;test->test;provided->provided")
.settings (
Expand All @@ -221,6 +223,7 @@ lazy val deltaIceberg = (project in file("delta-iceberg"))
"org.scala-lang.modules" %% "scala-collection-compat" % "2.1.1"
)
)
*/

/**
* Get list of python files and return the mapping between source files and target paths
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ statement
| ALTER TABLE table=qualifiedName
DROP CONSTRAINT (IF EXISTS)? name=identifier #dropTableConstraint
| OPTIMIZE (path=STRING | table=qualifiedName)
(WHERE partitionPredicate = predicateToken)?
(WHERE partitionPredicate=predicateToken)?
(zorderSpec)? #optimizeTable
| REORG TABLE table=qualifiedName
(WHERE partitionPredicate=predicateToken)?
APPLY LEFT_PAREN PURGE RIGHT_PAREN #reorgTable
| SHOW COLUMNS (IN | FROM) tableName=qualifiedName
((IN | FROM) schemaName=identifier)? #showColumns
| cloneTableHeader SHALLOW CLONE source=qualifiedName clause=temporalClause?
Expand Down Expand Up @@ -210,6 +213,7 @@ nonReserved
| CONVERT | TO | DELTA | PARTITIONED | BY
| DESC | DESCRIBE | LIMIT | DETAIL
| GENERATE | FOR | TABLE | CHECK | EXISTS | OPTIMIZE
| REORG | APPLY | PURGE
| RESTORE | AS | OF
| ZORDER | LEFT_PAREN | RIGHT_PAREN
| SHOW | COLUMNS | IN | FROM | NO | STATISTICS
Expand All @@ -219,6 +223,7 @@ nonReserved
// Define how the keywords above should appear in a user's SQL statement.
ADD: 'ADD';
ALTER: 'ALTER';
APPLY: 'APPLY';
AS: 'AS';
BY: 'BY';
CHECK: 'CHECK';
Expand Down Expand Up @@ -255,7 +260,9 @@ NULL: 'NULL';
OF: 'OF';
OR: 'OR';
OPTIMIZE: 'OPTIMIZE';
REORG: 'REORG';
PARTITIONED: 'PARTITIONED';
PURGE: 'PURGE';
REPLACE: 'REPLACE';
RESTORE: 'RESTORE';
RETAIN: 'RETAIN';
Expand Down
36 changes: 36 additions & 0 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
],
"sqlState" : "0A000"
},
"DELTA_ADDING_DELETION_VECTORS_WITH_TIGHT_BOUNDS_DISALLOWED" : {
"message" : [
"All operations that add deletion vectors should set the tightBounds column in statistics to false. Please file a bug report."
],
"sqlState" : "42000"
},
"DELTA_ADD_COLUMN_AT_INDEX_LESS_THAN_ZERO" : {
"message" : [
"Index <columnIndex> to add column <columnName> is lower than 0"
Expand Down Expand Up @@ -1023,12 +1029,24 @@
],
"sqlState" : "42601"
},
"DELTA_MERGE_RESOLVED_ATTRIBUTE_MISSING_FROM_INPUT" : {
"message" : [
"Resolved attribute(s) <missingAttributes> missing from <input> in operator <merge>"
],
"sqlState" : "42601"
},
"DELTA_MERGE_UNEXPECTED_ASSIGNMENT_KEY" : {
"message" : [
"Unexpected assignment key: <unexpectedKeyClass> - <unexpectedKeyObject>"
],
"sqlState" : "22005"
},
"DELTA_MERGE_UNRESOLVED_EXPRESSION" : {
"message" : [
"Cannot resolve <sqlExpr> in <clause> given <cols>"
],
"sqlState" : "42601"
},
"DELTA_METADATA_ABSENT" : {
"message" : [
"Couldn't find Metadata while committing the first version of the Delta table."
Expand Down Expand Up @@ -1328,6 +1346,12 @@
],
"sqlState" : "0A000"
},
"DELTA_OVERWRITE_SCHEMA_WITH_DYNAMIC_PARTITION_OVERWRITE" : {
"message" : [
"'overwriteSchema' cannot be used in dynamic partition overwrite mode."
],
"sqlState" : "42613"
},
"DELTA_PARTITION_COLUMN_CAST_FAILED" : {
"message" : [
"Failed to cast value `<value>` to `<dataType>` for partition column `<columnName>`"
Expand Down Expand Up @@ -1557,6 +1581,12 @@
],
"sqlState" : "XXKDS"
},
"DELTA_STATS_COLLECTION_COLUMN_NOT_FOUND" : {
"message" : [
"<statsType> stats not found for column in Parquet metadata: <columnPath>."
],
"sqlState" : "42000"
},
"DELTA_STREAMING_CANNOT_CONTINUE_PROCESSING_POST_SCHEMA_EVOLUTION" : {
"message" : [
"We've detected a non-additive schema change (<opType>) at Delta version <schemaChangeVersion> in the Delta streaming source. Please check if you want to manually propagate this schema change to the sink table before we proceed with stream processing.",
Expand Down Expand Up @@ -2100,6 +2130,12 @@
],
"sqlState" : "0AKDD"
},
"DELTA_UNSUPPORTED_STATS_RECOMPUTE_WITH_DELETION_VECTORS" : {
"message" : [
"Statistics re-computation on a Delta table with deletion vectors is not yet supported."
],
"sqlState" : "0AKDD"
},
"DELTA_UNSUPPORTED_SUBQUERY" : {
"message" : [
"Subqueries are not supported in the <operation> (condition = <cond>)."
Expand Down
34 changes: 28 additions & 6 deletions core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
case replaceHeader: ReplaceTableHeaderContext =>
(visitTableIdentifier(replaceHeader.table), replaceHeader.CREATE() != null, true, false)
case _ =>
throw new ParseException("Incorrect CLONE header expected REPLACE or CREATE table", ctx)
throw new DeltaParseException("Incorrect CLONE header expected REPLACE or CREATE table", ctx)
}
}

Expand All @@ -273,7 +273,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
val (target, isCreate, isReplace, ifNotExists) = visitCloneTableHeader(ctx.cloneTableHeader())

if (!isCreate && ifNotExists) {
throw new ParseException(
throw new DeltaParseException(
"IF NOT EXISTS cannot be used together with REPLACE", ctx.cloneTableHeader())
}

Expand Down Expand Up @@ -340,13 +340,35 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
*/
override def visitOptimizeTable(ctx: OptimizeTableContext): AnyRef = withOrigin(ctx) {
if (ctx.path == null && ctx.table == null) {
throw new ParseException("OPTIMIZE command requires a file path or table name.", ctx)
throw new DeltaParseException("OPTIMIZE command requires a file path or table name.", ctx)
}
val interleaveBy = Option(ctx.zorderSpec).map(visitZorderSpec).getOrElse(Seq.empty)
OptimizeTableCommand(
Option(ctx.path).map(string),
Option(ctx.table).map(visitTableIdentifier),
Option(ctx.partitionPredicate).map(extractRawText(_)).toSeq, Map.empty)(interleaveBy)
Option(ctx.partitionPredicate).map(extractRawText(_)).toSeq,
Map.empty)(interleaveBy)
}

/**
* Creates a [[DeltaReorgTable]] logical plan.
* Examples:
* {{{
* -- Physically delete dropped rows and columns of target table
* REORG TABLE (delta.`/path/to/table` | delta_table_name)
* [WHERE partition_predicate] APPLY (PURGE)
* }}}
*/
override def visitReorgTable(ctx: ReorgTableContext): AnyRef = withOrigin(ctx) {
if (ctx.table == null) {
throw new ParseException("REORG command requires a file path or table name.", ctx)
}

val targetIdentifier = visitTableIdentifier(ctx.table)
val tableNameParts = targetIdentifier.database.toSeq :+ targetIdentifier.table
val targetTable = createUnresolvedTable(tableNameParts, "REORG")

DeltaReorgTable(targetTable)(Option(ctx.partitionPredicate).map(extractRawText(_)).toSeq)
}

override def visitDescribeDeltaDetail(
Expand Down Expand Up @@ -406,7 +428,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
ctx.identifier.asScala.toSeq match {
case Seq(tbl) => TableIdentifier(tbl.getText)
case Seq(db, tbl) => TableIdentifier(tbl.getText, Some(db.getText))
case _ => throw new ParseException(s"Illegal table name ${ctx.getText}", ctx)
case _ => throw new DeltaParseException(s"Illegal table name ${ctx.getText}", ctx)
}
}

Expand Down Expand Up @@ -537,7 +559,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
case ("interval", Nil) => CalendarIntervalType
case (dt, params) =>
val dtStr = if (params.nonEmpty) s"$dt(${params.mkString(",")})" else dt
throw new ParseException(s"DataType $dtStr is not supported.", ctx)
throw new DeltaParseException(s"DataType $dtStr is not supported.", ctx)
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions core/src/main/scala/io/delta/tables/DeltaTableBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -339,17 +339,15 @@ class DeltaTableBuilder private[tables](

val stmt = builderOption match {
case CreateTableOptions(ifNotExists) =>
val unresolvedTable: LogicalPlan =
org.apache.spark.sql.catalyst.analysis.UnresolvedDBObjectName(table, isNamespace = false)
val unresolvedTable = org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier(table)
CreateTable(
unresolvedTable,
StructType(columns.toSeq),
partitioning,
tableSpec,
ifNotExists)
case ReplaceTableOptions(orCreate) =>
val unresolvedTable: LogicalPlan =
org.apache.spark.sql.catalyst.analysis.UnresolvedDBObjectName(table, isNamespace = false)
val unresolvedTable = org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier(table)
ReplaceTable(
unresolvedTable,
StructType(columns.toSeq),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,10 @@ object DeltaMergeInto {
// Note: This will throw error only on unresolved attribute issues,
// not other resolution errors like mismatched data types.
val cols = "columns " + plan.children.flatMap(_.output).map(_.sql).mkString(", ")
a.failAnalysis(msg = s"cannot resolve ${a.sql} in $mergeClauseType given $cols")
throw new DeltaAnalysisException(
errorClass = "DELTA_MERGE_UNRESOLVED_EXPRESSION",
messageParameters = Array(a.sql, mergeClauseType, cols),
origin = Some(a.origin))
}
resolvedExpr
}
Expand Down Expand Up @@ -536,7 +539,8 @@ object DeltaMergeInto {
Seq(d)

case _ =>
action.failAnalysis(msg = s"Unexpected action expression '$action' in clause $clause")
action.failAnalysis("INTERNAL_ERROR",
Map("message" -> s"Unexpected action expression '$action' in clause $clause"))
}
}

Expand Down Expand Up @@ -625,9 +629,12 @@ object DeltaMergeInto {
if (resolvedMerge.missingInput.nonEmpty) {
val missingAttributes = resolvedMerge.missingInput.mkString(",")
val input = resolvedMerge.inputSet.mkString(",")
val msgForMissingAttributes = s"Resolved attribute(s) $missingAttributes missing " +
s"from $input in operator ${resolvedMerge.simpleString(SQLConf.get.maxToStringFields)}."
resolvedMerge.failAnalysis(msg = msgForMissingAttributes)
throw new DeltaAnalysisException(
errorClass = "DELTA_MERGE_RESOLVED_ATTRIBUTE_MISSING_FROM_INPUT",
messageParameters = Array(missingAttributes, input,
resolvedMerge.simpleString(SQLConf.get.maxToStringFields)),
origin = Some(resolvedMerge.origin)
)
}

resolvedMerge
Expand Down
Loading

0 comments on commit 1dbc04c

Please sign in to comment.