diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 17ad995a97a7a..b1902aab5f019 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -6,9 +6,9 @@ _Describe context and summary for this change. Highlight if any code was copied. _Describe any public API or user-facing feature change or any performance impact._ -**Risk level: none | low | medium | high** +### Risk level (write none, low medium or high below) -_Choose one. If medium or high, explain what verification was done to mitigate the risks._ +_If medium or high, explain what verification was done to mitigate the risks._ ### Documentation Update diff --git a/.github/workflows/pr_compliance.yml b/.github/workflows/pr_compliance.yml index 67affbb7b749f..542a0a54672da 100644 --- a/.github/workflows/pr_compliance.yml +++ b/.github/workflows/pr_compliance.yml @@ -15,7 +15,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-python@v3 - name: run script - run: python3 scripts/pr_compliance.py > test.log || { echo "::error::pr_compliance.py $(cat test.log)" && exit 1; } + run: python3 scripts/pr_compliance.py diff --git a/docker/setup_demo.sh b/docker/setup_demo.sh index 81270bba75ffe..e847f913a5ac9 100755 --- a/docker/setup_demo.sh +++ b/docker/setup_demo.sh @@ -30,7 +30,7 @@ if [ "$HUDI_DEMO_ENV" != "dev" ]; then HUDI_WS=${WS_ROOT} docker-compose -f ${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} pull fi sleep 5 -HUDI_WS=${WS_ROOT} docker-compose --verbose -f ${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} up -d +HUDI_WS=${WS_ROOT} docker-compose -f ${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} up -d sleep 15 docker exec -it adhoc-1 /bin/bash /var/hoodie/ws/docker/demo/setup_demo_container.sh diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java index 303eea76dbd0b..1b62eca76142c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java @@ -22,7 +22,6 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.cdc.HoodieCDCOperation; import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode; @@ -33,6 +32,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.SizeEstimator; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.config.HoodieWriteConfig; @@ -49,10 +49,12 @@ import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** @@ -64,6 +66,10 @@ public class HoodieCDCLogger implements Closeable { private final String keyField; + private final String partitionPath; + + private final FileSystem fs; + private final Schema dataSchema; // writer for cdc data @@ -73,35 +79,56 @@ public class HoodieCDCLogger implements Closeable { private final Schema cdcSchema; - private final String cdcSchemaString; - // the cdc data private final Map cdcData; + private final Map cdcDataBlockHeader; + // the cdc record transformer private final CDCTransformer transformer; + // Max block size to limit to for a log block + private final int maxBlockSize; + + // Average cdc record size. This size is updated at the end of every log block flushed to disk + private long averageCDCRecordSize = 0; + + // Number of records that must be written to meet the max block size for a log block + private AtomicInteger numOfCDCRecordsInMemory = new AtomicInteger(); + + private final SizeEstimator sizeEstimator; + + private final List cdcAbsPaths; + public HoodieCDCLogger( String commitTime, HoodieWriteConfig config, HoodieTableConfig tableConfig, + String partitionPath, + FileSystem fs, Schema schema, HoodieLogFormat.Writer cdcWriter, long maxInMemorySizeInBytes) { try { this.commitTime = commitTime; - this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema); this.keyField = config.populateMetaFields() ? HoodieRecord.RECORD_KEY_METADATA_FIELD : tableConfig.getRecordKeyFieldProp(); + this.partitionPath = partitionPath; + this.fs = fs; + this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema); this.cdcWriter = cdcWriter; this.cdcSupplementalLoggingMode = tableConfig.cdcSupplementalLoggingMode(); this.cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode( cdcSupplementalLoggingMode, dataSchema ); - this.cdcSchemaString = this.cdcSchema.toString(); + this.cdcDataBlockHeader = new HashMap<>(); + this.cdcDataBlockHeader.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime); + this.cdcDataBlockHeader.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, cdcSchema.toString()); + + this.sizeEstimator = new DefaultSizeEstimator<>(); this.cdcData = new ExternalSpillableMap<>( maxInMemorySizeInBytes, config.getSpillableMapBasePath(), @@ -110,6 +137,9 @@ public HoodieCDCLogger( config.getCommonConfig().getSpillableDiskMapType(), config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()); this.transformer = getTransformer(); + this.maxBlockSize = config.getLogFileDataBlockMaxSize(); + + this.cdcAbsPaths = new ArrayList<>(); } catch (IOException e) { throw new HoodieUpsertException("Failed to initialize HoodieCDCLogger", e); } @@ -136,49 +166,70 @@ public void put(HoodieRecord hoodieRecord, cdcRecord = this.transformer.transform(HoodieCDCOperation.DELETE, recordKey, oldRecord, null); } - cdcData.put(recordKey, new HoodieAvroPayload(Option.of(cdcRecord))); + + flushIfNeeded(false); + HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(cdcRecord)); + if (cdcData.isEmpty()) { + averageCDCRecordSize = sizeEstimator.sizeEstimate(payload); + } + cdcData.put(recordKey, payload); + numOfCDCRecordsInMemory.incrementAndGet(); } - public Option writeCDCData() { - if (isEmpty()) { - return Option.empty(); + private void flushIfNeeded(Boolean force) { + if (force || numOfCDCRecordsInMemory.get() * averageCDCRecordSize >= maxBlockSize) { + try { + List records = cdcData.values().stream() + .map(record -> { + try { + return record.getInsertValue(cdcSchema).get(); + } catch (IOException e) { + throw new HoodieIOException("Failed to get cdc record", e); + } + }).collect(Collectors.toList()); + + HoodieLogBlock block = new HoodieCDCDataBlock(records, cdcDataBlockHeader, keyField); + AppendResult result = cdcWriter.appendBlocks(Collections.singletonList(block)); + + Path cdcAbsPath = result.logFile().getPath(); + if (!cdcAbsPaths.contains(cdcAbsPath)) { + cdcAbsPaths.add(cdcAbsPath); + } + + // reset stat + cdcData.clear(); + numOfCDCRecordsInMemory = new AtomicInteger(); + } catch (Exception e) { + throw new HoodieException("Failed to write the cdc data to " + cdcWriter.getLogFile().getPath(), e); + } } + } + public Map getCDCWriteStats() { + Map stats = new HashMap<>(); try { - List records = cdcData.values().stream() - .map(record -> { - try { - return record.getInsertValue(cdcSchema).get(); - } catch (IOException e) { - throw new HoodieIOException("Failed to get cdc record", e); - } - }).collect(Collectors.toList()); - - Map header = new HashMap<>(); - header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime); - header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, cdcSchemaString); - - HoodieLogBlock block = new HoodieCDCDataBlock(records, header, keyField); - AppendResult result = cdcWriter.appendBlocks(Collections.singletonList(block)); - - // call close to trigger the data flush. - this.close(); - - return Option.of(result); - } catch (Exception e) { - throw new HoodieException("Failed to write the cdc data to " + cdcWriter.getLogFile().getPath(), e); + for (Path cdcAbsPath : cdcAbsPaths) { + String cdcFileName = cdcAbsPath.getName(); + String cdcPath = StringUtils.isNullOrEmpty(partitionPath) ? cdcFileName : partitionPath + "/" + cdcFileName; + stats.put(cdcPath, FSUtils.getFileSize(fs, cdcAbsPath)); + } + } catch (IOException e) { + throw new HoodieUpsertException("Failed to get cdc write stat", e); } + return stats; } @Override public void close() { try { + flushIfNeeded(true); if (cdcWriter != null) { cdcWriter.close(); } } catch (IOException e) { throw new HoodieIOException("Failed to close HoodieCDCLogger", e); } finally { + // in case that crash when call `flushIfNeeded`, do the cleanup again. cdcData.clear(); } } @@ -204,40 +255,6 @@ private GenericRecord removeCommitMetadata(GenericRecord record) { return record == null ? null : HoodieAvroUtils.rewriteRecordWithNewSchema(record, dataSchema, Collections.emptyMap()); } - public boolean isEmpty() { - return this.cdcData.isEmpty(); - } - - public static Option writeCDCDataIfNeeded(HoodieCDCLogger cdcLogger, - long recordsWritten, - long insertRecordsWritten) { - if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) { - // the following cases where we do not need to write out the cdc file: - // case 1: all the data from the previous file slice are deleted. and no new data is inserted; - // case 2: all the incoming data is INSERT. - return Option.empty(); - } - return cdcLogger.writeCDCData(); - } - - public static void setCDCStatIfNeeded(HoodieWriteStat stat, - Option cdcResult, - String partitionPath, - FileSystem fs) { - try { - if (cdcResult.isPresent()) { - Path cdcLogFile = cdcResult.get().logFile().getPath(); - String cdcFileName = cdcLogFile.getName(); - String cdcPath = StringUtils.isNullOrEmpty(partitionPath) ? cdcFileName : partitionPath + "/" + cdcFileName; - long cdcFileSizeInBytes = FSUtils.getFileSize(fs, cdcLogFile); - stat.setCdcPath(cdcPath); - stat.setCdcWriteBytes(cdcFileSizeInBytes); - } - } catch (IOException e) { - throw new HoodieUpsertException("Failed to set cdc write stat", e); - } - } - // ------------------------------------------------------------------------- // Inner Class // ------------------------------------------------------------------------- diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java index 910bc42158d8d..d7ab49a039a46 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java @@ -24,8 +24,8 @@ import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.cdc.HoodieCDCUtils; -import org.apache.hudi.common.table.log.AppendResult; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.keygen.BaseKeyGenerator; @@ -52,6 +52,8 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTi instantTime, config, hoodieTable.getMetaClient().getTableConfig(), + partitionPath, + fs, tableSchema, createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX), IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config)); @@ -68,6 +70,8 @@ public HoodieMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTi instantTime, config, hoodieTable.getMetaClient().getTableConfig(), + partitionPath, + fs, tableSchema, createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX), IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config)); @@ -93,9 +97,17 @@ protected void writeInsertRecord(HoodieRecord hoodieRecord, Option close() { List writeStatuses = super.close(); // if there are cdc data written, set the CDC-related information. - Option cdcResult = - HoodieCDCLogger.writeCDCDataIfNeeded(cdcLogger, recordsWritten, insertRecordsWritten); - HoodieCDCLogger.setCDCStatIfNeeded(writeStatuses.get(0).getStat(), cdcResult, partitionPath, fs); + + if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) { + // the following cases where we do not need to write out the cdc file: + // case 1: all the data from the previous file slice are deleted. and no new data is inserted; + // case 2: all the incoming data is INSERT. + return writeStatuses; + } + + cdcLogger.close(); + HoodieWriteStat stat = writeStatuses.get(0).getStat(); + stat.setCdcStats(cdcLogger.getCDCWriteStats()); return writeStatuses; } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java index 62b5481cf9b8d..0c54f4e1a82ca 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java @@ -23,8 +23,8 @@ import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.cdc.HoodieCDCUtils; -import org.apache.hudi.common.table.log.AppendResult; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; @@ -54,6 +54,8 @@ public FlinkMergeAndReplaceHandleWithChangeLog(HoodieWriteConfig config, String instantTime, config, hoodieTable.getMetaClient().getTableConfig(), + partitionPath, + getFileSystem(), tableSchema, createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX), IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config)); @@ -78,9 +80,9 @@ protected void writeInsertRecord(HoodieRecord hoodieRecord, Option close() { List writeStatuses = super.close(); - // if there are cdc data written, set the CDC-related information. - Option cdcResult = cdcLogger.writeCDCData(); - HoodieCDCLogger.setCDCStatIfNeeded(writeStatuses.get(0).getStat(), cdcResult, partitionPath, fs); + cdcLogger.close(); + HoodieWriteStat stat = writeStatuses.get(0).getStat(); + stat.setCdcStats(cdcLogger.getCDCWriteStats()); return writeStatuses; } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java index f6adbbf0d492c..1dd5befa1d097 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java @@ -23,8 +23,8 @@ import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.cdc.HoodieCDCUtils; -import org.apache.hudi.common.table.log.AppendResult; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; @@ -57,6 +57,8 @@ public FlinkMergeHandleWithChangeLog(HoodieWriteConfig config, String instantTim instantTime, config, hoodieTable.getMetaClient().getTableConfig(), + partitionPath, + getFileSystem(), tableSchema, createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX), IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config)); @@ -81,9 +83,9 @@ protected void writeInsertRecord(HoodieRecord hoodieRecord, Option close() { List writeStatuses = super.close(); - // if there are cdc data written, set the CDC-related information. - Option cdcResult = cdcLogger.writeCDCData(); - HoodieCDCLogger.setCDCStatIfNeeded(writeStatuses.get(0).getStat(), cdcResult, partitionPath, fs); + cdcLogger.close(); + HoodieWriteStat stat = writeStatuses.get(0).getStat(); + stat.setCdcStats(cdcLogger.getCDCWriteStats()); return writeStatuses; } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 171f54fc2dfa2..80b9f400f6143 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -137,7 +137,7 @@ public HoodieWriteMetadata> performClustering(final Hood /** * Execute clustering to write inputRecords into new files based on strategyParams. - * Different from {@link performClusteringWithRecordsRDD}, this method take {@link Dataset} + * Different from {@link MultipleSparkJobExecutionStrategy#performClusteringWithRecordsRDD}, this method take {@link Dataset} * as inputs. */ public abstract HoodieData performClusteringWithRecordsAsRow(final Dataset inputRecords, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 1f69bea2d8fdb..c3d8e9f8be617 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -451,9 +451,13 @@ public static Integer getTaskAttemptIdFromLogPath(Path path) { * Get the last part of the file name in the log file and convert to int. */ public static int getFileVersionFromLog(Path logPath) { - Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName()); + return getFileVersionFromLog(logPath.getName()); + } + + public static int getFileVersionFromLog(String logFileName) { + Matcher matcher = LOG_FILE_PATTERN.matcher(logFileName); if (!matcher.find()) { - throw new InvalidHoodiePathException(logPath, "LogFile"); + throw new HoodieIOException("Invalid log file name: " + logFileName); } return Integer.parseInt(matcher.group(4)); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java index 080f228f161e9..6031f29d907d3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java @@ -95,11 +95,11 @@ public static Path getOuterFilePathFromInlinePath(Path inlineFSPath) { * input: "inlinefs://file1/s3a/?start_offset=20&length=40". * output: 20 */ - public static int startOffset(Path inlineFSPath) { + public static long startOffset(Path inlineFSPath) { assertInlineFSPath(inlineFSPath); String[] slices = inlineFSPath.toString().split("[?&=]"); - return Integer.parseInt(slices[slices.length - 3]); + return Long.parseLong(slices[slices.length - 3]); } /** @@ -108,11 +108,11 @@ public static int startOffset(Path inlineFSPath) { * input: "inlinefs:/file1/s3a/?start_offset=20&length=40". * output: 40 */ - public static int length(Path inlinePath) { + public static long length(Path inlinePath) { assertInlineFSPath(inlinePath); String[] slices = inlinePath.toString().split("[?&=]"); - return Integer.parseInt(slices[slices.length - 1]); + return Long.parseLong(slices[slices.length - 1]); } private static void assertInlineFSPath(Path inlinePath) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFsDataInputStream.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFsDataInputStream.java index 4e8701244c2ad..fbd067c6c18cb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFsDataInputStream.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFsDataInputStream.java @@ -33,11 +33,11 @@ */ public class InLineFsDataInputStream extends FSDataInputStream { - private final int startOffset; + private final long startOffset; private final FSDataInputStream outerStream; - private final int length; + private final long length; - public InLineFsDataInputStream(int startOffset, FSDataInputStream outerStream, int length) throws IOException { + public InLineFsDataInputStream(long startOffset, FSDataInputStream outerStream, long length) throws IOException { super(outerStream.getWrappedStream()); this.startOffset = startOffset; this.outerStream = outerStream; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java index 45f7ecf541f22..91637102f0404 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java @@ -21,10 +21,12 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.util.JsonUtils; import javax.annotation.Nullable; import java.io.Serializable; +import java.util.Map; /** * Statistics about a single Hoodie write operation. @@ -45,9 +47,9 @@ public class HoodieWriteStat implements Serializable { private String path; /** - * Relative CDC file path that store the CDC data. + * Relative CDC file path that store the CDC data and its size. */ - private String cdcPath; + private Map cdcStats; /** * The previous version of the file. (null if this is the first version. i.e insert) @@ -75,11 +77,6 @@ public class HoodieWriteStat implements Serializable { */ private long numInserts; - /** - * Total number of cdc bytes written. - */ - private long cdcWriteBytes; - /** * Total number of bytes written. */ @@ -205,18 +202,10 @@ public long getTotalWriteBytes() { return totalWriteBytes; } - public long getCdcWriteBytes() { - return cdcWriteBytes; - } - public void setTotalWriteBytes(long totalWriteBytes) { this.totalWriteBytes = totalWriteBytes; } - public void setCdcWriteBytes(long cdcWriteBytes) { - this.cdcWriteBytes = cdcWriteBytes; - } - public long getTotalWriteErrors() { return totalWriteErrors; } @@ -254,12 +243,12 @@ public String getPath() { } @Nullable - public String getCdcPath() { - return cdcPath; + public Map getCdcStats() { + return cdcStats; } - public void setCdcPath(String cdcPath) { - this.cdcPath = cdcPath; + public void setCdcStats(Map cdcStats) { + this.cdcStats = cdcStats; } public String getPartitionPath() { @@ -387,7 +376,7 @@ public String toString() { return "HoodieWriteStat{fileId='" + fileId + '\'' + ", path='" + path + '\'' + ", prevCommit='" + prevCommit + '\'' + ", numWrites=" + numWrites + ", numDeletes=" + numDeletes + ", numUpdateWrites=" + numUpdateWrites + ", totalWriteBytes=" + totalWriteBytes + ", totalWriteErrors=" + totalWriteErrors + ", tempPath='" + tempPath - + '\'' + ", cdcPath='" + cdcPath + ", cdcWriteBytes=" + cdcWriteBytes + + '\'' + ", cdcStats='" + JsonUtils.toString(cdcStats) + '\'' + ", partitionPath='" + partitionPath + '\'' + ", totalLogRecords=" + totalLogRecords + ", totalLogFilesCompacted=" + totalLogFilesCompacted + ", totalLogSizeCompacted=" + totalLogSizeCompacted + ", totalUpdatedRecordsCompacted=" + totalUpdatedRecordsCompacted + ", totalLogBlocks=" + totalLogBlocks diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java index a9da1be77a470..59f4a8779a58e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java @@ -33,8 +33,8 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; @@ -142,7 +142,7 @@ public Map> extractCDCFileSplits() { if (latestFileSliceOpt.isPresent()) { HoodieFileGroupId fileGroupId = new HoodieFileGroupId(partition, fileId); HoodieCDCFileSplit changeFile = new HoodieCDCFileSplit(instant.getTimestamp(), - REPLACE_COMMIT, null, latestFileSliceOpt, Option.empty()); + REPLACE_COMMIT, new ArrayList<>(), latestFileSliceOpt, Option.empty()); if (!fgToCommitChanges.containsKey(fileGroupId)) { fgToCommitChanges.put(fileGroupId, new ArrayList<>()); } @@ -254,7 +254,7 @@ private HoodieCDCFileSplit parseWriteStat( final String instantTs = instant.getTimestamp(); HoodieCDCFileSplit cdcFileSplit; - if (StringUtils.isNullOrEmpty(writeStat.getCdcPath())) { + if (CollectionUtils.isNullOrEmpty(writeStat.getCdcStats())) { // no cdc log files can be used directly. we reuse the existing data file to retrieve the change data. String path = writeStat.getPath(); if (FSUtils.isBaseFile(new Path(path))) { @@ -271,7 +271,7 @@ private HoodieCDCFileSplit parseWriteStat( new HoodieIOException("Can not get the previous version of the base file") ); FileSlice beforeFileSlice = new FileSlice(fileGroupId, writeStat.getPrevCommit(), beforeBaseFile, Collections.emptyList()); - cdcFileSplit = new HoodieCDCFileSplit(instantTs, BASE_FILE_DELETE, null, Option.empty(), Option.of(beforeFileSlice)); + cdcFileSplit = new HoodieCDCFileSplit(instantTs, BASE_FILE_DELETE, new ArrayList<>(), Option.empty(), Option.of(beforeFileSlice)); } else if (writeStat.getNumUpdateWrites() == 0L && writeStat.getNumDeletes() == 0 && writeStat.getNumWrites() == writeStat.getNumInserts()) { // all the records in this file are new. @@ -287,7 +287,7 @@ private HoodieCDCFileSplit parseWriteStat( } else { // this is a cdc log if (supplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER)) { - cdcFileSplit = new HoodieCDCFileSplit(instantTs, AS_IS, writeStat.getCdcPath()); + cdcFileSplit = new HoodieCDCFileSplit(instantTs, AS_IS, writeStat.getCdcStats().keySet()); } else { try { HoodieBaseFile beforeBaseFile = getOrCreateFsView().getBaseFileOn( @@ -301,7 +301,7 @@ private HoodieCDCFileSplit parseWriteStat( if (supplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.OP_KEY)) { beforeFileSlice = new FileSlice(fileGroupId, writeStat.getPrevCommit(), beforeBaseFile, new ArrayList<>()); } - cdcFileSplit = new HoodieCDCFileSplit(instantTs, AS_IS, writeStat.getCdcPath(), + cdcFileSplit = new HoodieCDCFileSplit(instantTs, AS_IS, writeStat.getCdcStats().keySet(), Option.ofNullable(beforeFileSlice), Option.ofNullable(currentFileSlice)); } catch (Exception e) { throw new HoodieException("Fail to parse HoodieWriteStat", e); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java index ae082b19455d3..79aaf50f1c499 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java @@ -18,12 +18,18 @@ package org.apache.hudi.common.table.cdc; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.util.Option; import org.jetbrains.annotations.NotNull; import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; /** * This contains all the information that retrieve the change data at a single file group and @@ -55,7 +61,7 @@ public class HoodieCDCFileSplit implements Serializable, Comparable cdcFiles; /** * THe file slice that are required when retrieving the before data. @@ -71,15 +77,29 @@ public HoodieCDCFileSplit(String instant, HoodieCDCInferCase cdcInferCase, Strin this(instant, cdcInferCase, cdcFile, Option.empty(), Option.empty()); } + public HoodieCDCFileSplit(String instant, HoodieCDCInferCase cdcInferCase, Collection cdcFiles) { + this(instant, cdcInferCase, cdcFiles, Option.empty(), Option.empty()); + } + public HoodieCDCFileSplit( String instant, HoodieCDCInferCase cdcInferCase, String cdcFile, Option beforeFileSlice, Option afterFileSlice) { + this(instant, cdcInferCase, Collections.singletonList(cdcFile), beforeFileSlice, afterFileSlice); + } + + public HoodieCDCFileSplit( + String instant, + HoodieCDCInferCase cdcInferCase, + Collection cdcFiles, + Option beforeFileSlice, + Option afterFileSlice) { this.instant = instant; this.cdcInferCase = cdcInferCase; - this.cdcFile = cdcFile; + this.cdcFiles = cdcFiles.stream() + .sorted(Comparator.comparingInt(FSUtils::getFileVersionFromLog)).collect(Collectors.toList()); this.beforeFileSlice = beforeFileSlice; this.afterFileSlice = afterFileSlice; } @@ -92,8 +112,8 @@ public HoodieCDCInferCase getCdcInferCase() { return this.cdcInferCase; } - public String getCdcFile() { - return this.cdcFile; + public List getCdcFiles() { + return this.cdcFiles; } public Option getBeforeFileSlice() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java index f194ddf8f4017..3d22a68a0af65 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java @@ -18,7 +18,6 @@ package org.apache.hudi.common.table.log; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.log.block.HoodieDataBlock; import org.apache.hudi.common.util.ClosableIterator; @@ -27,58 +26,106 @@ import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; public class HoodieCDCLogRecordIterator implements ClosableIterator { - private final HoodieLogFormat.Reader reader; + private final FileSystem fs; + + private final Schema cdcSchema; + + private final Iterator cdcLogFileIter; + + private HoodieLogFormat.Reader reader; private ClosableIterator itr; - public HoodieCDCLogRecordIterator( - Configuration hadoopConf, - Path cdcLogPath, - Schema cdcSchema) throws IOException { - this(FSUtils.getFs(cdcLogPath, hadoopConf), cdcLogPath, cdcSchema); - } + private IndexedRecord record; - public HoodieCDCLogRecordIterator( - FileSystem fs, - Path cdcLogPath, - Schema cdcSchema) throws IOException { - this.reader = new HoodieLogFileReader(fs, new HoodieLogFile(fs.getFileStatus(cdcLogPath)), cdcSchema, - HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false); + public HoodieCDCLogRecordIterator(FileSystem fs, HoodieLogFile[] cdcLogFiles, Schema cdcSchema) { + this.fs = fs; + this.cdcSchema = cdcSchema; + this.cdcLogFileIter = Arrays.stream(cdcLogFiles).iterator(); } @Override public boolean hasNext() { + if (record != null) { + return true; + } if (itr == null || !itr.hasNext()) { - if (reader.hasNext()) { - HoodieDataBlock dataBlock = (HoodieDataBlock) reader.next(); - itr = dataBlock.getRecordIterator(); - return itr.hasNext(); + if (reader == null || !reader.hasNext()) { + // step1: load new file reader first. + if (!loadReader()) { + return false; + } + } + // step2: load block records iterator + if (!loadItr()) { + return false; } - return false; } + record = itr.next(); return true; } + private boolean loadReader() { + try { + closeReader(); + if (cdcLogFileIter.hasNext()) { + reader = new HoodieLogFileReader(fs, cdcLogFileIter.next(), cdcSchema, + HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false); + return reader.hasNext(); + } + return false; + } catch (IOException e) { + throw new HoodieIOException(e.getMessage()); + } + } + + private boolean loadItr() { + HoodieDataBlock dataBlock = (HoodieDataBlock) reader.next(); + closeItr(); + itr = dataBlock.getRecordIterator(); + return itr.hasNext(); + } + @Override public IndexedRecord next() { - return itr.next(); + IndexedRecord ret = record; + record = null; + return ret; } @Override public void close() { try { - itr.close(); - reader.close(); + closeItr(); + closeReader(); } catch (IOException e) { throw new HoodieIOException(e.getMessage()); } } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private void closeReader() throws IOException { + if (reader != null) { + reader.close(); + reader = null; + } + } + + private void closeItr() { + if (itr != null) { + itr.close(); + itr = null; + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java index 3b94948707f5c..5d0e86cca2160 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java @@ -58,6 +58,10 @@ public static boolean isNullOrEmpty(Collection c) { return Objects.isNull(c) || c.isEmpty(); } + public static boolean isNullOrEmpty(Map m) { + return Objects.isNull(m) || m.isEmpty(); + } + public static boolean nonEmpty(Collection c) { return !isNullOrEmpty(c); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/JsonUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/JsonUtils.java index 7c41fe4f29d95..4dce10f454530 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/JsonUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/JsonUtils.java @@ -21,8 +21,10 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hudi.exception.HoodieIOException; public class JsonUtils { @@ -41,4 +43,13 @@ public class JsonUtils { public static ObjectMapper getObjectMapper() { return MAPPER; } + + public static String toString(Object value) { + try { + return MAPPER.writeValueAsString(value); + } catch (JsonProcessingException e) { + throw new HoodieIOException( + "Fail to convert the class: " + value.getClass().getName() + " to Json String", e); + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index a8ce534d1f625..5896c1a5ebb74 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -333,9 +333,9 @@ public static List convertMetadataToFilesPartitionRecords(HoodieCo // monotonically increasing (ie file-size never goes down, unless deleted) map.merge(fileName, stat.getFileSizeInBytes(), Math::max); - String cdcPath = stat.getCdcPath(); - if (cdcPath != null) { - map.put(cdcPath, stat.getCdcWriteBytes()); + Map cdcPathAndSizes = stat.getCdcStats(); + if (cdcPathAndSizes != null && !cdcPathAndSizes.isEmpty()) { + map.putAll(cdcPathAndSizes); } return map; }, diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/InLineFSUtilsTest.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/InLineFSUtilsTest.java new file mode 100644 index 0000000000000..7d704c91126d6 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/InLineFSUtilsTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.fs.inline; + +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.testutils.FileSystemTestUtils; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests {@link InLineFileSystem}. + */ +public class InLineFSUtilsTest { + + private static Stream configParams() { + Long[] data = new Long[] { + 0L, + 1000L, + (long) Integer.MAX_VALUE + 1, + Long.MAX_VALUE + }; + return Stream.of(data).map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("configParams") + void startOffset(long startOffset) { + Path inlinePath = FileSystemTestUtils.getPhantomFile(FileSystemTestUtils.getRandomOuterFSPath(), startOffset, 0L); + assertEquals(startOffset, InLineFSUtils.startOffset(inlinePath)); + } + + @ParameterizedTest + @MethodSource("configParams") + void length(long inlineLength) { + Path inlinePath = FileSystemTestUtils.getPhantomFile(FileSystemTestUtils.getRandomOuterFSPath(), 0L, inlineLength); + assertEquals(inlineLength, InLineFSUtils.length(inlinePath)); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java index 217240666094d..d7e1a6146ad31 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java @@ -185,8 +185,8 @@ public void testUseLatestRecordMetaValue() throws IOException { record1.put("child", Arrays.asList("A")); GenericRecord record2 = new GenericData.Record(schema); - record1.put("_hoodie_commit_time", "20220915000000001"); - record1.put("_hoodie_commit_seqno", "20220915000000001_2_000"); + record2.put("_hoodie_commit_time", "20220915000000001"); + record2.put("_hoodie_commit_seqno", "20220915000000001_2_000"); record2.put("id", "1"); record2.put("partition", "partition1"); record2.put("ts", 1L); @@ -204,7 +204,7 @@ public void testUseLatestRecordMetaValue() throws IOException { // let payload2 as the latest one, then should use payload2's meta field's value as the result GenericRecord mergedRecord2 = (GenericRecord) payload2.preCombine(payload1, schema, properties).getInsertValue(schema, properties).get(); - assertEquals(mergedRecord2.get("_hoodie_commit_time").toString(), "20220915000000001"); - assertEquals(mergedRecord2.get("_hoodie_commit_seqno").toString(), "20220915000000001_2_000"); + assertEquals(mergedRecord2.get("_hoodie_commit_time").toString(), record2.get("_hoodie_commit_time").toString()); + assertEquals(mergedRecord2.get("_hoodie_commit_seqno").toString(), record2.get("_hoodie_commit_seqno").toString()); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java index 41b05c9f1e93c..f7af6f5ceddab 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java @@ -18,7 +18,9 @@ package org.apache.hudi.table.format.cdc; +import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieLogFile; @@ -33,6 +35,7 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.format.FormatUtils; import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader; import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; @@ -152,9 +155,9 @@ private RecordIterator getRecordIterator( ImageManager imageManager) throws IOException { switch (fileSplit.getCdcInferCase()) { case BASE_FILE_INSERT: - ValidationUtils.checkState(fileSplit.getCdcFile() != null, - "CDC file path should exist"); - String path = new Path(tablePath, fileSplit.getCdcFile()).toString(); + ValidationUtils.checkState(fileSplit.getCdcFiles() != null && fileSplit.getCdcFiles().size() == 1, + "CDC file path should exist and be only one"); + String path = new Path(tablePath, fileSplit.getCdcFiles().get(0)).toString(); return new AddBaseFileIterator(getRequiredSchemaReader(path)); case BASE_FILE_DELETE: ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(), @@ -319,13 +322,21 @@ abstract static class BaseImageIterator implements RecordIterator { String tablePath, MergeOnReadTableState tableState, Schema cdcSchema, - HoodieCDCFileSplit fileSplit) throws IOException { + HoodieCDCFileSplit fileSplit) { this.requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema()); this.requiredPos = getRequiredPos(tableState.getAvroSchema(), this.requiredSchema); this.recordBuilder = new GenericRecordBuilder(requiredSchema); this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType()); - Path cdcFilePath = new Path(tablePath, fileSplit.getCdcFile()); - this.cdcItr = new HoodieCDCLogRecordIterator(hadoopConf, cdcFilePath, cdcSchema); + Path hadoopTablePath = new Path(tablePath); + FileSystem fs = FSUtils.getFs(hadoopTablePath, hadoopConf); + HoodieLogFile[] cdcLogFiles = fileSplit.getCdcFiles().stream().map(cdcFile -> { + try { + return new HoodieLogFile(fs.getFileStatus(new Path(hadoopTablePath, cdcFile))); + } catch (IOException e) { + throw new HoodieIOException("Fail to call getFileStatus", e); + } + }).toArray(HoodieLogFile[]::new); + this.cdcItr = new HoodieCDCLogRecordIterator(fs, cdcLogFiles, cdcSchema); } private int[] getRequiredPos(String tableSchema, Schema required) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala index afa3189659e80..e484e5905b312 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala @@ -207,8 +207,8 @@ class HoodieCDCRDD( /** * Two cases will use this to iterator the records: - * 1) extract the change data from the base file directly, including 'ADD_BASE_File' and 'REMOVE_BASE_File'. - * 2) when the type of cdc file is 'REPLACED_FILE_GROUP', + * 1) extract the change data from the base file directly, including 'BASE_FILE_INSERT' and 'BASE_FILE_DELETE'. + * 2) when the type of cdc file is 'REPLACE_COMMIT', * use this to trace the records that are converted from the '[[beforeImageRecords]] */ private var recordIter: Iterator[InternalRow] = Iterator.empty @@ -404,7 +404,7 @@ class HoodieCDCRDD( } private def loadCdcFile(): Unit = { - // reset all the iterator or reader first. + // reset all the iterator first. recordIter = Iterator.empty logRecordIter = Iterator.empty beforeImageRecords.clear() @@ -420,8 +420,8 @@ class HoodieCDCRDD( currentChangeFile = split currentChangeFile.getCdcInferCase match { case BASE_FILE_INSERT => - assert(currentChangeFile.getCdcFile != null) - val absCDCPath = new Path(basePath, currentChangeFile.getCdcFile) + assert(currentChangeFile.getCdcFiles != null && currentChangeFile.getCdcFiles.size() == 1) + val absCDCPath = new Path(basePath, currentChangeFile.getCdcFiles.get(0)) val fileStatus = fs.getFileStatus(absCDCPath) val pf = PartitionedFile(InternalRow.empty, absCDCPath.toUri.toString, 0, fileStatus.getLen) recordIter = parquetReader(pf) @@ -429,14 +429,15 @@ class HoodieCDCRDD( assert(currentChangeFile.getBeforeFileSlice.isPresent) recordIter = loadFileSlice(currentChangeFile.getBeforeFileSlice.get) case LOG_FILE => - assert(currentChangeFile.getCdcFile != null && currentChangeFile.getBeforeFileSlice.isPresent) + assert(currentChangeFile.getCdcFiles != null && currentChangeFile.getCdcFiles.size() == 1 + && currentChangeFile.getBeforeFileSlice.isPresent) loadBeforeFileSliceIfNeeded(currentChangeFile.getBeforeFileSlice.get) - val absLogPath = new Path(basePath, currentChangeFile.getCdcFile) + val absLogPath = new Path(basePath, currentChangeFile.getCdcFiles.get(0)) val morSplit = HoodieMergeOnReadFileSplit(None, List(new HoodieLogFile(fs.getFileStatus(absLogPath)))) val logFileIterator = new LogFileIterator(morSplit, originTableSchema, originTableSchema, tableState, conf) logRecordIter = logFileIterator.logRecordsIterator() case AS_IS => - assert(currentChangeFile.getCdcFile != null) + assert(currentChangeFile.getCdcFiles != null && !currentChangeFile.getCdcFiles.isEmpty) // load beforeFileSlice to beforeImageRecords if (currentChangeFile.getBeforeFileSlice.isPresent) { loadBeforeFileSliceIfNeeded(currentChangeFile.getBeforeFileSlice.get) @@ -450,8 +451,11 @@ class HoodieCDCRDD( afterImageRecords.put(key, row.copy()) } } - val absCDCPath = new Path(basePath, currentChangeFile.getCdcFile) - cdcLogRecordIterator = new HoodieCDCLogRecordIterator(fs, absCDCPath, cdcAvroSchema) + + val cdcLogFiles = currentChangeFile.getCdcFiles.asScala.map { cdcFile => + new HoodieLogFile(fs.getFileStatus(new Path(basePath, cdcFile))) + }.toArray + cdcLogRecordIterator = new HoodieCDCLogRecordIterator(fs, cdcLogFiles, cdcAvroSchema) case REPLACE_COMMIT => if (currentChangeFile.getBeforeFileSlice.isPresent) { loadBeforeFileSliceIfNeeded(currentChangeFile.getBeforeFileSlice.get) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala index 5f447a9bce6cc..af0935b423fde 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala @@ -100,30 +100,40 @@ abstract class HoodieCDCTestBase extends HoodieClientTestBase { ) val hoodieWriteStats = commitMetadata.getWriteStats.asScala hoodieWriteStats.exists { hoodieWriteStat => - val cdcPath = hoodieWriteStat.getCdcPath - cdcPath != null && cdcPath.nonEmpty + val cdcPaths = hoodieWriteStat.getCdcStats + cdcPaths != null && cdcPaths.nonEmpty } } /** - * whether this instant will create a cdc log file. + * extract a list of cdc log file. */ - protected def getCDCLogFIle(instant: HoodieInstant): List[String] = { + protected def getCDCLogFile(instant: HoodieInstant): List[String] = { val commitMetadata = HoodieCommitMetadata.fromBytes( metaClient.reloadActiveTimeline().getInstantDetails(instant).get(), classOf[HoodieCommitMetadata] ) - commitMetadata.getWriteStats.asScala.map(_.getCdcPath).toList + commitMetadata.getWriteStats.asScala.flatMap(_.getCdcStats.keys).toList } - protected def readCDCLogFile(relativeLogFile: String, cdcSchema: Schema): List[IndexedRecord] = { + protected def getCDCBlocks(relativeLogFile: String, cdcSchema: Schema): List[HoodieDataBlock] = { val logFile = new HoodieLogFile( metaClient.getFs.getFileStatus(new Path(metaClient.getBasePathV2, relativeLogFile))) - val reader = HoodieLogFormat.newReader(fs, logFile, cdcSchema); - assertTrue(reader.hasNext); + val reader = HoodieLogFormat.newReader(fs, logFile, cdcSchema) + val blocks = scala.collection.mutable.ListBuffer.empty[HoodieDataBlock] + while(reader.hasNext) { + blocks.add(reader.next().asInstanceOf[HoodieDataBlock]) + } + blocks.toList + } - val block = reader.next().asInstanceOf[HoodieDataBlock]; - block.getRecordIterator.asScala.toList + protected def readCDCLogFile(relativeLogFile: String, cdcSchema: Schema): List[IndexedRecord] = { + val records = scala.collection.mutable.ListBuffer.empty[IndexedRecord] + val blocks = getCDCBlocks(relativeLogFile, cdcSchema) + blocks.foreach { block => + records.addAll(block.getRecordIterator.asScala.toList) + } + records.toList } protected def checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode: String, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala index 40d156d6902fc..f5435e09adfc2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala @@ -93,7 +93,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { // part of data are updated, it will write out cdc log files assertTrue(hasCDCLogFile(instant2)) // check cdc data - val cdcDataFromCDCLogFile2 = getCDCLogFIle(instant2).flatMap(readCDCLogFile(_, cdcSchema)) + val cdcDataFromCDCLogFile2 = getCDCLogFile(instant2).flatMap(readCDCLogFile(_, cdcSchema)) // check the num of cdc data assertEquals(cdcDataFromCDCLogFile2.size, 50) // check record key, before, after according to the supplemental logging mode @@ -275,7 +275,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { // part of data are updated, it will write out cdc log files assertTrue(hasCDCLogFile(instant2)) - val cdcDataFromCDCLogFile2 = getCDCLogFIle(instant2).flatMap(readCDCLogFile(_, cdcSchema)) + val cdcDataFromCDCLogFile2 = getCDCLogFile(instant2).flatMap(readCDCLogFile(_, cdcSchema)) assertEquals(cdcDataFromCDCLogFile2.size, 50) // check op assertEquals(cdcDataFromCDCLogFile2.count(r => r.get(0).toString == "u"), 30) @@ -542,4 +542,74 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { val cdcDataFrom2To3 = cdcDataFrame((commitTime2.toLong - 1).toString, commitTime3) assertCDCOpCnt(cdcDataFrom2To3, insertedCnt2, 0, deletedCnt2 + deletedCnt3) } + + @ParameterizedTest + @CsvSource(Array("cdc_op_key", "cdc_data_before", "cdc_data_before_after")) + def testCDCWithMultiBlocksAndLogFiles(cdcSupplementalLoggingMode: String): Unit = { + val (blockSize, logFileSize) = if (cdcSupplementalLoggingMode == "cdc_op_key") { + // only op and key will be stored in cdc log file, we set the smaller values for the two configs. + // so that it can also write out more than one cdc log file + // and each of cdc log file has more that one data block as we expect. + (256, 1024) + } else { + (2048, 5120) + } + val options = commonOpts ++ Map( + HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> cdcSupplementalLoggingMode, + "hoodie.logfile.data.block.max.size" -> blockSize.toString, + "hoodie.logfile.max.size" -> logFileSize.toString + ) + + // Insert Operation + val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(options) + .mode(SaveMode.Overwrite) + .save(basePath) + + metaClient = HoodieTableMetaClient.builder() + .setBasePath(basePath) + .setConf(spark.sessionState.newHadoopConf) + .build() + + val schemaResolver = new TableSchemaResolver(metaClient) + val dataSchema = schemaResolver.getTableAvroSchema(false) + val cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode( + HoodieCDCSupplementalLoggingMode.parse(cdcSupplementalLoggingMode), dataSchema) + + // Upsert Operation + val hoodieRecords2 = dataGen.generateUniqueUpdates("001", 50) + val records2 = recordsToStrings(hoodieRecords2).toList + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .options(options) + .mode(SaveMode.Append) + .save(basePath) + val instant2 = metaClient.reloadActiveTimeline.lastInstant().get() + + val cdcLogFiles2 = getCDCLogFile(instant2) + // with a small value for 'hoodie.logfile.data.max.size', + // it will write out >1 cdc log files due to rollover. + assert(cdcLogFiles2.size > 1) + // with a small value for 'hoodie.logfile.data.block.max.size', + // it will write out >1 cdc data blocks in one single cdc log file. + assert(getCDCBlocks(cdcLogFiles2.head, cdcSchema).size > 1) + + // check cdc data + val cdcDataFromCDCLogFile2 = cdcLogFiles2.flatMap(readCDCLogFile(_, cdcSchema)) + // check the num of cdc data + assertEquals(cdcDataFromCDCLogFile2.size, 50) + // check record key, before, after according to the supplemental logging mode + checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode, cdcSchema, dataSchema, + cdcDataFromCDCLogFile2, hoodieRecords2, HoodieCDCOperation.UPDATE) + + val commitTime2 = instant2.getTimestamp + var currentSnapshotData = spark.read.format("hudi").load(basePath) + // at the last commit, 100 records are inserted. + val insertedCnt2 = currentSnapshotData.count() - 100 + val updatedCnt2 = 50 - insertedCnt2 + val cdcDataOnly2 = cdcDataFrame((commitTime2.toLong - 1).toString) + assertCDCOpCnt(cdcDataOnly2, insertedCnt2, updatedCnt2, 0) + } } diff --git a/packaging/hudi-integ-test-bundle/pom.xml b/packaging/hudi-integ-test-bundle/pom.xml index 747a9303dfdf4..0f8b799da772f 100644 --- a/packaging/hudi-integ-test-bundle/pom.xml +++ b/packaging/hudi-integ-test-bundle/pom.xml @@ -734,7 +734,7 @@ org.apache.thrift libthrift - 0.9.3 + 0.14.0 diff --git a/scripts/pr_compliance.py b/scripts/pr_compliance.py index ea1e2ce3e2241..5946a355872e5 100644 --- a/scripts/pr_compliance.py +++ b/scripts/pr_compliance.py @@ -18,7 +18,7 @@ import re import os import sys - +import inspect # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # _____ _ _ _ __ __ _ _ _ _ _ # # |_ _(_) |_| | ___ \ \ / /_ _| (_) __| | __ _| |_(_) ___ _ __ # @@ -130,14 +130,15 @@ class Outcomes: # identifier: str - line that signifies the start of a section # linesAfter: set of str - default lines in the template that we ignore when # verifying that the user filled out the section -# nextSection: str - The name of the next section or "SUCCESS" if this is the -# last section class ParseSectionData: - def __init__(self, name: str, identifier: str, linesAfter, nextSection: str): + def __init__(self, name: str, identifier: str, linesAfter: str): self.name = name self.identifier = identifier self.linesAfter = linesAfter - self.nextSection = nextSection + self.prevSection = "" + self.nextSection = "" + + #returns true if line matches the identifier def identify(self, line: str): @@ -151,8 +152,8 @@ def identifyAfter(self, line: str): #Special holder of data for risk level because the identifier line is modified #by the user class RiskLevelData(ParseSectionData): - def __init__(self, name: str, identifier: str, linesAfter, nextSection: str): - super().__init__(name, identifier, linesAfter, nextSection) + def __init__(self, name: str, identifier: str, linesAfter): + super().__init__(name, identifier, linesAfter) #we check that the line start with the identifier because the identifier #line will be modified when filled out correctly @@ -165,8 +166,23 @@ def identify(self, line: str): class ParseSections: def __init__(self, psd): self.sections = {} - for x in psd: - self.sections[x.name] = x + assert len(psd) > 0 + for i in range(len(psd)): + prevI = i - 1 + nextI = i + 1 + if prevI < 0: + psd[i].prevSection = "START" + else: + psd[i].prevSection = psd[prevI].name + + if nextI >= len(psd): + psd[i].nextSection = "END" + else: + psd[i].nextSection = psd[nextI].name + + self.sections[psd[i].name] = psd[i] + + #returns true if line is an identifier for a section that is not value # PARAMS @@ -179,6 +195,18 @@ def validateOthers(self, line: str, value: str): return True return False + #gets the name of the section identified in the line + # PARAMS + # line: str - the line that we are parsing + # RETURN + # string - name of the section if the identifier is found, else none + def getSectionName(self, line: str): + for name in self.sections: + if self.sections[name].identify(line): + return name + return None + + #returns the ParseSectionData that is named name def get(self, name): return self.sections.get(name) @@ -200,9 +228,13 @@ def __init__(self, data: ParseSectionData, sections: ParseSections, debug=False) self.sections = sections #prints error message if debug is set to true - def error(self, message): + def error(self, line: str, lineno: str, message: str): if self.debug: - print(f"ERROR:(state: {self.data.name}, found: {self.found}, message: {message}") + pyline = inspect.getframeinfo(inspect.stack()[1][0]).lineno + print(f"::error file=pr_compliance.py,line={pyline}::{message}") + if lineno != "" and line != "": + print(f"::error file=pr_compliance.py,line={pyline}::found on line {lineno}: {line}") + print(f"::debug::state: {self.data.name}, found: {self.found},") #returns the name of the next section def nextSection(self): @@ -214,29 +246,55 @@ def validateAfter(self, line): return self.found and self.data.identifyAfter(line) #Decides what outcome occurs when the section identifier is found - def processIdentify(self, line): + def processIdentify(self, line, lineno): if self.found: #since we have already found the identifier, this means that we have #found a duplicate of the identifier - self.error(f"duplicate line \"{line}\"") + self.error(line, lineno, f"Duplicate {self.data.name} section found") return Outcomes.ERROR self.found = True return Outcomes.CONTINUE + + def makeValidateOthersErrorMessage(self, line): + if self.found: + if self.nextSection() != "END" and self.sections.sections[self.nextSection()].identify(line): + #we found the next identifier but haven't found a description + #yet for this section + return f"Section {self.data.name} is missing a description" + #we found a section other than the next section + return f"Section {self.data.name} should be followed by section {self.data.nextSection}" + + #section identifier has not been found yet + sectionFound = self.sections.getSectionName(line) + if sectionFound is None: + print("ERROR: none found even though validateOthers returned True") + exit(1) + elif sectionFound == self.data.prevSection: + #we have not found the current section identifier but we found the + #previous section identifier again + return f"Duplicate {self.data.prevSection} section found" + + if self.data.prevSection == "START": + return f"Section {self.data.name} should be the first section" + if sectionFound == self.data.nextSection: + return f"Missing section {self.data.name} between {self.data.prevSection} and {self.data.nextSection}" + return f"Section {self.data.name} was expected after section {self.data.prevSection}" #Decides the outcome state by processing line - def validateLine(self,line): + def validateLine(self,line,lineno): if self.data.identify(line): #we have found the identifier so we should decide what to do - return self.processIdentify(line) + return self.processIdentify(line,lineno) elif self.sections.validateOthers(line, self.data.name): #we have found the identifier for another section - self.error(f"Out of order section or missing description \"{line}\"") + #figure out what the error is + self.error(line,lineno,self.makeValidateOthersErrorMessage(line)) return Outcomes.ERROR elif self.validateAfter(line): #the pr author has added new text to this section so we consider it #to be filled out - if self.nextSection() == "SUCCESS": - #if next section is "SUCCESS" then there are no more sections + if self.nextSection() == "END": + #if next section is "END" then there are no more sections #to process return Outcomes.SUCCESS return Outcomes.NEXTSECTION @@ -250,47 +308,15 @@ def __init__(self, data: ParseSectionData, sections: ParseSections, debug=False) #After finding the identifier we don't need to look for anything else so we #can just go to the next section or terminate if this is the last - def processIdentify(self, line): - o = super().processIdentify(line) + def processIdentify(self, line, lineno): + o = super().processIdentify(line, lineno) if o == Outcomes.CONTINUE: - if self.nextSection() == "SUCCESS": + if self.nextSection() == "END": return Outcomes.SUCCESS else: return Outcomes.NEXTSECTION return o -#Risk level has different processing because the pr author will modify the -#identifier and doesn't need to add description if risk is none or low -class RiskLevelSection(ParseSection): - def __init__(self, data: ParseSectionData, sections: ParseSections, debug=False): - super().__init__(data, sections, debug) - - def processIdentify(self, line): - if self.found: - #since we have already found the identifier, this means that we have - #found a duplicate of the identifier - self.error(f"duplicate line starting with \"{self.identifier}\"") - return Outcomes.ERROR - if line == "**Risk level: none | low | medium | high**": - #the user has not modified this line so a risk level was not chosen - self.error("risk level not chosen") - return Outcomes.ERROR - if "NONE" in line.upper() or "LOW" in line.upper(): - # an explanation is not required for none or low so we can just - # move on to the next section or terminate if this is the last - if self.nextSection() == "SUCCESS": - return Outcomes.SUCCESS - else: - return Outcomes.NEXTSECTION - elif "MEDIUM" in line.upper() or "HIGH" in line.upper(): - # an explanation is required so we don't change state - self.found = True - return Outcomes.CONTINUE - else: - #the pr author put something weird in for risk level - self.error("invalid choice for risk level") - return Outcomes.ERROR - #Class that orchestrates the validation of the entire body class ValidateBody: def __init__(self, body: str, firstSection: str, sections: ParseSections, debug=False): @@ -319,13 +345,11 @@ def nextSection(self): #get the data for that section data = self.sections.get(sectionName) if data is None: - print(f"parse section {sectionName} not found") + print(f"ERROR with your parse section setup. Parse section {sectionName} not found") exit(-3) #create the section - if data.name == "RISKLEVEL": - self.section = RiskLevelSection(data=data, sections=self.sections, debug=self.debug) - elif data.name == "CHECKLIST": + if data.name == "CHECKLIST": self.section = NoDataSection(data=data, sections=self.sections, debug=self.debug) else: self.section = ParseSection(data=data, sections=self.sections, debug=self.debug) @@ -336,13 +360,13 @@ def validate(self): self.nextSection() #validate each line - for line in self.body.splitlines(): + for lineno, line in enumerate(self.body.splitlines(), 1): #ignore empty lines if len(line) == 0: continue #run the parse section validation - o = self.section.validateLine(line) + o = self.section.validateLine(line, lineno) #decide what to do based on outcome if o == Outcomes.ERROR: @@ -353,7 +377,13 @@ def validate(self): self.nextSection() #if we get through all the lines without a success outcome, then the #body does not comply - self.section.error("template is not filled out properly") + if self.section.data.nextSection == "END": + if self.section.found: + self.section.error("","",f"Section {self.section.data.name} is missing a description") + return False + self.section.error("","",f"Missing section {self.section.data.name} at the end") + return False + self.section.error("","", "Please make sure you have filled out the template correctly. You can find a blank template in /.github/PULL_REQUEST_TEMPLATE.md") return False #Generate the validator for the current template. @@ -361,20 +391,16 @@ def validate(self): def make_default_validator(body, debug=False): changelogs = ParseSectionData("CHANGELOGS", "### Change Logs", - {"_Describe context and summary for this change. Highlight if any code was copied._"}, - "IMPACT") + {"_Describe context and summary for this change. Highlight if any code was copied._"}) impact = ParseSectionData("IMPACT", "### Impact", - {"_Describe any public API or user-facing feature change or any performance impact._"}, - "RISKLEVEL") + {"_Describe any public API or user-facing feature change or any performance impact._"}) risklevel = RiskLevelData("RISKLEVEL", - "**Risk level:", - {"_Choose one. If medium or high, explain what verification was done to mitigate the risks._"}, - "CHECKLIST") + "### Risk level ", + {"_If medium or high, explain what verification was done to mitigate the risks._"}) checklist = ParseSectionData("CHECKLIST", "### Contributor's checklist", - {}, - "SUCCESS") + {}) parseSections = ParseSections([changelogs, impact, risklevel, checklist]) return ValidateBody(body, "CHANGELOGS", parseSections, debug) @@ -431,18 +457,14 @@ def test_body(): good_impact[1] = "impact description" template_risklevel = [ - "**Risk level: none | low | medium | high**", + "### Risk level (write none, low medium or high below)", "", - "_Choose one. If medium or high, explain what verification was done to mitigate the risks._", + "_If medium or high, explain what verification was done to mitigate the risks._", "" ] - good_risklevel_none = template_risklevel.copy() - good_risklevel_none[0] = "**Risk level: none**" - - good_risklevel_medium = template_risklevel.copy() - good_risklevel_medium[0] = "**Risk level: medium**" - good_risklevel_medium[1] = "risklevel description" + good_risklevel = template_risklevel.copy() + good_risklevel[1] = "none" template_checklist = [ "### Contributor's checklist", @@ -454,13 +476,14 @@ def test_body(): ] #list of sections that when combined form a valid body - good_sections = [good_changelogs, good_impact, good_risklevel_none, template_checklist] + good_sections = [good_changelogs, good_impact, good_risklevel, template_checklist] #list of sections that when combined form the template template_sections = [template_changelogs, template_impact, template_risklevel, template_checklist] tests_passed = True #Test section not filled out + #no need to test checklist section for i in range(len(good_sections)-1): test_sections = [] for j in range(len(good_sections)): @@ -480,13 +503,13 @@ def test_body(): tests_passed = run_test(f"duplicate section: {i}", build_body(test_sections), False, DEBUG_MESSAGES) and tests_passed #Test out of order section - for i in range(len(good_sections)): + for i in range(len(good_sections)-1): test_sections = [] for j in range(len(good_sections)): test_sections.append(good_sections[j].copy()) - k = (i + 3) % len(good_sections) - test_sections[i], test_sections[k] = test_sections[k],test_sections[i] - tests_passed = run_test(f"Swapped sections: {i}, {k}", build_body(test_sections), False, DEBUG_MESSAGES) and tests_passed + for k in range(i+1,len(good_sections)): + test_sections[i], test_sections[k] = test_sections[k],test_sections[i] + tests_passed = run_test(f"Swapped sections: {i}, {k}", build_body(test_sections), False, DEBUG_MESSAGES) and tests_passed #Test missing section for i in range(len(good_sections)): @@ -496,28 +519,8 @@ def test_body(): test_sections.append(good_sections[j].copy()) tests_passed = run_test(f"Missing Section: {i}", build_body(test_sections), False, DEBUG_MESSAGES) and tests_passed - #Test good body with risk level of none: - tests_passed = run_test("good documentation. risk level none", build_body(good_sections), True, DEBUG_MESSAGES) and tests_passed - - #Test good body with risk level of medium: - risk_level_index = 2 - good_medium_risk_level = good_sections.copy() - good_medium_risk_level[risk_level_index] = good_risklevel_medium - tests_passed = run_test("good documentation. risk level medium", build_body(good_medium_risk_level), True, DEBUG_MESSAGES) and tests_passed - - #Test good body except medium risk level and there is no description - bad_medium_risk_level = good_sections.copy() - bad_risklevel_medium = good_risklevel_medium.copy() - bad_risklevel_medium[1] = "" - bad_medium_risk_level[risk_level_index] = bad_risklevel_medium - tests_passed = run_test("medium risk level but no description", build_body(bad_medium_risk_level), False, DEBUG_MESSAGES) and tests_passed - - #Test unknown risk level: - unknow_risk_level = good_sections.copy() - unknow_risk_level_section = template_risklevel.copy() - unknow_risk_level_section[0] = "**Risk level: unknown**" - unknow_risk_level[risk_level_index] = unknow_risk_level_section - tests_passed = run_test("unknown risk level", build_body(unknow_risk_level), False, DEBUG_MESSAGES) and tests_passed + #Test good body: + tests_passed = run_test("good documentation", build_body(good_sections), True, DEBUG_MESSAGES) and tests_passed print("*****") if tests_passed: