Skip to content

Commit

Permalink
Avoid file index and use fs view cache in COW input format
Browse files Browse the repository at this point in the history
  • Loading branch information
codope committed Dec 17, 2022
1 parent 7b17e6f commit 081d692
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,9 @@

package org.apache.hudi.hadoop;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
Expand All @@ -42,6 +30,8 @@
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
Expand All @@ -50,21 +40,43 @@
import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.buildMetadataConfig;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getFileStatus;
import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.refreshFileStatus;

/**
* Base implementation of the Hive's {@link FileInputFormat} allowing for reading of Hudi's
Expand Down Expand Up @@ -223,6 +235,7 @@ private List<FileStatus> listStatusForSnapshotMode(JobConf job,

Map<HoodieTableMetaClient, List<Path>> groupedPaths =
HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths);
Map<HoodieTableMetaClient, HoodieTableFileSystemView> fsViewCache = new HashMap<>();

for (Map.Entry<HoodieTableMetaClient, List<Path>> entry : groupedPaths.entrySet()) {
HoodieTableMetaClient tableMetaClient = entry.getKey();
Expand All @@ -236,28 +249,53 @@ private List<FileStatus> listStatusForSnapshotMode(JobConf job,
boolean shouldIncludePendingCommits =
HoodieHiveUtils.shouldIncludePendingCommits(job, tableMetaClient.getTableConfig().getTableName());

HiveHoodieTableFileIndex fileIndex =
new HiveHoodieTableFileIndex(
engineContext,
tableMetaClient,
props,
HoodieTableQueryType.SNAPSHOT,
partitionPaths,
queryCommitInstant,
shouldIncludePendingCommits);

Map<String, List<FileSlice>> partitionedFileSlices = fileIndex.listFileSlices();

Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt = getHoodieVirtualKeyInfo(tableMetaClient);

targetFiles.addAll(
partitionedFileSlices.values()
.stream()
.flatMap(Collection::stream)
.filter(fileSlice -> checkIfValidFileSlice(fileSlice))
.map(fileSlice -> createFileStatusUnchecked(fileSlice, fileIndex, virtualKeyInfoOpt))
.collect(Collectors.toList())
);
if (conf.getBoolean(ENABLE.key(), DEFAULT_METADATA_ENABLE_FOR_READERS) && HoodieTableMetadataUtil.isFilesPartitionAvailable(tableMetaClient)) {
HiveHoodieTableFileIndex fileIndex =
new HiveHoodieTableFileIndex(
engineContext,
tableMetaClient,
props,
HoodieTableQueryType.SNAPSHOT,
partitionPaths,
queryCommitInstant,
shouldIncludePendingCommits);

Map<String, List<FileSlice>> partitionedFileSlices = fileIndex.listFileSlices();

Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt = getHoodieVirtualKeyInfo(tableMetaClient);

targetFiles.addAll(
partitionedFileSlices.values()
.stream()
.flatMap(Collection::stream)
.filter(fileSlice -> checkIfValidFileSlice(fileSlice))
.map(fileSlice -> createFileStatusUnchecked(fileSlice, fileIndex, virtualKeyInfoOpt))
.collect(Collectors.toList())
);
} else {
HoodieTimeline timeline = tableMetaClient.getActiveTimeline();

try {
HoodieTableFileSystemView fsView = fsViewCache.computeIfAbsent(tableMetaClient, hoodietableMetaClient ->
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, hoodietableMetaClient, buildMetadataConfig(job), timeline));

List<HoodieBaseFile> filteredBaseFiles = new ArrayList<>();

for (Path p : entry.getValue()) {
String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(tableMetaClient.getBasePath()), p);
List<HoodieBaseFile> matched = fsView.getLatestBaseFiles(relativePartitionPath).collect(Collectors.toList());
filteredBaseFiles.addAll(matched);
}

LOG.info("Total filtered paths " + filteredBaseFiles.size());
for (HoodieBaseFile filteredFile : filteredBaseFiles) {
filteredFile = refreshFileStatus(job, filteredFile);
targetFiles.add(getFileStatus(filteredFile));
}
} finally {
fsViewCache.forEach(((metaClient, fsView) -> fsView.close()));
}
}
}

return targetFiles;
Expand Down Expand Up @@ -285,7 +323,7 @@ private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileS
@Nonnull
protected static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
try {
return HoodieInputFormatUtils.getFileStatus(baseFile);
return getFileStatus(baseFile);
} catch (IOException ioe) {
throw new HoodieIOException("Failed to get file-status", ioe);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ public static HoodieMetadataConfig buildMetadataConfig(Configuration conf) {
* @param dataFile
* @return
*/
private static HoodieBaseFile refreshFileStatus(Configuration conf, HoodieBaseFile dataFile) {
public static HoodieBaseFile refreshFileStatus(Configuration conf, HoodieBaseFile dataFile) {
Path dataPath = dataFile.getFileStatus().getPath();
try {
if (dataFile.getFileSize() == 0) {
Expand Down

0 comments on commit 081d692

Please sign in to comment.