From 9a5e7ddf4973268ab242852de9dd1bc9f843936e Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Fri, 16 Feb 2024 09:39:53 +0100 Subject: [PATCH 1/5] Type Widening in ALTER TABLE CHANGE COLUMN --- .../apache/spark/sql/delta/DeltaConfig.scala | 11 + .../apache/spark/sql/delta/TableFeature.scala | 15 +- .../apache/spark/sql/delta/TypeWidening.scala | 59 +++ .../commands/alterDeltaTableCommands.scala | 20 +- .../spark/sql/delta/schema/SchemaUtils.scala | 12 +- .../sql/delta/DeltaTypeWideningSuite.scala | 367 ++++++++++++++++++ 6 files changed, 477 insertions(+), 7 deletions(-) create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala index f4a9d7cdce0..9d484d93cc7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala @@ -725,6 +725,17 @@ trait DeltaConfigsBase extends DeltaLogging { "needs to be a boolean." ) + /** + * Whether widening the type of an existing column or field is allowed, either manually using + * ALTER TABLE CHANGE COLUMN or automatically if automatic schema evolution is enabled. + */ + val ENABLE_TYPE_WIDENING = buildConfig[Boolean]( + key = "enableTypeWidening", + defaultValue = false.toString, + fromString = _.toBoolean, + validationFunction = _ => true, + helpMessage = "needs to be a boolean.") + val MANAGED_COMMIT_OWNER_NAME = buildConfig[Option[String]]( "managedCommits.commitOwner-dev", null, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index 48382089060..430c6f0e76c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -355,7 +355,8 @@ object TableFeature { // managed-commits are under development and only available in testing. ManagedCommitTableFeature, // Row IDs are still under development and only available in testing. - RowTrackingFeature) + RowTrackingFeature, + TypeWideningTableFeature) } val featureMap = features.map(f => f.name.toLowerCase(Locale.ROOT) -> f).toMap require(features.size == featureMap.size, "Lowercase feature names must not duplicate.") @@ -625,6 +626,18 @@ object ManagedCommitTableFeature } } +object TypeWideningTableFeature extends ReaderWriterFeature(name = "typeWidening-dev") + with FeatureAutomaticallyEnabledByMetadata { + override def automaticallyUpdateProtocolOfExistingTables: Boolean = true + + private def isTypeWideningSupportNeededByMetadata(metadata: Metadata): Boolean = + DeltaConfigs.ENABLE_TYPE_WIDENING.fromMetaData(metadata) + + override def metadataRequiresFeatureToBeEnabled( + metadata: Metadata, + spark: SparkSession): Boolean = isTypeWideningSupportNeededByMetadata(metadata) +} + /** * Features below are for testing only, and are being registered to the system only in the testing * environment. See [[TableFeature.allSupportedFeaturesMap]] for the registration. diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala new file mode 100644 index 00000000000..c5754d726d5 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala @@ -0,0 +1,59 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.actions.{Metadata, Protocol, TableFeatureProtocolUtils} + +import org.apache.spark.sql.types._ + +object TypeWidening { + + /** + * Returns whether the protocol version supports the Type Widening table feature. + */ + def isSupported(protocol: Protocol): Boolean = + protocol.isFeatureSupported(TypeWideningTableFeature) + + /** + * Returns whether Type Widening is enabled on this table version. Checks that Type Widening is + * supported, which is a pre-requisite for enabling Type Widening, throws an error if + * not. When Type Widening is enabled, the type of existing columns or fields can be widened + * using ALTER TABLE CHANGE COLUMN. + */ + def isEnabled(protocol: Protocol, metadata: Metadata): Boolean = { + val isEnabled = DeltaConfigs.ENABLE_TYPE_WIDENING.fromMetaData(metadata) + if (isEnabled && !isSupported(protocol)) { + throw new IllegalStateException( + s"Table property '${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' is " + + s"set on the table but this table version doesn't support table feature " + + s"'${TableFeatureProtocolUtils.propertyKey(TypeWideningTableFeature)}'.") + } + isEnabled + } + + /** + * Returns whether the given type change is eligible for widening. This only checks atomic types, + * it is the responsibility of the caller to recurse into structs, maps and arrays. + */ + def isAtomicTypeChangeSupported(fromType: AtomicType, toType: AtomicType): Boolean = + (fromType, toType) match { + case (from, to) if from == to => true + case (ByteType, ShortType) => true + case (ByteType | ShortType, IntegerType) => true + case _ => false + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala index 4b9221eb0cd..008400b0671 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala @@ -685,8 +685,14 @@ case class AlterTableChangeColumnDeltaCommand( // first (original data type is already normalized as we store char/varchar as string type with // special metadata in the Delta log), then apply Delta-specific checks. val newType = CharVarcharUtils.replaceCharVarcharWithString(newColumn.dataType) - if (SchemaUtils.canChangeDataType(originalField.dataType, newType, resolver, - txn.metadata.columnMappingMode, columnPath :+ originalField.name).nonEmpty) { + if (SchemaUtils.canChangeDataType( + originalField.dataType, + newType, + resolver, + txn.metadata.columnMappingMode, + columnPath :+ originalField.name, + allowTypeWidening = TypeWidening.isEnabled(txn.protocol, txn.metadata) + ).nonEmpty) { throw DeltaErrors.alterTableChangeColumnException( s"'${UnresolvedAttribute(columnPath :+ originalField.name).name}' with type " + s"'${originalField.dataType}" + @@ -738,8 +744,14 @@ case class AlterTableReplaceColumnsDeltaCommand( val resolver = sparkSession.sessionState.conf.resolver val changingSchema = StructType(columns) - SchemaUtils.canChangeDataType(existingSchema, changingSchema, resolver, - txn.metadata.columnMappingMode, failOnAmbiguousChanges = true).foreach { operation => + SchemaUtils.canChangeDataType( + existingSchema, + changingSchema, + resolver, + txn.metadata.columnMappingMode, + allowTypeWidening = TypeWidening.isEnabled(txn.protocol, txn.metadata), + failOnAmbiguousChanges = true + ).foreach { operation => throw DeltaErrors.alterTableReplaceColumnsException( existingSchema, changingSchema, operation) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala index b028f97a977..15f0b916073 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal -import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingMode, DeltaErrors, DeltaLog, GeneratedColumn, NoMapping} +import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingMode, DeltaErrors, DeltaLog, GeneratedColumn, NoMapping, TypeWidening} import org.apache.spark.sql.delta.actions.Protocol import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.metering.DeltaLogging @@ -904,6 +904,8 @@ def normalizeColumnNamesInDataType( * @param failOnAmbiguousChanges Throw an error if a StructField both has columns dropped and new * columns added. These are ambiguous changes, because we don't * know if a column needs to be renamed, dropped, or added. + * @param allowTypeWidening Whether widening type changes as defined in [[TypeWidening]] + * can be applied. * @return None if the data types can be changed, otherwise Some(err) containing the reason. */ def canChangeDataType( @@ -912,7 +914,8 @@ def normalizeColumnNamesInDataType( resolver: Resolver, columnMappingMode: DeltaColumnMappingMode, columnPath: Seq[String] = Nil, - failOnAmbiguousChanges: Boolean = false): Option[String] = { + failOnAmbiguousChanges: Boolean = false, + allowTypeWidening: Boolean = false): Option[String] = { def verify(cond: Boolean, err: => String): Unit = { if (!cond) { throw DeltaErrors.cannotChangeDataType(err) @@ -963,6 +966,11 @@ def normalizeColumnNamesInDataType( (if (columnPath.nonEmpty) s" from $columnName" else "")) } + case (fromDataType: AtomicType, toDataType: AtomicType) if allowTypeWidening => + verify(TypeWidening.isAtomicTypeChangeSupported(fromDataType, toDataType), + s"changing data type of ${UnresolvedAttribute(columnPath).name} " + + s"from $fromDataType to $toDataType") + case (fromDataType, toDataType) => verify(fromDataType == toDataType, s"changing data type of ${UnresolvedAttribute(columnPath).name} " + diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala new file mode 100644 index 00000000000..ac80515a0e9 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala @@ -0,0 +1,367 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} +import org.apache.spark.sql.errors.QueryErrorsBase +import org.apache.spark.sql.execution.datasources.parquet.ParquetTest +import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +/** + * Suite covering the type widening table feature. + */ +class DeltaTypeWideningSuite + extends QueryTest + with ParquetTest + with DeltaDMLTestUtils + with DeltaTypeWideningTestMixin + with DeltaTypeWideningAlterTableTests + with DeltaTypeWideningTableFeatureTests + +/** + * Test mixin that enables type widening by default for all tests in the suite. + */ +trait DeltaTypeWideningTestMixin extends SharedSparkSession { + protected override def sparkConf: SparkConf = { + super.sparkConf.set(DeltaConfigs.ENABLE_TYPE_WIDENING.defaultTablePropertyKey, "true") + } + + /** Enable (or disable) type widening for the table under the given path. */ + protected def enableTypeWidening(tablePath: String, enabled: Boolean = true): Unit = + sql(s"ALTER TABLE delta.`$tablePath` " + + s"SET TBLPROPERTIES('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = '${enabled.toString}')") +} + +/** + * Trait collecting a subset of tests providing core coverage for type widening using ALTER TABLE + * CHANGE COLUMN TYPE. + */ +trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { + self: QueryTest with ParquetTest with DeltaTypeWideningTestMixin with DeltaDMLTestUtils => + + import testImplicits._ + + /** + * Represents the input of a type change test. + * @param fromType The original type of the column 'value' in the test table. + * @param toType The type to use when changing the type of column 'value' + * @param initialValues The initial values to insert in column 'value after table creation, + * using type `fromType` + * @param additionalValues Additional values to insert after changing the type of the column + * 'value' to `toType`. + */ + case class TypeEvolutionTestCase( + fromType: DataType, + toType: DataType, + initialValues: Seq[String], + additionalValues: Seq[String] = Seq.empty) { + def initialValuesDF: DataFrame = + initialValues.toDF("value").select($"value".cast(fromType)) + + def additionalValuesDF: DataFrame = + additionalValues.toDF("value").select($"value".cast(toType)) + + def expectedResult: DataFrame = + initialValuesDF.union(additionalValuesDF).select($"value".cast(toType)) + } + + // Type changes that are supported by all Parquet readers. Byte, Short, Int are all stored as + // INT32 in parquet so these changes are guaranteed to be supported. + private val supportedTestCases: Seq[TypeEvolutionTestCase] = Seq( + TypeEvolutionTestCase(ByteType, ShortType, + Seq("1", "2", Byte.MinValue.toString), + Seq("4", "5", Int.MaxValue.toString)), + TypeEvolutionTestCase(ByteType, IntegerType, + Seq("1", "2", Byte.MinValue.toString), + Seq("4", "5", Int.MaxValue.toString)), + TypeEvolutionTestCase(ShortType, IntegerType, + Seq("1", "2", Byte.MinValue.toString), + Seq("4", "5", Int.MaxValue.toString)) + ) + + for { + testCase <- supportedTestCases + partitioned <- BOOLEAN_DOMAIN + } { + test(s"type widening ${testCase.fromType.sql} -> ${testCase.toType.sql}, " + + s"partitioned=$partitioned") { + def writeData(df: DataFrame): Unit = if (partitioned) { + // The table needs to have at least 1 non-partition column, use a dummy one. + append(df.withColumn("dummy", lit(1)), partitionBy = Seq("value")) + } else { + append(df) + } + + writeData(testCase.initialValuesDF) + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN value TYPE ${testCase.toType.sql}") + withAllParquetReaders { + assert(readDeltaTable(tempPath).schema("value").dataType === testCase.toType) + checkAnswer(readDeltaTable(tempPath).select("value"), + testCase.initialValuesDF.select($"value".cast(testCase.toType))) + } + writeData(testCase.additionalValuesDF) + withAllParquetReaders { + checkAnswer(readDeltaTable(tempPath).select("value"), testCase.expectedResult) + } + } + } + + // Test type changes that aren't supported. + private val unsupportedNonTestCases: Seq[TypeEvolutionTestCase] = Seq( + TypeEvolutionTestCase(IntegerType, ByteType, + Seq("1", "2", Int.MinValue.toString)), + TypeEvolutionTestCase(LongType, IntegerType, + Seq("4", "5", Long.MaxValue.toString)), + TypeEvolutionTestCase(DoubleType, FloatType, + Seq("987654321.987654321", Double.NaN.toString, Double.NegativeInfinity.toString, + Double.PositiveInfinity.toString, Double.MinPositiveValue.toString, + Double.MinValue.toString, Double.MaxValue.toString)), + TypeEvolutionTestCase(IntegerType, DecimalType(6, 0), + Seq("1", "2", Int.MinValue.toString)), + TypeEvolutionTestCase(TimestampNTZType, DateType, + Seq("2020-03-17 15:23:15", "2023-12-31 23:59:59", "0001-01-01 00:00:00")), + // Reduce scale + TypeEvolutionTestCase(DecimalType(Decimal.MAX_INT_DIGITS, 2), + DecimalType(Decimal.MAX_INT_DIGITS, 3), + Seq("-67.89", "9" * (Decimal.MAX_INT_DIGITS - 2) + ".99")), + // Reduce precision + TypeEvolutionTestCase(DecimalType(Decimal.MAX_INT_DIGITS, 2), + DecimalType(Decimal.MAX_INT_DIGITS - 1, 2), + Seq("-67.89", "9" * (Decimal.MAX_LONG_DIGITS - 2) + ".99")), + // Reduce precision & scale + TypeEvolutionTestCase(DecimalType(Decimal.MAX_INT_DIGITS, 2), + DecimalType(Decimal.MAX_INT_DIGITS - 1, 1), + Seq("-67.89", "9" * (Decimal.MAX_LONG_DIGITS - 2) + ".99")), + // Increase scale more than precision + TypeEvolutionTestCase(DecimalType(Decimal.MAX_INT_DIGITS, 2), + DecimalType(Decimal.MAX_INT_DIGITS + 1, 4), + Seq("-67.89", "9" * (Decimal.MAX_LONG_DIGITS - 2) + ".99")) + ) + + for { + testCase <- unsupportedNonTestCases + partitioned <- BOOLEAN_DOMAIN + } { + test(s"unsupported type changes ${testCase.fromType.sql} -> ${testCase.toType.sql}, " + + s"partitioned=$partitioned") { + if (partitioned) { + // The table needs to have at least 1 non-partition column, use a dummy one. + append(testCase.initialValuesDF.withColumn("dummy", lit(1)), partitionBy = Seq("value")) + } else { + append(testCase.initialValuesDF) + } + sql(s"ALTER TABLE delta.`$tempPath` " + + s"SET TBLPROPERTIES('delta.feature.timestampNtz' = 'supported')") + + val alterTableSql = + s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN value TYPE ${testCase.toType.sql}" + checkError( + exception = intercept[AnalysisException] { + sql(alterTableSql) + }, + errorClass = "NOT_SUPPORTED_CHANGE_COLUMN", + sqlState = None, + parameters = Map( + "table" -> s"`spark_catalog`.`delta`.`$tempPath`", + "originName" -> toSQLId("value"), + "originType" -> toSQLType(testCase.fromType), + "newName" -> toSQLId("value"), + "newType" -> toSQLType(testCase.toType)), + context = ExpectedContext( + fragment = alterTableSql, + start = 0, + stop = alterTableSql.length - 1) + ) + } + } + + test("type widening in nested fields") { + sql(s"CREATE TABLE delta.`$tempPath` " + + "(s struct, m map, a array) USING DELTA") + append(Seq((1, 2, 3, 4)) + .toDF("a", "b", "c", "d") + .selectExpr( + "named_struct('a', cast(a as byte)) as s", + "map(cast(b as byte), cast(c as short)) as m", + "array(cast(d as short)) as a")) + + assert(readDeltaTable(tempPath).schema === new StructType() + .add("s", new StructType().add("a", ByteType)) + .add("m", MapType(ByteType, ShortType)) + .add("a", ArrayType(ShortType))) + + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN s.a TYPE short") + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN m.key TYPE int") + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN m.value TYPE int") + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a.element TYPE int") + + assert(readDeltaTable(tempPath).schema === new StructType() + .add("s", new StructType().add("a", ShortType)) + .add("m", MapType(IntegerType, IntegerType)) + .add("a", ArrayType(IntegerType))) + + append(Seq((5, 6, 7, 8)) + .toDF("a", "b", "c", "d") + .selectExpr("named_struct('a', cast(a as short)) as s", "map(b, c) as m", "array(d) as a")) + + checkAnswer( + readDeltaTable(tempPath), + Seq((1, 2, 3, 4), (5, 6, 7, 8)) + .toDF("a", "b", "c", "d") + .selectExpr("named_struct('a', cast(a as short)) as s", "map(b, c) as m", "array(d) as a")) + } + + test("type widening using ALTER TABLE REPLACE COLUMNS") { + append(Seq(1, 2).toDF("value").select($"value".cast(ShortType))) + assert(readDeltaTable(tempPath).schema === new StructType().add("value", ShortType)) + sql(s"ALTER TABLE delta.`$tempPath` REPLACE COLUMNS (value INT)") + assert(readDeltaTable(tempPath).schema === new StructType().add("value", IntegerType)) + checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2))) + append(Seq(3, 4).toDF("value")) + checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3), Row(4))) + } + + test("row group skipping Int -> Long") { + withSQLConf( + SQLConf.FILES_MAX_PARTITION_BYTES.key -> 1024.toString) { + append((0 until 1024).toDF("value").select($"value".cast(ShortType))) + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN value TYPE INT") + append((Short.MinValue + 1 until Short.MaxValue + 2048).toDF("value")) + withAllParquetReaders { + checkAnswer( + sql(s"SELECT * FROM delta.`$tempPath` WHERE value >= ${Short.MaxValue}::INT + 1000"), + (Short.MaxValue + 1000 until Short.MaxValue + 2048).map(Row(_))) + } + } + } + +} + +trait DeltaTypeWideningTableFeatureTests { + self: QueryTest with ParquetTest with DeltaDMLTestUtils with DeltaTypeWideningTestMixin + with SharedSparkSession => + + def isTypeWideningSupported: Boolean = { + val snapshot = DeltaLog.forTable(spark, tempPath).unsafeVolatileSnapshot + TypeWidening.isSupported(snapshot.protocol) + } + + def isTypeWideningEnabled: Boolean = { + val snapshot = DeltaLog.forTable(spark, tempPath).unsafeVolatileSnapshot + TypeWidening.isEnabled(snapshot.protocol, snapshot.metadata) + } + + test("enable type widening at table creation then disable it") { + sql(s"CREATE TABLE delta.`$tempPath` (a int) USING DELTA " + + s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'true')") + assert(isTypeWideningSupported) + assert(isTypeWideningEnabled) + enableTypeWidening(tempPath, enabled = false) + assert(isTypeWideningSupported) + assert(!isTypeWideningEnabled) + } + + test("enable type widening after table creation then disable it") { + sql(s"CREATE TABLE delta.`$tempPath` (a int) USING DELTA " + + s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'false')") + assert(!isTypeWideningSupported) + assert(!isTypeWideningEnabled) + // Setting the property to false shouldn't add the table feature if it's not present. + enableTypeWidening(tempPath, enabled = false) + assert(!isTypeWideningSupported) + assert(!isTypeWideningEnabled) + + enableTypeWidening(tempPath) + assert(isTypeWideningSupported) + assert(isTypeWideningEnabled) + enableTypeWidening(tempPath, enabled = false) + assert(isTypeWideningSupported) + assert(!isTypeWideningEnabled) + } + + test("set table property to incorrect value") { + val ex = intercept[IllegalArgumentException] { + sql(s"CREATE TABLE delta.`$tempPath` (a int) USING DELTA " + + s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'bla')") + } + assert(ex.getMessage.contains("For input string: \"bla\"")) + sql(s"CREATE TABLE delta.`$tempPath` (a int) USING DELTA " + + s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'false')") + checkError( + exception = intercept[SparkException] { + sql(s"ALTER TABLE delta.`$tempPath` " + + s"SET TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'bla')") + }, + errorClass = "_LEGACY_ERROR_TEMP_2045", + parameters = Map( + "message" -> "For input string: \"bla\"" + ) + ) + assert(!isTypeWideningSupported) + assert(!isTypeWideningEnabled) + } + + test("change column type without table feature") { + sql(s"CREATE TABLE delta.`$tempPath` (a TINYINT) USING DELTA " + + s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'false')") + + checkError( + exception = intercept[AnalysisException] { + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE SMALLINT") + }, + errorClass = "DELTA_UNSUPPORTED_ALTER_TABLE_CHANGE_COL_OP", + parameters = Map( + "fieldPath" -> "a", + "oldField" -> "TINYINT", + "newField" -> "SMALLINT" + ) + ) + } + + test("change column type with type widening table feature supported but table property set to " + + "false") { + sql(s"CREATE TABLE delta.`$tempPath` (a SMALLINT) USING DELTA") + sql(s"ALTER TABLE delta.`$tempPath` " + + s"SET TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'false')") + + checkError( + exception = intercept[AnalysisException] { + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE INT") + }, + errorClass = "DELTA_UNSUPPORTED_ALTER_TABLE_CHANGE_COL_OP", + parameters = Map( + "fieldPath" -> "a", + "oldField" -> "SMALLINT", + "newField" -> "INT" + ) + ) + } + + test("no-op type changes are always allowed") { + sql(s"CREATE TABLE delta.`$tempPath` (a int) USING DELTA " + + s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'false')") + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE INT") + enableTypeWidening(tempPath, enabled = true) + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE INT") + enableTypeWidening(tempPath, enabled = false) + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE INT") + } +} From 73eb480cc35e3eaed7a6f7958d1d3651b08fc4dc Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Mon, 19 Feb 2024 08:56:31 +0100 Subject: [PATCH 2/5] Set up Delta extension in test --- .../org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala index ac80515a0e9..8828be97764 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.delta +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest + import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.errors.QueryErrorsBase @@ -32,6 +34,7 @@ class DeltaTypeWideningSuite extends QueryTest with ParquetTest with DeltaDMLTestUtils + with DeltaSQLCommandTest with DeltaTypeWideningTestMixin with DeltaTypeWideningAlterTableTests with DeltaTypeWideningTableFeatureTests From 95c4fe241a0fe9614d25fa6c39e070f0b5b31e22 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Mon, 19 Feb 2024 12:08:59 +0100 Subject: [PATCH 3/5] Fix row group skipping test --- .../org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala index 8828be97764..ca29a6983ce 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala @@ -242,7 +242,7 @@ trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3), Row(4))) } - test("row group skipping Int -> Long") { + test("row group skipping Short -> Int") { withSQLConf( SQLConf.FILES_MAX_PARTITION_BYTES.key -> 1024.toString) { append((0 until 1024).toDF("value").select($"value".cast(ShortType))) @@ -250,7 +250,8 @@ trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { append((Short.MinValue + 1 until Short.MaxValue + 2048).toDF("value")) withAllParquetReaders { checkAnswer( - sql(s"SELECT * FROM delta.`$tempPath` WHERE value >= ${Short.MaxValue}::INT + 1000"), + sql(s"SELECT * FROM delta.`$tempPath` " + + s"WHERE value >= CAST(${Short.MaxValue} AS INT) + 1000"), (Short.MaxValue + 1000 until Short.MaxValue + 2048).map(Row(_))) } } From a227fa6987842edfc14e747a2edbf30dcd5765d3 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Mon, 26 Feb 2024 17:39:13 +0100 Subject: [PATCH 4/5] Address comments --- .../apache/spark/sql/delta/TypeWidening.scala | 10 +- .../spark/sql/delta/schema/SchemaUtils.scala | 2 +- .../sql/delta/DeltaTypeWideningSuite.scala | 258 +++++++++++++----- 3 files changed, 194 insertions(+), 76 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala index c5754d726d5..b3dd6b2b1cc 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta import org.apache.spark.sql.delta.actions.{Metadata, Protocol, TableFeatureProtocolUtils} +import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.types._ object TypeWidening { @@ -46,12 +47,15 @@ object TypeWidening { } /** - * Returns whether the given type change is eligible for widening. This only checks atomic types, - * it is the responsibility of the caller to recurse into structs, maps and arrays. + * Returns whether the given type change is eligible for widening. This only checks atomic types. + * It is the responsibility of the caller to recurse into structs, maps and arrays. */ - def isAtomicTypeChangeSupported(fromType: AtomicType, toType: AtomicType): Boolean = + def isTypeChangeSupported(fromType: AtomicType, toType: AtomicType): Boolean = (fromType, toType) match { case (from, to) if from == to => true + // All supported type changes below are supposed to be widening, but to be safe, reject any + // non-widening change upfront. + case (from, to) if !Cast.canUpCast(from, to) => false case (ByteType, ShortType) => true case (ByteType | ShortType, IntegerType) => true case _ => false diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala index 31eacda4435..784a5798494 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala @@ -974,7 +974,7 @@ def normalizeColumnNamesInDataType( } case (fromDataType: AtomicType, toDataType: AtomicType) if allowTypeWidening => - verify(TypeWidening.isAtomicTypeChangeSupported(fromDataType, toDataType), + verify(TypeWidening.isTypeChangeSupported(fromDataType, toDataType), s"changing data type of ${UnresolvedAttribute(columnPath).name} " + s"from $fromDataType to $toDataType") diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala index ca29a6983ce..3ab96605685 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.delta import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, DataFrame, Encoder, QueryTest, Row} import org.apache.spark.sql.errors.QueryErrorsBase import org.apache.spark.sql.execution.datasources.parquet.ParquetTest import org.apache.spark.sql.functions.lit @@ -37,6 +37,7 @@ class DeltaTypeWideningSuite with DeltaSQLCommandTest with DeltaTypeWideningTestMixin with DeltaTypeWideningAlterTableTests + with DeltaTypeWideningNestedFieldsTests with DeltaTypeWideningTableFeatureTests /** @@ -44,7 +45,10 @@ class DeltaTypeWideningSuite */ trait DeltaTypeWideningTestMixin extends SharedSparkSession { protected override def sparkConf: SparkConf = { - super.sparkConf.set(DeltaConfigs.ENABLE_TYPE_WIDENING.defaultTablePropertyKey, "true") + super.sparkConf + .set(DeltaConfigs.ENABLE_TYPE_WIDENING.defaultTablePropertyKey, "true") + // Ensure we don't silently cast test inputs to null on overflow. + .set(SQLConf.ANSI_ENABLED.key, "true") } /** Enable (or disable) type widening for the table under the given path. */ @@ -65,39 +69,54 @@ trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { /** * Represents the input of a type change test. * @param fromType The original type of the column 'value' in the test table. - * @param toType The type to use when changing the type of column 'value' - * @param initialValues The initial values to insert in column 'value after table creation, - * using type `fromType` - * @param additionalValues Additional values to insert after changing the type of the column - * 'value' to `toType`. + * @param toType The type to use when changing the type of column 'value'. */ - case class TypeEvolutionTestCase( - fromType: DataType, - toType: DataType, - initialValues: Seq[String], - additionalValues: Seq[String] = Seq.empty) { - def initialValuesDF: DataFrame = + abstract class TypeEvolutionTestCase( + val fromType: DataType, + val toType: DataType) { + /** The initial values to insert with type `fromType` in column 'value' after table creation. */ + def initialValuesDF: DataFrame + /** Additional values to insert after changing the type of the column 'value' to `toType`. */ + def additionalValuesDF: DataFrame + /** Expected content of the table after inserting the additional values. */ + def expectedResult: DataFrame + } + + /** + * Represents the input of a supported type change test. Handles converting the test values from + * scala types to a dataframe. + */ + case class SupportedTypeEvolutionTestCase[ + FromType <: DataType, ToType <: DataType, + FromVal : Encoder, ToVal: Encoder + ]( + override val fromType: FromType, + override val toType: ToType, + initialValues: Seq[FromVal], + additionalValues: Seq[ToVal] + ) extends TypeEvolutionTestCase(fromType, toType) { + override def initialValuesDF: DataFrame = initialValues.toDF("value").select($"value".cast(fromType)) - def additionalValuesDF: DataFrame = + override def additionalValuesDF: DataFrame = additionalValues.toDF("value").select($"value".cast(toType)) - def expectedResult: DataFrame = + override def expectedResult: DataFrame = initialValuesDF.union(additionalValuesDF).select($"value".cast(toType)) } // Type changes that are supported by all Parquet readers. Byte, Short, Int are all stored as // INT32 in parquet so these changes are guaranteed to be supported. private val supportedTestCases: Seq[TypeEvolutionTestCase] = Seq( - TypeEvolutionTestCase(ByteType, ShortType, - Seq("1", "2", Byte.MinValue.toString), - Seq("4", "5", Int.MaxValue.toString)), - TypeEvolutionTestCase(ByteType, IntegerType, - Seq("1", "2", Byte.MinValue.toString), - Seq("4", "5", Int.MaxValue.toString)), - TypeEvolutionTestCase(ShortType, IntegerType, - Seq("1", "2", Byte.MinValue.toString), - Seq("4", "5", Int.MaxValue.toString)) + SupportedTypeEvolutionTestCase(ByteType, ShortType, + Seq(1, -1, Byte.MinValue, Byte.MaxValue, null.asInstanceOf[Byte]), + Seq(4, -4, Short.MinValue, Short.MaxValue, null.asInstanceOf[Short])), + SupportedTypeEvolutionTestCase(ByteType, IntegerType, + Seq(1, -1, Byte.MinValue, Byte.MaxValue, null.asInstanceOf[Byte]), + Seq(4, -4, Int.MinValue, Int.MaxValue, null.asInstanceOf[Int])), + SupportedTypeEvolutionTestCase(ShortType, IntegerType, + Seq(1, -1, Short.MinValue, Short.MaxValue, null.asInstanceOf[Short]), + Seq(4, -4, Int.MinValue, Int.MaxValue, null.asInstanceOf[Int])) ) for { @@ -117,50 +136,80 @@ trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN value TYPE ${testCase.toType.sql}") withAllParquetReaders { assert(readDeltaTable(tempPath).schema("value").dataType === testCase.toType) - checkAnswer(readDeltaTable(tempPath).select("value"), - testCase.initialValuesDF.select($"value".cast(testCase.toType))) + checkAnswer(readDeltaTable(tempPath).select("value").sort("value"), + testCase.initialValuesDF.select($"value".cast(testCase.toType)).sort("value")) } writeData(testCase.additionalValuesDF) withAllParquetReaders { - checkAnswer(readDeltaTable(tempPath).select("value"), testCase.expectedResult) + checkAnswer( + readDeltaTable(tempPath).select("value").sort("value"), + testCase.expectedResult.sort("value")) } } } + /** + * Represents the input of an unsupported type change test. Handles converting the test values + * from scala types to a dataframe. Additional values to insert are always empty since the type + * change is expected to fail. + */ + case class UnsupportedTypeEvolutionTestCase[ + FromType <: DataType, ToType <: DataType, FromVal : Encoder]( + override val fromType: FromType, + override val toType: ToType, + initialValues: Seq[FromVal]) extends TypeEvolutionTestCase(fromType, toType) { + override def initialValuesDF: DataFrame = + initialValues.toDF("value").select($"value".cast(fromType)) + + override def additionalValuesDF: DataFrame = + spark.createDataFrame( + sparkContext.emptyRDD[Row], + new StructType().add(StructField("value", toType))) + + override def expectedResult: DataFrame = + initialValuesDF.select($"value".cast(toType)) + } + // Test type changes that aren't supported. - private val unsupportedNonTestCases: Seq[TypeEvolutionTestCase] = Seq( - TypeEvolutionTestCase(IntegerType, ByteType, - Seq("1", "2", Int.MinValue.toString)), - TypeEvolutionTestCase(LongType, IntegerType, - Seq("4", "5", Long.MaxValue.toString)), - TypeEvolutionTestCase(DoubleType, FloatType, - Seq("987654321.987654321", Double.NaN.toString, Double.NegativeInfinity.toString, - Double.PositiveInfinity.toString, Double.MinPositiveValue.toString, - Double.MinValue.toString, Double.MaxValue.toString)), - TypeEvolutionTestCase(IntegerType, DecimalType(6, 0), - Seq("1", "2", Int.MinValue.toString)), - TypeEvolutionTestCase(TimestampNTZType, DateType, + private val unsupportedTestCases: Seq[TypeEvolutionTestCase] = Seq( + UnsupportedTypeEvolutionTestCase(IntegerType, ByteType, + Seq(1, 2, Int.MinValue)), + UnsupportedTypeEvolutionTestCase(LongType, IntegerType, + Seq(4, 5, Long.MaxValue)), + UnsupportedTypeEvolutionTestCase(DoubleType, FloatType, + Seq(987654321.987654321d, Double.NaN, Double.NegativeInfinity, + Double.PositiveInfinity, Double.MinPositiveValue, + Double.MinValue, Double.MaxValue)), + UnsupportedTypeEvolutionTestCase(IntegerType, DecimalType(9, 0), + Seq(1, -1, Int.MinValue)), + UnsupportedTypeEvolutionTestCase(LongType, DecimalType(19, 0), + Seq(1, -1, Long.MinValue)), + UnsupportedTypeEvolutionTestCase(TimestampNTZType, DateType, Seq("2020-03-17 15:23:15", "2023-12-31 23:59:59", "0001-01-01 00:00:00")), // Reduce scale - TypeEvolutionTestCase(DecimalType(Decimal.MAX_INT_DIGITS, 2), + UnsupportedTypeEvolutionTestCase(DecimalType(Decimal.MAX_INT_DIGITS, 2), DecimalType(Decimal.MAX_INT_DIGITS, 3), - Seq("-67.89", "9" * (Decimal.MAX_INT_DIGITS - 2) + ".99")), + Seq(BigDecimal("-67.89"), BigDecimal("9" * (Decimal.MAX_INT_DIGITS - 2) + ".99"))), // Reduce precision - TypeEvolutionTestCase(DecimalType(Decimal.MAX_INT_DIGITS, 2), + UnsupportedTypeEvolutionTestCase(DecimalType(Decimal.MAX_INT_DIGITS, 2), DecimalType(Decimal.MAX_INT_DIGITS - 1, 2), - Seq("-67.89", "9" * (Decimal.MAX_LONG_DIGITS - 2) + ".99")), + Seq(BigDecimal("-67.89"), BigDecimal("9" * (Decimal.MAX_INT_DIGITS - 2) + ".99"))), // Reduce precision & scale - TypeEvolutionTestCase(DecimalType(Decimal.MAX_INT_DIGITS, 2), + UnsupportedTypeEvolutionTestCase(DecimalType(Decimal.MAX_LONG_DIGITS, 2), DecimalType(Decimal.MAX_INT_DIGITS - 1, 1), - Seq("-67.89", "9" * (Decimal.MAX_LONG_DIGITS - 2) + ".99")), + Seq(BigDecimal("-67.89"), BigDecimal("9" * (Decimal.MAX_LONG_DIGITS - 2) + ".99"))), // Increase scale more than precision - TypeEvolutionTestCase(DecimalType(Decimal.MAX_INT_DIGITS, 2), + UnsupportedTypeEvolutionTestCase(DecimalType(Decimal.MAX_INT_DIGITS, 2), DecimalType(Decimal.MAX_INT_DIGITS + 1, 4), - Seq("-67.89", "9" * (Decimal.MAX_LONG_DIGITS - 2) + ".99")) + Seq(BigDecimal("-67.89"), BigDecimal("9" * (Decimal.MAX_INT_DIGITS - 2) + ".99"))), + // Smaller scale and larger precision. + UnsupportedTypeEvolutionTestCase(DecimalType(Decimal.MAX_LONG_DIGITS, 2), + DecimalType(Decimal.MAX_INT_DIGITS + 3, 1), + Seq(BigDecimal("-67.89"), BigDecimal("9" * (Decimal.MAX_LONG_DIGITS - 2) + ".99"))) ) for { - testCase <- unsupportedNonTestCases + testCase <- unsupportedTestCases partitioned <- BOOLEAN_DOMAIN } { test(s"unsupported type changes ${testCase.fromType.sql} -> ${testCase.toType.sql}, " + @@ -196,7 +245,29 @@ trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { } } - test("type widening in nested fields") { + test("type widening using ALTER TABLE REPLACE COLUMNS") { + append(Seq(1, 2).toDF("value").select($"value".cast(ShortType))) + assert(readDeltaTable(tempPath).schema === new StructType().add("value", ShortType)) + sql(s"ALTER TABLE delta.`$tempPath` REPLACE COLUMNS (value INT)") + assert(readDeltaTable(tempPath).schema === new StructType().add("value", IntegerType)) + checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2))) + append(Seq(3, 4).toDF("value")) + checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3), Row(4))) + } + +} + +/** + * Tests covering type changes on nested fields in structs, maps and arrays. + */ +trait DeltaTypeWideningNestedFieldsTests { + self: QueryTest with ParquetTest with DeltaDMLTestUtils with DeltaTypeWideningTestMixin + with SharedSparkSession => + + import testImplicits._ + + /** Create a table with a struct, map and array for each test. */ + protected def createNestedTable(): Unit = { sql(s"CREATE TABLE delta.`$tempPath` " + "(s struct, m map, a array) USING DELTA") append(Seq((1, 2, 3, 4)) @@ -210,7 +281,56 @@ trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { .add("s", new StructType().add("a", ByteType)) .add("m", MapType(ByteType, ShortType)) .add("a", ArrayType(ShortType))) + } + + test("unsupported ALTER TABLE CHANGE COLUMN on non-leaf fields") { + createNestedTable() + // Running ALTER TABLE CHANGE COLUMN on non-leaf fields is invalid. + var alterTableSql = s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN s TYPE struct" + checkError( + exception = intercept[AnalysisException] { sql(alterTableSql) }, + errorClass = "CANNOT_UPDATE_FIELD.STRUCT_TYPE", + parameters = Map( + "table" -> s"`spark_catalog`.`delta`.`$tempPath`", + "fieldName" -> "`s`" + ), + context = ExpectedContext( + fragment = alterTableSql, + start = 0, + stop = alterTableSql.length - 1) + ) + + alterTableSql = s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN m TYPE map" + checkError( + exception = intercept[AnalysisException] { sql(alterTableSql) }, + errorClass = "CANNOT_UPDATE_FIELD.MAP_TYPE", + parameters = Map( + "table" -> s"`spark_catalog`.`delta`.`$tempPath`", + "fieldName" -> "`m`" + ), + context = ExpectedContext( + fragment = alterTableSql, + start = 0, + stop = alterTableSql.length - 1) + ) + alterTableSql = s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE array" + checkError( + exception = intercept[AnalysisException] { sql(alterTableSql) }, + errorClass = "CANNOT_UPDATE_FIELD.ARRAY_TYPE", + parameters = Map( + "table" -> s"`spark_catalog`.`delta`.`$tempPath`", + "fieldName" -> "`a`" + ), + context = ExpectedContext( + fragment = alterTableSql, + start = 0, + stop = alterTableSql.length - 1) + ) + } + + test("type widening with ALTER TABLE on nested fields") { + createNestedTable() sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN s.a TYPE short") sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN m.key TYPE int") sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN m.value TYPE int") @@ -232,31 +352,25 @@ trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { .selectExpr("named_struct('a', cast(a as short)) as s", "map(b, c) as m", "array(d) as a")) } - test("type widening using ALTER TABLE REPLACE COLUMNS") { - append(Seq(1, 2).toDF("value").select($"value".cast(ShortType))) - assert(readDeltaTable(tempPath).schema === new StructType().add("value", ShortType)) - sql(s"ALTER TABLE delta.`$tempPath` REPLACE COLUMNS (value INT)") - assert(readDeltaTable(tempPath).schema === new StructType().add("value", IntegerType)) - checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2))) - append(Seq(3, 4).toDF("value")) - checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3), Row(4))) - } + test("type widening using ALTER TABLE REPLACE COLUMNS on nested fields") { + createNestedTable() + sql(s"ALTER TABLE delta.`$tempPath` REPLACE COLUMNS " + + "(s struct, m map, a array)") + assert(readDeltaTable(tempPath).schema === new StructType() + .add("s", new StructType().add("a", ShortType)) + .add("m", MapType(IntegerType, IntegerType)) + .add("a", ArrayType(IntegerType))) - test("row group skipping Short -> Int") { - withSQLConf( - SQLConf.FILES_MAX_PARTITION_BYTES.key -> 1024.toString) { - append((0 until 1024).toDF("value").select($"value".cast(ShortType))) - sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN value TYPE INT") - append((Short.MinValue + 1 until Short.MaxValue + 2048).toDF("value")) - withAllParquetReaders { - checkAnswer( - sql(s"SELECT * FROM delta.`$tempPath` " + - s"WHERE value >= CAST(${Short.MaxValue} AS INT) + 1000"), - (Short.MaxValue + 1000 until Short.MaxValue + 2048).map(Row(_))) - } - } - } + append(Seq((5, 6, 7, 8)) + .toDF("a", "b", "c", "d") + .selectExpr("named_struct('a', cast(a as short)) as s", "map(b, c) as m", "array(d) as a")) + checkAnswer( + readDeltaTable(tempPath), + Seq((1, 2, 3, 4), (5, 6, 7, 8)) + .toDF("a", "b", "c", "d") + .selectExpr("named_struct('a', cast(a as short)) as s", "map(b, c) as m", "array(d) as a")) + } } trait DeltaTypeWideningTableFeatureTests { From d462e327921eeef82f76f07c76367083a60d33d0 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Tue, 27 Feb 2024 15:07:49 +0100 Subject: [PATCH 5/5] formatting: whitespaces --- .../org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala index 3ab96605685..b47158fc3a3 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala @@ -88,7 +88,7 @@ trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { */ case class SupportedTypeEvolutionTestCase[ FromType <: DataType, ToType <: DataType, - FromVal : Encoder, ToVal: Encoder + FromVal: Encoder, ToVal: Encoder ]( override val fromType: FromType, override val toType: ToType,