From 2b4ff68abcaec3e07c16bdf24b71873fc46db564 Mon Sep 17 00:00:00 2001 From: regadas Date: Fri, 6 Dec 2024 16:53:41 -0500 Subject: [PATCH] Add Iceberg support for name-based mapping schema --- sdks/java/io/iceberg/build.gradle | 2 + .../beam/sdk/io/iceberg/ScanTaskReader.java | 36 +++-- .../sdk/io/iceberg/IcebergIOReadTest.java | 149 ++++++++++++++++++ .../sdk/io/iceberg/TestDataWarehouse.java | 4 + 4 files changed, 182 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index a2d192b67208..e352acb49cf2 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -61,6 +61,8 @@ dependencies { testImplementation library.java.bigdataoss_gcsio testImplementation library.java.bigdataoss_gcs_connector testImplementation library.java.bigdataoss_util_hadoop + testImplementation "org.apache.parquet:parquet-avro:$parquet_version" + testImplementation "org.apache.parquet:parquet-common:$parquet_version" testImplementation "org.apache.iceberg:iceberg-gcp:$iceberg_version" testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version" testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java index b7cb42b2eacb..fc49b275d80f 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java @@ -29,6 +29,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.avro.DataReader; @@ -40,6 +41,7 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.checkerframework.checker.nullness.qual.NonNull; @@ -88,6 +90,7 @@ public boolean advance() throws IOException { // which are not null-safe. @SuppressWarnings("nullness") org.apache.iceberg.@NonNull Schema project = this.project; + String nameMapping = source.getTable().properties().get(TableProperties.DEFAULT_NAME_MAPPING); do { // If our current iterator is working... do that. @@ -117,33 +120,48 @@ public boolean advance() throws IOException { switch (file.format()) { case ORC: LOG.info("Preparing ORC input"); - iterable = + ORC.ReadBuilder orcReader = ORC.read(input) .split(fileTask.start(), fileTask.length()) .project(project) .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(project, fileSchema)) - .filter(fileTask.residual()) - .build(); + .filter(fileTask.residual()); + + if (nameMapping != null) { + orcReader.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + iterable = orcReader.build(); break; case PARQUET: LOG.info("Preparing Parquet input."); - iterable = + Parquet.ReadBuilder parquetReader = Parquet.read(input) .split(fileTask.start(), fileTask.length()) .project(project) .createReaderFunc( fileSchema -> GenericParquetReaders.buildReader(project, fileSchema)) - .filter(fileTask.residual()) - .build(); + .filter(fileTask.residual()); + + if (nameMapping != null) { + parquetReader.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + iterable = parquetReader.build(); break; case AVRO: LOG.info("Preparing Avro input."); - iterable = + Avro.ReadBuilder avroReader = Avro.read(input) .split(fileTask.start(), fileTask.length()) .project(project) - .createReaderFunc(DataReader::create) - .build(); + .createReaderFunc(DataReader::create); + + if (nameMapping != null) { + avroReader.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + iterable = avroReader.build(); break; default: throw new UnsupportedOperationException("Cannot read format: " + file.format()); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index fe4a07dedfdf..02a995dfee13 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -20,11 +20,15 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import java.io.File; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; @@ -33,10 +37,28 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.parquet.ParquetUtil; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -122,4 +144,131 @@ public void testSimpleScan() throws Exception { testPipeline.run(); } + + @Test + public void testNameMappingScan() throws Exception { + org.apache.avro.Schema avroSchema = + org.apache.avro.Schema.createRecord( + "test", + null, + null, + false, + ImmutableList.of( + new org.apache.avro.Schema.Field( + "data", org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING)), + new org.apache.avro.Schema.Field( + "id", org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG)))); + + List> recordData = + ImmutableList.>builder() + .add(ImmutableMap.of("id", 0L, "data", "clarification")) + .add(ImmutableMap.of("id", 1L, "data", "risky")) + .add(ImmutableMap.of("id", 2L, "data", "falafel")) + .build(); + + List avroRecords = + recordData.stream() + .map(data -> avroGenericRecord(avroSchema, data)) + .collect(Collectors.toList()); + + Configuration hadoopConf = new Configuration(); + String path = createParquetFile(avroSchema, avroRecords); + HadoopInputFile inputFile = HadoopInputFile.fromLocation(path, hadoopConf); + + NameMapping defaultMapping = NameMapping.of(MappedField.of(1, "id"), MappedField.of(2, "data")); + ImmutableMap tableProperties = + ImmutableMap.builder() + .put(TableProperties.DEFAULT_NAME_MAPPING, NameMappingParser.toJson(defaultMapping)) + .build(); + + TableIdentifier tableId = + TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + Table simpleTable = + warehouse + .buildTable(tableId, TestFixtures.SCHEMA) + .withProperties(tableProperties) + .withPartitionSpec(PartitionSpec.unpartitioned()) + .create(); + + MetricsConfig metricsConfig = MetricsConfig.forTable(simpleTable); + Metrics metrics = ParquetUtil.fileMetrics(inputFile, metricsConfig); + DataFile dataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withFormat(FileFormat.PARQUET) + .withInputFile(inputFile) + .withMetrics(metrics) + .build(); + + final Schema beamSchema = IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA); + + simpleTable.newFastAppend().appendFile(dataFile).commit(); + + Map catalogProps = + ImmutableMap.builder() + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", warehouse.location) + .build(); + + IcebergCatalogConfig catalogConfig = + IcebergCatalogConfig.builder() + .setCatalogName("name") + .setCatalogProperties(catalogProps) + .build(); + + PCollection output = + testPipeline + .apply(IcebergIO.readRows(catalogConfig).from(tableId)) + .apply(ParDo.of(new PrintRow())) + .setCoder(RowCoder.of(beamSchema)); + + final Row[] expectedRows = + recordData.stream() + .map(data -> icebergGenericRecord(TestFixtures.SCHEMA, data)) + .map(record -> IcebergUtils.icebergRecordToBeamRow(beamSchema, record)) + .toArray(Row[]::new); + + PAssert.that(output) + .satisfies( + (Iterable rows) -> { + assertThat(rows, containsInAnyOrder(expectedRows)); + return null; + }); + + testPipeline.run(); + } + + public static GenericRecord avroGenericRecord( + org.apache.avro.Schema schema, Map values) { + GenericRecord record = new GenericData.Record(schema); + values.forEach(record::put); + return record; + } + + public static Record icebergGenericRecord( + org.apache.iceberg.Schema schema, Map values) { + return org.apache.iceberg.data.GenericRecord.create(schema).copy(values); + } + + public static String createParquetFile(org.apache.avro.Schema schema, List records) + throws IOException { + + File tempFile = createTempFile(); + Path file = new Path(tempFile.getPath()); + + AvroParquetWriter.Builder builder = AvroParquetWriter.builder(file); + ParquetWriter parquetWriter = builder.withSchema(schema).build(); + for (GenericRecord record : records) { + parquetWriter.write(record); + } + parquetWriter.close(); + + return tempFile.getPath(); + } + + private static File createTempFile() throws IOException { + File tempFile = File.createTempFile(ScanSourceTest.class.getSimpleName(), ".tmp"); + tempFile.deleteOnExit(); + boolean unused = tempFile.delete(); + return tempFile; + } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java index 1e1c84d31de9..ff4fde1d7480 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java @@ -150,6 +150,10 @@ public Table createTable( return catalog.createTable(tableId, schema, partitionSpec); } + public Catalog.TableBuilder buildTable(TableIdentifier tableId, Schema schema) { + return catalog.buildTable(tableId, schema); + } + public Table loadTable(TableIdentifier tableId) { return catalog.loadTable(tableId); }