Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Type Widening in ALTER TABLE CHANGE COLUMN #2645

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,17 @@ trait DeltaConfigsBase extends DeltaLogging {
"needs to be a boolean."
)

/**
* Whether widening the type of an existing column or field is allowed, either manually using
* ALTER TABLE CHANGE COLUMN or automatically if automatic schema evolution is enabled.
*/
val ENABLE_TYPE_WIDENING = buildConfig[Boolean](
key = "enableTypeWidening",
defaultValue = false.toString,
fromString = _.toBoolean,
validationFunction = _ => true,
helpMessage = "needs to be a boolean.")

val MANAGED_COMMIT_OWNER_NAME = buildConfig[Option[String]](
"managedCommits.commitOwner-dev",
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ object TableFeature {
// managed-commits are under development and only available in testing.
ManagedCommitTableFeature,
// Row IDs are still under development and only available in testing.
RowTrackingFeature)
RowTrackingFeature,
TypeWideningTableFeature)
}
val featureMap = features.map(f => f.name.toLowerCase(Locale.ROOT) -> f).toMap
require(features.size == featureMap.size, "Lowercase feature names must not duplicate.")
Expand Down Expand Up @@ -625,6 +626,18 @@ object ManagedCommitTableFeature
}
}

object TypeWideningTableFeature extends ReaderWriterFeature(name = "typeWidening-dev")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are using -dev right now and its not ready for users using it. is it behind the isTesting flag?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with FeatureAutomaticallyEnabledByMetadata {
override def automaticallyUpdateProtocolOfExistingTables: Boolean = true

private def isTypeWideningSupportNeededByMetadata(metadata: Metadata): Boolean =
DeltaConfigs.ENABLE_TYPE_WIDENING.fromMetaData(metadata)

override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = isTypeWideningSupportNeededByMetadata(metadata)
}

/**
* Features below are for testing only, and are being registered to the system only in the testing
* environment. See [[TableFeature.allSupportedFeaturesMap]] for the registration.
Expand Down
63 changes: 63 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.{Metadata, Protocol, TableFeatureProtocolUtils}

import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.types._

object TypeWidening {

/**
* Returns whether the protocol version supports the Type Widening table feature.
*/
def isSupported(protocol: Protocol): Boolean =
protocol.isFeatureSupported(TypeWideningTableFeature)

/**
* Returns whether Type Widening is enabled on this table version. Checks that Type Widening is
* supported, which is a pre-requisite for enabling Type Widening, throws an error if
* not. When Type Widening is enabled, the type of existing columns or fields can be widened
* using ALTER TABLE CHANGE COLUMN.
*/
def isEnabled(protocol: Protocol, metadata: Metadata): Boolean = {
val isEnabled = DeltaConfigs.ENABLE_TYPE_WIDENING.fromMetaData(metadata)
if (isEnabled && !isSupported(protocol)) {
throw new IllegalStateException(
s"Table property '${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' is " +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this should be using the error framework?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should never happen unless there's a bug in the implementation, so I wouldn't give it an error class. We typically wouldn't want to document that error as a user-facing error

s"set on the table but this table version doesn't support table feature " +
s"'${TableFeatureProtocolUtils.propertyKey(TypeWideningTableFeature)}'.")
}
isEnabled
}

/**
* Returns whether the given type change is eligible for widening. This only checks atomic types.
* It is the responsibility of the caller to recurse into structs, maps and arrays.
*/
def isTypeChangeSupported(fromType: AtomicType, toType: AtomicType): Boolean =
(fromType, toType) match {
case (from, to) if from == to => true
// All supported type changes below are supposed to be widening, but to be safe, reject any
// non-widening change upfront.
case (from, to) if !Cast.canUpCast(from, to) => false
case (ByteType, ShortType) => true
case (ByteType | ShortType, IntegerType) => true
case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,8 @@ case class AlterTableChangeColumnDeltaCommand(
newType,
resolver,
txn.metadata.columnMappingMode,
columnPath :+ originalField.name
columnPath :+ originalField.name,
allowTypeWidening = TypeWidening.isEnabled(txn.protocol, txn.metadata)
).nonEmpty) {
throw DeltaErrors.alterTableChangeColumnException(
fieldPath = UnresolvedAttribute(columnPath :+ originalField.name).name,
Expand Down Expand Up @@ -785,6 +786,7 @@ case class AlterTableReplaceColumnsDeltaCommand(
changingSchema,
resolver,
txn.metadata.columnMappingMode,
allowTypeWidening = TypeWidening.isEnabled(txn.protocol, txn.metadata),
failOnAmbiguousChanges = true
).foreach { operation =>
throw DeltaErrors.alterTableReplaceColumnsException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingMode, DeltaErrors, DeltaLog, GeneratedColumn, NoMapping}
import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingMode, DeltaErrors, DeltaLog, GeneratedColumn, NoMapping, TypeWidening}
import org.apache.spark.sql.delta.actions.Protocol
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.metering.DeltaLogging
Expand Down Expand Up @@ -911,6 +911,8 @@ def normalizeColumnNamesInDataType(
* @param failOnAmbiguousChanges Throw an error if a StructField both has columns dropped and new
* columns added. These are ambiguous changes, because we don't
* know if a column needs to be renamed, dropped, or added.
* @param allowTypeWidening Whether widening type changes as defined in [[TypeWidening]]
* can be applied.
* @return None if the data types can be changed, otherwise Some(err) containing the reason.
*/
def canChangeDataType(
Expand All @@ -919,7 +921,8 @@ def normalizeColumnNamesInDataType(
resolver: Resolver,
columnMappingMode: DeltaColumnMappingMode,
columnPath: Seq[String] = Nil,
failOnAmbiguousChanges: Boolean = false): Option[String] = {
failOnAmbiguousChanges: Boolean = false,
allowTypeWidening: Boolean = false): Option[String] = {
def verify(cond: Boolean, err: => String): Unit = {
if (!cond) {
throw DeltaErrors.cannotChangeDataType(err)
Expand Down Expand Up @@ -970,6 +973,11 @@ def normalizeColumnNamesInDataType(
(if (columnPath.nonEmpty) s" from $columnName" else ""))
}

case (fromDataType: AtomicType, toDataType: AtomicType) if allowTypeWidening =>
verify(TypeWidening.isTypeChangeSupported(fromDataType, toDataType),
s"changing data type of ${UnresolvedAttribute(columnPath).name} " +
s"from $fromDataType to $toDataType")

case (fromDataType, toDataType) =>
verify(fromDataType == toDataType,
s"changing data type of ${UnresolvedAttribute(columnPath).name} " +
Expand Down
Loading
Loading