Skip to content

Commit

Permalink
[HUDI-3453] Fix HoodieBackedTableMetadata concurrent reading issue (#…
Browse files Browse the repository at this point in the history
…5091)

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>
  • Loading branch information
3 people authored and yuzhaojing committed Sep 22, 2022
1 parent f0d85e5 commit 6bbb432
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,19 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand All @@ -94,6 +103,56 @@ public void testTableOperations(boolean reuseReaders) throws Exception {
verifyBaseMetadataTable(reuseReaders);
}

/**
* Create a cow table and call getAllFilesInPartition api in parallel which reads data files from MDT
* This UT is guard that multi readers for MDT#getAllFilesInPartition api is safety.
* @param reuse
* @throws Exception
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testMultiReaderForHoodieBackedTableMetadata(boolean reuse) throws Exception {
final int taskNumber = 20;
HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
init(tableType);
testTable.doWriteOperation("000001", INSERT, emptyList(), asList("p1"), 1);
HoodieBackedTableMetadata tableMetadata = new HoodieBackedTableMetadata(context, writeConfig.getMetadataConfig(), writeConfig.getBasePath(), writeConfig.getSpillableMapBasePath(), reuse);
assertTrue(tableMetadata.enabled());
List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
String partition = metadataPartitions.get(0);
String finalPartition = basePath + "/" + partition;
ExecutorService executors = Executors.newFixedThreadPool(taskNumber);
AtomicBoolean flag = new AtomicBoolean(false);
CountDownLatch downLatch = new CountDownLatch(taskNumber);
AtomicInteger filesNumber = new AtomicInteger(0);

// call getAllFilesInPartition api from meta data table in parallel
for (int i = 0; i < taskNumber; i++) {
executors.submit(new Runnable() {
@Override
public void run() {
try {
downLatch.countDown();
downLatch.await();
FileStatus[] files = tableMetadata.getAllFilesInPartition(new Path(finalPartition));
if (files.length != 1) {
LOG.warn("Miss match data file numbers.");
throw new RuntimeException("Miss match data file numbers.");
}
filesNumber.addAndGet(files.length);
} catch (Exception e) {
LOG.warn("Catch Exception while reading data files from MDT.", e);
flag.compareAndSet(false, true);
}
}
});
}
executors.shutdown();
executors.awaitTermination(5, TimeUnit.MINUTES);
assertFalse(flag.get());
assertEquals(filesNumber.get(), taskNumber);
}

private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws Exception {
doWriteInsertAndUpsert(testTable, "0000001", "0000002", false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

package org.apache.hudi.metadata;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
Expand Down Expand Up @@ -54,6 +51,10 @@
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -231,7 +232,7 @@ public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecord
throw new HoodieIOException("Error merging records from metadata table for " + sortedKeys.size() + " key : ", ioe);
} finally {
if (!reuse) {
close(Pair.of(partitionFileSlicePair.getLeft(), partitionFileSlicePair.getRight().getFileId()));
closeReader(readers);
}
}
});
Expand Down Expand Up @@ -397,7 +398,12 @@ private Map<Pair<String, FileSlice>, List<String>> getPartitionFileSliceToKeysMa
* @return File reader and the record scanner pair for the requested file slice
*/
private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) {
return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> openReaders(partitionName, slice));
if (reuse) {
return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> {
return openReaders(partitionName, slice); });
} else {
return openReaders(partitionName, slice);
}
}

private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReaders(String partitionName, FileSlice slice) {
Expand Down

0 comments on commit 6bbb432

Please sign in to comment.