Skip to content

Commit

Permalink
Merge branch 'master' into duplicate-insert-test
Browse files Browse the repository at this point in the history
* master:
  [HUDI-5003] Fix the type of InLineFileSystem`startOffset to long (apache#6916)
  [HUDI-5022] Make better error messages for pr compliance (apache#6934)
  [MINOR] Fixing verbosity of docker set up (apache#6944)
  [HUDI-5037] Upgrade org.apache.thrift:libthrift to 0.14.0 (apache#6941)
  [HUDI-5033] Fix Broken Link In MultipleSparkJobExecutionStrategy (apache#6951)
  [HUDI-5030] Fix TestPartialUpdateAvroPayload.testUseLatestRecordMetaValue(apache#6948)
  [HUDI-4948] Improve CDC Write (apache#6818)
  • Loading branch information
Zouxxyy committed Oct 17, 2022
2 parents bb6868a + 76f3c6a commit a7b34a7
Show file tree
Hide file tree
Showing 26 changed files with 550 additions and 285 deletions.
4 changes: 2 additions & 2 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ _Describe context and summary for this change. Highlight if any code was copied.

_Describe any public API or user-facing feature change or any performance impact._

**Risk level: none | low | medium | high**
### Risk level (write none, low medium or high below)

_Choose one. If medium or high, explain what verification was done to mitigate the risks._
_If medium or high, explain what verification was done to mitigate the risks._

### Documentation Update

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pr_compliance.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-python@v3
- name: run script
run: python3 scripts/pr_compliance.py > test.log || { echo "::error::pr_compliance.py $(cat test.log)" && exit 1; }
run: python3 scripts/pr_compliance.py



2 changes: 1 addition & 1 deletion docker/setup_demo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if [ "$HUDI_DEMO_ENV" != "dev" ]; then
HUDI_WS=${WS_ROOT} docker-compose -f ${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} pull
fi
sleep 5
HUDI_WS=${WS_ROOT} docker-compose --verbose -f ${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} up -d
HUDI_WS=${WS_ROOT} docker-compose -f ${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} up -d
sleep 15

docker exec -it adhoc-1 /bin/bash /var/hoodie/ws/docker/demo/setup_demo_container.sh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.cdc.HoodieCDCOperation;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
Expand All @@ -33,6 +32,7 @@
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -49,10 +49,12 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
Expand All @@ -64,6 +66,10 @@ public class HoodieCDCLogger implements Closeable {

private final String keyField;

private final String partitionPath;

private final FileSystem fs;

private final Schema dataSchema;

// writer for cdc data
Expand All @@ -73,35 +79,56 @@ public class HoodieCDCLogger implements Closeable {

private final Schema cdcSchema;

private final String cdcSchemaString;

// the cdc data
private final Map<String, HoodieAvroPayload> cdcData;

private final Map<HoodieLogBlock.HeaderMetadataType, String> cdcDataBlockHeader;

// the cdc record transformer
private final CDCTransformer transformer;

// Max block size to limit to for a log block
private final int maxBlockSize;

// Average cdc record size. This size is updated at the end of every log block flushed to disk
private long averageCDCRecordSize = 0;

// Number of records that must be written to meet the max block size for a log block
private AtomicInteger numOfCDCRecordsInMemory = new AtomicInteger();

private final SizeEstimator<HoodieAvroPayload> sizeEstimator;

private final List<Path> cdcAbsPaths;

public HoodieCDCLogger(
String commitTime,
HoodieWriteConfig config,
HoodieTableConfig tableConfig,
String partitionPath,
FileSystem fs,
Schema schema,
HoodieLogFormat.Writer cdcWriter,
long maxInMemorySizeInBytes) {
try {
this.commitTime = commitTime;
this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema);
this.keyField = config.populateMetaFields()
? HoodieRecord.RECORD_KEY_METADATA_FIELD
: tableConfig.getRecordKeyFieldProp();
this.partitionPath = partitionPath;
this.fs = fs;
this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema);
this.cdcWriter = cdcWriter;
this.cdcSupplementalLoggingMode = tableConfig.cdcSupplementalLoggingMode();
this.cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
cdcSupplementalLoggingMode,
dataSchema
);
this.cdcSchemaString = this.cdcSchema.toString();

this.cdcDataBlockHeader = new HashMap<>();
this.cdcDataBlockHeader.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime);
this.cdcDataBlockHeader.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, cdcSchema.toString());

this.sizeEstimator = new DefaultSizeEstimator<>();
this.cdcData = new ExternalSpillableMap<>(
maxInMemorySizeInBytes,
config.getSpillableMapBasePath(),
Expand All @@ -110,6 +137,9 @@ public HoodieCDCLogger(
config.getCommonConfig().getSpillableDiskMapType(),
config.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
this.transformer = getTransformer();
this.maxBlockSize = config.getLogFileDataBlockMaxSize();

this.cdcAbsPaths = new ArrayList<>();
} catch (IOException e) {
throw new HoodieUpsertException("Failed to initialize HoodieCDCLogger", e);
}
Expand All @@ -136,49 +166,70 @@ public void put(HoodieRecord hoodieRecord,
cdcRecord = this.transformer.transform(HoodieCDCOperation.DELETE, recordKey,
oldRecord, null);
}
cdcData.put(recordKey, new HoodieAvroPayload(Option.of(cdcRecord)));

flushIfNeeded(false);
HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(cdcRecord));
if (cdcData.isEmpty()) {
averageCDCRecordSize = sizeEstimator.sizeEstimate(payload);
}
cdcData.put(recordKey, payload);
numOfCDCRecordsInMemory.incrementAndGet();
}

public Option<AppendResult> writeCDCData() {
if (isEmpty()) {
return Option.empty();
private void flushIfNeeded(Boolean force) {
if (force || numOfCDCRecordsInMemory.get() * averageCDCRecordSize >= maxBlockSize) {
try {
List<IndexedRecord> records = cdcData.values().stream()
.map(record -> {
try {
return record.getInsertValue(cdcSchema).get();
} catch (IOException e) {
throw new HoodieIOException("Failed to get cdc record", e);
}
}).collect(Collectors.toList());

HoodieLogBlock block = new HoodieCDCDataBlock(records, cdcDataBlockHeader, keyField);
AppendResult result = cdcWriter.appendBlocks(Collections.singletonList(block));

Path cdcAbsPath = result.logFile().getPath();
if (!cdcAbsPaths.contains(cdcAbsPath)) {
cdcAbsPaths.add(cdcAbsPath);
}

// reset stat
cdcData.clear();
numOfCDCRecordsInMemory = new AtomicInteger();
} catch (Exception e) {
throw new HoodieException("Failed to write the cdc data to " + cdcWriter.getLogFile().getPath(), e);
}
}
}

public Map<String, Long> getCDCWriteStats() {
Map<String, Long> stats = new HashMap<>();
try {
List<IndexedRecord> records = cdcData.values().stream()
.map(record -> {
try {
return record.getInsertValue(cdcSchema).get();
} catch (IOException e) {
throw new HoodieIOException("Failed to get cdc record", e);
}
}).collect(Collectors.toList());

Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, cdcSchemaString);

HoodieLogBlock block = new HoodieCDCDataBlock(records, header, keyField);
AppendResult result = cdcWriter.appendBlocks(Collections.singletonList(block));

// call close to trigger the data flush.
this.close();

return Option.of(result);
} catch (Exception e) {
throw new HoodieException("Failed to write the cdc data to " + cdcWriter.getLogFile().getPath(), e);
for (Path cdcAbsPath : cdcAbsPaths) {
String cdcFileName = cdcAbsPath.getName();
String cdcPath = StringUtils.isNullOrEmpty(partitionPath) ? cdcFileName : partitionPath + "/" + cdcFileName;
stats.put(cdcPath, FSUtils.getFileSize(fs, cdcAbsPath));
}
} catch (IOException e) {
throw new HoodieUpsertException("Failed to get cdc write stat", e);
}
return stats;
}

@Override
public void close() {
try {
flushIfNeeded(true);
if (cdcWriter != null) {
cdcWriter.close();
}
} catch (IOException e) {
throw new HoodieIOException("Failed to close HoodieCDCLogger", e);
} finally {
// in case that crash when call `flushIfNeeded`, do the cleanup again.
cdcData.clear();
}
}
Expand All @@ -204,40 +255,6 @@ private GenericRecord removeCommitMetadata(GenericRecord record) {
return record == null ? null : HoodieAvroUtils.rewriteRecordWithNewSchema(record, dataSchema, Collections.emptyMap());
}

public boolean isEmpty() {
return this.cdcData.isEmpty();
}

public static Option<AppendResult> writeCDCDataIfNeeded(HoodieCDCLogger cdcLogger,
long recordsWritten,
long insertRecordsWritten) {
if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
// the following cases where we do not need to write out the cdc file:
// case 1: all the data from the previous file slice are deleted. and no new data is inserted;
// case 2: all the incoming data is INSERT.
return Option.empty();
}
return cdcLogger.writeCDCData();
}

public static void setCDCStatIfNeeded(HoodieWriteStat stat,
Option<AppendResult> cdcResult,
String partitionPath,
FileSystem fs) {
try {
if (cdcResult.isPresent()) {
Path cdcLogFile = cdcResult.get().logFile().getPath();
String cdcFileName = cdcLogFile.getName();
String cdcPath = StringUtils.isNullOrEmpty(partitionPath) ? cdcFileName : partitionPath + "/" + cdcFileName;
long cdcFileSizeInBytes = FSUtils.getFileSize(fs, cdcLogFile);
stat.setCdcPath(cdcPath);
stat.setCdcWriteBytes(cdcFileSizeInBytes);
}
} catch (IOException e) {
throw new HoodieUpsertException("Failed to set cdc write stat", e);
}
}

// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.BaseKeyGenerator;
Expand All @@ -52,6 +52,8 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTi
instantTime,
config,
hoodieTable.getMetaClient().getTableConfig(),
partitionPath,
fs,
tableSchema,
createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
Expand All @@ -68,6 +70,8 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTi
instantTime,
config,
hoodieTable.getMetaClient().getTableConfig(),
partitionPath,
fs,
tableSchema,
createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
Expand All @@ -93,9 +97,17 @@ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRec
public List<WriteStatus> close() {
List<WriteStatus> writeStatuses = super.close();
// if there are cdc data written, set the CDC-related information.
Option<AppendResult> cdcResult =
HoodieCDCLogger.writeCDCDataIfNeeded(cdcLogger, recordsWritten, insertRecordsWritten);
HoodieCDCLogger.setCDCStatIfNeeded(writeStatuses.get(0).getStat(), cdcResult, partitionPath, fs);

if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
// the following cases where we do not need to write out the cdc file:
// case 1: all the data from the previous file slice are deleted. and no new data is inserted;
// case 2: all the incoming data is INSERT.
return writeStatuses;
}

cdcLogger.close();
HoodieWriteStat stat = writeStatuses.get(0).getStat();
stat.setCdcStats(cdcLogger.getCDCWriteStats());
return writeStatuses;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -54,6 +54,8 @@ public FlinkMergeAndReplaceHandleWithChangeLog(HoodieWriteConfig config, String
instantTime,
config,
hoodieTable.getMetaClient().getTableConfig(),
partitionPath,
getFileSystem(),
tableSchema,
createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
Expand All @@ -78,9 +80,9 @@ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRec
@Override
public List<WriteStatus> close() {
List<WriteStatus> writeStatuses = super.close();
// if there are cdc data written, set the CDC-related information.
Option<AppendResult> cdcResult = cdcLogger.writeCDCData();
HoodieCDCLogger.setCDCStatIfNeeded(writeStatuses.get(0).getStat(), cdcResult, partitionPath, fs);
cdcLogger.close();
HoodieWriteStat stat = writeStatuses.get(0).getStat();
stat.setCdcStats(cdcLogger.getCDCWriteStats());
return writeStatuses;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -57,6 +57,8 @@ public FlinkMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTim
instantTime,
config,
hoodieTable.getMetaClient().getTableConfig(),
partitionPath,
getFileSystem(),
tableSchema,
createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
Expand All @@ -81,9 +83,9 @@ protected void writeInsertRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRec
@Override
public List<WriteStatus> close() {
List<WriteStatus> writeStatuses = super.close();
// if there are cdc data written, set the CDC-related information.
Option<AppendResult> cdcResult = cdcLogger.writeCDCData();
HoodieCDCLogger.setCDCStatIfNeeded(writeStatuses.get(0).getStat(), cdcResult, partitionPath, fs);
cdcLogger.close();
HoodieWriteStat stat = writeStatuses.get(0).getStat();
stat.setCdcStats(cdcLogger.getCDCWriteStats());
return writeStatuses;
}
}
Loading

0 comments on commit a7b34a7

Please sign in to comment.