Skip to content

Commit

Permalink
[HUDI-3979] Optimize out mandatory columns when no merging is perform…
Browse files Browse the repository at this point in the history
…ed (#5430)

For MOR, when no merging is performed there is no point in reading either primary-key or pre-combine-key values (unless query is referencing these). Avoiding reading these allows to potentially save substantial resources wasted for reading it out.
  • Loading branch information
Alexey Kudinkin authored Jul 22, 2022
1 parent 6b84384 commit 39f2a06
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean =
internalSchemaOpt.isEmpty

override lazy val mandatoryFields: Seq[String] =
// TODO reconcile, record's key shouldn't be mandatory for base-file only relation
Seq(recordKeyField)
override lazy val mandatoryFields: Seq[String] = Seq.empty

override def imbueConfigs(sqlContext: SQLContext): Unit = {
super.imbueConfigs(sqlContext)
Expand All @@ -73,6 +71,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
partitionSchema: StructType,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
filters: Array[Filter]): HoodieUnsafeRDD = {

val baseFileReader = createBaseFileReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath, projectSchema}
import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath, projectSchema}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
Expand Down Expand Up @@ -204,6 +204,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath
}

/**
* NOTE: This fields are accessed by [[NestedSchemaPruning]] component which is only enabled for
* Spark >= 3.1
*/
lazy val (fileFormat: FileFormat, fileFormatClassName: String) =
metaClient.getTableConfig.getBaseFileFormat match {
case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
Expand Down Expand Up @@ -258,12 +262,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
*
* Check scala-doc for [[shouldExtractPartitionValuesFromPartitionPath]] for more details
*/
def dataSchema: StructType =
if (shouldExtractPartitionValuesFromPartitionPath) {
prunePartitionColumns(tableStructSchema)
} else {
tableStructSchema
}
def dataSchema: StructType = if (shouldExtractPartitionValuesFromPartitionPath) {
prunePartitionColumns(tableStructSchema)
} else {
tableStructSchema
}

/**
* Determines whether relation's schema could be pruned by Spark's Optimizer
Expand Down Expand Up @@ -346,7 +349,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
if (fileSplits.isEmpty) {
sparkSession.sparkContext.emptyRDD
} else {
val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, requiredDataSchema, filters)
val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, requiredDataSchema, targetColumns, filters)

// NOTE: In case when partition columns have been pruned from the required schema, we have to project
// the rows from the pruned schema back into the one expected by the caller
Expand All @@ -369,17 +372,19 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
/**
* Composes RDD provided file splits to read from, table and partition schemas, data filters to be applied
*
* @param fileSplits file splits to be handled by the RDD
* @param partitionSchema target table's partition schema
* @param dataSchema target table's data files' schema
* @param requiredSchema projected schema required by the reader
* @param filters data filters to be applied
* @param fileSplits file splits to be handled by the RDD
* @param partitionSchema target table's partition schema
* @param dataSchema target table's data files' schema
* @param requiredSchema projected schema required by the reader
* @param requestedColumns columns requested by the query
* @param filters data filters to be applied
* @return instance of RDD (implementing [[HoodieUnsafeRDD]])
*/
protected def composeRDD(fileSplits: Seq[FileSplit],
partitionSchema: StructType,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
filters: Array[Filter]): HoodieUnsafeRDD

/**
Expand Down Expand Up @@ -551,37 +556,48 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
requiredSchema: HoodieTableSchema,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
val hfileReader = createHFileReader(
spark = spark,
dataSchema = dataSchema,
requiredSchema = requiredSchema,
filters = filters,
options = options,
hadoopConf = hadoopConf
)

val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = spark,
dataSchema = dataSchema.structTypeSchema,
partitionSchema = partitionSchema,
requiredSchema = requiredSchema.structTypeSchema,
filters = filters,
options = options,
hadoopConf = hadoopConf,
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
)
hadoopConf: Configuration): BaseFileReader = {
val tableBaseFileFormat = tableConfig.getBaseFileFormat

// NOTE: PLEASE READ CAREFULLY
// Lambda returned from this method is going to be invoked on the executor, and therefore
// we have to eagerly initialize all of the readers even though only one specific to the type
// of the file being read will be used. This is required to avoid serialization of the whole
// relation (containing file-index for ex) and passing it to the executor
val reader = tableBaseFileFormat match {
case HoodieFileFormat.PARQUET =>
HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = spark,
dataSchema = dataSchema.structTypeSchema,
partitionSchema = partitionSchema,
requiredSchema = requiredSchema.structTypeSchema,
filters = filters,
options = options,
hadoopConf = hadoopConf,
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
)

case HoodieFileFormat.HFILE =>
createHFileReader(
spark = spark,
dataSchema = dataSchema,
requiredSchema = requiredSchema,
filters = filters,
options = options,
hadoopConf = hadoopConf
)

case _ => throw new UnsupportedOperationException(s"Base file format is not currently supported ($tableBaseFileFormat)")
}

partitionedFile => {
val extension = FSUtils.getFileExtension(partitionedFile.filePath)
if (HoodieFileFormat.PARQUET.getFileExtension.equals(extension)) {
parquetReader.apply(partitionedFile)
} else if (HoodieFileFormat.HFILE.getFileExtension.equals(extension)) {
hfileReader.apply(partitionedFile)
if (tableBaseFileFormat.getFileExtension.equals(extension)) {
reader.apply(partitionedFile)
} else {
throw new UnsupportedOperationException(s"Base file format not supported by Spark DataSource ($partitionedFile)")
throw new UnsupportedOperationException(s"Invalid base-file format ($extension), expected ($tableBaseFileFormat)")
}
}
}
Expand Down Expand Up @@ -629,6 +645,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

object HoodieBaseRelation extends SparkAdapterSupport {

type BaseFileReader = PartitionedFile => Iterator[InternalRow]

private def generateUnsafeProjection(from: StructType, to: StructType) =
sparkAdapter.getCatalystExpressionUtils().generateUnsafeProjection(from, to)

Expand Down Expand Up @@ -676,7 +694,7 @@ object HoodieBaseRelation extends SparkAdapterSupport {
requiredSchema: HoodieTableSchema,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
hadoopConf: Configuration): BaseFileReader = {
val hadoopConfBroadcast =
spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedReco
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.BaseFileReader
import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability}
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
Expand Down Expand Up @@ -55,11 +56,14 @@ import scala.util.Try

case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition

case class HoodieMergeOnReadBaseFileReaders(fullSchemaFileReader: BaseFileReader,
requiredSchemaFileReaderForMerging: BaseFileReader,
requiredSchemaFileReaderForNoMerging: BaseFileReader)

class HoodieMergeOnReadRDD(@transient sc: SparkContext,
@transient config: Configuration,
fullSchemaFileReader: PartitionedFile => Iterator[InternalRow],
requiredSchemaFileReader: PartitionedFile => Iterator[InternalRow],
tableSchema: HoodieTableSchema,
fileReaders: HoodieMergeOnReadBaseFileReaders,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
tableState: HoodieTableState,
mergeType: String,
Expand All @@ -86,13 +90,13 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
val iter = mergeOnReadPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
requiredSchemaFileReader.apply(dataFileOnlySplit.dataFile.get)
fileReaders.requiredSchemaFileReaderForNoMerging.apply(dataFileOnlySplit.dataFile.get)

case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
new LogFileIterator(logFileOnlySplit, getConfig)

case split if mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
val baseFileIterator = requiredSchemaFileReader.apply(split.dataFile.get)
val baseFileIterator = fileReaders.requiredSchemaFileReaderForNoMerging.apply(split.dataFile.get)
new SkipMergeIterator(split, baseFileIterator, getConfig)

case split if mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
Expand Down Expand Up @@ -126,9 +130,9 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
// then we can avoid reading and parsing the records w/ _full_ schema, and instead only
// rely on projected one, nevertheless being able to perform merging correctly
if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName))
(fullSchemaFileReader(split.dataFile.get), tableSchema)
(fileReaders.fullSchemaFileReader(split.dataFile.get), dataSchema)
else
(requiredSchemaFileReader(split.dataFile.get), requiredSchema)
(fileReaders.requiredSchemaFileReaderForMerging(split.dataFile.get), requiredSchema)
}

override protected def getPartitions: Array[Partition] =
Expand All @@ -152,7 +156,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
protected override val requiredAvroSchema: Schema = new Schema.Parser().parse(requiredSchema.avroSchemaStr)
protected override val requiredStructTypeSchema: StructType = requiredSchema.structTypeSchema

protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(tableSchema.avroSchemaStr)
protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(dataSchema.avroSchemaStr)

protected val recordBuilder: GenericRecordBuilder = new GenericRecordBuilder(requiredAvroSchema)
protected var recordToLoad: InternalRow = _
Expand All @@ -167,7 +171,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema)

private var logScanner = {
val internalSchema = tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
val internalSchema = dataSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, tableState,
maxCompactionMemoryInBytes, config, internalSchema)
}
Expand Down Expand Up @@ -232,7 +236,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
override def hasNext: Boolean = {
if (baseFileIterator.hasNext) {
val curRow = baseFileIterator.next()
recordToLoad = unsafeProjection(curRow)
recordToLoad = curRow
true
} else {
super[LogFileIterator].hasNext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
partitionSchema: StructType,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
filters: Array[Filter]): HoodieMergeOnReadRDD = {
val fullSchemaParquetReader = createBaseFileReader(
spark = sqlContext.sparkSession,
Expand All @@ -81,23 +82,25 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
hadoopConf = embedInternalSchema(new Configuration(conf), internalSchemaOpt)
)

val requiredSchemaParquetReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
requiredSchema = requiredSchema,
filters = filters ++ incrementalSpanRecordFilters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema)
)
val (requiredSchemaBaseFileReaderMerging, requiredSchemaBaseFileReaderNoMerging) =
createMergeOnReadBaseFileReaders(partitionSchema, dataSchema, requiredSchema, requestedColumns, filters ++ incrementalSpanRecordFilters)

val hoodieTableState = getTableState
// TODO(HUDI-3639) implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately
// filtered, since file-reader might not be capable to perform filtering
new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, requiredSchemaParquetReader,
dataSchema, requiredSchema, hoodieTableState, mergeType, fileSplits)
new HoodieMergeOnReadRDD(
sqlContext.sparkContext,
config = jobConf,
fileReaders = HoodieMergeOnReadBaseFileReaders(
fullSchemaFileReader = fullSchemaParquetReader,
requiredSchemaFileReaderForMerging = requiredSchemaBaseFileReaderMerging,
requiredSchemaFileReaderForNoMerging = requiredSchemaBaseFileReaderNoMerging
),
dataSchema = dataSchema,
requiredSchema = requiredSchema,
tableState = hoodieTableState,
mergeType = mergeType,
fileSplits = fileSplits)
}

override protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
Expand Down
Loading

0 comments on commit 39f2a06

Please sign in to comment.