Skip to content

Commit

Permalink
Record type widening metadata
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?

-Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

This change is part of the type widening table feature.
Type widening feature request: #2622
Type Widening protocol RFC: #2624

It introduces metadata to record information about type changes that were applied using `ALTER TABLE`. This metadata is stored in table schema, as specified in https://github.com/delta-io/delta/pull/2624/files#diff-114dec1ec600a6305fe7117bed7acb46e94180cdb1b8da63b47b12d6c40760b9R28

For example, changing a top-level column `a` from `int` to `long` will update the schema to include metadata:
```
{
    "name" : "a",
    "type" : "long",
    "nullable" : true,
    "metadata" : {
      "delta.typeChanges": [
        {
          "tableVersion": 1,
          "fromType": "integer",
          "toType": "long"
        },
        {
          "tableVersion": 5,
          "fromType": "integer",
          "toType": "long"
        }
      ]
    }
  }
```

- A new test suite `DeltaTypeWideningMetadataSuite` is created to cover methods handling type widening metadata.
- Tests covering adding metadata to the schema when running `ALTER TABLE CHANGE COLUMN` are added to `DeltaTypeWideningSuite`

Closes #2708

GitOrigin-RevId: cdbb7589f10a8355b66058e156bb7d1894268f4d
  • Loading branch information
johanl-db authored and allisonport-db committed Mar 7, 2024
1 parent 4e9a15c commit e076974
Show file tree
Hide file tree
Showing 7 changed files with 1,014 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ private[delta] class ConflictChecker(
reassignOverlappingRowIds()
reassignRowCommitVersions()

// Update the table version in newly added type widening metadata.
updateTypeWideningMetadata()

// Data file checks.
checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn()
checkForDeletedFilesAgainstCurrentTxnReadFiles()
Expand Down Expand Up @@ -487,6 +490,26 @@ private[delta] class ConflictChecker(
actions = updatedActions)
}

/**
* Metadata is recorded in the table schema on type changes. This includes the table version that
* the change was made in, which needs to be updated when there's a conflict.
*/
private def updateTypeWideningMetadata(): Unit = {
if (!TypeWidening.isEnabled(currentTransactionInfo.protocol, currentTransactionInfo.metadata)) {
return
}
val newActions = currentTransactionInfo.actions.map {
case metadata: Metadata =>
val updatedSchema = TypeWideningMetadata.updateTypeChangeVersion(
schema = metadata.schema,
fromVersion = winningCommitVersion,
toVersion = winningCommitVersion + 1L)
metadata.copy(schemaString = updatedSchema.json)
case a => a
}
currentTransactionInfo = currentTransactionInfo.copy(actions = newActions)
}

/**
* Checks whether the Row IDs assigned by the current transaction overlap with the Row IDs
* assigned by the winning transaction. I.e. this function checks whether both the winning and the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2003,7 +2003,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
}

/** Returns the version that the first attempt will try to commit at. */
protected def getFirstAttemptVersion: Long = readVersion + 1L
private[delta] def getFirstAttemptVersion: Long = readVersion + 1L

/** Returns the conflicting commit information */
protected def getConflictingVersions(previousAttemptVersion: Long): Seq[FileStatus] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.util.ScalaExtensions._

import org.apache.spark.sql.types._

/**
* Information corresponding to a single type change.
* @param version The version of the table where the type change was made.
* @param fromType The original type before the type change.
* @param toType The new type after the type change.
* @param fieldPath The path inside nested maps and arrays to the field where the type change was
* made. Each path element is either `key`/`value` for maps or `element` for
* arrays. The path is empty if the type change was applied inside a map or array.
*/
private[delta] case class TypeChange(
version: Long,
fromType: DataType,
toType: DataType,
fieldPath: Seq[String]) {
import TypeChange._

/** Serialize this type change to a [[Metadata]] object. */
def toMetadata: Metadata = {
val builder = new MetadataBuilder()
.putLong(TABLE_VERSION_METADATA_KEY, version)
.putString(FROM_TYPE_METADATA_KEY, fromType.typeName)
.putString(TO_TYPE_METADATA_KEY, toType.typeName)
if (fieldPath.nonEmpty) {
builder.putString(FIELD_PATH_METADATA_KEY, fieldPath.mkString("."))
}
builder.build()
}
}

private[delta] object TypeChange {
val TABLE_VERSION_METADATA_KEY: String = "tableVersion"
val FROM_TYPE_METADATA_KEY: String = "fromType"
val TO_TYPE_METADATA_KEY: String = "toType"
val FIELD_PATH_METADATA_KEY: String = "fieldPath"

/** Deserialize this type change from a [[Metadata]] object. */
def fromMetadata(metadata: Metadata): TypeChange = {
val fieldPath = if (metadata.contains(FIELD_PATH_METADATA_KEY)) {
metadata.getString(FIELD_PATH_METADATA_KEY).split("\\.").toSeq
} else {
Seq.empty
}
TypeChange(
version = metadata.getLong(TABLE_VERSION_METADATA_KEY),
fromType = DataType.fromDDL(metadata.getString(FROM_TYPE_METADATA_KEY)),
toType = DataType.fromDDL(metadata.getString(TO_TYPE_METADATA_KEY)),
fieldPath
)
}
}

/**
* Represents all type change information for a single struct field
* @param typeChanges The type changes that have been applied to the field.
*/
private[delta] case class TypeWideningMetadata(typeChanges: Seq[TypeChange]) {

import TypeWideningMetadata._

/**
* Add the type changes to the metadata of the given field, preserving any pre-existing type
* widening metadata.
*/
def appendToField(field: StructField): StructField = {
if (typeChanges.isEmpty) return field

val existingTypeChanges = fromField(field).map(_.typeChanges).getOrElse(Seq.empty)
val allTypeChanges = existingTypeChanges ++ typeChanges

val newMetadata = new MetadataBuilder().withMetadata(field.metadata)
.putMetadataArray(TYPE_CHANGES_METADATA_KEY, allTypeChanges.map(_.toMetadata).toArray)
.build()
field.copy(metadata = newMetadata)
}
}

private[delta] object TypeWideningMetadata {
val TYPE_CHANGES_METADATA_KEY: String = "delta.typeChanges"

/** Read the type widening metadata from the given field. */
def fromField(field: StructField): Option[TypeWideningMetadata] = {
Option.when(field.metadata.contains(TYPE_CHANGES_METADATA_KEY)) {
val typeChanges = field.metadata.getMetadataArray(TYPE_CHANGES_METADATA_KEY)
.map { changeMetadata =>
TypeChange.fromMetadata(changeMetadata)
}.toSeq
TypeWideningMetadata(typeChanges)
}
}

/**
* Computes the type changes from `oldSchema` to `schema` and adds corresponding type change
* metadata to `schema`.
*/
def addTypeWideningMetadata(
txn: OptimisticTransaction,
schema: StructType,
oldSchema: StructType): StructType = {

if (!TypeWidening.isEnabled(txn.protocol, txn.metadata)) return schema

if (DataType.equalsIgnoreNullability(schema, oldSchema)) return schema

SchemaMergingUtils.transformColumns(schema, oldSchema) {
case (_, newField, Some(oldField), _) =>
// Record the version the transaction will attempt to use in the type change metadata. If
// there's a conflict with another transaction, the version in the metadata will be updated
// during conflict resolution. See [[ConflictChecker.updateTypeWideningMetadata()]].
val typeChanges =
collectTypeChanges(oldField.dataType, newField.dataType, txn.getFirstAttemptVersion)
TypeWideningMetadata(typeChanges).appendToField(newField)
case (_, newField, None, _) =>
// The field was just added, no need to process.
newField
}
}

/**
* Recursively collect primitive type changes inside nested maps and arrays between `fromType` and
* `toType`. The `version` is the version of the table where the type change was made.
*/
private def collectTypeChanges(fromType: DataType, toType: DataType, version: Long)
: Seq[TypeChange] = (fromType, toType) match {
case (from: MapType, to: MapType) =>
collectTypeChanges(from.keyType, to.keyType, version).map { typeChange =>
typeChange.copy(fieldPath = "key" +: typeChange.fieldPath)
} ++
collectTypeChanges(from.valueType, to.valueType, version).map { typeChange =>
typeChange.copy(fieldPath = "value" +: typeChange.fieldPath)
}
case (from: ArrayType, to: ArrayType) =>
collectTypeChanges(from.elementType, to.elementType, version).map { typeChange =>
typeChange.copy(fieldPath = "element" +: typeChange.fieldPath)
}
case (fromType: AtomicType, toType: AtomicType) if fromType != toType =>
Seq(TypeChange(
version,
fromType,
toType,
fieldPath = Seq.empty
))
case (_: AtomicType, _: AtomicType) => Seq.empty
// Don't recurse inside structs, `collectTypeChanges` should be called directly on each struct
// fields instead to only collect type changes inside these fields.
case (_: StructType, _: StructType) => Seq.empty
}

/**
* Change the `tableVersion` value in the type change metadata present in `schema`. Used during
* conflict resolution to update the version associated with the transaction is incremented.
*/
def updateTypeChangeVersion(schema: StructType, fromVersion: Long, toVersion: Long): StructType =
SchemaMergingUtils.transformColumns(schema) {
case (_, field, _) =>
fromField(field) match {
case Some(typeWideningMetadata) =>
val updatedTypeChanges = typeWideningMetadata.typeChanges.map {
case typeChange if typeChange.version == fromVersion =>
typeChange.copy(version = toVersion)
case olderTypeChange => olderTypeChange
}
val newMetadata = new MetadataBuilder().withMetadata(field.metadata)
.putMetadataArray(
TYPE_CHANGES_METADATA_KEY,
updatedTypeChanges.map(_.toMetadata).toArray)
.build()
field.copy(metadata = newMetadata)

case None => field
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,11 @@ case class AlterTableChangeColumnDeltaCommand(
val newConfiguration = metadata.configuration ++
StatisticsCollection.renameDeltaStatsColumn(metadata, oldColumnPath, newColumnPath)

val newSchemaWithTypeWideningMetadata =
TypeWideningMetadata.addTypeWideningMetadata(txn, schema = newSchema, oldSchema = oldSchema)

val newMetadata = metadata.copy(
schemaString = newSchema.json,
schemaString = newSchemaWithTypeWideningMetadata.json,
partitionColumns = newPartitionColumns,
configuration = newConfiguration
)
Expand Down Expand Up @@ -816,7 +819,13 @@ case class AlterTableReplaceColumnsDeltaCommand(
SchemaMergingUtils.checkColumnNameDuplication(newSchema, "in replacing columns")
SchemaUtils.checkSchemaFieldNames(newSchema, metadata.columnMappingMode)

val newMetadata = metadata.copy(schemaString = newSchema.json)
val newSchemaWithTypeWideningMetadata = TypeWideningMetadata.addTypeWideningMetadata(
txn,
schema = newSchema,
oldSchema = existingSchema
)

val newMetadata = metadata.copy(schemaString = newSchemaWithTypeWideningMetadata.json)
txn.updateMetadata(newMetadata)
txn.commit(Nil, DeltaOperations.ReplaceColumns(columns))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,52 @@ object SchemaMergingUtils {
transform(Seq.empty, schema)
}

/**
* Transform (nested) columns in `schema` by walking down `schema` and `other` simultaneously.
* This allows comparing the two schemas and transforming `schema` based on the comparison.
* Columns or fields present only in `other` are ignored while `None` is passed to the transform
* function for columns or fields missing in `other`.
* @param schema Schema to transform.
* @param other Schema to compare with.
* @param tf Function to apply. The function arguments are the full path of the current field to
* transform, the current field in `schema` and, if present, the corresponding field in
* `other`.
*/
def transformColumns(
schema: StructType,
other: StructType)(
tf: (Seq[String], StructField, Option[StructField], Resolver) => StructField): StructType = {
def transform[E <: DataType](path: Seq[String], dt: E, otherDt: E): E = {
val newDt = (dt, otherDt) match {
case (struct: StructType, otherStruct: StructType) =>
val otherFields = SchemaMergingUtils.toFieldMap(otherStruct.fields, caseSensitive = true)
StructType(struct.map { field =>
val otherField = otherFields.get(field.name)
val newField = tf(path, field, otherField, DELTA_COL_RESOLVER)
otherField match {
case Some(other) =>
newField.copy(
dataType = transform(path :+ field.name, field.dataType, other.dataType)
)
case None => newField
}
})
case (map: MapType, otherMap: MapType) =>
map.copy(
keyType = transform(path :+ "key", map.keyType, otherMap.keyType),
valueType = transform(path :+ "value", map.valueType, otherMap.valueType)
)
case (array: ArrayType, otherArray: ArrayType) =>
array.copy(
elementType = transform(path :+ "element", array.elementType, otherArray.elementType)
)
case _ => dt
}
newDt.asInstanceOf[E]
}
transform(Seq.empty, schema, other)
}

/**
*
* Taken from DataType
Expand Down
Loading

0 comments on commit e076974

Please sign in to comment.