Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support timestamp type in Iceberg migrate procedure #17391

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,14 @@ private static org.apache.iceberg.types.Type toIcebergTypeInternal(Type type, Op
if (type.equals(UUID)) {
return Types.UUIDType.get();
}
if (type instanceof RowType) {
return fromRow((RowType) type, columnIdentity, nextFieldId);
if (type instanceof RowType rowType) {
return fromRow(rowType, columnIdentity, nextFieldId);
}
if (type instanceof ArrayType) {
return fromArray((ArrayType) type, columnIdentity, nextFieldId);
if (type instanceof ArrayType arrayType) {
return fromArray(arrayType, columnIdentity, nextFieldId);
}
if (type instanceof MapType) {
return fromMap((MapType) type, columnIdentity, nextFieldId);
if (type instanceof MapType mapType) {
return fromMap(mapType, columnIdentity, nextFieldId);
}
if (type instanceof TimeType) {
throw new TrinoException(NOT_SUPPORTED, format("Time precision (%s) not supported for Iceberg. Use \"time(6)\" instead.", ((TimeType) type).getPrecision()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.procedure.Procedure;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.SmallintType;
import io.trino.spi.type.TimestampType;
marcinsbd marked this conversation as resolved.
Show resolved Hide resolved
import io.trino.spi.type.TinyintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.AppendFiles;
Expand All @@ -60,9 +66,11 @@
import java.lang.invoke.MethodHandle;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import static com.google.common.collect.ImmutableList.toImmutableList;
Expand All @@ -71,6 +79,7 @@
import static io.trino.metastore.PrincipalPrivileges.NO_PRIVILEGES;
import static io.trino.plugin.hive.HiveMetadata.TRANSACTIONAL;
import static io.trino.plugin.hive.HiveMetadata.extractHiveStorageFormat;
import static io.trino.plugin.hive.HiveTimestampPrecision.MILLISECONDS;
import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet;
import static io.trino.plugin.hive.util.HiveTypeUtil.getTypeSignature;
import static io.trino.plugin.hive.util.HiveUtil.isDeltaLakeTable;
Expand All @@ -81,10 +90,9 @@
import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields;
import static io.trino.plugin.iceberg.TypeConverter.toIcebergTypeForNewColumn;
import static io.trino.plugin.iceberg.procedure.MigrationUtils.buildDataFiles;
import static io.trino.spi.StandardErrorCode.DUPLICATE_COLUMN_NAME;
import static io.trino.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.Boolean.parseBoolean;
import static java.lang.invoke.MethodHandles.lookup;
Expand Down Expand Up @@ -191,9 +199,9 @@ public void doMigrate(ConnectorSession session, String schemaName, String tableN
throw new TrinoException(NOT_SUPPORTED, "The table is already an Iceberg table");
}

Schema schema = toIcebergSchema(concat(hiveTable.getDataColumns().stream(), hiveTable.getPartitionColumns().stream()).toList());
NameMapping nameMapping = MappingUtil.create(schema);
HiveStorageFormat storageFormat = extractHiveStorageFormat(hiveTable.getStorage().getStorageFormat());
Schema schema = toIcebergSchema(concat(hiveTable.getDataColumns().stream(), hiveTable.getPartitionColumns().stream()).toList(), toIcebergFileFormat(storageFormat));
NameMapping nameMapping = MappingUtil.create(schema);
String location = hiveTable.getStorage().getLocation();

Map<String, String> properties = icebergTableProperties(location, hiveTable.getParameters(), nameMapping, toIcebergFileFormat(storageFormat));
Expand Down Expand Up @@ -271,13 +279,13 @@ private Map<String, String> icebergTableProperties(String location, Map<String,
return ImmutableMap.copyOf(icebergTableProperties);
}

private Schema toIcebergSchema(List<Column> columns)
private Schema toIcebergSchema(List<Column> columns, IcebergFileFormat storageFormat)
{
AtomicInteger nextFieldId = new AtomicInteger(1);
List<Types.NestedField> icebergColumns = new ArrayList<>();
for (Column column : columns) {
int index = icebergColumns.size();
org.apache.iceberg.types.Type type = toIcebergType(typeManager.getType(getTypeSignature(column.getType())), nextFieldId);
org.apache.iceberg.types.Type type = toIcebergType(typeManager.getType(getTypeSignature(column.getType(), MILLISECONDS)), nextFieldId, storageFormat);
raunaqmorarka marked this conversation as resolved.
Show resolved Hide resolved
Types.NestedField field = Types.NestedField.of(index, false, column.getName(), type, column.getComment().orElse(null));
icebergColumns.add(field);
}
Expand All @@ -288,12 +296,55 @@ private Schema toIcebergSchema(List<Column> columns)
return new Schema(icebergSchema.asStructType().fields());
}

private static org.apache.iceberg.types.Type toIcebergType(Type type, AtomicInteger nextFieldId)
private static org.apache.iceberg.types.Type toIcebergType(Type type, AtomicInteger nextFieldId, IcebergFileFormat storageFormat)
{
if (type.equals(TINYINT) || type.equals(SMALLINT)) {
return Types.IntegerType.get();
return switch (type) {
case TinyintType _, SmallintType _ -> Types.IntegerType.get();
case TimestampType _ -> switch (storageFormat) {
case ORC -> Types.TimestampType.withoutZone();
case PARQUET -> Types.TimestampType.withZone();
case AVRO -> // TODO https://github.com/trinodb/trino/issues/20481
throw new TrinoException(NOT_SUPPORTED, "Migrating timestamp type with Avro format is not supported.");
};
case RowType rowType -> fromRow(rowType, nextFieldId, storageFormat);
case ArrayType arrayType -> fromArray(arrayType, nextFieldId, storageFormat);
case MapType mapType -> fromMap(mapType, nextFieldId, storageFormat);
default -> toIcebergTypeForNewColumn(type, nextFieldId);
};
}

private static org.apache.iceberg.types.Type fromRow(RowType type, AtomicInteger nextFieldId, IcebergFileFormat storageFormat)
{
Set<String> fieldNames = new HashSet<>();
List<Types.NestedField> fields = new ArrayList<>();
for (int i = 0; i < type.getFields().size(); i++) {
int id = nextFieldId.getAndIncrement();
RowType.Field field = type.getFields().get(i);
String name = field.getName().orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Row type field does not have a name: " + type.getDisplayName()));
if (!fieldNames.add(name.toLowerCase(ENGLISH))) {
throw new TrinoException(DUPLICATE_COLUMN_NAME, "Field name '%s' specified more than once".formatted(name.toLowerCase(ENGLISH)));
}
org.apache.iceberg.types.Type icebergTypeInternal = toIcebergType(field.getType(), nextFieldId, storageFormat);
fields.add(Types.NestedField.optional(id, name, icebergTypeInternal));
}
return toIcebergTypeForNewColumn(type, nextFieldId);
return Types.StructType.of(fields);
}

private static org.apache.iceberg.types.Type fromArray(ArrayType type, AtomicInteger nextFieldId, IcebergFileFormat storageFormat)
{
int id = nextFieldId.getAndIncrement();
return Types.ListType.ofOptional(id, toIcebergType(type.getElementType(), nextFieldId, storageFormat));
}

private static org.apache.iceberg.types.Type fromMap(MapType type, AtomicInteger nextFieldId, IcebergFileFormat storageFormat)
{
int keyId = nextFieldId.getAndIncrement();
int valueId = nextFieldId.getAndIncrement();
return Types.MapType.ofOptional(
keyId,
valueId,
toIcebergType(type.getKeyType(), nextFieldId, storageFormat),
toIcebergType(type.getValueType(), nextFieldId, storageFormat));
}

public Map<String, Optional<Partition>> listAllPartitions(HiveMetastore metastore, io.trino.metastore.Table table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,38 @@ public void testMigrateTableWithComplexType(IcebergFileFormat fileFormat)
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testMigrateTimestampHiveTableInComplexType()
{
String inputValue = "2021-01-01 10:11:12.123"; // Tests are run with America/Bahia_Banderas timezone
String expectedValue = "2021-01-01 16:11:12.123000 UTC";

String tableName = "test_migrate_timestamp_complex_type_" + randomNameSuffix();
String hiveTableName = "hive.tpch." + tableName;
String icebergTableName = "iceberg.tpch." + tableName;

assertUpdate("CREATE TABLE " + hiveTableName + " WITH (format='PARQUET') AS " +
"SELECT CAST(row(timestamp '" + inputValue + "') AS row(t timestamp(3))) r," +
"array[timestamp '" + inputValue + "'] a, " +
"CAST(map(array[1], array[timestamp '" + inputValue + "']) AS map(int, timestamp(3))) m", 1);

assertThat(query("SELECT a, m, r.t FROM " + hiveTableName))
.matches("VALUES (" +
" ARRAY[timestamp '" + inputValue + "'], " +
" CAST(map(ARRAY[1], ARRAY[timestamp '" + inputValue + "']) AS map(int, timestamp(3)))," +
" timestamp '" + inputValue + "')");

assertUpdate("CALL iceberg.system.migrate('tpch', '" + tableName + "')");

assertThat(query("SELECT a, m, r.t FROM " + icebergTableName))
.matches("VALUES (" +
" ARRAY[timestamp '" + expectedValue + "'], " +
" CAST(map(ARRAY[1], ARRAY[timestamp '" + expectedValue + "']) AS map(int, timestamp(6) with time zone))," +
" timestamp '" + expectedValue + "')");

assertUpdate("DROP TABLE " + tableName);
}

@ParameterizedTest
@MethodSource("fileFormats")
public void testMigrateTableSchemaEvolution(IcebergFileFormat fileFormat)
Expand Down Expand Up @@ -410,20 +442,15 @@ private String getColumnComment(String tableName, String columnName)
}

@Test
public void testMigrateUnsupportedColumnType()
public void testMigrateTimestampMillisTypeWithAvro()
{
String tableName = "test_migrate_unsupported_column_type_" + randomNameSuffix();
String tableName = "test_migrate_timestamp_millis_avro" + randomNameSuffix();
String hiveTableName = "hive.tpch." + tableName;
String icebergTableName = "iceberg.tpch." + tableName;

assertUpdate("CREATE TABLE " + hiveTableName + " AS SELECT timestamp '2021-01-01 00:00:00.000' x", 1);

assertQueryFails(
"CALL iceberg.system.migrate('tpch', '" + tableName + "')",
"\\QTimestamp precision (3) not supported for Iceberg. Use \"timestamp(6)\" instead.");

assertUpdate("CREATE TABLE " + hiveTableName + " WITH (format='AVRO') AS SELECT timestamp '2021-01-01 00:00:00.000' x", 1);
assertQuery("SELECT * FROM " + hiveTableName, "VALUES timestamp '2021-01-01 00:00:00.000'");
assertQueryFails("SELECT * FROM " + icebergTableName, "Not an Iceberg table: .*");

assertQueryFails("CALL iceberg.system.migrate('tpch', '" + tableName + "')", "Migrating timestamp type with Avro format is not supported.");

assertUpdate("DROP TABLE " + hiveTableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,82 @@ public void testMigrateHiveTable()
onTrino().executeQuery("DROP TABLE IF EXISTS " + icebergTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testMigrateTimestampHiveTableWithOrc()
{
testMigrateTimestampHiveTable(
"MILLISECONDS",
"2021-01-01 10:11:12.123",
"2021-01-01 10:11:12.123000",
"2021-01-01 10:11:12.123",
"ORC");

testMigrateTimestampHiveTable(
"MICROSECONDS",
"2021-01-01 10:11:12.123456",
"2021-01-01 10:11:12.123456",
"2021-01-01 10:11:12.123456",
"ORC");

testMigrateTimestampHiveTable(
"NANOSECONDS",
"2021-01-01 10:11:12.123456789",
"2021-01-01 10:11:12.123457",
"2021-01-01 10:11:12.123456",
"ORC");
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testMigrateTimestampHiveTableWithParquet()
{
testMigrateTimestampHiveTable(
"MILLISECONDS",
"2021-01-01 10:11:12.123",
"2021-01-01 10:11:12.123000 UTC",
"2021-01-01 10:11:12.123",
"PARQUET");

testMigrateTimestampHiveTable(
"MICROSECONDS",
"2021-01-01 10:11:12.123456",
"2021-01-01 10:11:12.123456 UTC",
"2021-01-01 10:11:12.123456",
"PARQUET");

testMigrateTimestampHiveTable(
"NANOSECONDS",
"2021-01-01 10:11:12.123456789",
"2021-01-01 10:11:12.123457 UTC",
"2021-01-01 10:11:12.123456",
"PARQUET");
}

private void testMigrateTimestampHiveTable(String precisionName, String inputValue, String expectedValueInTrino, String expectedValueInSpark, String format)
{
String tableName = "test_migrate_timestamp_" + randomNameSuffix();
String hiveTableName = "hive.default." + tableName;
String icebergTableName = "iceberg.default." + tableName;
String sparkTableName = "iceberg_test.default." + tableName;

onTrino().executeQuery("DROP TABLE IF EXISTS default." + tableName);
onTrino().executeQuery("SET SESSION hive.timestamp_precision = '" + precisionName + "'");
onTrino().executeQuery("CREATE TABLE " + hiveTableName + " WITH (format='" + format + "') AS SELECT TIMESTAMP '" + inputValue + "' x ");

assertThat(onHive().executeQuery("SELECT CAST(x AS string) FROM default." + tableName))
.containsOnly(row(inputValue));
assertThat(onTrino().executeQuery("SELECT CAST(x AS varchar) FROM " + hiveTableName))
.containsOnly(row(inputValue));

onTrino().executeQuery("CALL iceberg.system.migrate('default', '" + tableName + "')");

assertThat(onTrino().executeQuery("SELECT CAST(x AS varchar) FROM " + icebergTableName))
.containsOnly(row(expectedValueInTrino));
assertThat(onSpark().executeQuery("SELECT CAST(x AS string) FROM " + sparkTableName))
.containsOnly(row(expectedValueInSpark));

onTrino().executeQuery("DROP TABLE " + icebergTableName);
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "fileFormats")
public void testMigrateHiveTableWithTinyintType(String fileFormat)
{
Expand Down