diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index 61be856d3662c..e3c2651718fd9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -18,8 +18,6 @@ package org.apache.hudi.index; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; @@ -35,6 +33,9 @@ import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -149,11 +150,10 @@ public static List filterKeysFromFile(Path filePath, List candid Configuration configuration) throws HoodieIndexException { ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath)); List foundRecordKeys = new ArrayList<>(); - try { + try (HoodieFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath)) { // Load all rowKeys from the file, to double-confirm if (!candidateRecordKeys.isEmpty()) { HoodieTimer timer = new HoodieTimer().startTimer(); - HoodieFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath); Set fileRowKeys = fileReader.filterRowKeys(new TreeSet<>(candidateRecordKeys)); foundRecordKeys.addAll(fileRowKeys); LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 82c6de576149f..6e172d01a6520 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -447,8 +447,7 @@ public void performMergeDataValidationCheck(WriteStatus writeStatus) { } long oldNumWrites = 0; - try { - HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath); + try (HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath)) { oldNumWrites = reader.getTotalRecords(); } catch (IOException e) { throw new HoodieUpsertException("Failed to check for merge data validation", e); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java index 5ead348140aa3..8c34e3c3a74ca 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java @@ -18,15 +18,10 @@ package org.apache.hudi.table.action.commit; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.client.utils.MergingIterator; -import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.HoodieMergeHandle; -import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.table.HoodieTable; import org.apache.avro.generic.GenericDatumReader; @@ -35,14 +30,10 @@ import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.Iterator; /** * Helper to read records from previous version of base file and run Merge. @@ -83,29 +74,6 @@ protected GenericRecord transformRecordBasedOnNewSchema(GenericDatumReader getMergingIterator(HoodieTable table, HoodieMergeHandle mergeHandle, - HoodieBaseFile baseFile, HoodieFileReader reader, - Schema readSchema, boolean externalSchemaTransformation) throws IOException { - Path externalFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath()); - Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf()); - HoodieFileReader bootstrapReader = HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, externalFilePath); - Schema bootstrapReadSchema; - if (externalSchemaTransformation) { - bootstrapReadSchema = bootstrapReader.getSchema(); - } else { - bootstrapReadSchema = mergeHandle.getWriterSchema(); - } - - return new MergingIterator<>(reader.getRecordIterator(readSchema), bootstrapReader.getRecordIterator(bootstrapReadSchema), - (inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields())); - } - /** * Consumer that dequeues records from queue and sends to Merge Handle. */ diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 5d1a55453d162..393ec976f2258 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -19,8 +19,10 @@ package org.apache.hudi.table.action.commit; import org.apache.avro.SchemaCompatibility; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.utils.MergingIterator; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; @@ -78,11 +80,15 @@ public void runMerge(HoodieTable>, HoodieData baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath()); + HoodieFileReader bootstrapFileReader = null; + final GenericDatumWriter gWriter; final GenericDatumReader gReader; Schema readSchema; if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) { - readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema(); + readSchema = baseFileReader.getSchema(); gWriter = new GenericDatumWriter<>(readSchema); gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields()); } else { @@ -126,7 +132,13 @@ public void runMerge(HoodieTable>, HoodieData readerIterator; if (baseFile.getBootstrapBaseFile().isPresent()) { - readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation); + Path bootstrapFilePath = new Path(baseFile.getBootstrapBaseFile().get().getPath()); + Configuration bootstrapFileConfig = new Configuration(table.getHadoopConf()); + bootstrapFileReader = HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, bootstrapFilePath); + readerIterator = new MergingIterator<>( + baseFileReader.getRecordIterator(readSchema), + bootstrapFileReader.getRecordIterator(), + (inputRecordPair) -> HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields())); } else { if (needToReWriteRecord) { readerIterator = HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema, renameCols); @@ -150,8 +162,9 @@ public void runMerge(HoodieTable>, HoodieData> readRecordsForGroupWithLogs(List { long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new JavaTaskContextSupplier(), config); LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction); + Option baseFileReader = Option.empty(); + HoodieMergedLogRecordScanner scanner = null; try { Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); - HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder() + scanner = HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(table.getMetaClient().getFs()) .withBasePath(table.getMetaClient().getBasePath()) .withLogFilePaths(clusteringOp.getDeltaFilePaths()) @@ -195,7 +197,7 @@ private List> readRecordsForGroupWithLogs(List baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) + baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) ? Option.empty() : Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()))); HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig(); @@ -208,6 +210,13 @@ private List> readRecordsForGroupWithLogs(List> readRecordsForGroupWithLogs(List> readRecordsForGroupBaseFiles(List clusteringOps) { List> records = new ArrayList<>(); clusteringOps.forEach(clusteringOp -> { - try { + try (HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()))) { Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); - HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath())); Iterator recordIterator = baseFileReader.getRecordIterator(readerSchema); recordIterator.forEachRemaining(record -> records.add(transform(record))); } catch (IOException e) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index b2308bd20a5df..f26d3743c8fa6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -111,20 +111,20 @@ public HoodieWriteMetadata> performClustering(final Hood boolean shouldPreserveMetadata = Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false); // execute clustering for each group async and collect WriteStatus Stream> writeStatusesStream = FutureUtils.allOf( - clusteringPlan.getInputGroups().stream() - .map(inputGroup -> { - if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) { - return runClusteringForGroupAsyncAsRow(inputGroup, - clusteringPlan.getStrategy().getStrategyParams(), - shouldPreserveMetadata, - instantTime); - } - return runClusteringForGroupAsync(inputGroup, - clusteringPlan.getStrategy().getStrategyParams(), - shouldPreserveMetadata, - instantTime); - }) - .collect(Collectors.toList())) + clusteringPlan.getInputGroups().stream() + .map(inputGroup -> { + if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) { + return runClusteringForGroupAsyncAsRow(inputGroup, + clusteringPlan.getStrategy().getStrategyParams(), + shouldPreserveMetadata, + instantTime); + } + return runClusteringForGroupAsync(inputGroup, + clusteringPlan.getStrategy().getStrategyParams(), + shouldPreserveMetadata, + instantTime); + }) + .collect(Collectors.toList())) .join() .stream(); JavaRDD[] writeStatuses = convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD)); @@ -194,7 +194,7 @@ private BulkInsertPartitioner getPartitioner(Map strategy Option orderByColumnsOpt = Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key())) .map(listStr -> listStr.split(",")); - + return orderByColumnsOpt.map(orderByColumns -> { HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy = getWriteConfig().getLayoutOptimizationStrategy(); switch (layoutOptStrategy) { @@ -274,8 +274,8 @@ private HoodieData> readRecordsForGroup(JavaSparkContext jsc, Ho * Read records from baseFiles, apply updates and convert to RDD. */ private HoodieData> readRecordsForGroupWithLogs(JavaSparkContext jsc, - List clusteringOps, - String instantTime) { + List clusteringOps, + String instantTime) { HoodieWriteConfig config = getWriteConfig(); HoodieTable table = getHoodieTable(); return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> { @@ -324,7 +324,7 @@ private HoodieData> readRecordsForGroupWithLogs(JavaSparkContext * Read records from baseFiles and convert to RDD. */ private HoodieData> readRecordsForGroupBaseFiles(JavaSparkContext jsc, - List clusteringOps) { + List clusteringOps) { SerializableConfiguration hadoopConf = new SerializableConfiguration(getHoodieTable().getHadoopConf()); HoodieWriteConfig writeConfig = getWriteConfig(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java index bb6d3df5f105e..e55bac0b172a4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java @@ -111,7 +111,6 @@ public HoodieWriteMetadata> performClustering(final Hood return writeMetadata; } - /** * Submit job to execute clustering for the group. */ @@ -132,7 +131,6 @@ private Stream runClusteringForGroup(ClusteringGroupInfo clustering .flatMap(writeStatusList -> writeStatusList.stream()); } - /** * Execute clustering to write inputRecords into new files as defined by rules in strategy parameters. * The number of new file groups created is bounded by numOutputGroups. diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java index e485e72c25755..5dcd66cd826d0 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java @@ -22,14 +22,6 @@ import java.io.DataOutput; import java.io.IOException; import java.nio.charset.StandardCharsets; -import org.apache.avro.Schema; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; -import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; public class InputSplitUtils { @@ -52,24 +44,4 @@ public static void writeBoolean(Boolean valueToWrite, DataOutput out) throws IOE public static boolean readBoolean(DataInput in) throws IOException { return in.readBoolean(); } - - /** - * Return correct base-file schema based on split. - * - * @param split File Split - * @param conf Configuration - * @return - */ - public static Schema getBaseFileSchema(FileSplit split, Configuration conf) { - try { - if (split instanceof BootstrapBaseFileSplit) { - HoodieFileReader storageReader = HoodieFileReaderFactory.getFileReader(conf, - ((BootstrapBaseFileSplit)(split)).getBootstrapFileSplit().getPath()); - return HoodieAvroUtils.addMetadataFields(storageReader.getSchema()); - } - return HoodieRealtimeRecordReaderUtils.readSchema(conf, split.getPath()); - } catch (IOException e) { - throw new HoodieIOException("Failed to read footer for parquet " + split.getPath(), e); - } - } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java index bf4cbff6665cf..ec8bb9034f122 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java @@ -18,6 +18,10 @@ package org.apache.hudi.hadoop.utils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; + import org.apache.avro.AvroRuntimeException; import org.apache.avro.JsonProperties; import org.apache.avro.LogicalTypes; @@ -25,8 +29,6 @@ import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; @@ -41,17 +43,11 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; -import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.io.IOException; import java.nio.ByteBuffer; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedHashSet; @@ -67,18 +63,6 @@ public class HoodieRealtimeRecordReaderUtils { private static final Logger LOG = LogManager.getLogger(HoodieRealtimeRecordReaderUtils.class); - /** - * Reads the schema from the base file. - */ - public static Schema readSchema(Configuration conf, Path filePath) { - try { - HoodieFileReader storageReader = HoodieFileReaderFactory.getFileReader(conf, filePath); - return storageReader.getSchema(); - } catch (IOException e) { - throw new HoodieIOException("Failed to read schema from " + filePath, e); - } - } - /** * get the max compaction memory in bytes from JobConf. */ diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index 9d1b0cbd8730d..d1d98475eae1f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -152,7 +152,7 @@ public class HoodieMetadataTableValidator implements Serializable { // Properties with source, hoodie client, key generator etc. private TypedProperties props; - private HoodieTableMetaClient metaClient; + private final HoodieTableMetaClient metaClient; protected transient Option asyncMetadataTableValidateService; @@ -940,10 +940,10 @@ public int compare(HoodieColumnRangeMetadata o1, HoodieColumnRangeMe * verified in the {@link HoodieMetadataTableValidator}. */ private static class HoodieMetadataValidationContext implements Serializable { - private HoodieTableMetaClient metaClient; - private HoodieTableFileSystemView fileSystemView; - private HoodieTableMetadata tableMetadata; - private boolean enableMetadataTable; + private final HoodieTableMetaClient metaClient; + private final HoodieTableFileSystemView fileSystemView; + private final HoodieTableMetadata tableMetadata; + private final boolean enableMetadataTable; private List allColumnNameList; public HoodieMetadataValidationContext( @@ -1038,30 +1038,29 @@ private List getAllColumnNames() { TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient); try { return schemaResolver.getTableAvroSchema().getFields().stream() - .map(entry -> entry.name()).collect(Collectors.toList()); + .map(Schema.Field::name).collect(Collectors.toList()); } catch (Exception e) { throw new HoodieException("Failed to get all column names for " + metaClient.getBasePath()); } } private Option readBloomFilterFromFile(String partitionPath, String filename) { - Path path = new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPath), filename); - HoodieFileReader fileReader; - try { - fileReader = HoodieFileReaderFactory.getFileReader(metaClient.getHadoopConf(), path); + Path path = new Path(FSUtils.getPartitionPath(metaClient.getBasePathV2(), partitionPath), filename); + BloomFilter bloomFilter; + try (HoodieFileReader fileReader = HoodieFileReaderFactory.getFileReader(metaClient.getHadoopConf(), path)) { + bloomFilter = fileReader.readBloomFilter(); + if (bloomFilter == null) { + Log.error("Failed to read bloom filter for " + path); + return Option.empty(); + } } catch (IOException e) { Log.error("Failed to get file reader for " + path + " " + e.getMessage()); return Option.empty(); } - final BloomFilter fileBloomFilter = fileReader.readBloomFilter(); - if (fileBloomFilter == null) { - Log.error("Failed to read bloom filter for " + path); - return Option.empty(); - } return Option.of(BloomFilterData.builder() .setPartitionPath(partitionPath) .setFilename(filename) - .setBloomFilter(ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes())) + .setBloomFilter(ByteBuffer.wrap(bloomFilter.serializeToString().getBytes())) .build()); } }