diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrateProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrateProcedure.java index a1a7c91d55a7..6c57853c9e4b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrateProcedure.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrateProcedure.java @@ -97,6 +97,8 @@ import static io.trino.plugin.iceberg.TypeConverter.toIcebergTypeForNewColumn; import static io.trino.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static io.trino.spi.type.SmallintType.SMALLINT; +import static io.trino.spi.type.TinyintType.TINYINT; import static io.trino.spi.type.VarcharType.VARCHAR; import static java.lang.Boolean.parseBoolean; import static java.lang.invoke.MethodHandles.lookup; @@ -312,6 +314,9 @@ private static org.apache.iceberg.types.Type toIcebergType(Type type, AtomicInte // TODO https://github.com/trinodb/trino/issues/17583 Add support for these complex types throw new TrinoException(NOT_SUPPORTED, "Migrating %s type is not supported".formatted(type)); } + if (type.equals(TINYINT) || type.equals(SMALLINT)) { + return Types.IntegerType.get(); + } return toIcebergTypeForNewColumn(type, nextFieldId); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMigrateProcedure.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMigrateProcedure.java index 3437e4ea49fb..4fcf249e78da 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMigrateProcedure.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMigrateProcedure.java @@ -27,7 +27,9 @@ import java.util.stream.Stream; import static com.google.common.collect.MoreCollectors.onlyElement; +import static io.trino.plugin.iceberg.IcebergFileFormat.AVRO; import static io.trino.testing.TestingNames.randomNameSuffix; +import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; @@ -76,6 +78,60 @@ public void testMigrateTable(IcebergFileFormat fileFormat) assertUpdate("DROP TABLE " + tableName); } + @Test(dataProvider = "fileFormats") + public void testMigrateTableWithTinyintType(IcebergFileFormat fileFormat) + { + String tableName = "test_migrate_tinyint" + randomNameSuffix(); + String hiveTableName = "hive.tpch." + tableName; + String icebergTableName = "iceberg.tpch." + tableName; + + String createTable = "CREATE TABLE " + hiveTableName + "(col TINYINT) WITH (format = '" + fileFormat + "')"; + if (fileFormat == AVRO) { + assertQueryFails(createTable, "Column 'col' is tinyint, which is not supported by Avro. Use integer instead."); + return; + } + + assertUpdate(createTable); + assertUpdate("INSERT INTO " + hiveTableName + " VALUES NULL, -128, 127", 3); + + assertUpdate("CALL iceberg.system.migrate('tpch', '" + tableName + "')"); + + assertThat(getColumnType(tableName, "col")).isEqualTo("integer"); + assertQuery("SELECT * FROM " + icebergTableName, "VALUES (NULL), (-128), (127)"); + + assertUpdate("INSERT INTO " + icebergTableName + " VALUES -2147483648, 2147483647", 2); + assertQuery("SELECT * FROM " + icebergTableName, "VALUES (NULL), (-2147483648), (-128), (127), (2147483647)"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test(dataProvider = "fileFormats") + public void testMigrateTableWithSmallintType(IcebergFileFormat fileFormat) + { + String tableName = "test_migrate_smallint" + randomNameSuffix(); + String hiveTableName = "hive.tpch." + tableName; + String icebergTableName = "iceberg.tpch." + tableName; + + String createTable = "CREATE TABLE " + hiveTableName + "(col SMALLINT) WITH (format = '" + fileFormat + "')"; + if (fileFormat == AVRO) { + assertQueryFails(createTable, "Column 'col' is smallint, which is not supported by Avro. Use integer instead."); + return; + } + + assertUpdate(createTable); + assertUpdate("INSERT INTO " + hiveTableName + " VALUES NULL, -32768, 32767", 3); + + assertUpdate("CALL iceberg.system.migrate('tpch', '" + tableName + "')"); + + assertThat(getColumnType(tableName, "col")).isEqualTo("integer"); + assertQuery("SELECT * FROM " + icebergTableName, "VALUES (NULL), (-32768), (32767)"); + + assertUpdate("INSERT INTO " + icebergTableName + " VALUES -2147483648, 2147483647", 2); + assertQuery("SELECT * FROM " + icebergTableName, "VALUES (NULL), (-2147483648), (-32768), (32767), (2147483647)"); + + assertUpdate("DROP TABLE " + tableName); + } + @DataProvider public static Object[][] fileFormats() { @@ -335,4 +391,11 @@ public void testMigrateEmptyTable() assertUpdate("DROP TABLE " + tableName); } + + private String getColumnType(String tableName, String columnName) + { + return (String) computeScalar(format("SELECT data_type FROM information_schema.columns WHERE table_schema = CURRENT_SCHEMA AND table_name = '%s' AND column_name = '%s'", + tableName, + columnName)); + } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergProcedureCalls.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergProcedureCalls.java index dbb5101964c5..f5b5bb77f834 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergProcedureCalls.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergProcedureCalls.java @@ -13,9 +13,13 @@ */ package io.trino.tests.product.iceberg; +import com.google.common.collect.ImmutableList; import io.trino.tempto.ProductTest; +import io.trino.tempto.assertions.QueryAssert.Row; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.util.List; import java.util.function.Consumer; import static io.trino.tempto.assertions.QueryAssert.Row.row; @@ -55,6 +59,58 @@ public void testMigrateHiveTable() onTrino().executeQuery("DROP TABLE IF EXISTS " + icebergTableName); } + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "fileFormats") + public void testMigrateHiveTableWithTinyintType(String fileFormat) + { + String tableName = "test_migrate_tinyint" + randomNameSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + String sparkTableName = "iceberg_test.default." + tableName; + + String createTable = "CREATE TABLE " + hiveTableName + "(col TINYINT) WITH (format = '" + fileFormat + "')"; + if (fileFormat.equals("AVRO")) { + assertQueryFailure(() -> onTrino().executeQuery(createTable)) + .hasMessageContaining("Column 'col' is tinyint, which is not supported by Avro. Use integer instead."); + return; + } + onTrino().executeQuery(createTable); + onTrino().executeQuery("INSERT INTO " + hiveTableName + " VALUES -128, 127"); + + onTrino().executeQuery("CALL iceberg.system.migrate('default', '" + tableName + "')"); + + List expected = ImmutableList.of(row(-128), row(127)); + assertThat(onTrino().executeQuery("SELECT * FROM " + icebergTableName)).containsOnly(expected); + assertThat(onSpark().executeQuery("SELECT * FROM " + sparkTableName)).containsOnly(expected); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "fileFormats") + public void testMigrateHiveTableWithSmallintType(String fileFormat) + { + String tableName = "test_migrate_smallint" + randomNameSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + String sparkTableName = "iceberg_test.default." + tableName; + + String createTable = "CREATE TABLE " + hiveTableName + "(col SMALLINT) WITH (format = '" + fileFormat + "')"; + if (fileFormat.equals("AVRO")) { + assertQueryFailure(() -> onTrino().executeQuery(createTable)) + .hasMessageContaining("Column 'col' is smallint, which is not supported by Avro. Use integer instead."); + return; + } + onTrino().executeQuery(createTable); + onTrino().executeQuery("INSERT INTO " + hiveTableName + " VALUES -32768, 32767"); + + onTrino().executeQuery("CALL iceberg.system.migrate('default', '" + tableName + "')"); + + List expected = ImmutableList.of(row(-32768), row(32767)); + assertThat(onTrino().executeQuery("SELECT * FROM " + icebergTableName)).containsOnly(expected); + assertThat(onSpark().executeQuery("SELECT * FROM " + sparkTableName)).containsOnly(expected); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) public void testMigrateHivePartitionedTable() { @@ -228,4 +284,10 @@ private long getSecondOldestTableSnapshot(String tableName) format("SELECT snapshot_id FROM iceberg.default.\"%s$snapshots\" WHERE parent_id IS NOT NULL ORDER BY committed_at FETCH FIRST 1 ROW WITH TIES", tableName)) .getOnlyValue(); } + + @DataProvider + public static Object[][] fileFormats() + { + return new Object[][] {{"ORC"}, {"PARQUET"}, {"AVRO"}}; + } }