-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-3758] Fix duplicate fileId error in MOR table type with flink bucket hash Index #5185
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,10 +18,14 @@ | |
|
||
package org.apache.hudi.sink.bucket; | ||
|
||
import org.apache.hadoop.fs.FileStatus; | ||
import org.apache.hadoop.fs.Path; | ||
import org.apache.hudi.common.model.HoodieKey; | ||
import org.apache.hudi.common.model.HoodieLogFile; | ||
import org.apache.hudi.common.model.HoodieRecord; | ||
import org.apache.hudi.common.model.HoodieRecordLocation; | ||
import org.apache.hudi.configuration.FlinkOptions; | ||
import org.apache.hudi.configuration.OptionsResolver; | ||
import org.apache.hudi.index.bucket.BucketIdentifier; | ||
import org.apache.hudi.sink.StreamWriteFunction; | ||
|
||
|
@@ -32,11 +36,14 @@ | |
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.FileNotFoundException; | ||
import java.io.IOException; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.Arrays; | ||
import java.util.function.Predicate; | ||
|
||
/** | ||
* A stream write function with bucket hash index. | ||
|
@@ -68,6 +75,12 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> { | |
*/ | ||
private Map<String, Map<Integer, String>> bucketIndex; | ||
|
||
/** | ||
* BucketID to file group mapping in each partition loaded from fileSystem. | ||
* Map(partition -> Map(bucketId, fileID)). | ||
*/ | ||
private Map<String, Map<Integer, String>> fsBucketIndex; | ||
|
||
/** | ||
* Incremental bucket index of the current checkpoint interval, | ||
* it is needed because the bucket type('I' or 'U') should be decided based on the committed files view, | ||
|
@@ -80,6 +93,11 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> { | |
*/ | ||
private boolean isEmptyTable; | ||
|
||
/** | ||
* Returns whether this is an empty table. | ||
*/ | ||
private boolean isCowTable; | ||
|
||
/** | ||
* Constructs a BucketStreamWriteFunction. | ||
* | ||
|
@@ -100,6 +118,8 @@ public void open(Configuration parameters) throws IOException { | |
this.bucketIndex = new HashMap<>(); | ||
this.incBucketIndex = new HashSet<>(); | ||
this.isEmptyTable = !this.metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent(); | ||
this.fsBucketIndex = new HashMap<>(); | ||
this.isCowTable = OptionsResolver.isCowTable(config); | ||
} | ||
|
||
@Override | ||
|
@@ -130,7 +150,11 @@ public void processElement(I i, ProcessFunction<I, Object>.Context context, Coll | |
} else if (bucketToFileId.containsKey(bucketNum)) { | ||
location = new HoodieRecordLocation("U", bucketToFileId.get(bucketNum)); | ||
} else { | ||
Map<Integer, String> fsBucketToFileId = fsBucketIndex.computeIfAbsent(partition, p -> new HashMap<>()); | ||
String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum); | ||
if (fsBucketToFileId.containsKey(bucketNum)) { | ||
newFileId = fsBucketToFileId.get(bucketNum); | ||
} | ||
location = new HoodieRecordLocation("I", newFileId); | ||
bucketToFileId.put(bucketNum, newFileId); | ||
incBucketIndex.add(bucketId); | ||
|
@@ -161,7 +185,7 @@ private Set<Integer> getBucketToLoad() { | |
* Get partition_bucket -> fileID mapping from the existing hudi table. | ||
* This is a required operation for each restart to avoid having duplicate file ids for one bucket. | ||
*/ | ||
private void bootstrapIndexIfNeed(String partition) { | ||
private void bootstrapIndexIfNeed(String partition) throws IOException { | ||
if (isEmptyTable || bucketIndex.containsKey(partition)) { | ||
return; | ||
} | ||
|
@@ -185,5 +209,30 @@ private void bootstrapIndexIfNeed(String partition) { | |
} | ||
}); | ||
bucketIndex.put(partition, bucketToFileIDMap); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess we only need to fix line 197 using: getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime, boolean includeFileSlicesInPendingCompaction) with includeFileSlicesInPendingCompaction as true. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was Empty After test this.writeClient.getHoodieTable().getFileSystemView().getAllFileGroups(partition).forEach(group -> {
String fileID = group.getFileGroupId().getFileId(); that works. What do you think |
||
// no need to load from file System | ||
boolean noNeedLoadFiles = bucketToFileIDMap.size() == bucketToLoad.size(); | ||
if (noNeedLoadFiles || isCowTable) { | ||
return; | ||
} | ||
// reuse unCommitted log file id | ||
try { | ||
Map<Integer, String> partitionFsBucketToFileIDMap = new HashMap<>(); | ||
Predicate<FileStatus> rtFilePredicate = fileStatus -> fileStatus.getPath().getName() | ||
.contains(metaClient.getTableConfig().getLogFileFormat().getFileExtension()); | ||
Arrays.stream(this.metaClient.getFs() | ||
.listStatus(new Path(this.metaClient.getBasePath() + "/" + partition))) | ||
.filter(rtFilePredicate) | ||
.map(HoodieLogFile::new) | ||
.forEach(s -> { | ||
int bucketNumber = BucketIdentifier.bucketIdFromFileId(s.getFileId()); | ||
if (bucketToLoad.contains(bucketNumber)) { | ||
partitionFsBucketToFileIDMap.put(bucketNumber, s.getFileId()); | ||
} | ||
}); | ||
fsBucketIndex.put(partition, partitionFsBucketToFileIDMap); | ||
} catch (FileNotFoundException fileNotFoundException) { | ||
LOG.warn("May be table was not initialed"); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: fix the comment