Skip to content

Commit

Permalink
Removed superfluous overrides of the HoodieBaseParquetConfig;
Browse files Browse the repository at this point in the history
`HoodieBaseParquetConfig` > `HoodieParquetConfig`
  • Loading branch information
Alexey Kudinkin committed Jun 24, 2022
1 parent a00533a commit 85ea89b
Show file tree
Hide file tree
Showing 15 changed files with 38 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.avro.HoodieAvroWriteSupport
import org.apache.hudi.client.SparkTaskContextSupplier
import org.apache.hudi.common.HoodieJsonPayload
import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory}
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.util.BaseFileUtils
import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig}
import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieAvroParquetWriter}
import org.apache.hudi.io.storage.{HoodieAvroParquetWriter, HoodieParquetConfig}
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.sql.{DataFrame, SQLContext}
Expand All @@ -45,7 +44,7 @@ object SparkHelpers {
val filter: BloomFilter = BloomFilterFactory.createBloomFilter(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_FPP_VALUE.defaultValue.toDouble,
HoodieIndexConfig.BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue.toInt, HoodieIndexConfig.BLOOM_FILTER_TYPE.defaultValue);
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(fs.getConf).convert(schema), schema, org.apache.hudi.common.util.Option.of(filter))
val parquetConfig: HoodieAvroParquetConfig = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt, fs.getConf, HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble)
val parquetConfig: HoodieParquetConfig[HoodieAvroWriteSupport] = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt, fs.getConf, HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble)

// Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'.
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ public class HoodieAvroParquetWriter<R extends IndexedRecord>

@SuppressWarnings({"unchecked", "rawtypes"})
public HoodieAvroParquetWriter(Path file,
HoodieAvroParquetConfig parquetConfig,
HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig,
String instantTime,
TaskContextSupplier taskContextSupplier,
boolean populateMetaFields) throws IOException {
super(file, (HoodieBaseParquetConfig) parquetConfig);
super(file, (HoodieParquetConfig) parquetConfig);
this.fileName = file.getName();
this.writeSupport = parquetConfig.getWriteSupport();
this.instantTime = instantTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public abstract class HoodieBaseParquetWriter<R> extends ParquetWriter<R> {
private long lastCachedDataSize = -1;

public HoodieBaseParquetWriter(Path file,
HoodieBaseParquetConfig<? extends WriteSupport<R>> parquetConfig) throws IOException {
HoodieParquetConfig<? extends WriteSupport<R>> parquetConfig) throws IOException {
super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
ParquetFileWriter.Mode.CREATE,
parquetConfig.getWriteSupport(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFi
Option<BloomFilter> filter = enableBloomFilter ? Option.of(createBloomFilter(config)) : Option.empty();
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter(conf).convert(schema), schema, filter);

HoodieAvroParquetConfig parquetConfig = new HoodieAvroParquetConfig(writeSupport, config.getParquetCompressionCodec(),
HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig = new HoodieParquetConfig<>(writeSupport, config.getParquetCompressionCodec(),
config.getParquetBlockSize(), config.getParquetPageSize(), config.getParquetMaxFileSize(),
conf, config.getParquetCompressionRatio(), config.parquetDictionaryEnabled());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

package org.apache.hudi.testutils;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
Expand All @@ -38,18 +44,11 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.io.storage.HoodieOrcConfig;
import org.apache.hudi.io.storage.HoodieOrcWriter;
import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.orc.CompressionKind;
Expand Down Expand Up @@ -110,7 +109,7 @@ public Path withInserts(String partition, String fileId, List<HoodieRecord> reco
if (HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().equals(HoodieFileFormat.PARQUET)) {
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
new AvroSchemaConverter().convert(schema), schema, Option.of(filter));
HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP,
HoodieParquetConfig<HoodieAvroWriteSupport> config = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024,
new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()));
try (HoodieAvroParquetWriter writer = new HoodieAvroParquetWriter<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.table.HoodieTable;

import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -67,7 +68,7 @@ private static HoodieRowDataFileWriter newParquetInternalRowFileWriter(
HoodieRowDataParquetWriteSupport writeSupport =
new HoodieRowDataParquetWriteSupport(table.getHadoopConf(), rowType, filter);
return new HoodieRowDataParquetWriter(
path, new HoodieRowDataParquetConfig(
path, new HoodieParquetConfig<>(
writeSupport,
writeConfig.getParquetCompressionCodec(),
writeConfig.getParquetBlockSize(),
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.flink.table.data.RowData;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;

Expand All @@ -39,7 +40,7 @@ public class HoodieRowDataParquetWriter extends ParquetWriter<RowData>
private final long maxFileSize;
private final HoodieRowDataParquetWriteSupport writeSupport;

public HoodieRowDataParquetWriter(Path file, HoodieRowDataParquetConfig parquetConfig)
public HoodieRowDataParquetWriter(Path file, HoodieParquetConfig<HoodieRowDataParquetWriteSupport> parquetConfig)
throws IOException {
super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(), parquetConfig.getCompressionCodecName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.table.HoodieTable;

import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -68,7 +69,8 @@ private static HoodieInternalRowFileWriter newParquetInternalRowFileWriter(
HoodieRowParquetWriteSupport writeSupport =
new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, filter, writeConfig);
return new HoodieInternalRowParquetWriter(
path, new HoodieRowParquetConfig(
path,
new HoodieParquetConfig<>(
writeSupport,
writeConfig.getParquetCompressionCodec(),
writeConfig.getParquetBlockSize(),
Expand All @@ -95,13 +97,15 @@ private static HoodieInternalRowFileWriter newParquetInternalRowFileWriterWithou
HoodieRowParquetWriteSupport writeSupport =
new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, null, writeConfig);
return new HoodieInternalRowParquetWriter(
path, new HoodieRowParquetConfig(
path, new HoodieParquetConfig<>(
writeSupport,
writeConfig.getParquetCompressionCodec(),
writeConfig.getParquetBlockSize(),
writeConfig.getParquetPageSize(),
writeConfig.getParquetMaxFileSize(),
writeSupport.getHadoopConf(),
writeConfig.getParquetCompressionRatio()));
writeConfig.getParquetCompressionRatio(),
writeConfig.parquetDictionaryEnabled())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.io.storage.row;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.io.storage.HoodieBaseParquetWriter;
import org.apache.spark.sql.catalyst.InternalRow;

Expand All @@ -32,7 +33,7 @@ public class HoodieInternalRowParquetWriter extends HoodieBaseParquetWriter<Inte

private final HoodieRowParquetWriteSupport writeSupport;

public HoodieInternalRowParquetWriter(Path file, HoodieRowParquetConfig parquetConfig)
public HoodieInternalRowParquetWriter(Path file, HoodieParquetConfig<HoodieRowParquetWriteSupport> parquetConfig)
throws IOException {
super(file, parquetConfig);

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,20 @@

package org.apache.hudi.common.table.log.block;

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.fs.inline.InLineFSUtils;
import org.apache.hudi.common.fs.inline.InLineFileSystem;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.io.storage.HoodieParquetStreamWriter;

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroSchemaConverter;
Expand All @@ -43,7 +42,6 @@
import org.apache.parquet.io.InputFile;

import javax.annotation.Nonnull;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -97,8 +95,8 @@ protected byte[] serializeRecords(List<IndexedRecord> records) throws IOExceptio
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
new AvroSchemaConverter().convert(writerSchema), writerSchema, Option.empty());

HoodieAvroParquetConfig avroParquetConfig =
new HoodieAvroParquetConfig(
HoodieParquetConfig<HoodieAvroWriteSupport> avroParquetConfig =
new HoodieParquetConfig<>(
writeSupport,
compressionCodecName.get(),
ParquetWriter.DEFAULT_BLOCK_SIZE,
Expand Down

This file was deleted.

Loading

0 comments on commit 85ea89b

Please sign in to comment.