Skip to content

Commit

Permalink
[HUDI-4792] Batch clean files to delete (#6580)
Browse files Browse the repository at this point in the history
This  patch makes use of batch call to get fileGroup to delete during cleaning instead of 1 call per partition.
This limit the number of call to the view and should fix the trouble with metadata table in context of lot of partitions.
Fixes issue #6373

Co-authored-by: sivabalan <n.siva.b@gmail.com>
  • Loading branch information
parisni and nsivabalan authored Sep 21, 2022
1 parent 84b05c8 commit cbf9b83
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -116,9 +117,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());

Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
.parallelize(partitionsToClean, cleanerParallelism)
.mapPartitions(partitionIterator -> {
List<String> partitionList = new ArrayList<>();
partitionIterator.forEachRemaining(partitionList::add);
Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanResult = planner.getDeletePaths(partitionList);
return cleanResult.entrySet().iterator();
}, false).collectAsList()
.stream()
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Map<String, List<HoodieCleanFileInfo>> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey,
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOExce
HoodieWriteConfig writeConfig = getConfigBuilder(true)
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).withMaxNumDeltaCommitsBeforeCompaction(2).build())
.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) {
Expand All @@ -81,7 +81,7 @@ public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOExce
client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4);

// 2nd write batch; 4 commits for the 4th partition; the 4th commit to trigger archiving the replace commit
// 2nd write batch; 4 commits for the 3rd partition; the 4th commit to trigger archiving the replace commit
for (int i = 5; i < 9; i++) {
String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000);
client.startCommitWithTime(instantTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActi

/**
* Refresh commits timeline.
*
*
* @param visibleActiveTimeline Visible Active Timeline
*/
protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
Expand Down Expand Up @@ -750,6 +750,20 @@ public final Stream<HoodieFileGroup> getAllFileGroups(String partitionStr) {
return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> !isFileGroupReplaced(fg));
}

@Override
public final Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
return getAllFileGroupsIncludingReplaced(partitionPaths)
.map(pair -> Pair.of(pair.getLeft(), pair.getRight().stream().filter(fg -> !isFileGroupReplaced(fg)).collect(Collectors.toList())));
}

private Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroupsIncludingReplaced(final List<String> partitionStrList) {
List<Pair<String, List<HoodieFileGroup>>> fileGroupPerPartitionList = new ArrayList<>();
for (String partitionStr : partitionStrList) {
fileGroupPerPartitionList.add(Pair.of(partitionStr, getAllFileGroupsIncludingReplaced(partitionStr).collect(Collectors.toList())));
}
return fileGroupPerPartitionList.stream();
}

private Stream<HoodieFileGroup> getAllFileGroupsIncludingReplaced(final String partitionStr) {
try {
readLock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ public Stream<HoodieFileGroup> getAllFileGroups(String partitionPath) {
return execute(partitionPath, preferredView::getAllFileGroups, secondaryView::getAllFileGroups);
}

@Override
public Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
return execute(partitionPaths, preferredView::getAllFileGroups, secondaryView::getAllFileGroups);
}

@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBeforeOrOn, secondaryView::getReplacedFileGroupsBeforeOrOn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
Expand Down Expand Up @@ -377,6 +379,16 @@ public Stream<HoodieFileGroup> getAllFileGroups(String partitionPath) {
}
}

@Override
public Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
ArrayList<Pair<String, List<HoodieFileGroup>>> fileGroupPerPartitionList = new ArrayList<>();
for (String partitionPath : partitionPaths) {
Stream<HoodieFileGroup> fileGroup = getAllFileGroups(partitionPath);
fileGroupPerPartitionList.add(Pair.of(partitionPath, fileGroup.collect(Collectors.toList())));
}
return fileGroupPerPartitionList.stream();
}

@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,18 @@ interface SliceViewWithLatestSlice {
/**
* Stream all latest file slices in given partition with precondition that commitTime(file) before maxCommitTime.
*
* @param partitionPath Partition path
* @param maxCommitTime Max Instant Time
* @param partitionPath Partition path
* @param maxCommitTime Max Instant Time
* @param includeFileSlicesInPendingCompaction include file-slices that are in pending compaction
*/
Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime,
boolean includeFileSlicesInPendingCompaction);
boolean includeFileSlicesInPendingCompaction);

/**
* Stream all "merged" file-slices before on an instant time If a file-group has a pending compaction request, the
* file-slice before and after compaction request instant is merged and returned.
*
* @param partitionPath Partition Path
*
* @param partitionPath Partition Path
* @param maxInstantTime Max Instant Time
* @return
*/
Expand Down Expand Up @@ -149,10 +149,12 @@ interface SliceView extends SliceViewWithLatestSlice {
*/
Stream<HoodieFileGroup> getAllFileGroups(String partitionPath);

Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths);

/**
* Return Pending Compaction Operations.
*
* @return Pair<Pair<InstantTime,CompactionOperation>>
* @return Pair<Pair < InstantTime, CompactionOperation>>
*/
Stream<Pair<String, CompactionOperation>> getPendingCompactionOperations();

Expand Down

0 comments on commit cbf9b83

Please sign in to comment.