diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java index a532ac66c987..4bcab2cec8a1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java @@ -23,6 +23,7 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.HoodieBloomFilterWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter; import org.apache.hudi.common.engine.TaskContextSupplier; @@ -44,9 +45,6 @@ import java.util.concurrent.atomic.AtomicLong; import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY; -import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE; -import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER; -import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER; public class HoodieOrcWriter implements HoodieFileWriter, Closeable { @@ -155,11 +153,11 @@ public void close() throws IOException { final BloomFilter bloomFilter = orcConfig.getBloomFilter(); writer.addUserMetadata(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, ByteBuffer.wrap(bloomFilter.serializeToString().getBytes())); if (minRecordKey != null && maxRecordKey != null) { - writer.addUserMetadata(HOODIE_MIN_RECORD_KEY_FOOTER, ByteBuffer.wrap(minRecordKey.getBytes())); - writer.addUserMetadata(HOODIE_MAX_RECORD_KEY_FOOTER, ByteBuffer.wrap(maxRecordKey.getBytes())); + writer.addUserMetadata(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, ByteBuffer.wrap(minRecordKey.getBytes())); + writer.addUserMetadata(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER, ByteBuffer.wrap(maxRecordKey.getBytes())); } if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) { - writer.addUserMetadata(HOODIE_BLOOM_FILTER_TYPE_CODE, ByteBuffer.wrap(bloomFilter.getBloomFilterTypeCode().name().getBytes())); + writer.addUserMetadata(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE, ByteBuffer.wrap(bloomFilter.getBloomFilterTypeCode().name().getBytes())); } } writer.addUserMetadata(HoodieOrcConfig.AVRO_SCHEMA_METADATA_KEY, ByteBuffer.wrap(avroSchema.toString().getBytes())); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java new file mode 100644 index 000000000000..df879dc81639 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.DummyTaskContextSupplier; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.BloomFilterFactory; +import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ParquetUtils; +import org.apache.hudi.io.storage.HoodieAvroParquetWriter; +import org.apache.hudi.io.storage.HoodieParquetConfig; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestHoodieAvroParquetWriter { + + @TempDir java.nio.file.Path tmpDir; + + @Test + public void testProperWriting() throws IOException { + Configuration hadoopConf = new Configuration(); + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED); + List records = dataGen.generateGenericRecords(10); + + Schema schema = records.get(0).getSchema(); + + BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, 10000, + BloomFilterTypeCode.DYNAMIC_V0.name()); + HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), + schema, Option.of(filter)); + + HoodieParquetConfig parquetConfig = + new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, + ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, hadoopConf, 0.1); + + Path filePath = new Path(tmpDir.resolve("test.parquet").toAbsolutePath().toString()); + + try (HoodieAvroParquetWriter writer = + new HoodieAvroParquetWriter<>(filePath, parquetConfig, "001", new DummyTaskContextSupplier(), true)) { + for (GenericRecord record : records) { + writer.writeAvro((String) record.get("_row_key"), record); + } + } + + ParquetUtils utils = new ParquetUtils(); + + // Step 1: Make sure records are written appropriately + List readRecords = utils.readAvroRecords(hadoopConf, filePath); + + assertEquals(toJson(records), toJson(readRecords)); + + // Step 2: Assert Parquet metadata was written appropriately + List recordKeys = records.stream().map(r -> (String) r.get("_row_key")).collect(Collectors.toList()); + + String minKey = recordKeys.stream().min(Comparator.naturalOrder()).get(); + String maxKey = recordKeys.stream().max(Comparator.naturalOrder()).get(); + + FileMetaData parquetMetadata = ParquetUtils.readMetadata(hadoopConf, filePath).getFileMetaData(); + + Map extraMetadata = parquetMetadata.getKeyValueMetaData(); + + assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER), minKey); + assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER), maxKey); + assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE), BloomFilterTypeCode.DYNAMIC_V0.name()); + + // Step 3: Make sure Bloom Filter contains all the record keys + BloomFilter bloomFilter = utils.readBloomFilterFromMetadata(hadoopConf, filePath); + recordKeys.forEach(recordKey -> { + assertTrue(bloomFilter.mightContain(recordKey)); + }); + } + + private static List toJson(List records) { + return records.stream().map(r -> { + try { + return new String(HoodieAvroUtils.avroToJson(r, true)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + } +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java index 817fc25a5d63..373fc31a5627 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java @@ -18,6 +18,7 @@ package org.apache.hudi.io.storage; +import org.apache.hudi.avro.HoodieBloomFilterWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; @@ -37,8 +38,6 @@ import java.util.function.Supplier; import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY; -import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER; -import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER; import static org.apache.hudi.io.storage.HoodieOrcConfig.AVRO_SCHEMA_METADATA_KEY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -78,8 +77,8 @@ protected HoodieFileReader createReader( protected void verifyMetadata(Configuration conf) throws IOException { Reader orcReader = OrcFile.createReader(getFilePath(), OrcFile.readerOptions(conf)); assertEquals(4, orcReader.getMetadataKeys().size()); - assertTrue(orcReader.getMetadataKeys().contains(HOODIE_MIN_RECORD_KEY_FOOTER)); - assertTrue(orcReader.getMetadataKeys().contains(HOODIE_MAX_RECORD_KEY_FOOTER)); + assertTrue(orcReader.getMetadataKeys().contains(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER)); + assertTrue(orcReader.getMetadataKeys().contains(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER)); assertTrue(orcReader.getMetadataKeys().contains(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY)); assertTrue(orcReader.getMetadataKeys().contains(AVRO_SCHEMA_METADATA_KEY)); assertEquals(CompressionKind.ZLIB.name(), orcReader.getCompressionKind().toString()); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java index 035cb2eab973..b939498c3e24 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java @@ -18,20 +18,18 @@ package org.apache.hudi.io.storage.row; +import org.apache.hudi.avro.HoodieBloomFilterWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; -import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.common.util.Option; import org.apache.parquet.hadoop.api.WriteSupport; -import java.util.HashMap; - -import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY; -import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE; -import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER; -import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Map; /** * Hoodie Write Support for directly writing {@link RowData} to Parquet. @@ -39,14 +37,13 @@ public class HoodieRowDataParquetWriteSupport extends RowDataParquetWriteSupport { private final Configuration hadoopConf; - private final BloomFilter bloomFilter; - private String minRecordKey; - private String maxRecordKey; + private final Option> bloomFilterWriteSupportOpt; public HoodieRowDataParquetWriteSupport(Configuration conf, RowType rowType, BloomFilter bloomFilter) { super(rowType); this.hadoopConf = new Configuration(conf); - this.bloomFilter = bloomFilter; + this.bloomFilterWriteSupportOpt = Option.ofNullable(bloomFilter) + .map(HoodieBloomFilterRowDataWriteSupport::new); } public Configuration getHadoopConf() { @@ -55,32 +52,26 @@ public Configuration getHadoopConf() { @Override public WriteSupport.FinalizedWriteContext finalizeWrite() { - HashMap extraMetaData = new HashMap<>(); - if (bloomFilter != null) { - extraMetaData.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilter.serializeToString()); - if (minRecordKey != null && maxRecordKey != null) { - extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey); - extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey); - } - if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) { - extraMetaData.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name()); - } - } - return new WriteSupport.FinalizedWriteContext(extraMetaData); + Map extraMetadata = + bloomFilterWriteSupportOpt.map(HoodieBloomFilterWriteSupport::finalizeMetadata) + .orElse(Collections.emptyMap()); + + return new WriteSupport.FinalizedWriteContext(extraMetadata); } public void add(String recordKey) { - this.bloomFilter.add(recordKey); - if (minRecordKey != null) { - minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey; - } else { - minRecordKey = recordKey; + this.bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport -> + bloomFilterWriteSupport.addKey(recordKey)); + } + + private static class HoodieBloomFilterRowDataWriteSupport extends HoodieBloomFilterWriteSupport { + public HoodieBloomFilterRowDataWriteSupport(BloomFilter bloomFilter) { + super(bloomFilter); } - if (maxRecordKey != null) { - maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey; - } else { - maxRecordKey = recordKey; + @Override + protected byte[] getUTF8Bytes(String key) { + return key.getBytes(StandardCharsets.UTF_8); } } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java index 28964ecc3f00..bb4dd9c61942 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java @@ -19,8 +19,8 @@ package org.apache.hudi.io.storage.row; import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.avro.HoodieBloomFilterWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; -import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.parquet.hadoop.api.WriteSupport; @@ -28,12 +28,8 @@ import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; -import java.util.HashMap; - -import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY; -import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE; -import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER; -import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER; +import java.util.Collections; +import java.util.Map; /** * Hoodie Write Support for directly writing Row to Parquet. @@ -41,19 +37,17 @@ public class HoodieRowParquetWriteSupport extends ParquetWriteSupport { private final Configuration hadoopConf; - private final BloomFilter bloomFilter; - - private UTF8String minRecordKey; - private UTF8String maxRecordKey; + private final Option> bloomFilterWriteSupportOpt; public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, Option bloomFilterOpt, HoodieWriteConfig writeConfig) { Configuration hadoopConf = new Configuration(conf); hadoopConf.set("spark.sql.parquet.writeLegacyFormat", writeConfig.parquetWriteLegacyFormatEnabled()); hadoopConf.set("spark.sql.parquet.outputTimestampType", writeConfig.parquetOutputTimestampType()); hadoopConf.set("spark.sql.parquet.fieldId.write.enabled", writeConfig.parquetFieldIdWriteEnabled()); - this.hadoopConf = hadoopConf; setSchema(structType, hadoopConf); - this.bloomFilter = bloomFilterOpt.orElse(null); + + this.hadoopConf = hadoopConf; + this.bloomFilterWriteSupportOpt = bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new); } public Configuration getHadoopConf() { @@ -62,32 +56,35 @@ public Configuration getHadoopConf() { @Override public WriteSupport.FinalizedWriteContext finalizeWrite() { - HashMap extraMetaData = new HashMap<>(); - if (bloomFilter != null) { - extraMetaData.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilter.serializeToString()); - if (minRecordKey != null && maxRecordKey != null) { - extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey.toString()); - extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey.toString()); - } - if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) { - extraMetaData.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name()); - } - } - return new WriteSupport.FinalizedWriteContext(extraMetaData); + Map extraMetadata = + bloomFilterWriteSupportOpt.map(HoodieBloomFilterWriteSupport::finalizeMetadata) + .orElse(Collections.emptyMap()); + + return new WriteSupport.FinalizedWriteContext(extraMetadata); } public void add(UTF8String recordKey) { - this.bloomFilter.add(recordKey.getBytes()); + this.bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport -> + bloomFilterWriteSupport.addKey(recordKey)); + } - if (minRecordKey == null || minRecordKey.compareTo(recordKey) < 0) { + private static class HoodieBloomFilterRowWriteSupport extends HoodieBloomFilterWriteSupport { + public HoodieBloomFilterRowWriteSupport(BloomFilter bloomFilter) { + super(bloomFilter); + } + + @Override + protected byte[] getUTF8Bytes(UTF8String key) { + return key.getBytes(); + } + + @Override + protected UTF8String dereference(UTF8String key) { // NOTE: [[clone]] is performed here (rather than [[copy]]) to only copy underlying buffer in // cases when [[UTF8String]] is pointing into a buffer storing the whole containing record, // and simply do a pass over when it holds a (immutable) buffer holding just the string - minRecordKey = recordKey.clone(); - } - - if (maxRecordKey == null || maxRecordKey.compareTo(recordKey) > 0) { - maxRecordKey = recordKey.clone(); + return key.clone(); } } + } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java index 481cda00d6e7..dce0e2fad591 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java @@ -18,40 +18,44 @@ package org.apache.hudi.io.storage.row; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.HoodieBloomFilterWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; +import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.SparkDatasetTestUtils; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.util.Comparator; import java.util.List; -import java.util.Random; -import java.util.UUID; +import java.util.Map; +import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Unit tests {@link HoodieInternalRowParquetWriter}. */ public class TestHoodieInternalRowParquetWriter extends HoodieClientTestHarness { - private static final Random RANDOM = new Random(); - @BeforeEach public void setUp() throws Exception { initSparkContexts("TestHoodieInternalRowParquetWriter"); @@ -68,44 +72,55 @@ public void tearDown() throws Exception { @ParameterizedTest @ValueSource(booleans = {true, false}) - public void endToEndTest(boolean parquetWriteLegacyFormatEnabled) throws Exception { + public void testProperWriting(boolean parquetWriteLegacyFormatEnabled) throws Exception { + // Generate inputs + Dataset inputRows = SparkDatasetTestUtils.getRandomRows(sqlContext, 100, + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, false); + StructType schema = inputRows.schema(); + + List rows = SparkDatasetTestUtils.toInternalRows(inputRows, SparkDatasetTestUtils.ENCODER); + HoodieWriteConfig.Builder writeConfigBuilder = SparkDatasetTestUtils.getConfigBuilder(basePath, timelineServicePort); - for (int i = 0; i < 5; i++) { - // init write support and parquet config - HoodieRowParquetWriteSupport writeSupport = getWriteSupport(writeConfigBuilder, hadoopConf, parquetWriteLegacyFormatEnabled); - HoodieWriteConfig cfg = writeConfigBuilder.build(); - HoodieParquetConfig parquetConfig = new HoodieParquetConfig<>(writeSupport, - CompressionCodecName.SNAPPY, cfg.getParquetBlockSize(), cfg.getParquetPageSize(), cfg.getParquetMaxFileSize(), - writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio(), cfg.parquetDictionaryEnabled()); - - // prepare path - String fileId = UUID.randomUUID().toString(); - Path filePath = new Path(basePath + "/" + fileId); - String partitionPath = HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; - metaClient.getFs().mkdirs(new Path(basePath)); - - // init writer - HoodieInternalRowParquetWriter writer = new HoodieInternalRowParquetWriter(filePath, parquetConfig); - - // generate input - int size = 10 + RANDOM.nextInt(100); - // Generate inputs - Dataset inputRows = SparkDatasetTestUtils.getRandomRows(sqlContext, size, partitionPath, false); - List internalRows = SparkDatasetTestUtils.toInternalRows(inputRows, SparkDatasetTestUtils.ENCODER); - - // issue writes - for (InternalRow internalRow : internalRows) { - writer.write(internalRow); - } - // close the writer - writer.close(); + HoodieRowParquetWriteSupport writeSupport = getWriteSupport(writeConfigBuilder, hadoopConf, parquetWriteLegacyFormatEnabled); + HoodieWriteConfig cfg = writeConfigBuilder.build(); + HoodieParquetConfig parquetConfig = new HoodieParquetConfig<>(writeSupport, + CompressionCodecName.SNAPPY, cfg.getParquetBlockSize(), cfg.getParquetPageSize(), cfg.getParquetMaxFileSize(), + writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio(), cfg.parquetDictionaryEnabled()); - // verify rows - Dataset result = sqlContext.read().parquet(basePath); - assertEquals(0, inputRows.except(result).count()); + Path filePath = new Path(basePath + "/internal_row_writer.parquet"); + + try (HoodieInternalRowParquetWriter writer = new HoodieInternalRowParquetWriter(filePath, parquetConfig)) { + for (InternalRow row : rows) { + writer.writeRow(row.getUTF8String(schema.fieldIndex("record_key")), row); + } } + + // Step 1: Verify rows written correctly + Dataset result = sqlContext.read().parquet(basePath); + assertEquals(0, inputRows.except(result).count()); + + // Step 2: Assert Parquet metadata was written appropriately + List recordKeys = + rows.stream().map(r -> r.getString(schema.fieldIndex("record_key"))).collect(Collectors.toList()); + + String minKey = recordKeys.stream().min(Comparator.naturalOrder()).get(); + String maxKey = recordKeys.stream().max(Comparator.naturalOrder()).get(); + + FileMetaData parquetMetadata = ParquetUtils.readMetadata(hadoopConf, filePath).getFileMetaData(); + + Map extraMetadata = parquetMetadata.getKeyValueMetaData(); + + assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER), minKey); + assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER), maxKey); + assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE), BloomFilterTypeCode.DYNAMIC_V0.name()); + + // Step 3: Make sure Bloom Filter contains all the record keys + BloomFilter bloomFilter = new ParquetUtils().readBloomFilterFromMetadata(hadoopConf, filePath); + recordKeys.forEach(recordKey -> { + assertTrue(bloomFilter.mightContain(recordKey)); + }); } private HoodieRowParquetWriteSupport getWriteSupport(HoodieWriteConfig.Builder writeConfigBuilder, Configuration hadoopConf, boolean parquetWriteLegacyFormatEnabled) { diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java index c3920211ae94..e87364fb9097 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java @@ -20,12 +20,14 @@ import org.apache.avro.Schema; import org.apache.hudi.common.bloom.BloomFilter; -import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.parquet.avro.AvroWriteSupport; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.schema.MessageType; +import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -34,55 +36,45 @@ */ public class HoodieAvroWriteSupport extends AvroWriteSupport { - private Option bloomFilterOpt; - private String minRecordKey; - private String maxRecordKey; - private Map footerMetadata = new HashMap<>(); + private final Option> bloomFilterWriteSupportOpt; + private final Map footerMetadata = new HashMap<>(); public static final String OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY = "com.uber.hoodie.bloomfilter"; public static final String HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY = "org.apache.hudi.bloomfilter"; - public static final String HOODIE_MIN_RECORD_KEY_FOOTER = "hoodie_min_record_key"; - public static final String HOODIE_MAX_RECORD_KEY_FOOTER = "hoodie_max_record_key"; - public static final String HOODIE_BLOOM_FILTER_TYPE_CODE = "hoodie_bloom_filter_type_code"; public HoodieAvroWriteSupport(MessageType schema, Schema avroSchema, Option bloomFilterOpt) { super(schema, avroSchema, ConvertingGenericData.INSTANCE); - this.bloomFilterOpt = bloomFilterOpt; + this.bloomFilterWriteSupportOpt = bloomFilterOpt.map(HoodieBloomFilterAvroWriteSupport::new); } @Override public WriteSupport.FinalizedWriteContext finalizeWrite() { - if (bloomFilterOpt.isPresent()) { - footerMetadata.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilterOpt.get().serializeToString()); - if (minRecordKey != null && maxRecordKey != null) { - footerMetadata.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey); - footerMetadata.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey); - } - if (bloomFilterOpt.get().getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) { - footerMetadata.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilterOpt.get().getBloomFilterTypeCode().name()); - } - } - return new WriteSupport.FinalizedWriteContext(footerMetadata); + Map extraMetadata = + CollectionUtils.combine(footerMetadata, + bloomFilterWriteSupportOpt.map(HoodieBloomFilterWriteSupport::finalizeMetadata) + .orElse(Collections.emptyMap()) + ); + + return new WriteSupport.FinalizedWriteContext(extraMetadata); } public void add(String recordKey) { - if (bloomFilterOpt.isPresent()) { - this.bloomFilterOpt.get().add(recordKey); - if (minRecordKey != null) { - minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey; - } else { - minRecordKey = recordKey; - } - - if (maxRecordKey != null) { - maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey; - } else { - maxRecordKey = recordKey; - } - } + this.bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport -> + bloomFilterWriteSupport.addKey(recordKey)); } public void addFooterMetadata(String key, String value) { footerMetadata.put(key, value); } + + private static class HoodieBloomFilterAvroWriteSupport extends HoodieBloomFilterWriteSupport { + public HoodieBloomFilterAvroWriteSupport(BloomFilter bloomFilter) { + super(bloomFilter); + } + + @Override + protected byte[] getUTF8Bytes(String key) { + return key.getBytes(StandardCharsets.UTF_8); + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieBloomFilterWriteSupport.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieBloomFilterWriteSupport.java new file mode 100644 index 000000000000..1a689791ba3f --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieBloomFilterWriteSupport.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.avro; + +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY; + +/** + * This is write-support utility base-class taking up handling of + * + *
    + *
  • Adding record keys to the Bloom Filter
  • + *
  • Keeping track of min/max record key (w/in single file)
  • + *
+ * + * @param record-key type being ingested by this clas + */ +public abstract class HoodieBloomFilterWriteSupport> { + + public static final String HOODIE_MIN_RECORD_KEY_FOOTER = "hoodie_min_record_key"; + public static final String HOODIE_MAX_RECORD_KEY_FOOTER = "hoodie_max_record_key"; + public static final String HOODIE_BLOOM_FILTER_TYPE_CODE = "hoodie_bloom_filter_type_code"; + + private final BloomFilter bloomFilter; + + private T minRecordKey; + private T maxRecordKey; + + public HoodieBloomFilterWriteSupport(BloomFilter bloomFilter) { + this.bloomFilter = bloomFilter; + } + + public void addKey(T recordKey) { + bloomFilter.add(getUTF8Bytes(recordKey)); + + if (minRecordKey == null || minRecordKey.compareTo(recordKey) > 0) { + minRecordKey = dereference(recordKey); + } + + if (maxRecordKey == null || maxRecordKey.compareTo(recordKey) < 0) { + maxRecordKey = dereference(recordKey); + } + } + + public Map finalizeMetadata() { + HashMap extraMetadata = new HashMap<>(); + + extraMetadata.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilter.serializeToString()); + if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) { + extraMetadata.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name()); + } + + if (minRecordKey != null && maxRecordKey != null) { + extraMetadata.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey.toString()); + extraMetadata.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey.toString()); + } + + return extraMetadata; + } + + /** + * Since Bloom Filter ingests record-keys represented as UTF8 encoded byte string, + * this method have to be implemented for converting the original record key into one + */ + protected abstract byte[] getUTF8Bytes(T key); + + /** + * This method allows to dereference the key object (t/h cloning, for ex) that might be + * pointing at a shared mutable buffer, to make sure that we're not keeping references + * to mutable objects + */ + protected T dereference(T key) { + return key; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java index d6391d178eb3..badb5e37a70f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.util; import org.apache.hudi.avro.HoodieAvroWriteSupport; +import org.apache.hudi.avro.HoodieBloomFilterWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; @@ -83,7 +84,7 @@ public BloomFilter readBloomFilterFromMetadata(Configuration configuration, Path readFooter(configuration, false, filePath, HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, - HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE); + HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE); String footerVal = footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); if (null == footerVal) { // We use old style key "com.uber.hoodie.bloomfilter" @@ -91,9 +92,9 @@ public BloomFilter readBloomFilterFromMetadata(Configuration configuration, Path } BloomFilter toReturn = null; if (footerVal != null) { - if (footerVals.containsKey(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)) { + if (footerVals.containsKey(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)) { toReturn = BloomFilterFactory.fromString(footerVal, - footerVals.get(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)); + footerVals.get(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)); } else { toReturn = BloomFilterFactory.fromString(footerVal, BloomFilterTypeCode.SIMPLE.name()); } @@ -109,14 +110,14 @@ public BloomFilter readBloomFilterFromMetadata(Configuration configuration, Path */ public String[] readMinMaxRecordKeys(Configuration configuration, Path filePath) { Map minMaxKeys = readFooter(configuration, true, filePath, - HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER); + HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER); if (minMaxKeys.size() != 2) { throw new HoodieException( String.format("Could not read min/max record key out of footer correctly from %s. read) : %s", filePath, minMaxKeys)); } - return new String[] {minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER), - minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER)}; + return new String[] {minMaxKeys.get(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER), + minMaxKeys.get(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER)}; } /** diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroWriteSupport.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroWriteSupport.java deleted file mode 100644 index 16a77c145cf2..000000000000 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroWriteSupport.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.avro; - -import org.apache.hudi.common.bloom.BloomFilter; -import org.apache.hudi.common.bloom.BloomFilterFactory; -import org.apache.hudi.common.bloom.BloomFilterTypeCode; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.util.Option; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.Path; -import org.apache.parquet.avro.AvroSchemaConverter; -import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -public class TestHoodieAvroWriteSupport { - - @Test - public void testAddKey(@TempDir java.nio.file.Path tempDir) throws IOException { - List rowKeys = new ArrayList<>(); - for (int i = 0; i < 1000; i++) { - rowKeys.add(UUID.randomUUID().toString()); - } - String filePath = tempDir.resolve("test.parquet").toAbsolutePath().toString(); - Schema schema = HoodieAvroUtils.getRecordKeySchema(); - BloomFilter filter = BloomFilterFactory.createBloomFilter( - 1000, 0.0001, 10000, - BloomFilterTypeCode.SIMPLE.name()); - HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport( - new AvroSchemaConverter().convert(schema), schema, Option.of(filter)); - ParquetWriter writer = new ParquetWriter(new Path(filePath), writeSupport, CompressionCodecName.GZIP, - 120 * 1024 * 1024, ParquetWriter.DEFAULT_PAGE_SIZE); - for (String rowKey : rowKeys) { - GenericRecord rec = new GenericData.Record(schema); - rec.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, rowKey); - writer.write(rec); - writeSupport.add(rowKey); - } - writer.close(); - } -}