Skip to content

Commit

Permalink
[HUDI-4101] When BucketIndexPartitioner take partition path for dispe…
Browse files Browse the repository at this point in the history
…rsion may cause the fileID of the task to not be loaded correctly (#5763)

Co-authored-by: john.wick <john.wick@vipshop.com>
  • Loading branch information
LinMingQiang and LinMingQiang authored Jun 6, 2022
1 parent 21ab0ff commit 132c0aa
Showing 1 changed file with 6 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {

private String indexKeyFields;

/**
* BucketID should be loaded in this task.
*/
private Set<Integer> bucketToLoad;

/**
* BucketID to file group mapping in each partition.
* Map(partition -> Map(bucketId, fileID)).
Expand Down Expand Up @@ -91,7 +86,6 @@ public void open(Configuration parameters) throws IOException {
this.indexKeyFields = config.getString(FlinkOptions.INDEX_KEY_FIELD);
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
this.bucketToLoad = getBucketToLoad();
this.bucketIndex = new HashMap<>();
this.incBucketIndex = new HashSet<>();
}
Expand Down Expand Up @@ -136,19 +130,12 @@ public void processElement(I i, ProcessFunction<I, Object>.Context context, Coll
}

/**
* Bootstrap bucket info from existing file system,
* bucketNum % totalParallelism == this taskID belongs to this task.
* Determine whether the current fileID belongs to the current task.
* (partition + curBucket) % numPartitions == this taskID belongs to this task.
*/
private Set<Integer> getBucketToLoad() {
Set<Integer> bucketToLoad = new HashSet<>();
for (int i = 0; i < bucketNum; i++) {
int partitionOfBucket = BucketIdentifier.mod(i, parallelism);
if (partitionOfBucket == taskID) {
bucketToLoad.add(i);
}
}
LOG.info("Bucket number that belongs to task [{}/{}]: {}", taskID, parallelism, bucketToLoad);
return bucketToLoad;
public boolean isBucketToLoad(int bucketNumber, String partition) {
int globalHash = ((partition + bucketNumber).hashCode()) & Integer.MAX_VALUE;
return BucketIdentifier.mod(globalHash, parallelism) == taskID;
}

/**
Expand All @@ -167,7 +154,7 @@ private void bootstrapIndexIfNeed(String partition) {
this.writeClient.getHoodieTable().getFileSystemView().getAllFileGroups(partition).forEach(fileGroup -> {
String fileID = fileGroup.getFileGroupId().getFileId();
int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID);
if (bucketToLoad.contains(bucketNumber)) {
if (isBucketToLoad(bucketNumber, partition)) {
LOG.info(String.format("Should load this partition bucket %s with fileID %s", bucketNumber, fileID));
if (bucketToFileIDMap.containsKey(bucketNumber)) {
throw new RuntimeException(String.format("Duplicate fileID %s from bucket %s of partition %s found "
Expand Down

0 comments on commit 132c0aa

Please sign in to comment.