diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java index 1456e8882f024..11d5f36436b78 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java @@ -75,11 +75,6 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { */ private Set incBucketIndex; - /** - * Returns whether this is an empty table. - */ - private boolean isEmptyTable; - /** * Constructs a BucketStreamWriteFunction. * @@ -99,7 +94,6 @@ public void open(Configuration parameters) throws IOException { this.bucketToLoad = getBucketToLoad(); this.bucketIndex = new HashMap<>(); this.incBucketIndex = new HashSet<>(); - this.isEmptyTable = !this.metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent(); } @Override @@ -162,7 +156,7 @@ private Set getBucketToLoad() { * This is a required operation for each restart to avoid having duplicate file ids for one bucket. */ private void bootstrapIndexIfNeed(String partition) { - if (isEmptyTable || bucketIndex.containsKey(partition)) { + if (bucketIndex.containsKey(partition)) { return; } LOG.info(String.format("Loading Hoodie Table %s, with path %s", this.metaClient.getTableConfig().getTableName(), @@ -170,8 +164,8 @@ private void bootstrapIndexIfNeed(String partition) { // Load existing fileID belongs to this task Map bucketToFileIDMap = new HashMap<>(); - this.writeClient.getHoodieTable().getHoodieView().getLatestFileSlices(partition).forEach(fileSlice -> { - String fileID = fileSlice.getFileId(); + this.writeClient.getHoodieTable().getFileSystemView().getAllFileGroups(partition).forEach(fileGroup -> { + String fileID = fileGroup.getFileGroupId().getFileId(); int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID); if (bucketToLoad.contains(bucketNumber)) { LOG.info(String.format("Should load this partition bucket %s with fileID %s", bucketNumber, fileID));