-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-3453] Fix HoodieBackedTableMetadata concurrent reading issue #5091
Changes from 4 commits
f65da7c
dbea31f
da80525
4da0484
05447d8
6c272e7
d5c4d54
a753995
5076ea5
5694cdd
daefc01
13507fc
c0dc922
3e3caec
c711e86
0ac5e2c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,17 +58,26 @@ | |
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.params.ParameterizedTest; | ||
import org.junit.jupiter.params.provider.EnumSource; | ||
import org.junit.jupiter.params.provider.ValueSource; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.apache.hudi.common.model.WriteOperationType.INSERT; | ||
import static org.apache.hudi.common.model.WriteOperationType.UPSERT; | ||
|
||
import static java.util.Arrays.asList; | ||
import static java.util.Collections.emptyList; | ||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; | ||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertFalse; | ||
|
@@ -92,6 +101,54 @@ public void testTableOperations() throws Exception { | |
verifyBaseMetadataTable(); | ||
} | ||
|
||
@ParameterizedTest | ||
@ValueSource(booleans = {true, false}) | ||
public void testMultiReaderForHoodieBackedTableMetadata(boolean reuse) throws Exception { | ||
final int taskNumber = 100; | ||
HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE; | ||
init(tableType); | ||
testTable.doWriteOperation("000001", INSERT, emptyList(), asList("p1"), 1); | ||
HoodieBackedTableMetadata tableMetadata = new HoodieBackedTableMetadata(context, writeConfig.getMetadataConfig(), writeConfig.getBasePath(), writeConfig.getSpillableMapBasePath(), reuse); | ||
assertTrue(tableMetadata.enabled()); | ||
List<String> metadataPartitions = tableMetadata.getAllPartitionPaths(); | ||
String partition = metadataPartitions.get(0); | ||
String finalPartition = basePath + "/" + partition; | ||
ArrayList<String> duplicatedPartitions = new ArrayList<>(taskNumber); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a comment explaining the intent of the test so that it's apparent and doesn't require deciphering the test to understand at least on the high levle There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added! |
||
for (int i = 0; i < taskNumber; i++) { | ||
duplicatedPartitions.add(finalPartition); | ||
} | ||
ExecutorService executors = Executors.newFixedThreadPool(taskNumber); | ||
AtomicBoolean flag = new AtomicBoolean(false); | ||
AtomicInteger count = new AtomicInteger(0); | ||
AtomicInteger filesNumber = new AtomicInteger(0); | ||
|
||
for (String part : duplicatedPartitions) { | ||
executors.submit(new Runnable() { | ||
@Override | ||
public void run() { | ||
try { | ||
count.incrementAndGet(); | ||
while (true) { | ||
if (count.get() == taskNumber) { | ||
break; | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we add a countDownLatch here so that all threads could call tableMetadata.getAllFilesInPartition() around the same time. that way the test is deterministic. |
||
FileStatus[] files = tableMetadata.getAllFilesInPartition(new Path(part)); | ||
filesNumber.addAndGet(files.length); | ||
LOG.warn(Arrays.toString(files) + " : " + files.length); | ||
codope marked this conversation as resolved.
Show resolved
Hide resolved
|
||
assertEquals(1, files.length); | ||
} catch (Exception e) { | ||
flag.set(true); | ||
} | ||
} | ||
}); | ||
} | ||
executors.shutdown(); | ||
executors.awaitTermination(24, TimeUnit.HOURS); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's set meaningful timeout here (1 min should be more than enough to complete) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed! |
||
assertFalse(flag.get()); | ||
assertEquals(filesNumber.get(), taskNumber); | ||
} | ||
|
||
private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws Exception { | ||
doWriteInsertAndUpsert(testTable, "0000001", "0000002", false); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -229,7 +229,7 @@ public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecord | |
throw new HoodieIOException("Error merging records from metadata table for " + sortedKeys.size() + " key : ", ioe); | ||
} finally { | ||
if (!reuse) { | ||
close(Pair.of(partitionFileSlicePair.getLeft(), partitionFileSlicePair.getRight().getFileId())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's also clean up There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Aha, It is also used in
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, we should rewrite it to use
|
||
closeReader(readers); | ||
} | ||
} | ||
}); | ||
|
@@ -397,7 +397,12 @@ private Map<Pair<String, FileSlice>, List<String>> getPartitionFileSliceToKeysMa | |
* @return File reader and the record scanner pair for the requested file slice | ||
*/ | ||
private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) { | ||
return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> openReaders(partitionName, slice)); | ||
if (reuse) { | ||
return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> { | ||
return openReaders(partitionName, slice); }); | ||
} else { | ||
return openReaders(partitionName, slice); | ||
} | ||
} | ||
|
||
private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReaders(String partitionName, FileSlice slice) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhangyue19921010 @nsivabalan folks, we should revisit our approach and avoid writing non-deterministic tests. Test that doesn't fail deterministically, but instead probabilistically has low net positive value b/c if it doesn't fail consistently in the presence of the issue it just becomes noisy, flaking signal.