Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3453] Fix HoodieBackedTableMetadata concurrent reading issue #5091

Merged
merged 16 commits into from
Sep 9, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,18 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand All @@ -94,6 +102,53 @@ public void testTableOperations() throws Exception {
verifyBaseMetadataTable();
}

@Test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codope let's remove this test altogether, and as a rule of thumb we should avoid adding any non-deterministic tests

public void testMultiReaderForHoodieBackedTableMetadata() throws Exception {
final int taskNumber = 100;
HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
init(tableType);
testTable.doWriteOperation("000001", INSERT, emptyList(), asList("p1"), 1);
HoodieBackedTableMetadata tableMetadata = new HoodieBackedTableMetadata(context, writeConfig.getMetadataConfig(), writeConfig.getBasePath(), writeConfig.getSpillableMapBasePath(), false);
assertTrue(tableMetadata.enabled());
List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
String partition = metadataPartitions.get(0);
String finalPartition = basePath + "/" + partition;
ArrayList<String> duplicatedPartitions = new ArrayList<>(taskNumber);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment explaining the intent of the test so that it's apparent and doesn't require deciphering the test to understand at least on the high levle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added!

for (int i = 0; i < taskNumber; i++) {
duplicatedPartitions.add(finalPartition);
}
ExecutorService executors = Executors.newFixedThreadPool(taskNumber);
AtomicBoolean flag = new AtomicBoolean(false);
AtomicInteger count = new AtomicInteger(0);
AtomicInteger filesNumber = new AtomicInteger(0);

for (String part : duplicatedPartitions) {
executors.submit(new Runnable() {
@Override
public void run() {
try {
count.incrementAndGet();
while (true) {
if (count.get() == taskNumber) {
break;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a countDownLatch here so that all threads could call tableMetadata.getAllFilesInPartition() around the same time. that way the test is deterministic.

FileStatus[] files = tableMetadata.getAllFilesInPartition(new Path(part));
filesNumber.addAndGet(files.length);
LOG.warn(Arrays.toString(files) + " : " + files.length);
codope marked this conversation as resolved.
Show resolved Hide resolved
assertEquals(1, files.length);
} catch (Exception e) {
flag.set(true);
}
}
});
}
executors.shutdown();
executors.awaitTermination(24, TimeUnit.HOURS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's set meaningful timeout here (1 min should be more than enough to complete)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed!

assertFalse(flag.get());
assertEquals(filesNumber.get(), taskNumber);
}

private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws Exception {
doWriteInsertAndUpsert(testTable, "0000001", "0000002", false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -280,29 +281,38 @@ private Map<Pair<String, FileSlice>, List<String>> getPartitionFileSliceToKeysMa
* @return File reader and the record scanner pair for the requested file slice
*/
private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReadersIfNeeded(String partitionName, FileSlice slice) {
return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> {
try {
HoodieTimer timer = new HoodieTimer().startTimer();

// Open base file reader
Pair<HoodieFileReader, Long> baseFileReaderOpenTimePair = getBaseFileReader(slice, timer);
HoodieFileReader baseFileReader = baseFileReaderOpenTimePair.getKey();
final long baseFileOpenMs = baseFileReaderOpenTimePair.getValue();

// Open the log record scanner using the log files from the latest file slice
List<HoodieLogFile> logFiles = slice.getLogFiles().collect(Collectors.toList());
Pair<HoodieMetadataMergedLogRecordReader, Long> logRecordScannerOpenTimePair =
getLogRecordScanner(logFiles, partitionName);
HoodieMetadataMergedLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey();
final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue();

metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR,
+baseFileOpenMs + logScannerOpenMs));
return Pair.of(baseFileReader, logRecordScanner);
} catch (IOException e) {
throw new HoodieIOException("Error opening readers for metadata table partition " + partitionName, e);
}
});
if (reuse) {
return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> {
return getHoodieMetadataMergedLogRecordReaderPair(partitionName, slice);
});
} else {
return getHoodieMetadataMergedLogRecordReaderPair(partitionName, slice);
}
}

@NotNull
private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> getHoodieMetadataMergedLogRecordReaderPair(String partitionName, FileSlice slice) {
try {
HoodieTimer timer = new HoodieTimer().startTimer();

// Open base file reader
Pair<HoodieFileReader, Long> baseFileReaderOpenTimePair = getBaseFileReader(slice, timer);
HoodieFileReader baseFileReader = baseFileReaderOpenTimePair.getKey();
final long baseFileOpenMs = baseFileReaderOpenTimePair.getValue();

// Open the log record scanner using the log files from the latest file slice
List<HoodieLogFile> logFiles = slice.getLogFiles().collect(Collectors.toList());
Pair<HoodieMetadataMergedLogRecordReader, Long> logRecordScannerOpenTimePair =
getLogRecordScanner(logFiles, partitionName);
HoodieMetadataMergedLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey();
final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue();

metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR,
+baseFileOpenMs + logScannerOpenMs));
return Pair.of(baseFileReader, logRecordScanner);
} catch (IOException e) {
throw new HoodieIOException("Error opening readers for metadata table partition " + partitionName, e);
}
}

private Pair<HoodieFileReader, Long> getBaseFileReader(FileSlice slice, HoodieTimer timer) throws IOException {
Expand Down