Skip to content

Commit

Permalink
[HUDI-4992] Fixing invalid min/max record key stats in Parquet metada…
Browse files Browse the repository at this point in the history
…ta (#6883)
  • Loading branch information
Alexey Kudinkin authored and yuzhaojing committed Oct 9, 2022
1 parent 28cb191 commit 5110562
Show file tree
Hide file tree
Showing 10 changed files with 361 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
import org.apache.hudi.common.engine.TaskContextSupplier;
Expand All @@ -44,9 +45,6 @@
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;

public class HoodieOrcWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
implements HoodieFileWriter<R>, Closeable {
Expand Down Expand Up @@ -155,11 +153,11 @@ public void close() throws IOException {
final BloomFilter bloomFilter = orcConfig.getBloomFilter();
writer.addUserMetadata(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, ByteBuffer.wrap(bloomFilter.serializeToString().getBytes()));
if (minRecordKey != null && maxRecordKey != null) {
writer.addUserMetadata(HOODIE_MIN_RECORD_KEY_FOOTER, ByteBuffer.wrap(minRecordKey.getBytes()));
writer.addUserMetadata(HOODIE_MAX_RECORD_KEY_FOOTER, ByteBuffer.wrap(maxRecordKey.getBytes()));
writer.addUserMetadata(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, ByteBuffer.wrap(minRecordKey.getBytes()));
writer.addUserMetadata(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER, ByteBuffer.wrap(maxRecordKey.getBytes()));
}
if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
writer.addUserMetadata(HOODIE_BLOOM_FILTER_TYPE_CODE, ByteBuffer.wrap(bloomFilter.getBloomFilterTypeCode().name().getBytes()));
writer.addUserMetadata(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE, ByteBuffer.wrap(bloomFilter.getBloomFilterTypeCode().name().getBytes()));
}
}
writer.addUserMetadata(HoodieOrcConfig.AVRO_SCHEMA_METADATA_KEY, ByteBuffer.wrap(avroSchema.toString().getBytes()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.avro;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DummyTaskContextSupplier;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestHoodieAvroParquetWriter {

@TempDir java.nio.file.Path tmpDir;

@Test
public void testProperWriting() throws IOException {
Configuration hadoopConf = new Configuration();

HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEED);
List<GenericRecord> records = dataGen.generateGenericRecords(10);

Schema schema = records.get(0).getSchema();

BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, 10000,
BloomFilterTypeCode.DYNAMIC_V0.name());
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema),
schema, Option.of(filter));

HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig =
new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE,
ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, hadoopConf, 0.1);

Path filePath = new Path(tmpDir.resolve("test.parquet").toAbsolutePath().toString());

try (HoodieAvroParquetWriter<GenericRecord> writer =
new HoodieAvroParquetWriter<>(filePath, parquetConfig, "001", new DummyTaskContextSupplier(), true)) {
for (GenericRecord record : records) {
writer.writeAvro((String) record.get("_row_key"), record);
}
}

ParquetUtils utils = new ParquetUtils();

// Step 1: Make sure records are written appropriately
List<GenericRecord> readRecords = utils.readAvroRecords(hadoopConf, filePath);

assertEquals(toJson(records), toJson(readRecords));

// Step 2: Assert Parquet metadata was written appropriately
List<String> recordKeys = records.stream().map(r -> (String) r.get("_row_key")).collect(Collectors.toList());

String minKey = recordKeys.stream().min(Comparator.naturalOrder()).get();
String maxKey = recordKeys.stream().max(Comparator.naturalOrder()).get();

FileMetaData parquetMetadata = ParquetUtils.readMetadata(hadoopConf, filePath).getFileMetaData();

Map<String, String> extraMetadata = parquetMetadata.getKeyValueMetaData();

assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER), minKey);
assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER), maxKey);
assertEquals(extraMetadata.get(HoodieBloomFilterWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE), BloomFilterTypeCode.DYNAMIC_V0.name());

// Step 3: Make sure Bloom Filter contains all the record keys
BloomFilter bloomFilter = utils.readBloomFilterFromMetadata(hadoopConf, filePath);
recordKeys.forEach(recordKey -> {
assertTrue(bloomFilter.mightContain(recordKey));
});
}

private static List<String> toJson(List<GenericRecord> records) {
return records.stream().map(r -> {
try {
return new String(HoodieAvroUtils.avroToJson(r, true));
} catch (IOException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.io.storage;

import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
Expand All @@ -37,8 +38,6 @@
import java.util.function.Supplier;

import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
import static org.apache.hudi.io.storage.HoodieOrcConfig.AVRO_SCHEMA_METADATA_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -78,8 +77,8 @@ protected HoodieFileReader<GenericRecord> createReader(
protected void verifyMetadata(Configuration conf) throws IOException {
Reader orcReader = OrcFile.createReader(getFilePath(), OrcFile.readerOptions(conf));
assertEquals(4, orcReader.getMetadataKeys().size());
assertTrue(orcReader.getMetadataKeys().contains(HOODIE_MIN_RECORD_KEY_FOOTER));
assertTrue(orcReader.getMetadataKeys().contains(HOODIE_MAX_RECORD_KEY_FOOTER));
assertTrue(orcReader.getMetadataKeys().contains(HoodieBloomFilterWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER));
assertTrue(orcReader.getMetadataKeys().contains(HoodieBloomFilterWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER));
assertTrue(orcReader.getMetadataKeys().contains(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY));
assertTrue(orcReader.getMetadataKeys().contains(AVRO_SCHEMA_METADATA_KEY));
assertEquals(CompressionKind.ZLIB.name(), orcReader.getCompressionKind().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,32 @@

package org.apache.hudi.io.storage.row;

import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.util.Option;
import org.apache.parquet.hadoop.api.WriteSupport;

import java.util.HashMap;

import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;

/**
* Hoodie Write Support for directly writing {@link RowData} to Parquet.
*/
public class HoodieRowDataParquetWriteSupport extends RowDataParquetWriteSupport {

private final Configuration hadoopConf;
private final BloomFilter bloomFilter;
private String minRecordKey;
private String maxRecordKey;
private final Option<HoodieBloomFilterWriteSupport<String>> bloomFilterWriteSupportOpt;

public HoodieRowDataParquetWriteSupport(Configuration conf, RowType rowType, BloomFilter bloomFilter) {
super(rowType);
this.hadoopConf = new Configuration(conf);
this.bloomFilter = bloomFilter;
this.bloomFilterWriteSupportOpt = Option.ofNullable(bloomFilter)
.map(HoodieBloomFilterRowDataWriteSupport::new);
}

public Configuration getHadoopConf() {
Expand All @@ -55,32 +52,26 @@ public Configuration getHadoopConf() {

@Override
public WriteSupport.FinalizedWriteContext finalizeWrite() {
HashMap<String, String> extraMetaData = new HashMap<>();
if (bloomFilter != null) {
extraMetaData.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilter.serializeToString());
if (minRecordKey != null && maxRecordKey != null) {
extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey);
extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey);
}
if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
extraMetaData.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name());
}
}
return new WriteSupport.FinalizedWriteContext(extraMetaData);
Map<String, String> extraMetadata =
bloomFilterWriteSupportOpt.map(HoodieBloomFilterWriteSupport::finalizeMetadata)
.orElse(Collections.emptyMap());

return new WriteSupport.FinalizedWriteContext(extraMetadata);
}

public void add(String recordKey) {
this.bloomFilter.add(recordKey);
if (minRecordKey != null) {
minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey;
} else {
minRecordKey = recordKey;
this.bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport ->
bloomFilterWriteSupport.addKey(recordKey));
}

private static class HoodieBloomFilterRowDataWriteSupport extends HoodieBloomFilterWriteSupport<String> {
public HoodieBloomFilterRowDataWriteSupport(BloomFilter bloomFilter) {
super(bloomFilter);
}

if (maxRecordKey != null) {
maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey;
} else {
maxRecordKey = recordKey;
@Override
protected byte[] getUTF8Bytes(String key) {
return key.getBytes(StandardCharsets.UTF_8);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,35 @@
package org.apache.hudi.io.storage.row;

import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;

import java.util.HashMap;

import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
import java.util.Collections;
import java.util.Map;

/**
* Hoodie Write Support for directly writing Row to Parquet.
*/
public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {

private final Configuration hadoopConf;
private final BloomFilter bloomFilter;

private UTF8String minRecordKey;
private UTF8String maxRecordKey;
private final Option<HoodieBloomFilterWriteSupport<UTF8String>> bloomFilterWriteSupportOpt;

public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, Option<BloomFilter> bloomFilterOpt, HoodieWriteConfig writeConfig) {
Configuration hadoopConf = new Configuration(conf);
hadoopConf.set("spark.sql.parquet.writeLegacyFormat", writeConfig.parquetWriteLegacyFormatEnabled());
hadoopConf.set("spark.sql.parquet.outputTimestampType", writeConfig.parquetOutputTimestampType());
hadoopConf.set("spark.sql.parquet.fieldId.write.enabled", writeConfig.parquetFieldIdWriteEnabled());
this.hadoopConf = hadoopConf;
setSchema(structType, hadoopConf);
this.bloomFilter = bloomFilterOpt.orElse(null);

this.hadoopConf = hadoopConf;
this.bloomFilterWriteSupportOpt = bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
}

public Configuration getHadoopConf() {
Expand All @@ -62,32 +56,35 @@ public Configuration getHadoopConf() {

@Override
public WriteSupport.FinalizedWriteContext finalizeWrite() {
HashMap<String, String> extraMetaData = new HashMap<>();
if (bloomFilter != null) {
extraMetaData.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilter.serializeToString());
if (minRecordKey != null && maxRecordKey != null) {
extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey.toString());
extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey.toString());
}
if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
extraMetaData.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name());
}
}
return new WriteSupport.FinalizedWriteContext(extraMetaData);
Map<String, String> extraMetadata =
bloomFilterWriteSupportOpt.map(HoodieBloomFilterWriteSupport::finalizeMetadata)
.orElse(Collections.emptyMap());

return new WriteSupport.FinalizedWriteContext(extraMetadata);
}

public void add(UTF8String recordKey) {
this.bloomFilter.add(recordKey.getBytes());
this.bloomFilterWriteSupportOpt.ifPresent(bloomFilterWriteSupport ->
bloomFilterWriteSupport.addKey(recordKey));
}

if (minRecordKey == null || minRecordKey.compareTo(recordKey) < 0) {
private static class HoodieBloomFilterRowWriteSupport extends HoodieBloomFilterWriteSupport<UTF8String> {
public HoodieBloomFilterRowWriteSupport(BloomFilter bloomFilter) {
super(bloomFilter);
}

@Override
protected byte[] getUTF8Bytes(UTF8String key) {
return key.getBytes();
}

@Override
protected UTF8String dereference(UTF8String key) {
// NOTE: [[clone]] is performed here (rather than [[copy]]) to only copy underlying buffer in
// cases when [[UTF8String]] is pointing into a buffer storing the whole containing record,
// and simply do a pass over when it holds a (immutable) buffer holding just the string
minRecordKey = recordKey.clone();
}

if (maxRecordKey == null || maxRecordKey.compareTo(recordKey) > 0) {
maxRecordKey = recordKey.clone();
return key.clone();
}
}

}
Loading

0 comments on commit 5110562

Please sign in to comment.