From 132c0aa8c7cc34f462f5ab44e8db8b75c2ffba38 Mon Sep 17 00:00:00 2001 From: HunterXHunter <1356469429@qq.com> Date: Mon, 6 Jun 2022 21:53:55 +0800 Subject: [PATCH] [HUDI-4101] When BucketIndexPartitioner take partition path for dispersion may cause the fileID of the task to not be loaded correctly (#5763) Co-authored-by: john.wick --- .../bucket/BucketStreamWriteFunction.java | 25 +++++-------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java index 11d5f36436b7..1ccfe91dbc0a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java @@ -57,11 +57,6 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { private String indexKeyFields; - /** - * BucketID should be loaded in this task. - */ - private Set bucketToLoad; - /** * BucketID to file group mapping in each partition. * Map(partition -> Map(bucketId, fileID)). @@ -91,7 +86,6 @@ public void open(Configuration parameters) throws IOException { this.indexKeyFields = config.getString(FlinkOptions.INDEX_KEY_FIELD); this.taskID = getRuntimeContext().getIndexOfThisSubtask(); this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); - this.bucketToLoad = getBucketToLoad(); this.bucketIndex = new HashMap<>(); this.incBucketIndex = new HashSet<>(); } @@ -136,19 +130,12 @@ public void processElement(I i, ProcessFunction.Context context, Coll } /** - * Bootstrap bucket info from existing file system, - * bucketNum % totalParallelism == this taskID belongs to this task. + * Determine whether the current fileID belongs to the current task. + * (partition + curBucket) % numPartitions == this taskID belongs to this task. */ - private Set getBucketToLoad() { - Set bucketToLoad = new HashSet<>(); - for (int i = 0; i < bucketNum; i++) { - int partitionOfBucket = BucketIdentifier.mod(i, parallelism); - if (partitionOfBucket == taskID) { - bucketToLoad.add(i); - } - } - LOG.info("Bucket number that belongs to task [{}/{}]: {}", taskID, parallelism, bucketToLoad); - return bucketToLoad; + public boolean isBucketToLoad(int bucketNumber, String partition) { + int globalHash = ((partition + bucketNumber).hashCode()) & Integer.MAX_VALUE; + return BucketIdentifier.mod(globalHash, parallelism) == taskID; } /** @@ -167,7 +154,7 @@ private void bootstrapIndexIfNeed(String partition) { this.writeClient.getHoodieTable().getFileSystemView().getAllFileGroups(partition).forEach(fileGroup -> { String fileID = fileGroup.getFileGroupId().getFileId(); int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID); - if (bucketToLoad.contains(bucketNumber)) { + if (isBucketToLoad(bucketNumber, partition)) { LOG.info(String.format("Should load this partition bucket %s with fileID %s", bucketNumber, fileID)); if (bucketToFileIDMap.containsKey(bucketNumber)) { throw new RuntimeException(String.format("Duplicate fileID %s from bucket %s of partition %s found "