Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4038] Avoid calling getDataSize after every record written #5497

Merged
merged 6 commits into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory}
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.util.BaseFileUtils
import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig}
import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieParquetWriter}
import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieAvroParquetWriter}
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.sql.{DataFrame, SQLContext}
Expand All @@ -50,8 +50,7 @@ object SparkHelpers {
// Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'.
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)

val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](instantTime, destinationFile, parquetConfig, schema, new SparkTaskContextSupplier(),
true)
val writer = new HoodieAvroParquetWriter[IndexedRecord](destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true)
for (rec <- sourceRecords) {
val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
if (!keysToSkip.contains(key)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,15 @@

package org.apache.hudi.io.storage;

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;

import javax.annotation.concurrent.NotThreadSafe;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
* HoodieParquetWriter extends the ParquetWriter to help limit the size of underlying file. Provides a way to check if
Expand All @@ -42,45 +35,24 @@
* ATTENTION: HoodieParquetWriter is not thread safe and developer should take care of the order of write and close
*/
@NotThreadSafe
public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
extends ParquetWriter<IndexedRecord> implements HoodieFileWriter<R> {
public class HoodieAvroParquetWriter<R extends IndexedRecord>
extends HoodieBaseParquetWriter<IndexedRecord>
implements HoodieFileWriter<R> {

private static AtomicLong recordIndex = new AtomicLong(1);

private final Path file;
private final HoodieWrapperFileSystem fs;
private final long maxFileSize;
private final HoodieAvroWriteSupport writeSupport;
private final String fileName;
private final String instantTime;
private final TaskContextSupplier taskContextSupplier;
private final boolean populateMetaFields;
private final HoodieAvroWriteSupport writeSupport;

public HoodieParquetWriter(String instantTime,
Path file,
HoodieAvroParquetConfig parquetConfig,
Schema schema,
TaskContextSupplier taskContextSupplier,
boolean populateMetaFields) throws IOException {
super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
ParquetFileWriter.Mode.CREATE,
parquetConfig.getWriteSupport(),
parquetConfig.getCompressionCodecName(),
parquetConfig.getBlockSize(),
parquetConfig.getPageSize(),
parquetConfig.getPageSize(),
parquetConfig.dictionaryEnabled(),
DEFAULT_IS_VALIDATING_ENABLED,
DEFAULT_WRITER_VERSION,
FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf());
this.fs =
(HoodieWrapperFileSystem) this.file.getFileSystem(FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
// We cannot accurately measure the snappy compressed output file size. We are choosing a
// conservative 10%
// TODO - compute this compression ratio dynamically by looking at the bytes written to the
// stream and the actual file size reported by HDFS
this.maxFileSize = parquetConfig.getMaxFileSize()
+ Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio());
@SuppressWarnings({"unchecked", "rawtypes"})
public HoodieAvroParquetWriter(Path file,
HoodieAvroParquetConfig parquetConfig,
String instantTime,
TaskContextSupplier taskContextSupplier,
boolean populateMetaFields) throws IOException {
super(file, (HoodieBaseParquetConfig) parquetConfig);
this.fileName = file.getName();
this.writeSupport = parquetConfig.getWriteSupport();
this.instantTime = instantTime;
this.taskContextSupplier = taskContextSupplier;
Expand All @@ -91,19 +63,14 @@ public HoodieParquetWriter(String instantTime,
public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException {
if (populateMetaFields) {
prepRecordWithMetadata(key, avroRecord, instantTime,
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
taskContextSupplier.getPartitionIdSupplier().get(), getWrittenRecordCount(), fileName);
super.write(avroRecord);
writeSupport.add(key.getRecordKey());
} else {
super.write(avroRecord);
}
}

@Override
public boolean canWrite() {
return getDataSize() < maxFileSize;
}

@Override
public void writeAvro(String key, IndexedRecord object) throws IOException {
super.write(object);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io.storage;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
* Base class of Hudi's custom {@link ParquetWriter} implementations
*
* @param <R> target type of the object being written into Parquet files (for ex,
* {@code IndexedRecord}, {@code InternalRow})
*/
public abstract class HoodieBaseParquetWriter<R> extends ParquetWriter<R> {

private static final int WRITTEN_RECORDS_THRESHOLD_FOR_FILE_SIZE_CHECK = 1000;

private final AtomicLong writtenRecordCount = new AtomicLong(0);
private final long maxFileSize;
private long lastCachedDataSize = -1;
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved

public HoodieBaseParquetWriter(Path file,
HoodieBaseParquetConfig<? extends WriteSupport<R>> parquetConfig) throws IOException {
super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
ParquetFileWriter.Mode.CREATE,
parquetConfig.getWriteSupport(),
parquetConfig.getCompressionCodecName(),
parquetConfig.getBlockSize(),
parquetConfig.getPageSize(),
parquetConfig.getPageSize(),
parquetConfig.dictionaryEnabled(),
DEFAULT_IS_VALIDATING_ENABLED,
DEFAULT_WRITER_VERSION,
FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));

// We cannot accurately measure the snappy compressed output file size. We are choosing a
// conservative 10%
// TODO - compute this compression ratio dynamically by looking at the bytes written to the
// stream and the actual file size reported by HDFS
this.maxFileSize = parquetConfig.getMaxFileSize()
+ Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio());
}

public boolean canWrite() {
// TODO we can actually do evaluation more accurately:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to the overall idea. but here is the deal - the size may not update until a row group is actually flushed out to storage. so getDataSize() simply returns 0 until then. This on the fly file sizing is only useful for large files with multiple blocks/row groups. This is the behavior back in the day.

What pattern did you observe on writes to S3? is the getDataSize() real-time i.e reflect the last write's size between subsequent calls?

I see the code here. which should respect the buffered data?

  /**
   * @return the total size of data written to the file and buffered in memory
   */
  public long getDataSize() {
    return lastRowGroupEndPos + columnStore.getBufferedSize();
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to the overall idea. but here is the deal - the size may not update until a row group is actually flushed out to storage. so getDataSize() simply returns 0 until then.

It won't: it always returns accurate metric, b/c

  1. It keeps track how many bytes were written (lastRowGroupEndPos)
  2. It calculates the actual buffered footprint (columnStore.getBufferedSize())

With the second being the problem -- it always traverse all of the cached all groups to accurately calculate the in-memory footprint (and there's no internal caching). So what ended up happening it kept growing the buffer for the whole file (120Mb) not flushing in until closure which was making traversals quadratic in runtime.

// if we cache last data size check, since we account for how many records
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we file a JIRA for this follow on work, after verifying the realtime ness of the getDataSize()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I validated that getDataSize returns accurate results for buffered data

// were written we can accurately project avg record size, and therefore
// estimate how many more records we can write before cut off
if (lastCachedDataSize == -1 || getWrittenRecordCount() % WRITTEN_RECORDS_THRESHOLD_FOR_FILE_SIZE_CHECK == 0) {
lastCachedDataSize = getDataSize();
}
return lastCachedDataSize < maxFileSize;
}

@Override
public void write(R object) throws IOException {
super.write(object);
writtenRecordCount.incrementAndGet();
}

protected long getWrittenRecordCount() {
return writtenRecordCount.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@

package org.apache.hudi.io.storage;

import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;

import org.apache.avro.generic.IndexedRecord;

import java.io.IOException;

public interface HoodieFileWriter<R extends IndexedRecord> {
Expand All @@ -38,8 +36,8 @@ public interface HoodieFileWriter<R extends IndexedRecord> {

void writeAvro(String key, R oldRecord) throws IOException;

default void prepRecordWithMetadata(HoodieKey key, R avroRecord, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) {
String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement());
default void prepRecordWithMetadata(HoodieKey key, R avroRecord, String instantTime, Integer partitionId, long recordIndex, String fileName) {
String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex);
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, key.getRecordKey(), key.getPartitionPath(), fileName);
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFi
config.getParquetBlockSize(), config.getParquetPageSize(), config.getParquetMaxFileSize(),
conf, config.getParquetCompressionRatio(), config.parquetDictionaryEnabled());

return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema, taskContextSupplier, populateMetaFields);
return new HoodieAvroParquetWriter<>(path, parquetConfig, instantTime, taskContextSupplier, populateMetaFields);
}

static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newHFileFileWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileC
public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException {
if (populateMetaFields) {
prepRecordWithMetadata(key, avroRecord, instantTime,
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex.getAndIncrement(), file.getName());
writeAvro(key.getRecordKey(), avroRecord);
} else {
writeAvro(key.getRecordKey(), avroRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public HoodieOrcWriter(String instantTime, Path file, HoodieOrcConfig config, Sc
@Override
public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException {
prepRecordWithMetadata(key, avroRecord, instantTime,
taskContextSupplier.getPartitionIdSupplier().get(), RECORD_INDEX, file.getName());
taskContextSupplier.getPartitionIdSupplier().get(), RECORD_INDEX.getAndIncrement(), file.getName());
writeAvro(key.getRecordKey(), avroRecord);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
import org.apache.hudi.io.storage.HoodieOrcConfig;
import org.apache.hudi.io.storage.HoodieOrcWriter;
import org.apache.hudi.io.storage.HoodieParquetWriter;
import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -113,10 +113,9 @@ public Path withInserts(String partition, String fileId, List<HoodieRecord> reco
HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024,
new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()));
try (HoodieParquetWriter writer = new HoodieParquetWriter(
currentInstantTime,
new Path(Paths.get(basePath, partition, fileName).toString()),
config, schema, contextSupplier, populateMetaFields)) {
try (HoodieAvroParquetWriter writer = new HoodieAvroParquetWriter<>(
new Path(Paths.get(basePath, partition, fileName).toString()), config, currentInstantTime,
contextSupplier, populateMetaFields)) {
int seqId = 1;
for (HoodieRecord record : records) {
GenericRecord avroRecord = (GenericRecord) ((HoodieRecordPayload) record.getData()).getInsertValue(schema).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ public void testFileSizeUpsertRecords() throws Exception {

List<HoodieRecord> records = new ArrayList<>();
// Approx 1150 records are written for block size of 64KB
for (int i = 0; i < 2000; i++) {
for (int i = 0; i < 2050; i++) {
String recordStr = "{\"_row_key\":\"" + UUID.randomUUID().toString()
+ "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":" + i + "}";
RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
Expand All @@ -402,7 +402,8 @@ public void testFileSizeUpsertRecords() throws Exception {
counts++;
}
}
assertEquals(5, counts, "If the number of records are more than 1150, then there should be a new file");
// we check canWrite only once every 1000 records. and so 2 files with 1000 records and 3rd file with 50 records.
assertEquals(3, counts, "If the number of records are more than 1150, then there should be a new file");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,24 @@
package org.apache.hudi.io.storage.row;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;

import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.hudi.io.storage.HoodieBaseParquetWriter;
import org.apache.spark.sql.catalyst.InternalRow;

import java.io.IOException;

/**
* Parquet's impl of {@link HoodieInternalRowFileWriter} to write {@link InternalRow}s.
*/
public class HoodieInternalRowParquetWriter extends ParquetWriter<InternalRow>
public class HoodieInternalRowParquetWriter extends HoodieBaseParquetWriter<InternalRow>
implements HoodieInternalRowFileWriter {

private final Path file;
private final HoodieWrapperFileSystem fs;
private final long maxFileSize;
private final HoodieRowParquetWriteSupport writeSupport;

public HoodieInternalRowParquetWriter(Path file, HoodieRowParquetConfig parquetConfig)
throws IOException {
super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(), parquetConfig.getCompressionCodecName(),
parquetConfig.getBlockSize(), parquetConfig.getPageSize(), parquetConfig.getPageSize(),
DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED,
DEFAULT_WRITER_VERSION, FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf());
this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(FSUtils.registerFileSystem(file,
parquetConfig.getHadoopConf()));
this.maxFileSize = parquetConfig.getMaxFileSize()
+ Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio());
this.writeSupport = parquetConfig.getWriteSupport();
}
super(file, parquetConfig);

@Override
public boolean canWrite() {
return getDataSize() < maxFileSize;
this.writeSupport = parquetConfig.getWriteSupport();
}

@Override
Expand All @@ -69,9 +49,4 @@ public void writeRow(String key, InternalRow row) throws IOException {
public void writeRow(InternalRow row) throws IOException {
super.write(row);
}

@Override
public void close() throws IOException {
super.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void testGetFileWriter() throws IOException {
SparkTaskContextSupplier supplier = new SparkTaskContextSupplier();
HoodieFileWriter<IndexedRecord> parquetWriter = HoodieFileWriterFactory.getFileWriter(instantTime,
parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
assertTrue(parquetWriter instanceof HoodieParquetWriter);
assertTrue(parquetWriter instanceof HoodieAvroParquetWriter);

// hfile format.
final Path hfilePath = new Path(basePath + "/partition/path/f1_1-0-1_000.hfile");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ public void testFileSizeUpsertRecords() throws Exception {

List<HoodieRecord> records = new ArrayList<>();
// Approx 1150 records are written for block size of 64KB
for (int i = 0; i < 2000; i++) {
for (int i = 0; i < 2050; i++) {
String recordStr = "{\"_row_key\":\"" + UUID.randomUUID().toString()
+ "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":" + i + "}";
RawTripTestPayload rowChange = new RawTripTestPayload(recordStr);
Expand All @@ -441,7 +441,8 @@ public void testFileSizeUpsertRecords() throws Exception {
counts++;
}
}
assertEquals(5, counts, "If the number of records are more than 1150, then there should be a new file");
// we check canWrite only once every 1000 records. and so 2 files with 1000 records and 3rd file with 50 records.
assertEquals(3, counts, "If the number of records are more than 1150, then there should be a new file");
}

@Test
Expand Down