Skip to content

Commit

Permalink
[3.3][Kernel] Disable setting columnMapping/icebergCompatV2 for 3.3.0 (
Browse files Browse the repository at this point in the history
…#3980)

There are a couple of pieces missing from fully supporting these
features
- Data path PR is not yet merged (#3475), waiting on the
expression fixup which is going to affect how column mapping is done.
- Auto enabling of the protocol version based on the current table
version and column mapping/iceberg compat v2 enabled.

Disable these in 3.3 to avoid writing a partial column mapping data.
  • Loading branch information
vkorukanti authored Dec 19, 2024
1 parent 2c210de commit 6177288
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,20 @@ public static Map<String, String> validateProperties(Map<String, String> configu
String key = kv.getKey().toLowerCase(Locale.ROOT);
String value = kv.getValue();
if (key.startsWith("delta.") && VALID_PROPERTIES.containsKey(key)) {
// Disable `columnMapping` and `icebergCompatV2` for Delta 3.3 release
// There are a couple of pieces missing from fully supporting these features
// 1. Data path PR is not yet merged (delta-io/delta#3475), waiting on the
// expression fixup which is going to affect how column mapping is done.
// 2. Auto enabling of the protocol version based on the current table version and
// column mapping/iceberg compat v2 enabled.
if ((key.equalsIgnoreCase(COLUMN_MAPPING_MODE.getKey())
|| key.equalsIgnoreCase(ICEBERG_COMPAT_V2_ENABLED.getKey()))
&&
// Ignore the check if tests are running
!Boolean.getBoolean("ENABLE_COLUMN_MAPPING_TESTS")) {
throw DeltaErrors.unknownConfigurationException(kv.getKey());
}

TableConfig<?> tableConfig = VALID_PROPERTIES.get(key);
if (tableConfig.editable) {
tableConfig.validate(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class TableFeatures {
{
add("appendOnly");
add("inCommitTimestamp");
add("columnMapping");
// add("columnMapping"); comment for Delta 3.3
add("typeWidening-preview");
add("typeWidening");
add(DOMAIN_METADATA_FEATURE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ class TableConfigSuite extends AnyFunSuite {
TableConfig.IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP.getKey -> "1",
TableConfig.COORDINATED_COMMITS_COORDINATOR_NAME.getKey -> "{in-memory}",
TableConfig.COORDINATED_COMMITS_COORDINATOR_CONF.getKey -> "{\"1\": \"1\"}",
TableConfig.COORDINATED_COMMITS_TABLE_CONF.getKey -> "{\"1\": \"1\"}",
TableConfig.COLUMN_MAPPING_MODE.getKey -> "name",
TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true").asJava)
TableConfig.COORDINATED_COMMITS_TABLE_CONF.getKey -> "{\"1\": \"1\"}"
// TableConfig.COLUMN_MAPPING_MODE.getKey -> "name",
// TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true"
).asJava)
}

test("check TableConfig.MAX_COLUMN_ID.editable is false") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class TableFeaturesSuite extends AnyFunSuite {
checkSupported(createTestProtocol(minWriterVersion = 7))
}

Seq("appendOnly", "inCommitTimestamp", "columnMapping", "typeWidening-preview", "typeWidening",
Seq("appendOnly", "inCommitTimestamp", "typeWidening-preview", "typeWidening",
"domainMetadata")
.foreach { supportedWriterFeature =>
test(s"validateWriteSupported: protocol 7 with $supportedWriterFeature") {
Expand All @@ -78,7 +78,7 @@ class TableFeaturesSuite extends AnyFunSuite {

Seq("invariants", "checkConstraints", "generatedColumns", "allowColumnDefaults", "changeDataFeed",
"identityColumns", "deletionVectors", "rowTracking", "timestampNtz",
"v2Checkpoint", "icebergCompatV1", "icebergCompatV2", "clustering",
"v2Checkpoint", "icebergCompatV1", "icebergCompatV2", "clustering", "columnMapping",
"vacuumProtocolCheck").foreach { unsupportedWriterFeature =>
test(s"validateWriteSupported: protocol 7 with $unsupportedWriterFeature") {
checkUnsupported(createTestProtocol(minWriterVersion = 7, unsupportedWriterFeature))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
txnBuilder.build(engine)
}

test("create table with unsupported column mapping mode") {
withColumnMappingEnabledInTests("create table with unsupported column mapping mode") {
withTempDirAndEngine { (tablePath, engine) =>
val ex = intercept[InvalidConfigurationValueException] {
createTxn(engine, tablePath, isNewTable = true, testSchema, partCols = Seq.empty,
Expand All @@ -1017,7 +1017,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("create table with column mapping mode = none") {
withColumnMappingEnabledInTests("create table with column mapping mode = none") {
withTempDirAndEngine { (tablePath, engine) =>
createTxn(engine, tablePath, isNewTable = true, testSchema, partCols = Seq.empty,
tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "none"))
Expand All @@ -1028,7 +1028,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("cannot update table with unsupported column mapping mode") {
withColumnMappingEnabledInTests("cannot update table with unsupported column mapping mode") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
createTxn(engine, tablePath, isNewTable = true, testSchema, Seq.empty)
Expand All @@ -1046,7 +1046,8 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("cannot update table with unsupported column mapping mode change") {
withColumnMappingEnabledInTests(
"cannot update table with unsupported column mapping mode change") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
createTxn(engine, tablePath, isNewTable = true, testSchema, partCols = Seq.empty,
Expand All @@ -1065,7 +1066,8 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("cannot update column mapping mode from id to name on existing table") {
withColumnMappingEnabledInTests(
"cannot update column mapping mode from id to name on existing table") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
val schema = new StructType()
Expand Down Expand Up @@ -1093,7 +1095,8 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("cannot update column mapping mode from name to id on existing table") {
withColumnMappingEnabledInTests(
"cannot update column mapping mode from name to id on existing table") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
val schema = new StructType()
Expand Down Expand Up @@ -1121,7 +1124,8 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("cannot update column mapping mode from none to id on existing table") {
withColumnMappingEnabledInTests(
"cannot update column mapping mode from none to id on existing table") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
val schema = new StructType()
Expand Down Expand Up @@ -1156,7 +1160,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
// TODO
}

test("new table with column mapping mode = name") {
withColumnMappingEnabledInTests("new table with column mapping mode = name") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
val schema = new StructType()
Expand All @@ -1173,7 +1177,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("new table with column mapping mode = id") {
withColumnMappingEnabledInTests("new table with column mapping mode = id") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
val schema = new StructType()
Expand All @@ -1190,7 +1194,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("can update existing table to column mapping mode = name") {
withColumnMappingEnabledInTests("can update existing table to column mapping mode = name") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
val schema = new StructType()
Expand All @@ -1216,7 +1220,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("new table with column mapping mode = id and nested schema") {
withColumnMappingEnabledInTests("new table with column mapping mode = id and nested schema") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
val schema = new StructType()
Expand Down Expand Up @@ -1256,4 +1260,52 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
assert(meta.get(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY) == expPhyName)
}
}

test("Delta 3.3 only - ensure column mapping is disabled in create") {
Seq(
(TableConfig.COLUMN_MAPPING_MODE.getKey, "id"),
(TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey, "true")
).foreach { case (key, value) =>
withTempDirAndEngine { (tablePath, engine) =>
val ex = intercept[UnknownConfigurationException] {
createTxn(engine, tablePath, isNewTable = true, testSchema, partCols = Seq.empty,
tableProperties = Map(key -> value))
.commit(engine, emptyIterable())
}
assert(ex.getMessage.contains("Unknown configuration was specified:"))
}
}
}

test("Delta 3.3 only - ensure not writable to an existing column mapping enabled table") {
// Use Spark to create a column mapping enabled table
withTempDirAndEngine { (tablePath, engine) =>
spark.sql(
"CREATE TABLE delta.`" + tablePath + "` USING delta" +
" TBLPROPERTIES ('delta.columnMapping.mode' = 'name') AS VALUES(1, 'a')")

val table = Table.forPath(engine, tablePath)
val ex = intercept[KernelException] {
table.createTransactionBuilder(engine, testEngineInfo, Operation.WRITE)
.build(engine)
.commit(engine, emptyIterable())
}
assert(ex.getMessage.contains("requires writer table feature \"columnMapping\" which is " +
"unsupported by this version of Delta Kernel."))
}

}

// helper method to run columnmapping tests
def withColumnMappingEnabledInTests(testName: String)(fun: => Unit): Unit = {
test(testName) {
try {
// this property is needed to enable column mapping
System.setProperty("ENABLE_COLUMN_MAPPING_TESTS", "true")
fun
} finally {
System.clearProperty("ENABLE_COLUMN_MAPPING_TESTS")
}
}
}
}

0 comments on commit 6177288

Please sign in to comment.