From 28f1e2045a198b0a067100cff44e6d6c185a05c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E7=A5=A5=E5=B9=B3?= <408317717@qq.com> Date: Fri, 29 Apr 2022 14:10:20 +0800 Subject: [PATCH] [HUDI-3758] Fix duplicate fileId error in MOR table type with flink bucket hash Index (#5185) * fix duplicate fileId with bucket Index * replace to load FileGroup from FileSystemView (cherry picked from commit e421d536eae236d0b2e29d9a4f59ed65822fd883) --- .../hudi/sink/bucket/BucketStreamWriteFunction.java | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java index 1456e8882f024..11d5f36436b78 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java @@ -75,11 +75,6 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { */ private Set incBucketIndex; - /** - * Returns whether this is an empty table. - */ - private boolean isEmptyTable; - /** * Constructs a BucketStreamWriteFunction. * @@ -99,7 +94,6 @@ public void open(Configuration parameters) throws IOException { this.bucketToLoad = getBucketToLoad(); this.bucketIndex = new HashMap<>(); this.incBucketIndex = new HashSet<>(); - this.isEmptyTable = !this.metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent(); } @Override @@ -162,7 +156,7 @@ private Set getBucketToLoad() { * This is a required operation for each restart to avoid having duplicate file ids for one bucket. */ private void bootstrapIndexIfNeed(String partition) { - if (isEmptyTable || bucketIndex.containsKey(partition)) { + if (bucketIndex.containsKey(partition)) { return; } LOG.info(String.format("Loading Hoodie Table %s, with path %s", this.metaClient.getTableConfig().getTableName(), @@ -170,8 +164,8 @@ private void bootstrapIndexIfNeed(String partition) { // Load existing fileID belongs to this task Map bucketToFileIDMap = new HashMap<>(); - this.writeClient.getHoodieTable().getHoodieView().getLatestFileSlices(partition).forEach(fileSlice -> { - String fileID = fileSlice.getFileId(); + this.writeClient.getHoodieTable().getFileSystemView().getAllFileGroups(partition).forEach(fileGroup -> { + String fileID = fileGroup.getFileGroupId().getFileId(); int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID); if (bucketToLoad.contains(bucketNumber)) { LOG.info(String.format("Should load this partition bucket %s with fileID %s", bucketNumber, fileID));