diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 87ea78f53c3c..fb4cae303f23 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -72,6 +72,7 @@ import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; import org.apache.iceberg.Transaction; import org.apache.iceberg.io.CloseableIterable; @@ -181,6 +182,7 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa } Optional snapshotId = getSnapshotId(table, name.getSnapshotId()); + String nameMappingJson = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); return new IcebergTableHandle( tableName.getSchemaName(), name.getTableName(), @@ -188,7 +190,8 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa snapshotId, TupleDomain.all(), TupleDomain.all(), - ImmutableSet.of()); + ImmutableSet.of(), + Optional.ofNullable(nameMappingJson)); } @Override @@ -732,7 +735,8 @@ public Optional> applyFilter(C table.getSnapshotId(), newUnenforcedConstraint, newEnforcedConstraint, - table.getProjectedColumns()), + table.getProjectedColumns(), + table.getNameMappingJson()), remainingConstraint.transformKeys(ColumnHandle.class::cast), false)); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index c043afcd5ceb..42c80218a07e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -15,10 +15,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; import com.google.common.graph.Traverser; import io.trino.memory.context.AggregatedMemoryContext; -import io.trino.orc.NameBasedFieldMapper; import io.trino.orc.OrcColumn; import io.trino.orc.OrcCorruptionException; import io.trino.orc.OrcDataSource; @@ -28,6 +26,7 @@ import io.trino.orc.OrcRecordReader; import io.trino.orc.TupleDomainOrcPredicate; import io.trino.orc.TupleDomainOrcPredicate.TupleDomainOrcPredicateBuilder; +import io.trino.orc.metadata.OrcType; import io.trino.parquet.Field; import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetDataSource; @@ -48,7 +47,6 @@ import io.trino.plugin.hive.orc.OrcPageSource.ColumnAdaptation; import io.trino.plugin.hive.orc.OrcReaderConfig; import io.trino.plugin.hive.parquet.HdfsParquetDataSource; -import io.trino.plugin.hive.parquet.HiveParquetColumnIOConverter; import io.trino.plugin.hive.parquet.ParquetPageSource; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.iceberg.IcebergParquetColumnIOConverter.FieldContext; @@ -77,6 +75,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockMissingException; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.MappedFields; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.parquet.ParquetSchemaUtil; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; @@ -94,6 +97,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -101,6 +105,7 @@ import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Maps.uniqueIndex; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.trino.orc.OrcReader.INITIAL_BATCH_SIZE; @@ -108,7 +113,6 @@ import static io.trino.orc.OrcReader.fullyProjectedLayout; import static io.trino.parquet.ParquetTypeUtils.getColumnIO; import static io.trino.parquet.ParquetTypeUtils.getDescriptors; -import static io.trino.parquet.ParquetTypeUtils.getParquetTypeByName; import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; @@ -201,7 +205,8 @@ public ConnectorPageSource createPageSource( split.getFileSize(), split.getFileFormat(), regularColumns, - effectivePredicate); + effectivePredicate, + table.getNameMappingJson().map(NameMappingParser::fromJson)); Optional projectionsAdapter = dataPageSource.getReaderColumns().map(readerColumns -> new ReaderProjectionsAdapter( @@ -221,7 +226,8 @@ private ReaderPageSource createDataPageSource( long fileSize, FileFormat fileFormat, List dataColumns, - TupleDomain predicate) + TupleDomain predicate, + Optional nameMapping) { if (!isUseFileSizeFromMetadata(session)) { try { @@ -256,7 +262,8 @@ private ReaderPageSource createDataPageSource( .withNestedLazy(isOrcNestedLazy(session)) .withBloomFiltersEnabled(isOrcBloomFiltersEnabled(session)), fileFormatDataSourceStats, - typeManager); + typeManager, + nameMapping); case PARQUET: return createParquetPageSource( hdfsEnvironment, @@ -270,7 +277,8 @@ private ReaderPageSource createDataPageSource( parquetReaderOptions .withMaxReadBlockSize(getParquetMaxReadBlockSize(session)), predicate, - fileFormatDataSourceStats); + fileFormatDataSourceStats, + nameMapping); default: throw new TrinoException(NOT_SUPPORTED, "File format not supported for Iceberg: " + fileFormat); } @@ -288,7 +296,8 @@ private static ReaderPageSource createOrcPageSource( TupleDomain effectivePredicate, OrcReaderOptions options, FileFormatDataSourceStats stats, - TypeManager typeManager) + TypeManager typeManager, + Optional nameMapping) { OrcDataSource orcDataSource = null; try { @@ -303,13 +312,16 @@ private static ReaderPageSource createOrcPageSource( OrcReader reader = OrcReader.createOrcReader(orcDataSource, options) .orElseThrow(() -> new TrinoException(ICEBERG_BAD_DATA, "ORC file is zero length")); + List fileColumns = reader.getRootColumn().getNestedColumns(); - Map fileColumnsByIcebergId = mapIdsToOrcFileColumns(fileColumns); - Map fileColumnsByName = null; - if (fileColumnsByIcebergId.isEmpty()) { - fileColumnsByName = uniqueIndex(fileColumns, orcColumn -> orcColumn.getColumnName().toLowerCase(ENGLISH)); + if (nameMapping.isPresent() && !hasIds(reader.getRootColumn())) { + fileColumns = fileColumns.stream() + .map(orcColumn -> setMissingFieldIds(orcColumn, nameMapping.get(), ImmutableList.of(orcColumn.getColumnName()))) + .collect(toImmutableList()); } + Map fileColumnsByIcebergId = mapIdsToOrcFileColumns(fileColumns); + TupleDomainOrcPredicateBuilder predicateBuilder = TupleDomainOrcPredicate.builder() .setBloomFiltersEnabled(options.isBloomFiltersEnabled()); Map effectivePredicateDomains = effectivePredicate.getDomains() @@ -320,13 +332,6 @@ private static ReaderPageSource createOrcPageSource( .collect(groupingBy( column -> column.getBaseColumnIdentity().getId(), mapping(IcebergColumnHandle::getPath, toUnmodifiableList()))); - Map>> projectionsByName = null; - if (fileColumnsByIcebergId.isEmpty()) { - projectionsByName = columns.stream() - .collect(groupingBy( - column -> column.getBaseColumnIdentity().getName(), - mapping(IcebergColumnHandle::getPath, toUnmodifiableList()))); - } List readColumns = columnProjections .map(readerColumns -> (List) readerColumns.get().stream().map(IcebergColumnHandle.class::cast).collect(toImmutableList())) @@ -337,14 +342,7 @@ private static ReaderPageSource createOrcPageSource( List columnAdaptations = new ArrayList<>(readColumns.size()); for (IcebergColumnHandle column : readColumns) { verify(column.isBaseColumn(), "Column projections must be based from a root column"); - - OrcColumn orcColumn; - if (fileColumnsByIcebergId.isEmpty()) { - orcColumn = fileColumnsByName.get(column.getName().toLowerCase(ENGLISH)); - } - else { - orcColumn = fileColumnsByIcebergId.get(column.getId()); - } + OrcColumn orcColumn = fileColumnsByIcebergId.get(column.getId()); if (orcColumn != null) { Type readType = getOrcReadType(column.getType(), typeManager); @@ -353,9 +351,7 @@ private static ReaderPageSource createOrcPageSource( throw new TrinoException(ICEBERG_BAD_DATA, format("Expected ORC column for UUID data to be annotated with %s=UUID: %s", ICEBERG_BINARY_TYPE, orcColumn)); } - List> fieldIdProjections = fileColumnsByIcebergId.isEmpty() ? - projectionsByName.get(column.getBaseColumnIdentity().getName()) : - projectionsByFieldId.get(column.getId()); + List> fieldIdProjections = projectionsByFieldId.get(column.getId()); ProjectedLayout projectedLayout = IcebergOrcProjectedLayout.createProjectedLayout(orcColumn, fieldIdProjections); int sourceIndex = fileReadColumns.size(); @@ -366,14 +362,7 @@ private static ReaderPageSource createOrcPageSource( for (Map.Entry domainEntry : effectivePredicateDomains.entrySet()) { IcebergColumnHandle predicateColumn = domainEntry.getKey(); - OrcColumn predicateOrcColumn; - if (fileColumnsByIcebergId.isEmpty()) { - predicateOrcColumn = fileColumnsByName.get(predicateColumn.getName().toLowerCase(ENGLISH)); - } - else { - predicateOrcColumn = fileColumnsByIcebergId.get(predicateColumn.getId()); - } - + OrcColumn predicateOrcColumn = fileColumnsByIcebergId.get(predicateColumn.getId()); if (predicateOrcColumn != null && column.getColumnIdentity().equals(predicateColumn.getBaseColumnIdentity())) { predicateBuilder.addColumn(predicateOrcColumn.getColumnId(), domainEntry.getValue()); } @@ -397,9 +386,7 @@ private static ReaderPageSource createOrcPageSource( systemMemoryUsage, INITIAL_BATCH_SIZE, exception -> handleException(orcDataSourceId, exception), - fileColumnsByIcebergId.isEmpty() - ? NameBasedFieldMapper::create - : new IdBasedFieldMapperFactory(readColumns)); + new IdBasedFieldMapperFactory(readColumns)); return new ReaderPageSource( new OrcPageSource( @@ -431,6 +418,48 @@ private static ReaderPageSource createOrcPageSource( } } + private static boolean hasIds(OrcColumn column) + { + if (column.getAttributes().containsKey(ORC_ICEBERG_ID_KEY)) { + return true; + } + + return column.getNestedColumns().stream().anyMatch(IcebergPageSourceProvider::hasIds); + } + + private static OrcColumn setMissingFieldIds(OrcColumn column, NameMapping nameMapping, List qualifiedPath) + { + MappedField mappedField = nameMapping.find(qualifiedPath); + + ImmutableMap.Builder attributes = ImmutableMap.builder() + .putAll(column.getAttributes()); + if (mappedField != null && mappedField.id() != null) { + attributes.put(ORC_ICEBERG_ID_KEY, String.valueOf(mappedField.id())); + } + + return new OrcColumn( + column.getPath(), + column.getColumnId(), + column.getColumnName(), + column.getColumnType(), + column.getOrcDataSourceId(), + column.getNestedColumns().stream() + .map(nestedColumn -> { + ImmutableList.Builder nextQualifiedPath = ImmutableList.builder() + .addAll(qualifiedPath); + if (column.getColumnType().equals(OrcType.OrcTypeKind.LIST)) { + // The Trino ORC reader uses "item" for list element names, but the NameMapper expects "element" + nextQualifiedPath.add("element"); + } + else { + nextQualifiedPath.add(nestedColumn.getColumnName()); + } + return setMissingFieldIds(nestedColumn, nameMapping, nextQualifiedPath.build()); + }) + .collect(toImmutableList()), + attributes.build()); + } + /** * Gets the index based dereference chain to get from the readColumnHandle to the expectedColumnHandle */ @@ -456,7 +485,12 @@ private static Map mapIdsToOrcFileColumns(List co ImmutableMap.Builder columnsById = ImmutableMap.builder(); Traverser.forTree(OrcColumn::getNestedColumns) .depthFirstPreOrder(columns) - .forEach(column -> columnsById.put(getIcebergFieldId(column), column)); + .forEach(column -> { + String fieldId = (column.getAttributes().get(ORC_ICEBERG_ID_KEY)); + if (fieldId != null) { + columnsById.put(Integer.parseInt(fieldId), column); + } + }); return columnsById.build(); } @@ -516,7 +550,7 @@ public IdBasedFieldMapperFactory(List columns) @Override public OrcReader.FieldMapper create(OrcColumn column) { - Map nestedColumns = Maps.uniqueIndex( + Map nestedColumns = uniqueIndex( column.getNestedColumns(), IcebergPageSourceProvider::getIcebergFieldId); @@ -574,7 +608,8 @@ private static ReaderPageSource createParquetPageSource( List regularColumns, ParquetReaderOptions options, TupleDomain effectivePredicate, - FileFormatDataSourceStats fileFormatDataSourceStats) + FileFormatDataSourceStats fileFormatDataSourceStats, + Optional nameMapping) { AggregatedMemoryContext systemMemoryContext = newSimpleAggregatedMemoryContext(); @@ -587,27 +622,23 @@ private static ReaderPageSource createParquetPageSource( ParquetMetadata parquetMetadata = hdfsEnvironment.doAs(identity, () -> MetadataReader.readFooter(theDataSource)); FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); MessageType fileSchema = fileMetaData.getSchema(); + if (nameMapping.isPresent() && !ParquetSchemaUtil.hasIds(fileSchema)) { + // NameMapping conversion is necessary because MetadataReader converts all column names to lowercase and NameMapping is case sensitive + fileSchema = ParquetSchemaUtil.applyNameMapping(fileSchema, convertToLowercase(nameMapping.get())); + } // Mapping from Iceberg field ID to Parquet fields. Map parquetIdToField = fileSchema.getFields().stream() .filter(field -> field.getId() != null) .collect(toImmutableMap(field -> field.getId().intValue(), Function.identity())); - // Map by name for a migrated table - boolean mapByName = parquetIdToField.isEmpty(); - Optional columnProjections = projectColumns(regularColumns); List readColumns = columnProjections .map(readerColumns -> (List) readerColumns.get().stream().map(IcebergColumnHandle.class::cast).collect(toImmutableList())) .orElse(regularColumns); List parquetFields = readColumns.stream() - .map(column -> { - if (mapByName) { - return getParquetTypeByName(column.getName(), fileSchema); - } - return parquetIdToField.get(column.getId()); - }) + .map(column -> parquetIdToField.get(column.getId())) .collect(toList()); MessageType requestedSchema = new MessageType(fileSchema.getName(), parquetFields.stream().filter(Objects::nonNull).collect(toImmutableList())); @@ -651,9 +682,7 @@ private static ReaderPageSource createParquetPageSource( else { // The top level columns are already mapped by name/id appropriately. ColumnIO columnIO = messageColumnIO.getChild(parquetField.getName()); - internalFields.add(mapByName - ? HiveParquetColumnIOConverter.constructField(trinoType, columnIO) - : IcebergParquetColumnIOConverter.constructField(new FieldContext(trinoType, column.getColumnIdentity()), columnIO)); + internalFields.add(IcebergParquetColumnIOConverter.constructField(new FieldContext(trinoType, column.getColumnIdentity()), columnIO)); } } @@ -683,6 +712,33 @@ private static ReaderPageSource createParquetPageSource( } } + /** + * Create a new NameMapping with the same names but converted to lowercase. + * @param nameMapping The original NameMapping, potentially containing non-lowercase characters + */ + private static NameMapping convertToLowercase(NameMapping nameMapping) + { + return NameMapping.of(convertToLowercase(nameMapping.asMappedFields().fields())); + } + + private static MappedFields convertToLowercase(MappedFields mappedFields) + { + if (mappedFields == null) { + return null; + } + return MappedFields.of(convertToLowercase(mappedFields.fields())); + } + + private static List convertToLowercase(List fields) + { + return fields.stream() + .map(mappedField -> { + Set lowercaseNames = mappedField.names().stream().map(name -> name.toLowerCase(ENGLISH)).collect(toImmutableSet()); + return MappedField.of(mappedField.id(), lowercaseNames, convertToLowercase(mappedField.nestedMapping())); + }) + .collect(toImmutableList()); + } + private static class IcebergOrcProjectedLayout implements ProjectedLayout { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java index b0b156b668cb..11aae4c14d79 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableHandle.java @@ -42,6 +42,7 @@ public class IcebergTableHandle private final TupleDomain enforcedPredicate; private final Set projectedColumns; + private final Optional nameMappingJson; @JsonCreator public IcebergTableHandle( @@ -51,7 +52,8 @@ public IcebergTableHandle( @JsonProperty("snapshotId") Optional snapshotId, @JsonProperty("unenforcedPredicate") TupleDomain unenforcedPredicate, @JsonProperty("enforcedPredicate") TupleDomain enforcedPredicate, - @JsonProperty("projectedColumns") Set projectedColumns) + @JsonProperty("projectedColumns") Set projectedColumns, + @JsonProperty("nameMappingJson") Optional nameMappingJson) { this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); @@ -60,6 +62,7 @@ public IcebergTableHandle( this.unenforcedPredicate = requireNonNull(unenforcedPredicate, "unenforcedPredicate is null"); this.enforcedPredicate = requireNonNull(enforcedPredicate, "enforcedPredicate is null"); this.projectedColumns = ImmutableSet.copyOf(requireNonNull(projectedColumns, "projectedColumns is null")); + this.nameMappingJson = requireNonNull(nameMappingJson, "nameMappingJson is null"); } @JsonProperty @@ -104,6 +107,12 @@ public Set getProjectedColumns() return projectedColumns; } + @JsonProperty + public Optional getNameMappingJson() + { + return nameMappingJson; + } + public SchemaTableName getSchemaTableName() { return new SchemaTableName(schemaName, tableName); @@ -123,7 +132,8 @@ public IcebergTableHandle withProjectedColumns(Set projecte snapshotId, unenforcedPredicate, enforcedPredicate, - projectedColumns); + projectedColumns, + nameMappingJson); } @Override @@ -143,13 +153,14 @@ public boolean equals(Object o) Objects.equals(snapshotId, that.snapshotId) && Objects.equals(unenforcedPredicate, that.unenforcedPredicate) && Objects.equals(enforcedPredicate, that.enforcedPredicate) && - Objects.equals(projectedColumns, that.projectedColumns); + Objects.equals(projectedColumns, that.projectedColumns) && + Objects.equals(nameMappingJson, that.nameMappingJson); } @Override public int hashCode() { - return Objects.hash(schemaName, tableName, tableType, snapshotId, unenforcedPredicate, enforcedPredicate, projectedColumns); + return Objects.hash(schemaName, tableName, tableType, snapshotId, unenforcedPredicate, enforcedPredicate, projectedColumns, nameMappingJson); } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index 36b319fbbee5..b724e784ee71 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -110,7 +110,8 @@ public void testIncompleteDynamicFilterTimeout() Optional.empty(), TupleDomain.all(), TupleDomain.all(), - ImmutableSet.of()); + ImmutableSet.of(), + Optional.empty()); Table nationTable = loadIcebergTable(metastore, operationsProvider, SESSION, schemaTableName); IcebergSplitSource splitSource = new IcebergSplitSource( diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg/spark-defaults.conf b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg/spark-defaults.conf index b3b97b1fa5e6..92328e23c068 100644 --- a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg/spark-defaults.conf +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg/spark-defaults.conf @@ -2,6 +2,9 @@ spark.sql.catalogImplementation=hive spark.sql.warehouse.dir=hdfs://hadoop-master:9000/user/hive/warehouse spark.sql.hive.thriftServer.singleSession=false +spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog +spark.sql.catalog.spark_catalog.type = hive + spark.sql.catalog.iceberg_test=org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.iceberg_test.type=hive spark.sql.catalog.iceberg_test.uri=thrift://hadoop-master:9083 diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index ae78930adeb3..b2c2c6ccc1b2 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -1080,6 +1080,135 @@ private List compressionCodecs() "GZIP"); } + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + public void testTrinoReadingMigratedNestedData(StorageFormat storageFormat) + { + String baseTableName = "test_trino_reading_migrated_nested_data_" + randomTableSuffix(); + String defaultCatalogTableName = sparkDefaultCatalogTableName(baseTableName); + + String sparkTableDefinition = "" + + "CREATE TABLE %s (\n" + + " doc_id STRING\n" + + ", nested_map MAP>>\n" + + ", nested_array ARRAY>>>\n" + + ", nested_struct STRUCT>)\n" + + " USING %s"; + onSpark().executeQuery(format(sparkTableDefinition, defaultCatalogTableName, storageFormat.name())); + + String insert = "" + + "INSERT INTO TABLE %s SELECT" + + " 'Doc213'" + + ", map('s1', array(named_struct('sName', 'ASName1', 'sNumber', 201), named_struct('sName', 'ASName2', 'sNumber', 202)))" + + ", array(map('m1', array(named_struct('mName', 'MAS1Name1', 'mNumber', 301), named_struct('mName', 'MAS1Name2', 'mNumber', 302)))" + + " ,map('m2', array(named_struct('mName', 'MAS2Name1', 'mNumber', 401), named_struct('mName', 'MAS2Name2', 'mNumber', 402))))" + + ", named_struct('id', 1, 'name', 'P. Sherman', 'address', named_struct('street_number', 42, 'street_name', 'Wallaby Way'))"; + onSpark().executeQuery(format(insert, defaultCatalogTableName)); + onSpark().executeQuery(format("CALL system.migrate('%s')", defaultCatalogTableName)); + + String sparkTableName = sparkTableName(baseTableName); + Row row = row("Doc213", "ASName2", 201, "MAS2Name1", 302, "P. Sherman", 42, "Wallaby Way"); + + String sparkSelect = "SELECT" + + " doc_id" + + ", nested_map['s1'][1].sName" + + ", nested_map['s1'][0].sNumber" + + ", nested_array[1]['m2'][0].mName" + + ", nested_array[0]['m1'][1].mNumber" + + ", nested_struct.name" + + ", nested_struct.address.street_number" + + ", nested_struct.address.street_name" + + " FROM "; + + QueryResult sparkResult = onSpark().executeQuery(sparkSelect + sparkTableName); + // The Spark behavior when the default name mapping does not exist is not consistent + assertThat(sparkResult).containsOnly(row); + + String trinoSelect = "SELECT" + + " doc_id" + + ", nested_map['s1'][2].sName" + + ", nested_map['s1'][1].sNumber" + + ", nested_array[2]['m2'][1].mName" + + ", nested_array[1]['m1'][2].mNumber" + + ", nested_struct.name" + + ", nested_struct.address.street_number" + + ", nested_struct.address.street_name" + + " FROM "; + + String trinoTableName = trinoTableName(baseTableName); + QueryResult trinoResult = onTrino().executeQuery(trinoSelect + trinoTableName); + assertThat(trinoResult).containsOnly(row); + + // After removing the name mapping, columns from migrated files should be null since they are missing the Iceberg Field IDs + onSpark().executeQuery(format("ALTER TABLE %s UNSET TBLPROPERTIES ('schema.name-mapping.default')", sparkTableName)); + assertThat(onTrino().executeQuery(trinoSelect + trinoTableName)).containsOnly(row(null, null, null, null, null, null, null, null)); + assertThat(onTrino().executeQuery("SELECT * FROM " + trinoTableName)).containsOnly(row(null, null, null, null)); + assertThat(onTrino().executeQuery("SELECT nested_struct.address.street_number, nested_struct.address.street_name FROM " + trinoTableName)).containsOnly(row(null, null)); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + public void testMigratedDataWithAlteredSchema(StorageFormat storageFormat) + { + String baseTableName = "test_migrated_data_with_altered_schema_" + randomTableSuffix(); + String defaultCatalogTableName = sparkDefaultCatalogTableName(baseTableName); + + String sparkTableDefinition = "" + + "CREATE TABLE %s (\n" + + " doc_id STRING\n" + + ", nested_struct STRUCT>)\n" + + " USING %s"; + onSpark().executeQuery(format(sparkTableDefinition, defaultCatalogTableName, storageFormat.name())); + + String insert = "" + + "INSERT INTO TABLE %s SELECT" + + " 'Doc213'" + + ", named_struct('id', 1, 'name', 'P. Sherman', 'address', named_struct('a', 42, 'b', 'Wallaby Way'))"; + onSpark().executeQuery(format(insert, defaultCatalogTableName)); + onSpark().executeQuery(format("CALL system.migrate('%s')", defaultCatalogTableName)); + + String sparkTableName = sparkTableName(baseTableName); + onSpark().executeQuery("ALTER TABLE " + sparkTableName + " RENAME COLUMN nested_struct TO nested_struct_moved"); + + String select = "SELECT" + + " nested_struct_moved.name" + + ", nested_struct_moved.address.a" + + ", nested_struct_moved.address.b" + + " FROM "; + Row row = row("P. Sherman", 42, "Wallaby Way"); + + QueryResult sparkResult = onSpark().executeQuery(select + sparkTableName); + assertThat(sparkResult).containsOnly(ImmutableList.of(row)); + + String trinoTableName = trinoTableName(baseTableName); + assertThat(onTrino().executeQuery(select + trinoTableName)).containsOnly(ImmutableList.of(row)); + + // After removing the name mapping, columns from migrated files should be null since they are missing the Iceberg Field IDs + onSpark().executeQuery(format("ALTER TABLE %s UNSET TBLPROPERTIES ('schema.name-mapping.default')", sparkTableName)); + assertThat(onTrino().executeQuery(select + trinoTableName)).containsOnly(row(null, null, null)); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + public void testMigratedDataWithPartialNameMapping(StorageFormat storageFormat) + { + String baseTableName = "test_migrated_data_with_partial_name_mapping_" + randomTableSuffix(); + String defaultCatalogTableName = sparkDefaultCatalogTableName(baseTableName); + + String sparkTableDefinition = "CREATE TABLE %s (a INT, b INT) USING " + storageFormat.name(); + onSpark().executeQuery(format(sparkTableDefinition, defaultCatalogTableName)); + + String insert = "INSERT INTO TABLE %s SELECT 1, 2"; + onSpark().executeQuery(format(insert, defaultCatalogTableName)); + onSpark().executeQuery(format("CALL system.migrate('%s')", defaultCatalogTableName)); + + String sparkTableName = sparkTableName(baseTableName); + String trinoTableName = trinoTableName(baseTableName); + // Test missing entry for column 'b' + onSpark().executeQuery(format( + "ALTER TABLE %s SET TBLPROPERTIES ('schema.name-mapping.default'='[{\"field-id\": 1, \"names\": [\"a\"]}, {\"field-id\": 2, \"names\": [\"c\"]} ]')", + sparkTableName)); + assertThat(onTrino().executeQuery("SELECT a, b FROM " + trinoTableName)) + .containsOnly(row(1, null)); + } + private static String escapeSparkString(String value) { return value.replace("\\", "\\\\").replace("'", "\\'"); @@ -1095,6 +1224,11 @@ private static String sparkTableName(String tableName) return format("%s.%s.%s", SPARK_CATALOG, TEST_SCHEMA_NAME, tableName); } + private static String sparkDefaultCatalogTableName(String tableName) + { + return format("%s.%s", TEST_SCHEMA_NAME, tableName); + } + private static String trinoTableName(String tableName) { return format("%s.%s.%s", TRINO_CATALOG, TEST_SCHEMA_NAME, tableName);