Skip to content

Commit

Permalink
Squashed commits:
Browse files Browse the repository at this point in the history
  - 193497a5e8e309d8a75bcaba4c68b795dc47e049 Address the review by Hao Jiang <hao.jiang@databricks.com>
  - b5a0eb1a8cf1e940a1919396c7f781cab2518b8f Address the review by Hao Jiang <hao.jiang@databricks.com>
  - 8591d13565a34b6ffa20bee240cc699359bb71c2 Address the review by Hao Jiang <hao.jiang@databricks.com>
  - 4096babc9674103d4fae3cd2a9713406c2e8dfd4 Address review comments by Hao Jiang <hao.jiang@databricks.com>
  - b9ae52a7577b6d990023bc526ddc9f34f662da25 Address Review Comments by Hao Jiang <hao.jiang@databricks.com>
  - 4e18c64afed8fc606274cecdff41d231b55ea077 Address Review Comments by Hao Jiang <hao.jiang@databricks.com>
  - 5e3c24cc7aeab2b34d70c23cbaaffaa02604be20 Add IcebergCompatV2 by Hao Jiang <hao.jiang@databricks.com>

GitOrigin-RevId: 193497a5e8e309d8a75bcaba4c68b795dc47e049
  • Loading branch information
harperjiang authored and scottsand-db committed Jan 31, 2024
1 parent 11cd832 commit 3d04e37
Show file tree
Hide file tree
Showing 95 changed files with 557 additions and 15,571 deletions.
111 changes: 5 additions & 106 deletions PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
- [Writer Requirement for Deletion Vectors](#writer-requirement-for-deletion-vectors)
- [Iceberg Compatibility V1](#iceberg-compatibility-v1)
- [Writer Requirements for IcebergCompatV1](#writer-requirements-for-icebergcompatv1)
- [Iceberg Compatibility V2](#iceberg-compatibility-v2)
- [Writer Requirement for IcebergCompatV2](#iceberg-compatibility-v2)
- [Timestamp without timezone (TimestampNtz)](#timestamp-without-timezone-timestampntz)
- [V2 Checkpoint Table Feature](#v2-checkpoint-table-feature)
- [Row Tracking](#row-tracking)
Expand Down Expand Up @@ -104,6 +102,8 @@

<!-- END doctoc generated TOC please keep comment here to allow auto update -->

<font color="red">THIS IS AN IN-PROGRESS DRAFT</font>

# Overview
This document is a specification for the Delta Transaction Protocol, which brings [ACID](https://en.wikipedia.org/wiki/ACID) properties to large collections of data, stored as files, in a distributed file system or object store. The protocol was designed with the following goals in mind:

Expand Down Expand Up @@ -905,7 +905,9 @@ To support this feature:
- The table must be on Writer Version 7.
- The feature `icebergCompatV1` must exist in the table `protocol`'s `writerFeatures`.

This table feature is enabled when the table property `delta.enableIcebergCompatV1` is set to `true`.
Activation: Set table property `delta.enableIcebergCompatV1` to `true`.

Deactivation: Unset table property `delta.enableIcebergCompatV1`, or set it to `false`.

## Writer Requirements for IcebergCompatV1

Expand All @@ -919,109 +921,6 @@ When supported and active, writers must:
- e.g. replacing a table partitioned by `part_a INT` with partition spec `part_b INT` must be blocked
- e.g. replacing a table partitioned by `part_a INT` with partition spec `part_a LONG` is allowed

# Iceberg Compatibility V2

This table feature (`icebergCompatV2`) ensures that Delta tables can be converted to Apache Iceberg™ format, though this table feature does not implement or specify that conversion.

To support this feature:
- Since this table feature depends on Column Mapping, the table must be on Reader Version = 2, or it must be on Reader Version >= 3 and the feature `columnMapping` must exist in the `protocol`'s `readerFeatures`.
- The table must be on Writer Version 7.
- The feature `icebergCompatV2` must exist in the table protocol's `writerFeatures`.

This table feature is enabled when the table property `delta.enableIcebergCompatV2` is set to `true`.

## Writer Requirements for IcebergCompatV2

When this feature is supported and enabled, writers must:
- Require that Column Mapping be enabled and set to either `name` or `id` mode
- Require that the nested `element` field of ArrayTypes and the nested `key` and `value` fields of MapTypes be assigned 32 bit integer identifiers. These identifiers must be unique and different from those used in [Column Mapping](#column-mapping), and must be stored in the metadata of their nearest ancestor [StructField](#struct-field) of the Delta table schema. Identifiers belonging to the same `StructField` must be organized as a `Map[String, Long]` and stored in metadata with key `parquet.field.nested.ids`. The keys of the map are "element", "key", or "value", prefixed by the name of the nearest ancestor StructField, separated by dots. The values are the identifiers. The keys for fields in nested arrays or nested maps are prefixed by their parents' key, separated by dots. An [example](#example-of-storing-identifiers-for-nested-fields-in-arraytype-and-maptype) is provided below to demonstrate how the identifiers are stored. These identifiers must be also written to the `field_id` field of the `SchemaElement` struct in the [Parquet Thrift specification](https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift) when writing parquet files.
- Require that IcebergCompatV1 is not active, which means either the `icebergCompatV1` table feature is not present in the table protocol or the table property `delta.enableIcebergCompatV1` is not set to `true`
- Require that Deletion Vectors are not active, which means either the `deletionVectors` table feature is not present in the table protocol or the table property `delta.enableDeletionVectors` is not set to `true`
- Require that partition column values be materialized when writing Parquet data files
- Require that all new `AddFile`s committed to the table have the `numRecords` statistic populated in their `stats` field
- Require writing timestamp columns as int64
- Require that the table schema contains only data types in the following allow-list: [`byte`, `short`, `integer`, `long`, `float`, `double`, `decimal`, `string`, `binary`, `boolean`, `timestamp`, `timestampNTZ`, `date`, `array`, `map`, `struct`].
- Block replacing partitioned tables with a differently-named partition spec
- e.g. replacing a table partitioned by `part_a INT` with partition spec `part_b INT` must be blocked
- e.g. replacing a table partitioned by `part_a INT` with partition spec `part_a LONG` is allowed

### Example of storing identifiers for nested fields in ArrayType and MapType
The following is an example of storing the identifiers for nested fields in `ArrayType` and `MapType`, of a table with the following schema,
```
|-- col1: array[array[int]]
|-- col2: map[int, array[int]]
|-- col3: map[int, struct]
|-- subcol1: array[int]
```
The identifiers for the nested fields are stored in the metadata as follows:
```json
[
{
"name": "col1",
"type": {
"type": "array",
"elementType": {
"type": "array",
"elementType": "int"
}
},
"metadata": {
"parquet.field.nested.ids": {
"col1.element": 100,
"col1.element.element": 101
}
}
},
{
"name": "col2",
"type": {
"type": "map",
"keyType": "int",
"valueType": {
"type": "array",
"elementType": "int"
}
},
"metadata": {
"parquet.field.nested.ids": {
"col2.key": 102,
"col2.value": 103,
"col2.value.element": 104
}
}
},
{
"name": "col3",
"type": {
"type": "map",
"keyType": "int",
"valueType": {
"type": "struct",
"fields": [
{
"name": "subcol1",
"type": {
"type": "array",
"elementType": "int"
},
"metadata": {
"parquet.field.nested.ids": {
"subcol1.element": 107
}
}
}
]
}
},
"metadata": {
"parquet.field.nested.ids": {
"col3.key": 105,
"col3.value": 106
}
}
}
]
```
# Timestamp without timezone (TimestampNtz)
This feature introduces a new data type to support timestamps without timezone information. For example: `1970-01-01 00:00:00`, or `1970-01-01 00:00:00.123456`.
The serialization method is described in Sections [Partition Value Serialization](#partition-value-serialization) and [Schema Serialization Format](#schema-serialization-format).
Expand Down
26 changes: 1 addition & 25 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -213,30 +213,6 @@ lazy val contribs = (project in file("contribs"))
Compile / compile := ((Compile / compile) dependsOn createTargetClassesDir).value
).configureUnidoc()

lazy val sharing = (project in file("sharing"))
.dependsOn(spark % "compile->compile;test->test;provided->provided")
.settings(
name := "delta-sharing-spark",
commonSettings,
scalaStyleSettings,
Test / javaOptions ++= Seq("-ea"),
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",

"io.delta" %% "delta-sharing-client" % "1.0.3",

// Test deps
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"org.scalatestplus" %% "scalacheck-1-15" % "3.2.9.0" % "test",
"junit" % "junit" % "4.12" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test",
"org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion % "test" classifier "tests",
)
).configureUnidoc()

lazy val kernelApi = (project in file("kernel/kernel-api"))
.settings(
name := "delta-kernel-api",
Expand Down Expand Up @@ -1108,7 +1084,7 @@ val createTargetClassesDir = taskKey[Unit]("create target classes dir")

// Don't use these groups for any other projects
lazy val sparkGroup = project
.aggregate(spark, contribs, storage, storageS3DynamoDB, iceberg, testDeltaIcebergJar, sharing)
.aggregate(spark, contribs, storage, storageS3DynamoDB, iceberg, testDeltaIcebergJar)
.settings(
// crossScalaVersions must be set to Nil on the aggregating project
crossScalaVersions := Nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,6 @@ object IcebergSchemaUtils extends DeltaLogging {
new IcebergSchema(icebergStruct.fields())
}

private[delta] def getNestedFieldId(field: Option[StructField], path: Seq[String]): Int = {
field.get.metadata
.getMetadata(DeltaColumnMapping.COLUMN_MAPPING_METADATA_NESTED_IDS_KEY)
.getLong(path.mkString("."))
.toInt
}

////////////////////
// Helper Methods //
////////////////////
Expand All @@ -65,8 +58,7 @@ object IcebergSchemaUtils extends DeltaLogging {
* - MapType -> IcebergTypes.MapType
* - primitive -> IcebergType.PrimitiveType
*/
def transform[E <: DataType](elem: E, field: Option[StructField], name: Seq[String])
: IcebergType = elem match {
def transform[E <: DataType](elem: E): IcebergType = elem match {
case StructType(fields) =>
IcebergTypes.StructType.of(fields.map { f =>
if (!DeltaColumnMapping.hasColumnId(f)) {
Expand All @@ -77,48 +69,24 @@ object IcebergSchemaUtils extends DeltaLogging {
DeltaColumnMapping.getColumnId(f),
f.nullable,
f.name,
transform(f.dataType, Some(f), Seq(DeltaColumnMapping.getPhysicalName(f))),
transform(f.dataType),
f.getComment().orNull
)
}.toList.asJava)

case ArrayType(elementType, containsNull) =>
val currName = name :+ DeltaColumnMapping.PARQUET_LIST_ELEMENT_FIELD_NAME
val id = getNestedFieldId(field, currName)
if (containsNull) {
IcebergTypes.ListType.ofOptional(id, transform(elementType, field, currName))
} else {
IcebergTypes.ListType.ofRequired(id, transform(elementType, field, currName))
}
throw new UnsupportedOperationException("UniForm doesn't support Array columns")

case MapType(keyType, valueType, valueContainsNull) =>
val currKeyName = name :+ DeltaColumnMapping.PARQUET_MAP_KEY_FIELD_NAME
val currValName = name :+ DeltaColumnMapping.PARQUET_MAP_VALUE_FIELD_NAME
val keyId = getNestedFieldId(field, currKeyName)
val valId = getNestedFieldId(field, currValName)
if (valueContainsNull) {
IcebergTypes.MapType.ofOptional(
keyId,
valId,
transform(keyType, field, currKeyName),
transform(valueType, field, currValName)
)
} else {
IcebergTypes.MapType.ofRequired(
keyId,
valId,
transform(keyType, field, currKeyName),
transform(valueType, field, currValName)
)
}
throw new UnsupportedOperationException("UniForm doesn't support Map columns")

case atomicType: AtomicType => convertAtomic(atomicType)

case other =>
throw new UnsupportedOperationException(s"Cannot convert Delta type $other to Iceberg")
}

transform(deltaSchema, None, Seq.empty).asStructType()
transform(deltaSchema).asStructType()
}

/**
Expand Down
Loading

0 comments on commit 3d04e37

Please sign in to comment.