Skip to content

Commit

Permalink
[MINOR] Make sure Dictionary Encoding in Parquet enabled by default (#…
Browse files Browse the repository at this point in the history
…7052)

* Make sure Dictionary Encoding in Parquet allowed by default

* Fixing the test fixture

* Rebased all users of `HoodieParquetConfig` to rely on configured value from `HoodieWriteConfig`

* Tidying up

* Fixed tests
  • Loading branch information
alexeykudinkin authored Nov 15, 2022
1 parent 3b8df4b commit 6b0b03b
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 21 deletions.
11 changes: 10 additions & 1 deletion hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,16 @@ object SparkHelpers {
val filter: BloomFilter = BloomFilterFactory.createBloomFilter(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble,
HoodieIndexConfig.BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_TYPE.defaultValue);
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(fs.getConf).convert(schema), schema, org.apache.hudi.common.util.Option.of(filter))
val parquetConfig: HoodieParquetConfig[HoodieAvroWriteSupport] = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt, fs.getConf, HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble)
val parquetConfig: HoodieParquetConfig[HoodieAvroWriteSupport] =
new HoodieParquetConfig(
writeSupport,
CompressionCodecName.GZIP,
HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt,
HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt,
HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt,
fs.getConf,
HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble,
HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED.defaultValue)

// Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'.
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,13 @@ private static HoodieLogBlock getBlock(HoodieWriteConfig writeConfig,
return new HoodieHFileDataBlock(
recordList, header, writeConfig.getHFileCompressionAlgorithm(), new Path(writeConfig.getBasePath()));
case PARQUET_DATA_BLOCK:
return new HoodieParquetDataBlock(recordList, header, keyField, writeConfig.getParquetCompressionCodec());
return new HoodieParquetDataBlock(
recordList,
header,
keyField,
writeConfig.getParquetCompressionCodec(),
writeConfig.getParquetCompressionRatio(),
writeConfig.parquetDictionaryEnabled());
default:
throw new HoodieException("Data block format " + logDataBlockFormat + " not implemented");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testProperWriting() throws IOException {

HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig =
new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE,
ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, hadoopConf, 0.1);
ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, hadoopConf, 0.1, true);

Path filePath = new Path(tmpDir.resolve("test.parquet").toAbsolutePath().toString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public Path withInserts(String partition, String fileId, List<HoodieRecord> reco
new AvroSchemaConverter().convert(schema), schema, Option.of(filter));
HoodieParquetConfig<HoodieAvroWriteSupport> config = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024,
new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()));
new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()), true);
try (HoodieAvroParquetWriter writer = new HoodieAvroParquetWriter<>(
new Path(Paths.get(basePath, partition, fileName).toString()), config, currentInstantTime,
contextSupplier, populateMetaFields)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ private static HoodieRowDataFileWriter newParquetInternalRowFileWriter(
writeConfig.getParquetPageSize(),
writeConfig.getParquetMaxFileSize(),
writeSupport.getHadoopConf(),
writeConfig.getParquetCompressionRatio()));
writeConfig.getParquetCompressionRatio(),
writeConfig.parquetDictionaryEnabled()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;

import javax.annotation.Nonnull;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
Expand All @@ -54,6 +53,8 @@
public class HoodieParquetDataBlock extends HoodieDataBlock {

private final Option<CompressionCodecName> compressionCodecName;
private final Option<Double> expectedCompressionRatio;
private final Option<Boolean> useDictionaryEncoding;

public HoodieParquetDataBlock(FSDataInputStream inputStream,
Option<byte[]> content,
Expand All @@ -66,17 +67,22 @@ public HoodieParquetDataBlock(FSDataInputStream inputStream,
super(content, inputStream, readBlockLazily, Option.of(logBlockContentLocation), readerSchema, header, footer, keyField, false);

this.compressionCodecName = Option.empty();
this.expectedCompressionRatio = Option.empty();
this.useDictionaryEncoding = Option.empty();
}

public HoodieParquetDataBlock(
@Nonnull List<IndexedRecord> records,
@Nonnull Map<HeaderMetadataType, String> header,
@Nonnull String keyField,
@Nonnull CompressionCodecName compressionCodecName
public HoodieParquetDataBlock(List<IndexedRecord> records,
Map<HeaderMetadataType, String> header,
String keyField,
CompressionCodecName compressionCodecName,
double expectedCompressionRatio,
boolean useDictionaryEncoding
) {
super(records, header, new HashMap<>(), keyField);

this.compressionCodecName = Option.of(compressionCodecName);
this.expectedCompressionRatio = Option.of(expectedCompressionRatio);
this.useDictionaryEncoding = Option.of(useDictionaryEncoding);
}

@Override
Expand All @@ -103,7 +109,8 @@ protected byte[] serializeRecords(List<IndexedRecord> records) throws IOExceptio
ParquetWriter.DEFAULT_PAGE_SIZE,
1024 * 1024 * 1024,
new Configuration(),
Double.parseDouble(String.valueOf(0.1)));//HoodieStorageConfig.PARQUET_COMPRESSION_RATIO.defaultValue()));
expectedCompressionRatio.get(),
useDictionaryEncoding.get());

ByteArrayOutputStream baos = new ByteArrayOutputStream();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ public class HoodieParquetConfig<T> {
private final double compressionRatio;
private final boolean dictionaryEnabled;

public HoodieParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize,
int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio) {
this(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio, false);
}

public HoodieParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize,
int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio, boolean dictionaryEnabled) {
this.writeSupport = writeSupport;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2347,7 +2347,7 @@ public void testDataBlockFormatAppendAndReadWithProjectedSchema(
new HashMap<HoodieLogBlockType, Integer>() {{
put(HoodieLogBlockType.AVRO_DATA_BLOCK, 0); // not supported
put(HoodieLogBlockType.HFILE_DATA_BLOCK, 0); // not supported
put(HoodieLogBlockType.PARQUET_DATA_BLOCK, HoodieAvroUtils.gteqAvro1_9() ? 2593 : 2605);
put(HoodieLogBlockType.PARQUET_DATA_BLOCK, HoodieAvroUtils.gteqAvro1_9() ? 1802 : 1809);
}};

List<IndexedRecord> recordsRead = getRecords(dataBlockRead);
Expand Down Expand Up @@ -2378,7 +2378,7 @@ private HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, List<Inde
case HFILE_DATA_BLOCK:
return new HoodieHFileDataBlock(records, header, Compression.Algorithm.GZ, pathForReader);
case PARQUET_DATA_BLOCK:
return new HoodieParquetDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD, CompressionCodecName.GZIP);
return new HoodieParquetDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD, CompressionCodecName.GZIP, 0.1, true);
default:
throw new RuntimeException("Unknown data block type " + dataBlockType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ public static HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir,
dataBlock = new HoodieHFileDataBlock(
records, header, Compression.Algorithm.GZ, writer.getLogFile().getPath());
} else if (logBlockType == HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK) {
dataBlock = new HoodieParquetDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD, CompressionCodecName.GZIP);
dataBlock = new HoodieParquetDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD, CompressionCodecName.GZIP, 0.1, true);
} else {
dataBlock = new HoodieAvroDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,16 @@ object SparkHelpers {
val filter: BloomFilter = BloomFilterFactory.createBloomFilter(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble,
HoodieIndexConfig.BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_TYPE.defaultValue);
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(fs.getConf).convert(schema), schema, org.apache.hudi.common.util.Option.of(filter))
val parquetConfig: HoodieParquetConfig[HoodieAvroWriteSupport] = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt, fs.getConf, HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble)
val parquetConfig: HoodieParquetConfig[HoodieAvroWriteSupport] =
new HoodieParquetConfig(
writeSupport,
CompressionCodecName.GZIP,
HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt,
HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt,
HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt,
fs.getConf,
HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble,
HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED.defaultValue)

// Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'.
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)
Expand Down

0 comments on commit 6b0b03b

Please sign in to comment.