Skip to content

Commit

Permalink
[HUDI-3758] Fix duplicate fileId error in MOR table type with flink b…
Browse files Browse the repository at this point in the history
…ucket hash Index (apache#5185)

* fix duplicate fileId with bucket Index
* replace to load FileGroup from FileSystemView

(cherry picked from commit e421d53)
  • Loading branch information
wxplovecc authored and XuQianJin-Stars committed Jun 5, 2022
1 parent 60d836c commit 28f1e20
Showing 1 changed file with 3 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
*/
private Set<String> incBucketIndex;

/**
* Returns whether this is an empty table.
*/
private boolean isEmptyTable;

/**
* Constructs a BucketStreamWriteFunction.
*
Expand All @@ -99,7 +94,6 @@ public void open(Configuration parameters) throws IOException {
this.bucketToLoad = getBucketToLoad();
this.bucketIndex = new HashMap<>();
this.incBucketIndex = new HashSet<>();
this.isEmptyTable = !this.metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent();
}

@Override
Expand Down Expand Up @@ -162,16 +156,16 @@ private Set<Integer> getBucketToLoad() {
* This is a required operation for each restart to avoid having duplicate file ids for one bucket.
*/
private void bootstrapIndexIfNeed(String partition) {
if (isEmptyTable || bucketIndex.containsKey(partition)) {
if (bucketIndex.containsKey(partition)) {
return;
}
LOG.info(String.format("Loading Hoodie Table %s, with path %s", this.metaClient.getTableConfig().getTableName(),
this.metaClient.getBasePath() + "/" + partition));

// Load existing fileID belongs to this task
Map<Integer, String> bucketToFileIDMap = new HashMap<>();
this.writeClient.getHoodieTable().getHoodieView().getLatestFileSlices(partition).forEach(fileSlice -> {
String fileID = fileSlice.getFileId();
this.writeClient.getHoodieTable().getFileSystemView().getAllFileGroups(partition).forEach(fileGroup -> {
String fileID = fileGroup.getFileGroupId().getFileId();
int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID);
if (bucketToLoad.contains(bucketNumber)) {
LOG.info(String.format("Should load this partition bucket %s with fileID %s", bucketNumber, fileID));
Expand Down

0 comments on commit 28f1e20

Please sign in to comment.