Skip to content

Commit

Permalink
[HUDI-5353] Close file readers (apache#7412)
Browse files Browse the repository at this point in the history
  • Loading branch information
codope authored and Alexey Kudinkin committed Dec 14, 2022
1 parent 68361fa commit 6e6940f
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi.index;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
Expand All @@ -35,6 +33,9 @@
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -149,11 +150,10 @@ public static List<String> filterKeysFromFile(Path filePath, List<String> candid
Configuration configuration) throws HoodieIndexException {
ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath));
List<String> foundRecordKeys = new ArrayList<>();
try {
try (HoodieFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath)) {
// Load all rowKeys from the file, to double-confirm
if (!candidateRecordKeys.isEmpty()) {
HoodieTimer timer = new HoodieTimer().startTimer();
HoodieFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath);
Set<String> fileRowKeys = fileReader.filterRowKeys(new TreeSet<>(candidateRecordKeys));
foundRecordKeys.addAll(fileRowKeys);
LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,7 @@ public void performMergeDataValidationCheck(WriteStatus writeStatus) {
}

long oldNumWrites = 0;
try {
HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath);
try (HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath)) {
oldNumWrites = reader.getTotalRecords();
} catch (IOException e) {
throw new HoodieUpsertException("Failed to check for merge data validation", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,10 @@

package org.apache.hudi.table.action.commit;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.generic.GenericDatumReader;
Expand All @@ -35,14 +30,10 @@
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Iterator;

/**
* Helper to read records from previous version of base file and run Merge.
Expand Down Expand Up @@ -83,29 +74,6 @@ protected GenericRecord transformRecordBasedOnNewSchema(GenericDatumReader<Gener
}
}

/**
* Create Parquet record iterator that provides a stitched view of record read from skeleton and bootstrap file.
* Skeleton file is a representation of the bootstrap file inside the table, with just the bare bone fields needed
* for indexing, writing and other functionality.
*
*/
protected Iterator<GenericRecord> getMergingIterator(HoodieTable<T, I, K, O> table, HoodieMergeHandle<T, I, K, O> mergeHandle,
HoodieBaseFile baseFile, HoodieFileReader<GenericRecord> reader,
Schema readSchema, boolean externalSchemaTransformation) throws IOException {
Path externalFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath());
Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
HoodieFileReader<GenericRecord> bootstrapReader = HoodieFileReaderFactory.<GenericRecord>getFileReader(bootstrapFileConfig, externalFilePath);
Schema bootstrapReadSchema;
if (externalSchemaTransformation) {
bootstrapReadSchema = bootstrapReader.getSchema();
} else {
bootstrapReadSchema = mergeHandle.getWriterSchema();
}

return new MergingIterator<>(reader.getRecordIterator(readSchema), bootstrapReader.getRecordIterator(bootstrapReadSchema),
(inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
}

/**
* Consumer that dequeues records from queue and sends to Merge Handle.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.hudi.table.action.commit;

import org.apache.avro.SchemaCompatibility;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
Expand Down Expand Up @@ -78,11 +80,15 @@ public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<Hood
Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();

Configuration hadoopConf = new Configuration(table.getHadoopConf());
HoodieFileReader<GenericRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
HoodieFileReader<GenericRecord> bootstrapFileReader = null;

final GenericDatumWriter<GenericRecord> gWriter;
final GenericDatumReader<GenericRecord> gReader;
Schema readSchema;
if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {
readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema();
readSchema = baseFileReader.getSchema();
gWriter = new GenericDatumWriter<>(readSchema);
gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields());
} else {
Expand All @@ -92,7 +98,6 @@ public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<Hood
}

BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());

Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(table.getConfig().getInternalSchema());
boolean needToReWriteRecord = false;
Expand Down Expand Up @@ -126,12 +131,18 @@ public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<Hood
try {
final Iterator<GenericRecord> readerIterator;
if (baseFile.getBootstrapBaseFile().isPresent()) {
readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
Path bootstrapFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath());
Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
bootstrapFileReader = HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, bootstrapFilePath);
readerIterator = new MergingIterator<>(
baseFileReader.getRecordIterator(readSchema),
bootstrapFileReader.getRecordIterator(),
(inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
} else {
if (needToReWriteRecord) {
readerIterator = HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema, renameCols);
readerIterator = HoodieAvroUtils.rewriteRecordWithNewSchema(baseFileReader.getRecordIterator(), readSchema, renameCols);
} else {
readerIterator = reader.getRecordIterator(readSchema);
readerIterator = baseFileReader.getRecordIterator(readSchema);
}
}

Expand All @@ -150,8 +161,9 @@ public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<Hood
} finally {
// HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
// and executor firstly and then close mergeHandle.
if (reader != null) {
reader.close();
baseFileReader.close();
if (bootstrapFileReader != null) {
bootstrapFileReader.close();
}
if (null != wrapper) {
wrapper.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

package org.apache.hudi.table.action.commit;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
Expand Down Expand Up @@ -68,10 +71,14 @@ public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List
final GenericDatumReader<GenericRecord> gReader;
Schema readSchema;

Configuration hadoopConf = new Configuration(table.getHadoopConf());
HoodieFileReader<GenericRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
HoodieFileReader<GenericRecord> bootstrapFileReader = null;

final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {
readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema();
readSchema = baseFileReader.getSchema();
gWriter = new GenericDatumWriter<>(readSchema);
gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields());
} else {
Expand All @@ -81,14 +88,18 @@ public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List
}

BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
try {
final Iterator<GenericRecord> readerIterator;
if (baseFile.getBootstrapBaseFile().isPresent()) {
readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
Path bootstrapFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath());
Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
bootstrapFileReader = HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, bootstrapFilePath);
readerIterator = new MergingIterator<>(
baseFileReader.getRecordIterator(readSchema),
bootstrapFileReader.getRecordIterator(),
(inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
} else {
readerIterator = reader.getRecordIterator(readSchema);
readerIterator = baseFileReader.getRecordIterator(readSchema);
}

ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
Expand All @@ -106,8 +117,9 @@ public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List
} finally {
// HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
// and executor firstly and then close mergeHandle.
if (reader != null) {
reader.close();
baseFileReader.close();
if (bootstrapFileReader != null) {
bootstrapFileReader.close();
}
if (null != wrapper) {
wrapper.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,11 @@ private List<HoodieRecord<T>> readRecordsForGroupWithLogs(List<ClusteringOperati
clusteringOps.forEach(clusteringOp -> {
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new JavaTaskContextSupplier(), config);
LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
Option<HoodieFileReader> baseFileReader = Option.empty();
HoodieMergedLogRecordScanner scanner = null;
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(table.getMetaClient().getFs())
.withBasePath(table.getMetaClient().getBasePath())
.withLogFilePaths(clusteringOp.getDeltaFilePaths())
Expand All @@ -195,7 +197,7 @@ private List<HoodieRecord<T>> readRecordsForGroupWithLogs(List<ClusteringOperati
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build();

Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
? Option.empty()
: Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())));
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
Expand All @@ -208,6 +210,13 @@ private List<HoodieRecord<T>> readRecordsForGroupWithLogs(List<ClusteringOperati
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
} finally {
if (scanner != null) {
scanner.close();
}
if (baseFileReader.isPresent()) {
baseFileReader.get().close();
}
}
});
return records;
Expand All @@ -219,9 +228,8 @@ private List<HoodieRecord<T>> readRecordsForGroupWithLogs(List<ClusteringOperati
private List<HoodieRecord<T>> readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
List<HoodieRecord<T>> records = new ArrayList<>();
clusteringOps.forEach(clusteringOp -> {
try {
try (HoodieFileReader<IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()))) {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
HoodieFileReader<IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
Iterator<IndexedRecord> recordIterator = baseFileReader.getRecordIterator(readerSchema);
recordIterator.forEachRemaining(record -> records.add(transform(record)));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

package org.apache.hudi.table.action.commit;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
Expand Down Expand Up @@ -66,11 +69,15 @@ public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List
HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> mergeHandle = upsertHandle;
HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();

Configuration hadoopConf = new Configuration(table.getHadoopConf());
HoodieFileReader<GenericRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
HoodieFileReader<GenericRecord> bootstrapFileReader = null;

final GenericDatumWriter<GenericRecord> gWriter;
final GenericDatumReader<GenericRecord> gReader;
Schema readSchema;
if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {
readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema();
readSchema = baseFileReader.getSchema();
gWriter = new GenericDatumWriter<>(readSchema);
gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields());
} else {
Expand All @@ -80,13 +87,18 @@ public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List
}

BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
try {
final Iterator<GenericRecord> readerIterator;
if (baseFile.getBootstrapBaseFile().isPresent()) {
readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
Path bootstrapFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath());
Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf());
bootstrapFileReader = HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, bootstrapFilePath);
readerIterator = new MergingIterator<>(
baseFileReader.getRecordIterator(readSchema),
bootstrapFileReader.getRecordIterator(),
(inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
} else {
readerIterator = reader.getRecordIterator(readSchema);
readerIterator = baseFileReader.getRecordIterator(readSchema);
}

ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
Expand All @@ -104,8 +116,9 @@ public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List
} finally {
// HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting
// and executor firstly and then close mergeHandle.
if (reader != null) {
reader.close();
baseFileReader.close();
if (bootstrapFileReader != null) {
bootstrapFileReader.close();
}
if (null != wrapper) {
wrapper.shutdownNow();
Expand Down
Loading

0 comments on commit 6e6940f

Please sign in to comment.