Skip to content

Commit

Permalink
[HUDI-4038] Avoid calling getDataSize after every record written (a…
Browse files Browse the repository at this point in the history
…pache#5497)

- getDataSize has non-trivial overhead in the current ParquetWriter impl, requiring traversal of already composed Column Groups in memory. Instead we can sample these calls to getDataSize to amortize its cost.

Co-authored-by: sivabalan <n.siva.b@gmail.com>
  • Loading branch information
2 people authored and cdmikechen committed May 13, 2022
1 parent daaa170 commit 0bd4539
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory}
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.util.BaseFileUtils
import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig}
import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieParquetWriter}
import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieAvroParquetWriter}
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.sql.{DataFrame, SQLContext}
Expand All @@ -50,8 +50,7 @@ object SparkHelpers {
// Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'.
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)

val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](instantTime, destinationFile, parquetConfig, schema, new SparkTaskContextSupplier(),
true)
val writer = new HoodieAvroParquetWriter[IndexedRecord](destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true)
for (rec <- sourceRecords) {
val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
if (!keysToSkip.contains(key)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,15 @@

package org.apache.hudi.io.storage;

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;

import javax.annotation.concurrent.NotThreadSafe;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
* HoodieParquetWriter extends the ParquetWriter to help limit the size of underlying file. Provides a way to check if
Expand All @@ -42,45 +35,24 @@
* ATTENTION: HoodieParquetWriter is not thread safe and developer should take care of the order of write and close
*/
@NotThreadSafe
public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
extends ParquetWriter<IndexedRecord> implements HoodieFileWriter<R> {
public class HoodieAvroParquetWriter<R extends IndexedRecord>
extends HoodieBaseParquetWriter<IndexedRecord>
implements HoodieFileWriter<R> {

private static AtomicLong recordIndex = new AtomicLong(1);

private final Path file;
private final HoodieWrapperFileSystem fs;
private final long maxFileSize;
private final HoodieAvroWriteSupport writeSupport;
private final String fileName;
private final String instantTime;
private final TaskContextSupplier taskContextSupplier;
private final boolean populateMetaFields;
private final HoodieAvroWriteSupport writeSupport;

public HoodieParquetWriter(String instantTime,
Path file,
HoodieAvroParquetConfig parquetConfig,
Schema schema,
TaskContextSupplier taskContextSupplier,
boolean populateMetaFields) throws IOException {
super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
ParquetFileWriter.Mode.CREATE,
parquetConfig.getWriteSupport(),
parquetConfig.getCompressionCodecName(),
parquetConfig.getBlockSize(),
parquetConfig.getPageSize(),
parquetConfig.getPageSize(),
parquetConfig.dictionaryEnabled(),
DEFAULT_IS_VALIDATING_ENABLED,
DEFAULT_WRITER_VERSION,
FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf());
this.fs =
(HoodieWrapperFileSystem) this.file.getFileSystem(FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
// We cannot accurately measure the snappy compressed output file size. We are choosing a
// conservative 10%
// TODO - compute this compression ratio dynamically by looking at the bytes written to the
// stream and the actual file size reported by HDFS
this.maxFileSize = parquetConfig.getMaxFileSize()
+ Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio());
@SuppressWarnings({"unchecked", "rawtypes"})
public HoodieAvroParquetWriter(Path file,
HoodieAvroParquetConfig parquetConfig,
String instantTime,
TaskContextSupplier taskContextSupplier,
boolean populateMetaFields) throws IOException {
super(file, (HoodieBaseParquetConfig) parquetConfig);
this.fileName = file.getName();
this.writeSupport = parquetConfig.getWriteSupport();
this.instantTime = instantTime;
this.taskContextSupplier = taskContextSupplier;
Expand All @@ -91,19 +63,14 @@ public HoodieParquetWriter(String instantTime,
public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException {
if (populateMetaFields) {
prepRecordWithMetadata(key, avroRecord, instantTime,
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
taskContextSupplier.getPartitionIdSupplier().get(), getWrittenRecordCount(), fileName);
super.write(avroRecord);
writeSupport.add(key.getRecordKey());
} else {
super.write(avroRecord);
}
}

@Override
public boolean canWrite() {
return getDataSize() < maxFileSize;
}

@Override
public void writeAvro(String key, IndexedRecord object) throws IOException {
super.write(object);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io.storage;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
* Base class of Hudi's custom {@link ParquetWriter} implementations
*
* @param <R> target type of the object being written into Parquet files (for ex,
* {@code IndexedRecord}, {@code InternalRow})
*/
public abstract class HoodieBaseParquetWriter<R> extends ParquetWriter<R> {

private static final int WRITTEN_RECORDS_THRESHOLD_FOR_FILE_SIZE_CHECK = 1000;

private final AtomicLong writtenRecordCount = new AtomicLong(0);
private final long maxFileSize;
private long lastCachedDataSize = -1;

public HoodieBaseParquetWriter(Path file,
HoodieBaseParquetConfig<? extends WriteSupport<R>> parquetConfig) throws IOException {
super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
ParquetFileWriter.Mode.CREATE,
parquetConfig.getWriteSupport(),
parquetConfig.getCompressionCodecName(),
parquetConfig.getBlockSize(),
parquetConfig.getPageSize(),
parquetConfig.getPageSize(),
parquetConfig.dictionaryEnabled(),
DEFAULT_IS_VALIDATING_ENABLED,
DEFAULT_WRITER_VERSION,
FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));

// We cannot accurately measure the snappy compressed output file size. We are choosing a
// conservative 10%
// TODO - compute this compression ratio dynamically by looking at the bytes written to the
// stream and the actual file size reported by HDFS
this.maxFileSize = parquetConfig.getMaxFileSize()
+ Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio());
}

public boolean canWrite() {
// TODO we can actually do evaluation more accurately:
// if we cache last data size check, since we account for how many records
// were written we can accurately project avg record size, and therefore
// estimate how many more records we can write before cut off
if (lastCachedDataSize == -1 || getWrittenRecordCount() % WRITTEN_RECORDS_THRESHOLD_FOR_FILE_SIZE_CHECK == 0) {
lastCachedDataSize = getDataSize();
}
return lastCachedDataSize < maxFileSize;
}

@Override
public void write(R object) throws IOException {
super.write(object);
writtenRecordCount.incrementAndGet();
}

protected long getWrittenRecordCount() {
return writtenRecordCount.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@

package org.apache.hudi.io.storage;

import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;

import org.apache.avro.generic.IndexedRecord;

import java.io.IOException;

public interface HoodieFileWriter<R extends IndexedRecord> {
Expand All @@ -38,8 +36,8 @@ public interface HoodieFileWriter<R extends IndexedRecord> {

void writeAvro(String key, R oldRecord) throws IOException;

default void prepRecordWithMetadata(HoodieKey key, R avroRecord, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) {
String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement());
default void prepRecordWithMetadata(HoodieKey key, R avroRecord, String instantTime, Integer partitionId, long recordIndex, String fileName) {
String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex);
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, key.getRecordKey(), key.getPartitionPath(), fileName);
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFi
config.getParquetBlockSize(), config.getParquetPageSize(), config.getParquetMaxFileSize(),
conf, config.getParquetCompressionRatio(), config.parquetDictionaryEnabled());

return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema, taskContextSupplier, populateMetaFields);
return new HoodieAvroParquetWriter<>(path, parquetConfig, instantTime, taskContextSupplier, populateMetaFields);
}

static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newHFileFileWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileC
public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException {
if (populateMetaFields) {
prepRecordWithMetadata(key, avroRecord, instantTime,
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex.getAndIncrement(), file.getName());
writeAvro(key.getRecordKey(), avroRecord);
} else {
writeAvro(key.getRecordKey(), avroRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public HoodieOrcWriter(String instantTime, Path file, HoodieOrcConfig config, Sc
@Override
public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException {
prepRecordWithMetadata(key, avroRecord, instantTime,
taskContextSupplier.getPartitionIdSupplier().get(), RECORD_INDEX, file.getName());
taskContextSupplier.getPartitionIdSupplier().get(), RECORD_INDEX.getAndIncrement(), file.getName());
writeAvro(key.getRecordKey(), avroRecord);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
import org.apache.hudi.io.storage.HoodieOrcConfig;
import org.apache.hudi.io.storage.HoodieOrcWriter;
import org.apache.hudi.io.storage.HoodieParquetWriter;
import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -113,10 +113,9 @@ public Path withInserts(String partition, String fileId, List<HoodieRecord> reco
HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024,
new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()));
try (HoodieParquetWriter writer = new HoodieParquetWriter(
currentInstantTime,
new Path(Paths.get(basePath, partition, fileName).toString()),
config, schema, contextSupplier, populateMetaFields)) {
try (HoodieAvroParquetWriter writer = new HoodieAvroParquetWriter<>(
new Path(Paths.get(basePath, partition, fileName).toString()), config, currentInstantTime,
contextSupplier, populateMetaFields)) {
int seqId = 1;
for (HoodieRecord record : records) {
GenericRecord avroRecord = (GenericRecord) ((HoodieRecordPayload) record.getData()).getInsertValue(schema).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ public void testFileSizeUpsertRecords() throws Exception {

List<HoodieRecord> records = new ArrayList<>();
// Approx 1150 records are written for block size of 64KB
for (int i = 0; i < 2000; i++) {
for (int i = 0; i < 2050; i++) {
String recordStr = "{\"_row_key\":\"" + UUID.randomUUID().toString()
+ "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":" + i + "}";
RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
Expand All @@ -402,7 +402,8 @@ public void testFileSizeUpsertRecords() throws Exception {
counts++;
}
}
assertEquals(5, counts, "If the number of records are more than 1150, then there should be a new file");
// we check canWrite only once every 1000 records. and so 2 files with 1000 records and 3rd file with 50 records.
assertEquals(3, counts, "If the number of records are more than 1150, then there should be a new file");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,24 @@
package org.apache.hudi.io.storage.row;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;

import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.hudi.io.storage.HoodieBaseParquetWriter;
import org.apache.spark.sql.catalyst.InternalRow;

import java.io.IOException;

/**
* Parquet's impl of {@link HoodieInternalRowFileWriter} to write {@link InternalRow}s.
*/
public class HoodieInternalRowParquetWriter extends ParquetWriter<InternalRow>
public class HoodieInternalRowParquetWriter extends HoodieBaseParquetWriter<InternalRow>
implements HoodieInternalRowFileWriter {

private final Path file;
private final HoodieWrapperFileSystem fs;
private final long maxFileSize;
private final HoodieRowParquetWriteSupport writeSupport;

public HoodieInternalRowParquetWriter(Path file, HoodieRowParquetConfig parquetConfig)
throws IOException {
super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(), parquetConfig.getCompressionCodecName(),
parquetConfig.getBlockSize(), parquetConfig.getPageSize(), parquetConfig.getPageSize(),
DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED,
DEFAULT_WRITER_VERSION, FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf());
this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(FSUtils.registerFileSystem(file,
parquetConfig.getHadoopConf()));
this.maxFileSize = parquetConfig.getMaxFileSize()
+ Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio());
this.writeSupport = parquetConfig.getWriteSupport();
}
super(file, parquetConfig);

@Override
public boolean canWrite() {
return getDataSize() < maxFileSize;
this.writeSupport = parquetConfig.getWriteSupport();
}

@Override
Expand All @@ -69,9 +49,4 @@ public void writeRow(String key, InternalRow row) throws IOException {
public void writeRow(InternalRow row) throws IOException {
super.write(row);
}

@Override
public void close() throws IOException {
super.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void testGetFileWriter() throws IOException {
SparkTaskContextSupplier supplier = new SparkTaskContextSupplier();
HoodieFileWriter<IndexedRecord> parquetWriter = HoodieFileWriterFactory.getFileWriter(instantTime,
parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
assertTrue(parquetWriter instanceof HoodieParquetWriter);
assertTrue(parquetWriter instanceof HoodieAvroParquetWriter);

// hfile format.
final Path hfilePath = new Path(basePath + "/partition/path/f1_1-0-1_000.hfile");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ public void testFileSizeUpsertRecords() throws Exception {

List<HoodieRecord> records = new ArrayList<>();
// Approx 1150 records are written for block size of 64KB
for (int i = 0; i < 2000; i++) {
for (int i = 0; i < 2050; i++) {
String recordStr = "{\"_row_key\":\"" + UUID.randomUUID().toString()
+ "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":" + i + "}";
RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
Expand All @@ -441,7 +441,8 @@ public void testFileSizeUpsertRecords() throws Exception {
counts++;
}
}
assertEquals(5, counts, "If the number of records are more than 1150, then there should be a new file");
// we check canWrite only once every 1000 records. and so 2 files with 1000 records and 3rd file with 50 records.
assertEquals(3, counts, "If the number of records are more than 1150, then there should be a new file");
}

@Test
Expand Down

0 comments on commit 0bd4539

Please sign in to comment.