Skip to content

Commit

Permalink
[HUDI-8079] Get rid of other count case for fg reader (apache#11774)
Browse files Browse the repository at this point in the history
Co-authored-by: Jonathan Vexler <=>
  • Loading branch information
jonvex authored Aug 15, 2024
1 parent 367ca8c commit 31eea3b
Showing 1 changed file with 28 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.parquet
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, HoodieTableSchema, HoodieTableState, SparkAdapterSupport, SparkFileFormatInternalRowReaderContext}
import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
import org.apache.hudi.avro.AvroSchemaUtils
import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation, HoodieCDCFileGroupSplit}
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
Expand Down Expand Up @@ -114,8 +113,6 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental
val augmentedStorageConf = new HadoopStorageConfiguration(hadoopConf).getInline
setSchemaEvolutionConfigs(augmentedStorageConf)
val baseFileReader = super.buildReaderWithPartitionValues(spark, dataSchema, partitionSchema, requiredSchema,
filters ++ requiredFilters, options, augmentedStorageConf.unwrapCopy())
val cdcFileReader = if (isCDC) {
super.buildReaderWithPartitionValues(
spark,
Expand Down Expand Up @@ -144,39 +141,34 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
val filegroupName = FSUtils.getFileIdFromFilePath(sparkAdapter
.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file))
fileSliceMapping.getSlice(filegroupName) match {
case Some(fileSlice) if !isCount =>
if (requiredSchema.isEmpty && !fileSlice.getLogFiles.findAny().isPresent) {
val hoodieBaseFile = fileSlice.getBaseFile.get()
baseFileReader(createPartitionedFile(fileSliceMapping.getPartitionValues, hoodieBaseFile.getStoragePath, 0, hoodieBaseFile.getFileLen))
} else {
val readerContext = new SparkFileFormatInternalRowReaderContext(parquetFileReader.value, tableState.recordKeyField, filters, requiredFilters)
val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
.builder().setConf(storageConf).setBasePath(tableState.tablePath).build
val props = metaClient.getTableConfig.getProps
options.foreach(kv => props.setProperty(kv._1, kv._2))
val reader = new HoodieFileGroupReader[InternalRow](
readerContext,
new HoodieHadoopStorage(metaClient.getBasePath, storageConf),
tableState.tablePath,
tableState.latestCommitTimestamp.get,
fileSlice,
broadcastedDataSchema.value,
broadcastedRequestedSchema.value,
internalSchemaOpt,
metaClient,
props,
file.start,
file.length,
shouldUseRecordPosition)
reader.initRecordIterators()
// Append partition values to rows and project to output schema
appendPartitionAndProject(
reader.getClosableIterator,
requiredSchema,
partitionSchema,
outputSchema,
fileSliceMapping.getPartitionValues)
}
case Some(fileSlice) if !isCount && (requiredSchema.nonEmpty || fileSlice.getLogFiles.findAny().isPresent) =>
val readerContext = new SparkFileFormatInternalRowReaderContext(parquetFileReader.value, tableState.recordKeyField, filters, requiredFilters)
val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
.builder().setConf(storageConf).setBasePath(tableState.tablePath).build
val props = metaClient.getTableConfig.getProps
options.foreach(kv => props.setProperty(kv._1, kv._2))
val reader = new HoodieFileGroupReader[InternalRow](
readerContext,
new HoodieHadoopStorage(metaClient.getBasePath, storageConf),
tableState.tablePath,
tableState.latestCommitTimestamp.get,
fileSlice,
broadcastedDataSchema.value,
broadcastedRequestedSchema.value,
internalSchemaOpt,
metaClient,
props,
file.start,
file.length,
shouldUseRecordPosition)
reader.initRecordIterators()
// Append partition values to rows and project to output schema
appendPartitionAndProject(
reader.getClosableIterator,
requiredSchema,
partitionSchema,
outputSchema,
fileSliceMapping.getPartitionValues)

case _ => parquetFileReader.value.read(file, requiredSchema, partitionSchema, internalSchemaOpt, filters, storageConf)
}
Expand Down

0 comments on commit 31eea3b

Please sign in to comment.