Skip to content

Commit

Permalink
Type Widening in ALTER TABLE CHANGE COLUMN
Browse files Browse the repository at this point in the history
  • Loading branch information
johanl-db committed Feb 16, 2024
1 parent 4ecfa45 commit 9a5e7dd
Show file tree
Hide file tree
Showing 6 changed files with 477 additions and 7 deletions.
11 changes: 11 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,17 @@ trait DeltaConfigsBase extends DeltaLogging {
"needs to be a boolean."
)

/**
* Whether widening the type of an existing column or field is allowed, either manually using
* ALTER TABLE CHANGE COLUMN or automatically if automatic schema evolution is enabled.
*/
val ENABLE_TYPE_WIDENING = buildConfig[Boolean](
key = "enableTypeWidening",
defaultValue = false.toString,
fromString = _.toBoolean,
validationFunction = _ => true,
helpMessage = "needs to be a boolean.")

val MANAGED_COMMIT_OWNER_NAME = buildConfig[Option[String]](
"managedCommits.commitOwner-dev",
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ object TableFeature {
// managed-commits are under development and only available in testing.
ManagedCommitTableFeature,
// Row IDs are still under development and only available in testing.
RowTrackingFeature)
RowTrackingFeature,
TypeWideningTableFeature)
}
val featureMap = features.map(f => f.name.toLowerCase(Locale.ROOT) -> f).toMap
require(features.size == featureMap.size, "Lowercase feature names must not duplicate.")
Expand Down Expand Up @@ -625,6 +626,18 @@ object ManagedCommitTableFeature
}
}

object TypeWideningTableFeature extends ReaderWriterFeature(name = "typeWidening-dev")
with FeatureAutomaticallyEnabledByMetadata {
override def automaticallyUpdateProtocolOfExistingTables: Boolean = true

private def isTypeWideningSupportNeededByMetadata(metadata: Metadata): Boolean =
DeltaConfigs.ENABLE_TYPE_WIDENING.fromMetaData(metadata)

override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = isTypeWideningSupportNeededByMetadata(metadata)
}

/**
* Features below are for testing only, and are being registered to the system only in the testing
* environment. See [[TableFeature.allSupportedFeaturesMap]] for the registration.
Expand Down
59 changes: 59 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.{Metadata, Protocol, TableFeatureProtocolUtils}

import org.apache.spark.sql.types._

object TypeWidening {

/**
* Returns whether the protocol version supports the Type Widening table feature.
*/
def isSupported(protocol: Protocol): Boolean =
protocol.isFeatureSupported(TypeWideningTableFeature)

/**
* Returns whether Type Widening is enabled on this table version. Checks that Type Widening is
* supported, which is a pre-requisite for enabling Type Widening, throws an error if
* not. When Type Widening is enabled, the type of existing columns or fields can be widened
* using ALTER TABLE CHANGE COLUMN.
*/
def isEnabled(protocol: Protocol, metadata: Metadata): Boolean = {
val isEnabled = DeltaConfigs.ENABLE_TYPE_WIDENING.fromMetaData(metadata)
if (isEnabled && !isSupported(protocol)) {
throw new IllegalStateException(
s"Table property '${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' is " +
s"set on the table but this table version doesn't support table feature " +
s"'${TableFeatureProtocolUtils.propertyKey(TypeWideningTableFeature)}'.")
}
isEnabled
}

/**
* Returns whether the given type change is eligible for widening. This only checks atomic types,
* it is the responsibility of the caller to recurse into structs, maps and arrays.
*/
def isAtomicTypeChangeSupported(fromType: AtomicType, toType: AtomicType): Boolean =
(fromType, toType) match {
case (from, to) if from == to => true
case (ByteType, ShortType) => true
case (ByteType | ShortType, IntegerType) => true
case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -685,8 +685,14 @@ case class AlterTableChangeColumnDeltaCommand(
// first (original data type is already normalized as we store char/varchar as string type with
// special metadata in the Delta log), then apply Delta-specific checks.
val newType = CharVarcharUtils.replaceCharVarcharWithString(newColumn.dataType)
if (SchemaUtils.canChangeDataType(originalField.dataType, newType, resolver,
txn.metadata.columnMappingMode, columnPath :+ originalField.name).nonEmpty) {
if (SchemaUtils.canChangeDataType(
originalField.dataType,
newType,
resolver,
txn.metadata.columnMappingMode,
columnPath :+ originalField.name,
allowTypeWidening = TypeWidening.isEnabled(txn.protocol, txn.metadata)
).nonEmpty) {
throw DeltaErrors.alterTableChangeColumnException(
s"'${UnresolvedAttribute(columnPath :+ originalField.name).name}' with type " +
s"'${originalField.dataType}" +
Expand Down Expand Up @@ -738,8 +744,14 @@ case class AlterTableReplaceColumnsDeltaCommand(
val resolver = sparkSession.sessionState.conf.resolver
val changingSchema = StructType(columns)

SchemaUtils.canChangeDataType(existingSchema, changingSchema, resolver,
txn.metadata.columnMappingMode, failOnAmbiguousChanges = true).foreach { operation =>
SchemaUtils.canChangeDataType(
existingSchema,
changingSchema,
resolver,
txn.metadata.columnMappingMode,
allowTypeWidening = TypeWidening.isEnabled(txn.protocol, txn.metadata),
failOnAmbiguousChanges = true
).foreach { operation =>
throw DeltaErrors.alterTableReplaceColumnsException(
existingSchema, changingSchema, operation)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingMode, DeltaErrors, DeltaLog, GeneratedColumn, NoMapping}
import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingMode, DeltaErrors, DeltaLog, GeneratedColumn, NoMapping, TypeWidening}
import org.apache.spark.sql.delta.actions.Protocol
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.metering.DeltaLogging
Expand Down Expand Up @@ -904,6 +904,8 @@ def normalizeColumnNamesInDataType(
* @param failOnAmbiguousChanges Throw an error if a StructField both has columns dropped and new
* columns added. These are ambiguous changes, because we don't
* know if a column needs to be renamed, dropped, or added.
* @param allowTypeWidening Whether widening type changes as defined in [[TypeWidening]]
* can be applied.
* @return None if the data types can be changed, otherwise Some(err) containing the reason.
*/
def canChangeDataType(
Expand All @@ -912,7 +914,8 @@ def normalizeColumnNamesInDataType(
resolver: Resolver,
columnMappingMode: DeltaColumnMappingMode,
columnPath: Seq[String] = Nil,
failOnAmbiguousChanges: Boolean = false): Option[String] = {
failOnAmbiguousChanges: Boolean = false,
allowTypeWidening: Boolean = false): Option[String] = {
def verify(cond: Boolean, err: => String): Unit = {
if (!cond) {
throw DeltaErrors.cannotChangeDataType(err)
Expand Down Expand Up @@ -963,6 +966,11 @@ def normalizeColumnNamesInDataType(
(if (columnPath.nonEmpty) s" from $columnName" else ""))
}

case (fromDataType: AtomicType, toDataType: AtomicType) if allowTypeWidening =>
verify(TypeWidening.isAtomicTypeChangeSupported(fromDataType, toDataType),
s"changing data type of ${UnresolvedAttribute(columnPath).name} " +
s"from $fromDataType to $toDataType")

case (fromDataType, toDataType) =>
verify(fromDataType == toDataType,
s"changing data type of ${UnresolvedAttribute(columnPath).name} " +
Expand Down
Loading

0 comments on commit 9a5e7dd

Please sign in to comment.