diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala index b9f8df5fc233..5c0e4a662cd4 100644 --- a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala +++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala @@ -44,7 +44,16 @@ object SparkHelpers { val filter: BloomFilter = BloomFilterFactory.createBloomFilter(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble, HoodieIndexConfig.BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_TYPE.defaultValue); val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(fs.getConf).convert(schema), schema, org.apache.hudi.common.util.Option.of(filter)) - val parquetConfig: HoodieParquetConfig[HoodieAvroWriteSupport] = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt, fs.getConf, HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble) + val parquetConfig: HoodieParquetConfig[HoodieAvroWriteSupport] = + new HoodieParquetConfig( + writeSupport, + CompressionCodecName.GZIP, + HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt, + HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt, + HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt, + fs.getConf, + HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble, + HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED.defaultValue) // Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'. parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 830c5743b800..b7b0c626d5a0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -608,7 +608,13 @@ private static HoodieLogBlock getBlock(HoodieWriteConfig writeConfig, return new HoodieHFileDataBlock( recordList, header, writeConfig.getHFileCompressionAlgorithm(), new Path(writeConfig.getBasePath())); case PARQUET_DATA_BLOCK: - return new HoodieParquetDataBlock(recordList, header, keyField, writeConfig.getParquetCompressionCodec()); + return new HoodieParquetDataBlock( + recordList, + header, + keyField, + writeConfig.getParquetCompressionCodec(), + writeConfig.getParquetCompressionRatio(), + writeConfig.parquetDictionaryEnabled()); default: throw new HoodieException("Data block format " + logDataBlockFormat + " not implemented"); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java index df879dc81639..5b4c4df285c3 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java @@ -67,7 +67,7 @@ public void testProperWriting() throws IOException { HoodieParquetConfig parquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, - ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, hadoopConf, 0.1); + ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, hadoopConf, 0.1, true); Path filePath = new Path(tmpDir.resolve("test.parquet").toAbsolutePath().toString()); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java index 8e7df833cc5d..fb6695752495 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java @@ -110,7 +110,7 @@ public Path withInserts(String partition, String fileId, List reco new AvroSchemaConverter().convert(schema), schema, Option.of(filter)); HoodieParquetConfig config = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024, - new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue())); + new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()), true); try (HoodieAvroParquetWriter writer = new HoodieAvroParquetWriter<>( new Path(Paths.get(basePath, partition, fileName).toString()), config, currentInstantTime, contextSupplier, populateMetaFields)) { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java index 98d4a866e0ee..816cec4f906c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java @@ -75,6 +75,7 @@ private static HoodieRowDataFileWriter newParquetInternalRowFileWriter( writeConfig.getParquetPageSize(), writeConfig.getParquetMaxFileSize(), writeSupport.getHadoopConf(), - writeConfig.getParquetCompressionRatio())); + writeConfig.getParquetCompressionRatio(), + writeConfig.parquetDictionaryEnabled())); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java index afb448f84489..eaad70fb7347 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java @@ -41,7 +41,6 @@ import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.io.InputFile; -import javax.annotation.Nonnull; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.HashMap; @@ -54,6 +53,8 @@ public class HoodieParquetDataBlock extends HoodieDataBlock { private final Option compressionCodecName; + private final Option expectedCompressionRatio; + private final Option useDictionaryEncoding; public HoodieParquetDataBlock(FSDataInputStream inputStream, Option content, @@ -66,17 +67,22 @@ public HoodieParquetDataBlock(FSDataInputStream inputStream, super(content, inputStream, readBlockLazily, Option.of(logBlockContentLocation), readerSchema, header, footer, keyField, false); this.compressionCodecName = Option.empty(); + this.expectedCompressionRatio = Option.empty(); + this.useDictionaryEncoding = Option.empty(); } - public HoodieParquetDataBlock( - @Nonnull List records, - @Nonnull Map header, - @Nonnull String keyField, - @Nonnull CompressionCodecName compressionCodecName + public HoodieParquetDataBlock(List records, + Map header, + String keyField, + CompressionCodecName compressionCodecName, + double expectedCompressionRatio, + boolean useDictionaryEncoding ) { super(records, header, new HashMap<>(), keyField); this.compressionCodecName = Option.of(compressionCodecName); + this.expectedCompressionRatio = Option.of(expectedCompressionRatio); + this.useDictionaryEncoding = Option.of(useDictionaryEncoding); } @Override @@ -103,7 +109,8 @@ protected byte[] serializeRecords(List records) throws IOExceptio ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, new Configuration(), - Double.parseDouble(String.valueOf(0.1)));//HoodieStorageConfig.PARQUET_COMPRESSION_RATIO.defaultValue())); + expectedCompressionRatio.get(), + useDictionaryEncoding.get()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java index 77fea6beee52..b5e567b7644e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java @@ -35,11 +35,6 @@ public class HoodieParquetConfig { private final double compressionRatio; private final boolean dictionaryEnabled; - public HoodieParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize, - int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio) { - this(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio, false); - } - public HoodieParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio, boolean dictionaryEnabled) { this.writeSupport = writeSupport; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index f16eb52d3686..c74d3e5c4354 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -2347,7 +2347,7 @@ public void testDataBlockFormatAppendAndReadWithProjectedSchema( new HashMap() {{ put(HoodieLogBlockType.AVRO_DATA_BLOCK, 0); // not supported put(HoodieLogBlockType.HFILE_DATA_BLOCK, 0); // not supported - put(HoodieLogBlockType.PARQUET_DATA_BLOCK, HoodieAvroUtils.gteqAvro1_9() ? 2593 : 2605); + put(HoodieLogBlockType.PARQUET_DATA_BLOCK, HoodieAvroUtils.gteqAvro1_9() ? 1802 : 1809); }}; List recordsRead = getRecords(dataBlockRead); @@ -2378,7 +2378,7 @@ private HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, List