Skip to content

Commit

Permalink
Squashed commits:
Browse files Browse the repository at this point in the history
  - 09e2ec4a95be0c66204b80f36ed2f3f0e8f76831 scalastyle import by vkorukanti <venki.korukanti@gmail.com>
  - babe0920ef1b507a94289b92fef3aec12570fd2c import ordering to match with the OSS one by vkorukanti <venki.korukanti@gmail.com>
  - 659d6d3abd1ab9f2402d2d14059eb5f71ce32d87 Make the OptimizeMetadataOnlyDeltaQueryDeletionVectorSuit... by vkorukanti <venki.korukanti@gmail.com>
  - 997b335960fea03d4a3ffb7accc5117da89b0c12 disable tests that don't run in DBR by vkorukanti <venki.korukanti@gmail.com>
  - 1bca4b2d7792334dda3d636103109cf2bf9044b0 fix build by vkorukanti <venki.korukanti@gmail.com>
  - 6280c6d5e65983d515501b6e25b72881c39db50c fix build by vkorukanti <venki.korukanti@gmail.com>
  - 967f093594d745f8b37a5f4b8088e1bfdabae186 conflicts by vkorukanti <venki.korukanti@gmail.com>
  - 87e81252a21bcf3039181f9df8d8d3d3e8908a1c [DELTA-OSS-EXTERNAL] Optimize Min/Max using Delta metadata by Felipe Pessoto <felipepessoto@hotmail.com>

GitOrigin-RevId: 09e2ec4a95be0c66204b80f36ed2f3f0e8f76831
  • Loading branch information
vkorukanti authored and scottsand-db committed Jan 31, 2024
1 parent 11cd832 commit ad7d5f8
Show file tree
Hide file tree
Showing 77 changed files with 194 additions and 11,804 deletions.
111 changes: 5 additions & 106 deletions PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
- [Writer Requirement for Deletion Vectors](#writer-requirement-for-deletion-vectors)
- [Iceberg Compatibility V1](#iceberg-compatibility-v1)
- [Writer Requirements for IcebergCompatV1](#writer-requirements-for-icebergcompatv1)
- [Iceberg Compatibility V2](#iceberg-compatibility-v2)
- [Writer Requirement for IcebergCompatV2](#iceberg-compatibility-v2)
- [Timestamp without timezone (TimestampNtz)](#timestamp-without-timezone-timestampntz)
- [V2 Checkpoint Table Feature](#v2-checkpoint-table-feature)
- [Row Tracking](#row-tracking)
Expand Down Expand Up @@ -104,6 +102,8 @@

<!-- END doctoc generated TOC please keep comment here to allow auto update -->

<font color="red">THIS IS AN IN-PROGRESS DRAFT</font>

# Overview
This document is a specification for the Delta Transaction Protocol, which brings [ACID](https://en.wikipedia.org/wiki/ACID) properties to large collections of data, stored as files, in a distributed file system or object store. The protocol was designed with the following goals in mind:

Expand Down Expand Up @@ -905,7 +905,9 @@ To support this feature:
- The table must be on Writer Version 7.
- The feature `icebergCompatV1` must exist in the table `protocol`'s `writerFeatures`.

This table feature is enabled when the table property `delta.enableIcebergCompatV1` is set to `true`.
Activation: Set table property `delta.enableIcebergCompatV1` to `true`.

Deactivation: Unset table property `delta.enableIcebergCompatV1`, or set it to `false`.

## Writer Requirements for IcebergCompatV1

Expand All @@ -919,109 +921,6 @@ When supported and active, writers must:
- e.g. replacing a table partitioned by `part_a INT` with partition spec `part_b INT` must be blocked
- e.g. replacing a table partitioned by `part_a INT` with partition spec `part_a LONG` is allowed

# Iceberg Compatibility V2

This table feature (`icebergCompatV2`) ensures that Delta tables can be converted to Apache Iceberg™ format, though this table feature does not implement or specify that conversion.

To support this feature:
- Since this table feature depends on Column Mapping, the table must be on Reader Version = 2, or it must be on Reader Version >= 3 and the feature `columnMapping` must exist in the `protocol`'s `readerFeatures`.
- The table must be on Writer Version 7.
- The feature `icebergCompatV2` must exist in the table protocol's `writerFeatures`.

This table feature is enabled when the table property `delta.enableIcebergCompatV2` is set to `true`.

## Writer Requirements for IcebergCompatV2

When this feature is supported and enabled, writers must:
- Require that Column Mapping be enabled and set to either `name` or `id` mode
- Require that the nested `element` field of ArrayTypes and the nested `key` and `value` fields of MapTypes be assigned 32 bit integer identifiers. These identifiers must be unique and different from those used in [Column Mapping](#column-mapping), and must be stored in the metadata of their nearest ancestor [StructField](#struct-field) of the Delta table schema. Identifiers belonging to the same `StructField` must be organized as a `Map[String, Long]` and stored in metadata with key `parquet.field.nested.ids`. The keys of the map are "element", "key", or "value", prefixed by the name of the nearest ancestor StructField, separated by dots. The values are the identifiers. The keys for fields in nested arrays or nested maps are prefixed by their parents' key, separated by dots. An [example](#example-of-storing-identifiers-for-nested-fields-in-arraytype-and-maptype) is provided below to demonstrate how the identifiers are stored. These identifiers must be also written to the `field_id` field of the `SchemaElement` struct in the [Parquet Thrift specification](https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift) when writing parquet files.
- Require that IcebergCompatV1 is not active, which means either the `icebergCompatV1` table feature is not present in the table protocol or the table property `delta.enableIcebergCompatV1` is not set to `true`
- Require that Deletion Vectors are not active, which means either the `deletionVectors` table feature is not present in the table protocol or the table property `delta.enableDeletionVectors` is not set to `true`
- Require that partition column values be materialized when writing Parquet data files
- Require that all new `AddFile`s committed to the table have the `numRecords` statistic populated in their `stats` field
- Require writing timestamp columns as int64
- Require that the table schema contains only data types in the following allow-list: [`byte`, `short`, `integer`, `long`, `float`, `double`, `decimal`, `string`, `binary`, `boolean`, `timestamp`, `timestampNTZ`, `date`, `array`, `map`, `struct`].
- Block replacing partitioned tables with a differently-named partition spec
- e.g. replacing a table partitioned by `part_a INT` with partition spec `part_b INT` must be blocked
- e.g. replacing a table partitioned by `part_a INT` with partition spec `part_a LONG` is allowed

### Example of storing identifiers for nested fields in ArrayType and MapType
The following is an example of storing the identifiers for nested fields in `ArrayType` and `MapType`, of a table with the following schema,
```
|-- col1: array[array[int]]
|-- col2: map[int, array[int]]
|-- col3: map[int, struct]
|-- subcol1: array[int]
```
The identifiers for the nested fields are stored in the metadata as follows:
```json
[
{
"name": "col1",
"type": {
"type": "array",
"elementType": {
"type": "array",
"elementType": "int"
}
},
"metadata": {
"parquet.field.nested.ids": {
"col1.element": 100,
"col1.element.element": 101
}
}
},
{
"name": "col2",
"type": {
"type": "map",
"keyType": "int",
"valueType": {
"type": "array",
"elementType": "int"
}
},
"metadata": {
"parquet.field.nested.ids": {
"col2.key": 102,
"col2.value": 103,
"col2.value.element": 104
}
}
},
{
"name": "col3",
"type": {
"type": "map",
"keyType": "int",
"valueType": {
"type": "struct",
"fields": [
{
"name": "subcol1",
"type": {
"type": "array",
"elementType": "int"
},
"metadata": {
"parquet.field.nested.ids": {
"subcol1.element": 107
}
}
}
]
}
},
"metadata": {
"parquet.field.nested.ids": {
"col3.key": 105,
"col3.value": 106
}
}
}
]
```
# Timestamp without timezone (TimestampNtz)
This feature introduces a new data type to support timestamps without timezone information. For example: `1970-01-01 00:00:00`, or `1970-01-01 00:00:00.123456`.
The serialization method is described in Sections [Partition Value Serialization](#partition-value-serialization) and [Schema Serialization Format](#schema-serialization-format).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,6 @@ object IcebergSchemaUtils extends DeltaLogging {
new IcebergSchema(icebergStruct.fields())
}

private[delta] def getNestedFieldId(field: Option[StructField], path: Seq[String]): Int = {
field.get.metadata
.getMetadata(DeltaColumnMapping.COLUMN_MAPPING_METADATA_NESTED_IDS_KEY)
.getLong(path.mkString("."))
.toInt
}

////////////////////
// Helper Methods //
////////////////////
Expand All @@ -65,8 +58,7 @@ object IcebergSchemaUtils extends DeltaLogging {
* - MapType -> IcebergTypes.MapType
* - primitive -> IcebergType.PrimitiveType
*/
def transform[E <: DataType](elem: E, field: Option[StructField], name: Seq[String])
: IcebergType = elem match {
def transform[E <: DataType](elem: E): IcebergType = elem match {
case StructType(fields) =>
IcebergTypes.StructType.of(fields.map { f =>
if (!DeltaColumnMapping.hasColumnId(f)) {
Expand All @@ -77,48 +69,24 @@ object IcebergSchemaUtils extends DeltaLogging {
DeltaColumnMapping.getColumnId(f),
f.nullable,
f.name,
transform(f.dataType, Some(f), Seq(DeltaColumnMapping.getPhysicalName(f))),
transform(f.dataType),
f.getComment().orNull
)
}.toList.asJava)

case ArrayType(elementType, containsNull) =>
val currName = name :+ DeltaColumnMapping.PARQUET_LIST_ELEMENT_FIELD_NAME
val id = getNestedFieldId(field, currName)
if (containsNull) {
IcebergTypes.ListType.ofOptional(id, transform(elementType, field, currName))
} else {
IcebergTypes.ListType.ofRequired(id, transform(elementType, field, currName))
}
throw new UnsupportedOperationException("UniForm doesn't support Array columns")

case MapType(keyType, valueType, valueContainsNull) =>
val currKeyName = name :+ DeltaColumnMapping.PARQUET_MAP_KEY_FIELD_NAME
val currValName = name :+ DeltaColumnMapping.PARQUET_MAP_VALUE_FIELD_NAME
val keyId = getNestedFieldId(field, currKeyName)
val valId = getNestedFieldId(field, currValName)
if (valueContainsNull) {
IcebergTypes.MapType.ofOptional(
keyId,
valId,
transform(keyType, field, currKeyName),
transform(valueType, field, currValName)
)
} else {
IcebergTypes.MapType.ofRequired(
keyId,
valId,
transform(keyType, field, currKeyName),
transform(valueType, field, currValName)
)
}
throw new UnsupportedOperationException("UniForm doesn't support Map columns")

case atomicType: AtomicType => convertAtomic(atomicType)

case other =>
throw new UnsupportedOperationException(s"Cannot convert Delta type $other to Iceberg")
}

transform(deltaSchema, None, Seq.empty).asStructType()
transform(deltaSchema).asStructType()
}

/**
Expand Down
Loading

0 comments on commit ad7d5f8

Please sign in to comment.