From 98b4e9796e1e3e1f69954afa698ace5b28bde4a0 Mon Sep 17 00:00:00 2001 From: ForwardXu Date: Fri, 1 Apr 2022 10:01:41 +0800 Subject: [PATCH] =?UTF-8?q?[HUDI-3406]=20Rollback=20incorrectly=20relying?= =?UTF-8?q?=20on=20FS=20listing=20instead=20of=20Com=E2=80=A6=20(#4957)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [HUDI-3406] Rollback incorrectly relying on FS listing instead of Commit Metadata * [HUDI-3406] Rollback incorrectly relying on FS listing instead of Commit Metadata * [HUDI-3406] Rollback incorrectly relying on FS listing instead of Commit Metadata * fix comments * fix comments * fix comments --- .../client/utils/MetadataConversionUtils.java | 17 +- .../action/rollback/BaseRollbackHelper.java | 5 - .../rollback/ListingBasedRollbackHelper.java | 150 --------- .../ListingBasedRollbackStrategy.java | 302 ++++++++++++++++-- .../table/action/rollback/RollbackUtils.java | 167 ---------- .../rollback/SerializablePathFilter.java | 26 ++ .../upgrade/ZeroToOneUpgradeHandler.java | 34 +- ...TestMergeOnReadRollbackActionExecutor.java | 5 +- .../common/model/HoodieCommitMetadata.java | 13 + 9 files changed, 351 insertions(+), 368 deletions(-) delete mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java index 8a9d0b3204e7..d588a9c5dd0c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; - import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; @@ -34,6 +33,7 @@ import org.apache.hudi.common.model.HoodieRollingStatMetadata; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; @@ -146,6 +146,19 @@ private static Option getRequestedReplaceMetadat return Option.of(TimelineMetadataUtils.deserializeRequestedReplaceMetadata(requestedContent.get())); } + public static Option getHoodieCommitMetadata(HoodieTableMetaClient metaClient, HoodieInstant hoodieInstant) throws IOException { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); + + if (hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { + return Option.of(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant).get(), + HoodieReplaceCommitMetadata.class)); + } + return Option.of(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant).get(), + HoodieCommitMetadata.class)); + + } + public static org.apache.hudi.avro.model.HoodieCommitMetadata convertCommitMetadata( HoodieCommitMetadata hoodieCommitMetadata) { ObjectMapper mapper = new ObjectMapper(); @@ -160,4 +173,4 @@ public static org.apache.hudi.avro.model.HoodieCommitMetadata convertCommitMetad avroMetaData.getExtraMetadata().put(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY, ""); return avroMetaData; } -} \ No newline at end of file +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java index 189de373d92d..8475afe16eea 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java @@ -20,7 +20,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.hudi.avro.model.HoodieRollbackRequest; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -214,8 +213,4 @@ protected Map generateHeader(String c String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); return header; } - - public interface SerializablePathFilter extends PathFilter, Serializable { - - } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java deleted file mode 100644 index 628b2fc3720f..000000000000 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi.table.action.rollback; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hudi.avro.model.HoodieRollbackRequest; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import static org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING; - -/** - * Performs Rollback of Hoodie Tables. - */ -public class ListingBasedRollbackHelper implements Serializable { - private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class); - - private final HoodieTableMetaClient metaClient; - private final HoodieWriteConfig config; - - public ListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) { - this.metaClient = metaClient; - this.config = config; - } - - /** - * Collects info for Rollback plan. - */ - public List getRollbackRequestsForRollbackPlan(HoodieEngineContext context, HoodieInstant instantToRollback, List rollbackRequests) { - int sparkPartitions = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1); - context.setJobStatus(this.getClass().getSimpleName(), "Creating Rollback Plan"); - return getListingBasedRollbackRequests(context, instantToRollback, rollbackRequests, sparkPartitions); - } - - /** - * May be delete interested files and collect stats or collect stats only. - * - * @param context instance of {@link HoodieEngineContext} to use. - * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested. - * @param rollbackRequests List of {@link ListingBasedRollbackRequest} to be operated on. - * @param numPartitions number of spark partitions to use for parallelism. - * @return stats collected with or w/o actual deletions. - */ - private List getListingBasedRollbackRequests(HoodieEngineContext context, HoodieInstant instantToRollback, - List rollbackRequests, int numPartitions) { - return context.map(rollbackRequests, rollbackRequest -> { - switch (rollbackRequest.getType()) { - case DELETE_DATA_FILES_ONLY: { - final FileStatus[] filesToDeletedStatus = getBaseFilesToBeDeleted(metaClient, config, instantToRollback.getTimestamp(), - rollbackRequest.getPartitionPath(), metaClient.getFs()); - List filesToBeDeleted = Arrays.stream(filesToDeletedStatus).map(fileStatus -> { - String fileToBeDeleted = fileStatus.getPath().toString(); - // strip scheme - return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") + 1); - }).collect(Collectors.toList()); - return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(), - EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, Collections.EMPTY_MAP); - } - case DELETE_DATA_AND_LOG_FILES: { - final FileStatus[] filesToDeletedStatus = getBaseAndLogFilesToBeDeleted(instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), metaClient.getFs()); - List filesToBeDeleted = Arrays.stream(filesToDeletedStatus).map(fileStatus -> { - String fileToBeDeleted = fileStatus.getPath().toString(); - // strip scheme - return fileToBeDeleted.substring(fileToBeDeleted.indexOf(":") + 1); - }).collect(Collectors.toList()); - return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(), EMPTY_STRING, EMPTY_STRING, filesToBeDeleted, Collections.EMPTY_MAP); - } - case APPEND_ROLLBACK_BLOCK: { - String fileId = rollbackRequest.getFileId().get(); - String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get(); - HoodieWriteStat writeStat = rollbackRequest.getWriteStat().get(); - - Path fullLogFilePath = FSUtils.getPartitionPath(config.getBasePath(), writeStat.getPath()); - - Map logFilesWithBlocksToRollback = - Collections.singletonMap(fullLogFilePath.toString(), writeStat.getTotalWriteBytes()); - - return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(), fileId, latestBaseInstant, - Collections.EMPTY_LIST, logFilesWithBlocksToRollback); - } - default: - throw new IllegalStateException("Unknown Rollback action " + rollbackRequest); - } - }, numPartitions); - } - - private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient metaClient, HoodieWriteConfig config, - String commit, String partitionPath, FileSystem fs) throws IOException { - LOG.info("Collecting files to be cleaned/rolledback up for path " + partitionPath + " and commit " + commit); - String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); - PathFilter filter = (path) -> { - if (path.toString().contains(basefileExtension)) { - String fileCommitTime = FSUtils.getCommitTime(path.getName()); - return commit.equals(fileCommitTime); - } - return false; - }; - return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); - } - - private FileStatus[] getBaseAndLogFilesToBeDeleted(String commit, String partitionPath, FileSystem fs) throws IOException { - String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); - BaseRollbackHelper.SerializablePathFilter filter = (path) -> { - if (path.toString().endsWith(basefileExtension)) { - String fileCommitTime = FSUtils.getCommitTime(path.getName()); - return commit.equals(fileCommitTime); - } else if (FSUtils.isLogFile(path)) { - // Since the baseCommitTime is the only commit for new log files, it's okay here - String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path); - return commit.equals(fileCommitTime); - } - return false; - }; - return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); - } -} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java index e6355526e523..ed37798607bd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java @@ -18,19 +18,42 @@ package org.apache.hudi.table.action.rollback; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hudi.avro.model.HoodieRollbackRequest; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; - import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.jetbrains.annotations.NotNull; -import java.io.IOException; -import java.util.List; +import static org.apache.hudi.client.utils.MetadataConversionUtils.getHoodieCommitMetadata; +import static org.apache.hudi.common.util.ValidationUtils.checkArgument; +import static org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING; /** * Listing based rollback strategy to fetch list of {@link HoodieRollbackRequest}s. @@ -39,12 +62,15 @@ public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecu private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackStrategy.class); - protected final HoodieTable table; - protected final HoodieEngineContext context; + protected final HoodieTable table; + + protected final transient HoodieEngineContext context; + protected final HoodieWriteConfig config; + protected final String instantTime; - public ListingBasedRollbackStrategy(HoodieTable table, + public ListingBasedRollbackStrategy(HoodieTable table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) { @@ -57,20 +83,260 @@ public ListingBasedRollbackStrategy(HoodieTable table, @Override public List getRollbackRequests(HoodieInstant instantToRollback) { try { - List rollbackRequests = null; - if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) { - rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, - table.getMetaClient().getBasePath()); - } else { - rollbackRequests = RollbackUtils - .generateRollbackRequestsUsingFileListingMOR(instantToRollback, table, context); - } - List listingBasedRollbackRequests = new ListingBasedRollbackHelper(table.getMetaClient(), config) - .getRollbackRequestsForRollbackPlan(context, instantToRollback, rollbackRequests); - return listingBasedRollbackRequests; - } catch (IOException e) { + HoodieTableMetaClient metaClient = table.getMetaClient(); + List partitionPaths = + FSUtils.getAllPartitionPaths(context, table.getMetaClient().getBasePath(), false, false); + int numPartitions = Math.max(Math.min(partitionPaths.size(), config.getRollbackParallelism()), 1); + + context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing Rollback Plan"); + + HoodieTableType tableType = table.getMetaClient().getTableType(); + String baseFileExtension = getBaseFileExtension(metaClient); + Option commitMetadataOptional = getHoodieCommitMetadata(metaClient, instantToRollback); + Boolean isCommitMetadataCompleted = checkCommitMetadataCompleted(instantToRollback, commitMetadataOptional); + + return context.flatMap(partitionPaths, partitionPath -> { + List hoodieRollbackRequests = new ArrayList<>(partitionPaths.size()); + FileStatus[] filesToDelete = + fetchFilesFromInstant(instantToRollback, partitionPath, metaClient.getBasePath(), baseFileExtension, + metaClient.getFs(), commitMetadataOptional, isCommitMetadataCompleted); + + if (HoodieTableType.COPY_ON_WRITE == tableType) { + hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, filesToDelete)); + } else if (HoodieTableType.MERGE_ON_READ == tableType) { + String commit = instantToRollback.getTimestamp(); + HoodieActiveTimeline activeTimeline = table.getMetaClient().reloadActiveTimeline(); + switch (instantToRollback.getAction()) { + case HoodieTimeline.COMMIT_ACTION: + case HoodieTimeline.REPLACE_COMMIT_ACTION: + hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, filesToDelete)); + break; + case HoodieTimeline.COMPACTION_ACTION: + // If there is no delta commit present after the current commit (if compaction), no action, else we + // need to make sure that a compaction commit rollback also deletes any log files written as part of the + // succeeding deltacommit. + boolean higherDeltaCommits = + !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1) + .empty(); + if (higherDeltaCommits) { + // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled + // and has not yet finished. In this scenario we should delete only the newly created base files + // and not corresponding base commit log files created with this as baseCommit since updates would + // have been written to the log files. + hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, + listFilesToBeDeleted(instantToRollback.getTimestamp(), baseFileExtension, partitionPath, + metaClient.getFs()))); + } else { + // No deltacommits present after this compaction commit (inflight or requested). In this case, we + // can also delete any log files that were created with this compaction commit as base + // commit. + hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, filesToDelete)); + } + break; + case HoodieTimeline.DELTA_COMMIT_ACTION: + // -------------------------------------------------------------------------------------------------- + // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal + // -------------------------------------------------------------------------------------------------- + // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In + // this scenario we would want to delete these log files. + // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario, + // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks. + // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is + // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and + // and hence will end up deleting these log files. This is done so there are no orphan log files + // lying around. + // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions + // taken in this scenario is a combination of (A.2) and (A.3) + // --------------------------------------------------------------------------------------------------- + // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal + // --------------------------------------------------------------------------------------------------- + // (B.1) Failed first commit - Inserts were written to base files and HoodieWriteStat has no entries. + // In this scenario, we delete all the base files written for the failed commit. + // (B.2) Failed recurring commits - Inserts were written to base files and updates to log files. In + // this scenario, perform (A.1) and for updates written to log files, write rollback blocks. + // (B.3) Rollback triggered for first commit - Same as (B.1) + // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files + // as well if the base base file gets deleted. + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(), + HoodieCommitMetadata.class); + + // In case all data was inserts and the commit failed, delete the file belonging to that commit + // We do not know fileIds for inserts (first inserts are either log files or base files), + // delete all files for the corresponding failed commit, if present (same as COW) + hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, filesToDelete)); + + // append rollback blocks for updates and inserts as A.2 and B.2 + if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { + hoodieRollbackRequests.addAll( + getRollbackRequestToAppend(partitionPath, instantToRollback, commitMetadata, table)); + } + break; + default: + throw new HoodieRollbackException("Unknown listing type, during rollback of " + instantToRollback); + } + } else { + throw new HoodieRollbackException( + String.format("Unsupported table type: %s, during listing rollback of %s", tableType, instantToRollback)); + } + return hoodieRollbackRequests.stream(); + }, numPartitions); + } catch (Exception e) { LOG.error("Generating rollback requests failed for " + instantToRollback.getTimestamp(), e); throw new HoodieRollbackException("Generating rollback requests failed for " + instantToRollback.getTimestamp(), e); } } + + private String getBaseFileExtension(HoodieTableMetaClient metaClient) { + return metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); + } + + @NotNull + private HoodieRollbackRequest getHoodieRollbackRequest(String partitionPath, FileStatus[] filesToDeletedStatus) { + List filesToDelete = getFilesToBeDeleted(filesToDeletedStatus); + return new HoodieRollbackRequest( + partitionPath, EMPTY_STRING, EMPTY_STRING, filesToDelete, Collections.emptyMap()); + } + + @NotNull + private List getFilesToBeDeleted(FileStatus[] dataFilesToDeletedStatus) { + return Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> { + String dataFileToBeDeleted = fileStatus.getPath().toString(); + // strip scheme E.g: file:/var/folders + return dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") + 1); + }).collect(Collectors.toList()); + } + + private FileStatus[] listFilesToBeDeleted(String commit, String basefileExtension, String partitionPath, + FileSystem fs) throws IOException { + LOG.info("Collecting files to be cleaned/rolledback up for path " + partitionPath + " and commit " + commit); + PathFilter filter = (path) -> { + if (path.toString().contains(basefileExtension)) { + String fileCommitTime = FSUtils.getCommitTime(path.getName()); + return commit.equals(fileCommitTime); + } + return false; + }; + return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); + } + + private FileStatus[] fetchFilesFromInstant(HoodieInstant instantToRollback, String partitionPath, String basePath, + String baseFileExtension, HoodieWrapperFileSystem fs, + Option commitMetadataOptional, + Boolean isCommitMetadataCompleted) throws IOException { + if (isCommitMetadataCompleted) { + return fetchFilesFromCommitMetadata(instantToRollback, partitionPath, basePath, commitMetadataOptional.get(), + baseFileExtension, fs); + } else { + return fetchFilesFromListFiles(instantToRollback, partitionPath, basePath, baseFileExtension, fs); + } + } + + private FileStatus[] fetchFilesFromCommitMetadata(HoodieInstant instantToRollback, String partitionPath, + String basePath, HoodieCommitMetadata commitMetadata, + String baseFileExtension, HoodieWrapperFileSystem fs) + throws IOException { + SerializablePathFilter pathFilter = getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp()); + Path[] filePaths = getFilesFromCommitMetadata(basePath, commitMetadata, partitionPath); + + return fs.listStatus(filePaths, pathFilter); + } + + private FileStatus[] fetchFilesFromListFiles(HoodieInstant instantToRollback, String partitionPath, String basePath, + String baseFileExtension, HoodieWrapperFileSystem fs) + throws IOException { + SerializablePathFilter pathFilter = getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp()); + Path[] filePaths = listFilesToBeDeleted(basePath, partitionPath); + + return fs.listStatus(filePaths, pathFilter); + } + + private Boolean checkCommitMetadataCompleted(HoodieInstant instantToRollback, + Option commitMetadataOptional) { + return commitMetadataOptional.isPresent() && instantToRollback.isCompleted() + && !WriteOperationType.UNKNOWN.equals(commitMetadataOptional.get().getOperationType()); + } + + private static Path[] listFilesToBeDeleted(String basePath, String partitionPath) { + return new Path[] {FSUtils.getPartitionPath(basePath, partitionPath)}; + } + + private static Path[] getFilesFromCommitMetadata(String basePath, HoodieCommitMetadata commitMetadata, String partitionPath) { + List fullPaths = commitMetadata.getFullPathsByPartitionPath(basePath, partitionPath); + return fullPaths.stream().map(Path::new).toArray(Path[]::new); + } + + @NotNull + private static SerializablePathFilter getSerializablePathFilter(String basefileExtension, String commit) { + return (path) -> { + if (path.toString().endsWith(basefileExtension)) { + String fileCommitTime = FSUtils.getCommitTime(path.getName()); + return commit.equals(fileCommitTime); + } else if (FSUtils.isLogFile(path)) { + // Since the baseCommitTime is the only commit for new log files, it's okay here + String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path); + return commit.equals(fileCommitTime); + } + return false; + }; + } + + public static List getRollbackRequestToAppend(String partitionPath, HoodieInstant rollbackInstant, + HoodieCommitMetadata commitMetadata, HoodieTable table) { + List hoodieRollbackRequests = new ArrayList<>(); + checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)); + + // wStat.getPrevCommit() might not give the right commit time in the following + // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be + // used to write the new log files. In this case, the commit time for the log file is the compaction requested time. + // But the index (global) might store the baseCommit of the base and not the requested, hence get the + // baseCommit always by listing the file slice + // With multi writers, rollbacks could be lazy. and so we need to use getLatestFileSlicesBeforeOrOn() instead of getLatestFileSlices() + Map latestFileSlices = table.getSliceView() + .getLatestFileSlicesBeforeOrOn(partitionPath, rollbackInstant.getTimestamp(), true) + .collect(Collectors.toMap(FileSlice::getFileId, Function.identity())); + + List hoodieWriteStats = commitMetadata.getPartitionToWriteStats().get(partitionPath) + .stream() + .filter(writeStat -> { + // Filter out stats without prevCommit since they are all inserts + boolean validForRollback = (writeStat != null) && (!writeStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT)) + && (writeStat.getPrevCommit() != null) && latestFileSlices.containsKey(writeStat.getFileId()); + + if (!validForRollback) { + return false; + } + + FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId()); + + // For sanity, log-file base-instant time can never be less than base-commit on which we are rolling back + checkArgument( + HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(), + HoodieTimeline.LESSER_THAN_OR_EQUALS, rollbackInstant.getTimestamp()), + "Log-file base-instant could not be less than the instant being rolled back"); + + // Command block "rolling back" the preceding block {@link HoodieCommandBlockTypeEnum#ROLLBACK_PREVIOUS_BLOCK} + // w/in the latest file-slice is appended iff base-instant of the log-file is _strictly_ less + // than the instant of the Delta Commit being rolled back. Otherwise, log-file will be cleaned up + // in a different branch of the flow. + return HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp()); + }) + .collect(Collectors.toList()); + + for (HoodieWriteStat writeStat : hoodieWriteStats) { + FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId()); + String fileId = writeStat.getFileId(); + String latestBaseInstant = latestFileSlice.getBaseInstantTime(); + + Path fullLogFilePath = FSUtils.getPartitionPath(table.getConfig().getBasePath(), writeStat.getPath()); + + Map logFilesWithBlocksToRollback = + Collections.singletonMap(fullLogFilePath.toString(), writeStat.getTotalWriteBytes()); + + hoodieRollbackRequests.add(new HoodieRollbackRequest(partitionPath, fileId, latestBaseInstant, + Collections.emptyList(), logFilesWithBlocksToRollback)); + } + + return hoodieRollbackRequests; + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java index 2bc9b59b0d1f..ce7a18515137 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java @@ -21,21 +21,13 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.common.HoodieRollbackStat; -import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.FileSlice; -import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieTable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -44,9 +36,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.function.Function; -import java.util.stream.Collectors; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; @@ -102,160 +91,4 @@ static HoodieRollbackStat mergeRollbackStat(HoodieRollbackStat stat1, HoodieRoll return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount); } - /** - * Generate all rollback requests that needs rolling back this action without actually performing rollback for COW table type. - * @param engineContext instance of {@link HoodieEngineContext} to use. - * @param basePath base path of interest. - * @return {@link List} of {@link ListingBasedRollbackRequest}s thus collected. - */ - public static List generateRollbackRequestsByListingCOW(HoodieEngineContext engineContext, String basePath) { - return FSUtils.getAllPartitionPaths(engineContext, basePath, false, false).stream() - .map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction) - .collect(Collectors.toList()); - } - - /** - * Generate all rollback requests that we need to perform for rolling back this action without actually performing rolling back for MOR table type. - * - * @param instantToRollback Instant to Rollback - * @param table instance of {@link HoodieTable} to use. - * @param context instance of {@link HoodieEngineContext} to use. - * @return list of rollback requests - */ - public static List generateRollbackRequestsUsingFileListingMOR(HoodieInstant instantToRollback, HoodieTable table, HoodieEngineContext context) throws IOException { - String commit = instantToRollback.getTimestamp(); - HoodieWriteConfig config = table.getConfig(); - List partitions = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getBasePath(), false, false); - if (partitions.isEmpty()) { - return new ArrayList<>(); - } - int sparkPartitions = Math.max(Math.min(partitions.size(), config.getRollbackParallelism()), 1); - context.setJobStatus(RollbackUtils.class.getSimpleName(), "Generate all rollback requests"); - return context.flatMap(partitions, partitionPath -> { - HoodieActiveTimeline activeTimeline = table.getMetaClient().reloadActiveTimeline(); - List partitionRollbackRequests = new ArrayList<>(); - switch (instantToRollback.getAction()) { - case HoodieTimeline.COMMIT_ACTION: - case HoodieTimeline.REPLACE_COMMIT_ACTION: - LOG.info("Rolling back commit action."); - partitionRollbackRequests.add( - ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); - break; - case HoodieTimeline.COMPACTION_ACTION: - // If there is no delta commit present after the current commit (if compaction), no action, else we - // need to make sure that a compaction commit rollback also deletes any log files written as part of the - // succeeding deltacommit. - boolean higherDeltaCommits = - !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1).empty(); - if (higherDeltaCommits) { - // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled - // and has not yet finished. In this scenario we should delete only the newly created base files - // and not corresponding base commit log files created with this as baseCommit since updates would - // have been written to the log files. - LOG.info("Rolling back compaction. There are higher delta commits. So only deleting data files"); - partitionRollbackRequests.add( - ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataFilesOnlyAction(partitionPath)); - } else { - // No deltacommits present after this compaction commit (inflight or requested). In this case, we - // can also delete any log files that were created with this compaction commit as base - // commit. - LOG.info("Rolling back compaction plan. There are NO higher delta commits. So deleting both data and" - + " log files"); - partitionRollbackRequests.add( - ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); - } - break; - case HoodieTimeline.DELTA_COMMIT_ACTION: - // -------------------------------------------------------------------------------------------------- - // (A) The following cases are possible if index.canIndexLogFiles and/or index.isGlobal - // -------------------------------------------------------------------------------------------------- - // (A.1) Failed first commit - Inserts were written to log files and HoodieWriteStat has no entries. In - // this scenario we would want to delete these log files. - // (A.2) Failed recurring commit - Inserts/Updates written to log files. In this scenario, - // HoodieWriteStat will have the baseCommitTime for the first log file written, add rollback blocks. - // (A.3) Rollback triggered for first commit - Inserts were written to the log files but the commit is - // being reverted. In this scenario, HoodieWriteStat will be `null` for the attribute prevCommitTime and - // and hence will end up deleting these log files. This is done so there are no orphan log files - // lying around. - // (A.4) Rollback triggered for recurring commits - Inserts/Updates are being rolled back, the actions - // taken in this scenario is a combination of (A.2) and (A.3) - // --------------------------------------------------------------------------------------------------- - // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal - // --------------------------------------------------------------------------------------------------- - // (B.1) Failed first commit - Inserts were written to base files and HoodieWriteStat has no entries. - // In this scenario, we delete all the base files written for the failed commit. - // (B.2) Failed recurring commits - Inserts were written to base files and updates to log files. In - // this scenario, perform (A.1) and for updates written to log files, write rollback blocks. - // (B.3) Rollback triggered for first commit - Same as (B.1) - // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files - // as well if the base base file gets deleted. - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(), - HoodieCommitMetadata.class); - - // In case all data was inserts and the commit failed, delete the file belonging to that commit - // We do not know fileIds for inserts (first inserts are either log files or base files), - // delete all files for the corresponding failed commit, if present (same as COW) - partitionRollbackRequests.add( - ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); - - // append rollback blocks for updates and inserts as A.2 and B.2 - if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { - partitionRollbackRequests - .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata, table)); - } - break; - default: - break; - } - return partitionRollbackRequests.stream(); - }, Math.min(partitions.size(), sparkPartitions)).stream().filter(Objects::nonNull).collect(Collectors.toList()); - } - - private static List generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant, - HoodieCommitMetadata commitMetadata, HoodieTable table) { - checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)); - - // wStat.getPrevCommit() might not give the right commit time in the following - // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be - // used to write the new log files. In this case, the commit time for the log file is the compaction requested time. - // But the index (global) might store the baseCommit of the base and not the requested, hence get the - // baseCommit always by listing the file slice - // With multi writers, rollbacks could be lazy. and so we need to use getLatestFileSlicesBeforeOrOn() instead of getLatestFileSlices() - Map latestFileSlices = table.getSliceView() - .getLatestFileSlicesBeforeOrOn(partitionPath, rollbackInstant.getTimestamp(), true) - .collect(Collectors.toMap(FileSlice::getFileId, Function.identity())); - - return commitMetadata.getPartitionToWriteStats().get(partitionPath) - .stream() - .filter(writeStat -> { - // Filter out stats without prevCommit since they are all inserts - boolean validForRollback = (writeStat != null) && (!writeStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT)) - && (writeStat.getPrevCommit() != null) && latestFileSlices.containsKey(writeStat.getFileId()); - - if (!validForRollback) { - return false; - } - - FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId()); - - // For sanity, log-file base-instant time can never be less than base-commit on which we are rolling back - checkArgument( - HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(), - HoodieTimeline.LESSER_THAN_OR_EQUALS, rollbackInstant.getTimestamp()), - "Log-file base-instant could not be less than the instant being rolled back"); - - // Command block "rolling back" the preceding block {@link HoodieCommandBlockTypeEnum#ROLLBACK_PREVIOUS_BLOCK} - // w/in the latest file-slice is appended iff base-instant of the log-file is _strictly_ less - // than the instant of the Delta Commit being rolled back. Otherwise, log-file will be cleaned up - // in a different branch of the flow. - return HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp()); - }) - .map(writeStat -> { - FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId()); - return ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, - writeStat.getFileId(), latestFileSlice.getBaseInstantTime(), writeStat); - }) - .collect(Collectors.toList()); - } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java new file mode 100644 index 000000000000..e2affdf5ca89 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/SerializablePathFilter.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.rollback; + +import org.apache.hadoop.fs.PathFilter; + +import java.io.Serializable; + +public interface SerializablePathFilter extends PathFilter, Serializable { +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java index 6a114154c877..42add690f29e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -18,14 +18,14 @@ package org.apache.hudi.table.upgrade; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieRollbackRequest; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.IOType; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -35,15 +35,10 @@ import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.rollback.BaseRollbackHelper; -import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper; -import org.apache.hudi.table.action.rollback.ListingBasedRollbackRequest; -import org.apache.hudi.table.action.rollback.RollbackUtils; +import org.apache.hudi.table.action.rollback.ListingBasedRollbackStrategy; import org.apache.hudi.table.marker.WriteMarkers; import org.apache.hudi.table.marker.WriteMarkersFactory; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; - import java.util.Collections; import java.util.List; import java.util.Map; @@ -100,14 +95,7 @@ protected void recreateMarkers(final String commitInstantTime, writeMarkers.quietDeleteMarkerDir(context, parallelism); // generate rollback stats - List rollbackRequests; - if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) { - rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getBasePath()); - } else { - rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context); - } - List rollbackStats = getListBasedRollBackStats(table.getMetaClient(), table.getConfig(), - context, commitInstantOpt, rollbackRequests); + List rollbackStats = getListBasedRollBackStats(table, context, commitInstantOpt); // recreate markers adhering to marker based rollback for (HoodieRollbackStat rollbackStat : rollbackStats) { @@ -126,12 +114,12 @@ protected void recreateMarkers(final String commitInstantTime, } } - List getListBasedRollBackStats( - HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, - Option commitInstantOpt, List rollbackRequests) { - List hoodieRollbackRequests = new ListingBasedRollbackHelper(metaClient, config) - .getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(), rollbackRequests); - return new BaseRollbackHelper(metaClient, config).collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests); + List getListBasedRollBackStats(HoodieTable table, HoodieEngineContext context, Option commitInstantOpt) { + List hoodieRollbackRequests = + new ListingBasedRollbackStrategy(table, context, table.getConfig(), commitInstantOpt.get().getTimestamp()) + .getRollbackRequests(commitInstantOpt.get()); + return new BaseRollbackHelper(table.getMetaClient(), table.getConfig()) + .collectRollbackStats(context, commitInstantOpt.get(), hoodieRollbackRequests); } /** @@ -143,7 +131,7 @@ List getListBasedRollBackStats( * @param table {@link HoodieTable} instance to use * @return the marker file name thus curated. */ - private static String getFileNameForMarkerFromLogFile(String logFilePath, HoodieTable table) { + private static String getFileNameForMarkerFromLogFile(String logFilePath, HoodieTable table) { Path logPath = new Path(table.getMetaClient().getBasePath(), logFilePath); String fileId = FSUtils.getFileIdFromLogPath(logPath); String baseInstant = FSUtils.getBaseCommitTimeFromLogPath(logPath); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java index c9e3fed871ac..d8ce6612a443 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java @@ -45,7 +45,6 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.testutils.MetadataMergeWriteStatus; - import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -125,8 +124,8 @@ public void testMergeOnReadRollbackActionExecutor(boolean isUsingMarkers) throws for (Map.Entry entry : rollbackMetadata.entrySet()) { HoodieRollbackPartitionMetadata meta = entry.getValue(); - assertTrue(meta.getFailedDeleteFiles() == null || meta.getFailedDeleteFiles().size() == 0); - assertTrue(meta.getSuccessDeleteFiles() == null || meta.getSuccessDeleteFiles().size() == 0); + assertEquals(0, meta.getFailedDeleteFiles().size()); + assertEquals(0, meta.getSuccessDeleteFiles().size()); } //4. assert file group after rollback, and compare to the rollbackstat diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java index c57965d72721..09996dbb337e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java @@ -137,6 +137,19 @@ public HashMap getFileIdAndFullPaths(String basePath) { return fullPaths; } + public List getFullPathsByPartitionPath(String basePath, String partitionPath) { + HashSet fullPaths = new HashSet<>(); + if (getPartitionToWriteStats().get(partitionPath) != null) { + for (HoodieWriteStat stat : getPartitionToWriteStats().get(partitionPath)) { + if ((stat.getFileId() != null)) { + String fullPath = FSUtils.getPartitionPath(basePath, stat.getPath()).toString(); + fullPaths.add(fullPath); + } + } + } + return new ArrayList<>(fullPaths); + } + public Map getFileGroupIdAndFullPaths(String basePath) { Map fileGroupIdToFullPaths = new HashMap<>(); for (Map.Entry> entry : getPartitionToWriteStats().entrySet()) {