Skip to content

Commit

Permalink
Merge branch 'delta-io:master' into docs
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Dec 19, 2023
2 parents 439d06a + a6a7c3c commit c6fb4c9
Show file tree
Hide file tree
Showing 134 changed files with 6,002 additions and 2,246 deletions.
69 changes: 56 additions & 13 deletions PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,9 @@ tags | Map[String, String] | Map containing metadata about this logical file | o
deletionVector | [DeletionVectorDescriptor Struct](#Deletion-Vectors) | Either null (or absent in JSON) when no DV is associated with this data file, or a struct (described below) that contains necessary information about the DV that is part of this logical file. | optional
baseRowId | Long | Default generated Row ID of the first row in the file. The default generated Row IDs of the other rows in the file can be reconstructed by adding the physical index of the row within the file to the base Row ID. See also [Row IDs](#row-ids) | optional
defaultRowCommitVersion | Long | First commit version in which an `add` action with the same `path` was committed to the table. | optional
clusteringProvider | String | The name of the clustering implementation. See also [Clustered Table](#clustered-table)| optional

The following is an example `add` action:
The following is an example `add` action for a partitioned table:
```json
{
"add": {
Expand All @@ -425,6 +426,23 @@ The following is an example `add` action:
}
```

The following is an example `add` action for a clustered table:
```json
{
"add": {
"path": "date=2017-12-10/part-000...c000.gz.parquet",
"partitionValues": {},
"size": 841454,
"modificationTime": 1512909768000,
"dataChange": true,
"baseRowId": 4071,
"defaultRowCommitVersion": 41,
"clusteringProvider": "liquid",
"stats": "{\"numRecords\":1,\"minValues\":{\"val..."
}
}
```

The schema of the `remove` action is as follows:

Field Name | Data Type | Description | optional/required
Expand Down Expand Up @@ -492,12 +510,16 @@ When available, change data readers should use the `cdc` actions in a given tabl
Specifically, to read the row-level changes made in a version, the following strategy should be used:
1. If there are `cdc` actions in this version, then read only those to get the row-level changes, and skip the remaining `add` and `remove` actions in this version.
2. Otherwise, if there are no `cdc` actions in this version, read and treat all the rows in the `add` and `remove` actions as inserted and deleted rows, respectively.
3. The following extra columns should also be generated:
3. Change data readers should return the following extra columns:

Field Name | Data Type | Description
-|-|-
_commit_version|`Long`| The table version containing the change. This can be got from the name of the Delta log file that contains actions.
_commit_timestamp|`Timestamp`| The timestamp associated when the commit was created. This can be got from the file modification time of the Delta log file that contains actions.
Field Name | Data Type | Description
-|-|-
_commit_version|`Long`| The table version containing the change. This can be derived from the name of the Delta log file that contains actions.
_commit_timestamp|`Timestamp`| The timestamp associated when the commit was created. This can be derived from the file modification time of the Delta log file that contains actions.

##### Note for non-change data readers

In a table with Change Data Feed enabled, the data Parquet files referenced by `add` and `remove` actions are allowed to contain an extra column `_change_type`. This column is not present in the table's schema and will consistently have a `null` value. When accessing these files, readers should disregard this column and only process columns defined within the table's schema.

### Transaction Identifiers
Incremental processing systems (e.g., streaming systems) that track progress using their own application-specific versions need to record what progress has been made, in order to avoid duplicating data in the face of failures and retries during a write.
Expand Down Expand Up @@ -1060,27 +1082,48 @@ When Row Tracking is enabled (when the table property `delta.enableRowTracking`

The Clustered Table feature facilitates the physical clustering of rows that share similar values on a predefined set of clustering columns.
This enhances query performance when selective filters are applied to these clustering columns through data skipping.
Clustering columns must be specified during the initial definition of a clustered table, and they can be modified after the table has been created.
Clustering columns can be specified during the initial creation of a table, or they can be added later, provided that the table doesn't have partition columns.

A table is defined as a clustered table through the following criteria:
- When the feature `clustering` exists in the table `protocol`'s `writerFeatures`, then we say that the table is a clustered table.
The feature `domainMetadata` is required in the table `protocol`'s `writerFeatures`.

Enablement:
- The table must be on Writer Version 7.
- The feature `clustering` must exist in the table `protocol`'s `writerFeatures`.
- The feature `clustering` must exist in the table `protocol`'s `writerFeatures`, either during its creation or at a later stage, provided the table does not have partition columns.

## Writer Requirements for Clustered Table

When the Clustered Table is supported (when the `writerFeatures` field of a table's `protocol` action contains `clustering`), then:
- Writers must write out [per-file statistics](#per-file-statistics) and per-column statistics for clustering columns in `add` action.
Failure to collect per-column statistics for clustering columns will result in an error when defining a clustered table or making changes to the clustering columns.
- Writers must track clustering column names in a `domainMetadata` action with `delta.clustering` as the `domain` and a `configuration` containing all clustering column names.
If [Column Mapping](#column-mapping) is enabled, the physical column names should be used.
- When a clustering implementation clusters files, writers must incorporate a `tag` with `CLUSTERED_BY` as the key and the name of the clustering implementation as the corresponding value in `add` action.
- A clustering implementation must only cluster files that belong to the implementation or files that do not have the `CLUSTERED_BY` tag (i.e., unclustered).
- Writer is not required to cluster a specific file at any specific moment though it is still obligated to record accurate statistics. However, if it decides to cluster a particular file, it must include the CLUSTERED_BY tag.
- Writers must write out [per-file statistics](#per-file-statistics) and per-column statistics for clustering columns in `add` action.
If a new column is included in the clustering columns list, it is required for all table files to have statistics for these added columns.
- When a clustering implementation clusters files, writers must set the name of the clustering implementation in the `clusteringProvider` field when adding `add` actions for clustered files.
- By default, a clustering implementation must only recluster files that have the field `clusteringProvider` set to the name of the same clustering implementation, or to the names of other clustering implementations that are superseded by the current clustering implementation. In addition, a clustering implementation may cluster any files with an unset `clusteringProvider` field (i.e., unclustered files).
- Writer is not required to cluster a specific file at any specific moment.
- A clustering implementation is free to add additional information such as adding a new user-controlled metadata domain to keep track of its metadata.
- Writers must not define clustered and partitioned table at the same time.

The following is an example for the `domainMetadata` action defintion of a table that leverages column mapping.
```json
{
"domainMetadata": {
"domain": "delta.clustering",
"configuration": "{\"clusteringColumns\":[\"col-daadafd7-7c20-4697-98f8-bff70199b1f9\", \"col-5abe0e80-cf57-47ac-9ffc-a861a3d1077e\"]}",
"removed": false
}
}
```
The example above converts `configuration` field into JSON format, including escaping characters. Here's how it looks in plain JSON for better understanding.
```json
{
"clusteringColumns": [
"col-daadafd7-7c20-4697-98f8-bff70199b1f9",
"col-5abe0e80-cf57-47ac-9ffc-a861a3d1077e"
]
}
```

# Requirements for Writers
This section documents additional requirements that writers must follow in order to preserve some of the higher level guarantees that Delta provides.
Expand Down
7 changes: 5 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,13 @@ lazy val kernelApi = (project in file("kernel/kernel-api"))
Test / javaOptions ++= Seq("-ea"),
libraryDependencies ++= Seq(
"org.roaringbitmap" % "RoaringBitmap" % "0.9.25",
"org.slf4j" % "slf4j-api" % "2.0.9",

"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5" % "test",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"junit" % "junit" % "4.13" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test"
"com.novocode" % "junit-interface" % "0.11" % "test",
"org.slf4j" % "slf4j-log4j12" % "2.0.9" % "test"
),
javaCheckstyleSettings("kernel/dev/checkstyle.xml"),
// Unidoc settings
Expand All @@ -251,7 +253,8 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults"))
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"junit" % "junit" % "4.13" % "test",
"commons-io" % "commons-io" % "2.8.0" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test"
"com.novocode" % "junit-interface" % "0.11" % "test",
"org.slf4j" % "slf4j-log4j12" % "2.0.9" % "test"
),
javaCheckstyleSettings("kernel/dev/checkstyle.xml"),
// Unidoc settings
Expand Down
4 changes: 2 additions & 2 deletions connectors/flink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ See the [Java API docs](https://delta-io.github.io/connectors/latest/delta-flink

### Known limitations

- For Azure Blob Storage, the current version only supports reading. Writing to Azure Blob Storage is not supported by Flink due to [issue](https://issues.apache.org/jira/browse/FLINK-17444) with class shading
and will probably be added along with [Azure Data Lake Store Gen 2 support](https://issues.apache.org/jira/browse/FLINK-18568).
- For Azure Blob Storage, the current version only supports reading. Writing to Azure Blob Storage is not supported by Flink due to [issue](https://issues.apache.org/jira/browse/FLINK-17444) with class shading.
However, since Flink 1.17 Azure Data Lake Gen2 is supported – see [FLINK-30128](https://issues.apache.org/jira/browse/FLINK-30128).
- For AWS S3 storage, in order to ensure concurrent transactional writes from different clusters, use [multi-cluster configuration guidelines](https://docs.delta.io/latest/delta-storage.html#multi-cluster-setup). Please see [example](#3-sink-creation-with-multi-cluster-support-for-delta-standalone) for how to use this configuration in Flink Delta Sink.

## Delta Sink
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit c6fb4c9

Please sign in to comment.