Skip to content

Commit

Permalink
[HUDI-5353] Close file readers (apache#7412)
Browse files Browse the repository at this point in the history
  • Loading branch information
codope authored and Alexey Kudinkin committed Dec 14, 2022
1 parent c423e20 commit d5c3528
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi.index;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
Expand All @@ -35,6 +33,9 @@
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -149,11 +150,10 @@ public static List<String> filterKeysFromFile(Path filePath, List<String> candid
Configuration configuration) throws HoodieIndexException {
ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath));
List<String> foundRecordKeys = new ArrayList<>();
try {
try (HoodieFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath)) {
// Load all rowKeys from the file, to double-confirm
if (!candidateRecordKeys.isEmpty()) {
HoodieTimer timer = new HoodieTimer().startTimer();
HoodieFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath);
Set<String> fileRowKeys = fileReader.filterRowKeys(new TreeSet<>(candidateRecordKeys));
foundRecordKeys.addAll(fileRowKeys);
LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,7 @@ public void performMergeDataValidationCheck(WriteStatus writeStatus) {
}

long oldNumWrites = 0;
try {
HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath);
try (HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath)) {
oldNumWrites = reader.getTotalRecords();
} catch (IOException e) {
throw new HoodieUpsertException("Failed to check for merge data validation", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,10 @@

package org.apache.hudi.table.action.commit;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.generic.GenericDatumReader;
Expand All @@ -35,14 +30,10 @@
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Iterator;

/**
* Helper to read records from previous version of base file and run Merge.
Expand Down Expand Up @@ -83,29 +74,6 @@ protected GenericRecord transformRecordBasedOnNewSchema(GenericDatumReader<Gener
}
}

/**
* Create Parquet record iterator that provides a stitched view of record read from skeleton and bootstrap file.
* Skeleton file is a representation of the bootstrap file inside the table, with just the bare bone fields needed
* for indexing, writing and other functionality.
*
*/
protected Iterator<GenericRecord> getMergingIterator(HoodieTable<T, I, K, O> table, HoodieMergeHandle<T, I, K, O> mergeHandle,
HoodieBaseFile baseFile, HoodieFileReader<GenericRecord> reader,
Schema readSchema, boolean externalSchemaTransformation) throws IOException {
Path externalFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath());
Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
HoodieFileReader<GenericRecord> bootstrapReader = HoodieFileReaderFactory.<GenericRecord>getFileReader(bootstrapFileConfig, externalFilePath);
Schema bootstrapReadSchema;
if (externalSchemaTransformation) {
bootstrapReadSchema = bootstrapReader.getSchema();
} else {
bootstrapReadSchema = mergeHandle.getWriterSchema();
}

return new MergingIterator<>(reader.getRecordIterator(readSchema), bootstrapReader.getRecordIterator(bootstrapReadSchema),
(inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
}

/**
* Consumer that dequeues records from queue and sends to Merge Handle.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.hudi.table.action.commit;

import org.apache.avro.SchemaCompatibility;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
Expand Down Expand Up @@ -78,11 +80,15 @@ public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<Hood
Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();

Configuration hadoopConf = new Configuration(table.getHadoopConf());
HoodieFileReader<GenericRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
HoodieFileReader<GenericRecord> bootstrapFileReader = null;

final GenericDatumWriter<GenericRecord> gWriter;
final GenericDatumReader<GenericRecord> gReader;
Schema readSchema;
if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {
readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema();
readSchema = baseFileReader.getSchema();
gWriter = new GenericDatumWriter<>(readSchema);
gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields());
} else {
Expand Down Expand Up @@ -126,7 +132,13 @@ public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<Hood
try {
final Iterator<GenericRecord> readerIterator;
if (baseFile.getBootstrapBaseFile().isPresent()) {
readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
Path bootstrapFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath());
Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
bootstrapFileReader = HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, bootstrapFilePath);
readerIterator = new MergingIterator<>(
baseFileReader.getRecordIterator(readSchema),
bootstrapFileReader.getRecordIterator(),
(inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
} else {
if (needToReWriteRecord) {
readerIterator = HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema, renameCols);
Expand All @@ -150,8 +162,9 @@ public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<Hood
} finally {
// HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
// and executor firstly and then close mergeHandle.
if (reader != null) {
reader.close();
baseFileReader.close();
if (bootstrapFileReader != null) {
bootstrapFileReader.close();
}
if (null != wrapper) {
wrapper.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,11 @@ private List<HoodieRecord<T>> readRecordsForGroupWithLogs(List<ClusteringOperati
clusteringOps.forEach(clusteringOp -> {
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new JavaTaskContextSupplier(), config);
LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
Option<HoodieFileReader> baseFileReader = Option.empty();
HoodieMergedLogRecordScanner scanner = null;
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(table.getMetaClient().getFs())
.withBasePath(table.getMetaClient().getBasePath())
.withLogFilePaths(clusteringOp.getDeltaFilePaths())
Expand All @@ -195,7 +197,7 @@ private List<HoodieRecord<T>> readRecordsForGroupWithLogs(List<ClusteringOperati
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build();

Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
? Option.empty()
: Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())));
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
Expand All @@ -208,6 +210,13 @@ private List<HoodieRecord<T>> readRecordsForGroupWithLogs(List<ClusteringOperati
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
} finally {
if (scanner != null) {
scanner.close();
}
if (baseFileReader.isPresent()) {
baseFileReader.get().close();
}
}
});
return records;
Expand All @@ -219,9 +228,8 @@ private List<HoodieRecord<T>> readRecordsForGroupWithLogs(List<ClusteringOperati
private List<HoodieRecord<T>> readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
List<HoodieRecord<T>> records = new ArrayList<>();
clusteringOps.forEach(clusteringOp -> {
try {
try (HoodieFileReader<IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()))) {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
HoodieFileReader<IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
Iterator<IndexedRecord> recordIterator = baseFileReader.getRecordIterator(readerSchema);
recordIterator.forEachRemaining(record -> records.add(transform(record)));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,20 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final Hood
boolean shouldPreserveMetadata = Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false);
// execute clustering for each group async and collect WriteStatus
Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
clusteringPlan.getInputGroups().stream()
.map(inputGroup -> {
if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) {
return runClusteringForGroupAsyncAsRow(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
instantTime);
}
return runClusteringForGroupAsync(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
instantTime);
})
.collect(Collectors.toList()))
clusteringPlan.getInputGroups().stream()
.map(inputGroup -> {
if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) {
return runClusteringForGroupAsyncAsRow(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
instantTime);
}
return runClusteringForGroupAsync(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
instantTime);
})
.collect(Collectors.toList()))
.join()
.stream();
JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));
Expand Down Expand Up @@ -194,7 +194,7 @@ private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String> strategy
Option<String[]> orderByColumnsOpt =
Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
.map(listStr -> listStr.split(","));

return orderByColumnsOpt.map(orderByColumns -> {
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy = getWriteConfig().getLayoutOptimizationStrategy();
switch (layoutOptStrategy) {
Expand Down Expand Up @@ -274,8 +274,8 @@ private HoodieData<HoodieRecord<T>> readRecordsForGroup(JavaSparkContext jsc, Ho
* Read records from baseFiles, apply updates and convert to RDD.
*/
private HoodieData<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext jsc,
List<ClusteringOperation> clusteringOps,
String instantTime) {
List<ClusteringOperation> clusteringOps,
String instantTime) {
HoodieWriteConfig config = getWriteConfig();
HoodieTable table = getHoodieTable();
return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> {
Expand Down Expand Up @@ -324,7 +324,7 @@ private HoodieData<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext
* Read records from baseFiles and convert to RDD.
*/
private HoodieData<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContext jsc,
List<ClusteringOperation> clusteringOps) {
List<ClusteringOperation> clusteringOps) {
SerializableConfiguration hadoopConf = new SerializableConfiguration(getHoodieTable().getHadoopConf());
HoodieWriteConfig writeConfig = getWriteConfig();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final Hood
return writeMetadata;
}


/**
* Submit job to execute clustering for the group.
*/
Expand All @@ -132,7 +131,6 @@ private Stream<WriteStatus> runClusteringForGroup(ClusteringGroupInfo clustering
.flatMap(writeStatusList -> writeStatusList.stream());
}


/**
* Execute clustering to write inputRecords into new files as defined by rules in strategy parameters.
* The number of new file groups created is bounded by numOutputGroups.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,6 @@
import java.io.DataOutput;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;

public class InputSplitUtils {

Expand All @@ -52,24 +44,4 @@ public static void writeBoolean(Boolean valueToWrite, DataOutput out) throws IOE
public static boolean readBoolean(DataInput in) throws IOException {
return in.readBoolean();
}

/**
* Return correct base-file schema based on split.
*
* @param split File Split
* @param conf Configuration
* @return
*/
public static Schema getBaseFileSchema(FileSplit split, Configuration conf) {
try {
if (split instanceof BootstrapBaseFileSplit) {
HoodieFileReader storageReader = HoodieFileReaderFactory.getFileReader(conf,
((BootstrapBaseFileSplit)(split)).getBootstrapFileSplit().getPath());
return HoodieAvroUtils.addMetadataFields(storageReader.getSchema());
}
return HoodieRealtimeRecordReaderUtils.readSchema(conf, split.getPath());
} catch (IOException e) {
throw new HoodieIOException("Failed to read footer for parquet " + split.getPath(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@

package org.apache.hudi.hadoop.utils;

import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
Expand All @@ -41,17 +43,11 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashSet;
Expand All @@ -67,18 +63,6 @@
public class HoodieRealtimeRecordReaderUtils {
private static final Logger LOG = LogManager.getLogger(HoodieRealtimeRecordReaderUtils.class);

/**
* Reads the schema from the base file.
*/
public static Schema readSchema(Configuration conf, Path filePath) {
try {
HoodieFileReader storageReader = HoodieFileReaderFactory.getFileReader(conf, filePath);
return storageReader.getSchema();
} catch (IOException e) {
throw new HoodieIOException("Failed to read schema from " + filePath, e);
}
}

/**
* get the max compaction memory in bytes from JobConf.
*/
Expand Down
Loading

0 comments on commit d5c3528

Please sign in to comment.