Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3453] Fix HoodieBackedTableMetadata concurrent reading issue #5091

Merged
merged 16 commits into from
Sep 9, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,19 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand All @@ -94,6 +103,56 @@ public void testTableOperations(boolean reuseReaders) throws Exception {
verifyBaseMetadataTable(reuseReaders);
}

/**
* Create a cow table and call getAllFilesInPartition api in parallel which reads data files from MDT
* This UT is guard that multi readers for MDT#getAllFilesInPartition api is safety.
* @param reuse
* @throws Exception
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testMultiReaderForHoodieBackedTableMetadata(boolean reuse) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhangyue19921010 @nsivabalan folks, we should revisit our approach and avoid writing non-deterministic tests. Test that doesn't fail deterministically, but instead probabilistically has low net positive value b/c if it doesn't fail consistently in the presence of the issue it just becomes noisy, flaking signal.

final int taskNumber = 20;
HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
init(tableType);
testTable.doWriteOperation("000001", INSERT, emptyList(), asList("p1"), 1);
HoodieBackedTableMetadata tableMetadata = new HoodieBackedTableMetadata(context, writeConfig.getMetadataConfig(), writeConfig.getBasePath(), writeConfig.getSpillableMapBasePath(), reuse);
assertTrue(tableMetadata.enabled());
List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
String partition = metadataPartitions.get(0);
String finalPartition = basePath + "/" + partition;
ExecutorService executors = Executors.newFixedThreadPool(taskNumber);
AtomicBoolean flag = new AtomicBoolean(false);
CountDownLatch downLatch = new CountDownLatch(taskNumber);
AtomicInteger filesNumber = new AtomicInteger(0);

// call getAllFilesInPartition api from meta data table in parallel
for (int i = 0; i < taskNumber; i++) {
executors.submit(new Runnable() {
@Override
public void run() {
try {
downLatch.countDown();
downLatch.await();
FileStatus[] files = tableMetadata.getAllFilesInPartition(new Path(finalPartition));
if (files.length != 1) {
LOG.warn("Miss match data file numbers.");
throw new RuntimeException("Miss match data file numbers.");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a countDownLatch here so that all threads could call tableMetadata.getAllFilesInPartition() around the same time. that way the test is deterministic.

filesNumber.addAndGet(files.length);
} catch (Exception e) {
LOG.warn("Catch Exception while reading data files from MDT.", e);
flag.compareAndSet(false, true);
}
}
});
}
executors.shutdown();
executors.awaitTermination(5, TimeUnit.MINUTES);
assertFalse(flag.get());
assertEquals(filesNumber.get(), taskNumber);
}

private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws Exception {
doWriteInsertAndUpsert(testTable, "0000001", "0000002", false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

package org.apache.hudi.metadata;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
Expand Down Expand Up @@ -54,6 +51,10 @@
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -231,7 +232,7 @@ public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecord
throw new HoodieIOException("Error merging records from metadata table for " + sortedKeys.size() + " key : ", ioe);
} finally {
if (!reuse) {
close(Pair.of(partitionFileSlicePair.getLeft(), partitionFileSlicePair.getRight().getFileId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also clean up close method (doesn't seem like we need it)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, It is also used in

  private void closePartitionReaders() {
    for (Pair<String, String> partitionFileSlicePair : partitionReaders.keySet()) {
      close(partitionFileSlicePair);
    }
    partitionReaders.clear();
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we should rewrite it to use closeReader instead

private void closePartitionReaders() {
    for (Pair<...> pair : partitionReaders.values()) {
      closeReader(pair);
    }
    partitionReaders.clear();
  }

closeReader(readers);
}
}
});
Expand Down Expand Up @@ -399,7 +400,12 @@ private Map<Pair<String, FileSlice>, List<String>> getPartitionFileSliceToKeysMa
* @return File reader and the record scanner pair for the requested file slice
*/
private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> getOrCreateReaders(String partitionName, FileSlice slice) {
return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> openReaders(partitionName, slice));
if (reuse) {
return partitionReaders.computeIfAbsent(Pair.of(partitionName, slice.getFileId()), k -> {
return openReaders(partitionName, slice); });
} else {
return openReaders(partitionName, slice);
}
}

private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReaders(String partitionName, FileSlice slice) {
Expand Down