Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3979] Optimize out mandatory columns when no merging is performed #5430

Merged
merged 25 commits into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
bb70f97
Injecting Spark Session extensions for `TestMORDataSource`
Apr 23, 2022
f559585
Tidying up
Apr 26, 2022
2de5e0e
Fixed `createBaseFileReader` seq to only create base-file readers bas…
Apr 26, 2022
b3e8fff
Bifurcate cases when merging required and is not w/in `HoodieMergeOnR…
Apr 26, 2022
853a075
Fixed `MergeOnReadSnapshotRelation`
Apr 26, 2022
72e32c3
Extracted `MergeOnReadFileReaders`
Apr 26, 2022
ba57347
Rebased `MergeOnReadIncrementalRelation`
Apr 26, 2022
a540b19
Fixed MOR schema-pruning seq to prune only columns required for mergi…
Apr 26, 2022
debaf46
Remove mandatory fields for `BaseFileOnlyRelation`
Apr 26, 2022
2644e50
Fixing compilation
Apr 26, 2022
57b85f9
Fixed required-for-merging columns filtering seq, to avoid filtering …
Apr 26, 2022
e1ff7f5
Rebased `HoodieMergeOnReadRDD` to accept file readers as a tuple
Apr 26, 2022
f2bfcf1
Abstracted `BaseFileReader` type def
Apr 26, 2022
8506514
Avoid unneccessary unsafe projection
Apr 26, 2022
42b241e
Fixed required schema pruning in cases when columns required for merg…
Apr 26, 2022
c070075
Tidying up
Apr 26, 2022
a00bd5d
Re-enabling tests
Apr 26, 2022
976ca7d
Re-enable erroneously commented out fields
Apr 26, 2022
961628b
Fixed `dataSchema` to only strip out partition columns in case when p…
Apr 27, 2022
5468f2b
Fixing compilation
May 28, 2022
0a1470f
Disabling test
Jul 20, 2022
65b9bb3
Fixing typo;
Jul 21, 2022
50c4247
Disabled test
Jul 21, 2022
598c6ca
Fallback to default for hive-style partitioning, url-encoding configs
Jul 22, 2022
0ed8c2e
Always dump stdout (since sometimes spark-shell wouldn't exit, and do…
Jul 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,11 @@ public String getKeyGeneratorClassName() {
}

public String getHiveStylePartitioningEnable() {
return getString(HIVE_STYLE_PARTITIONING_ENABLE);
return getStringOrDefault(HIVE_STYLE_PARTITIONING_ENABLE);
}

public String getUrlEncodePartitioning() {
return getString(URL_ENCODE_PARTITIONING);
return getStringOrDefault(URL_ENCODE_PARTITIONING);
}

public Boolean shouldDropPartitionColumns() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,8 @@ private TestExecStartResultCallback executeCommandInDocker(String containerName,
}
int exitCode = dockerClient.inspectExecCmd(createCmdResponse.getId()).exec().getExitCode();
LOG.info("Exit code for command : " + exitCode);
if (exitCode != 0) {
LOG.error("\n\n ###### Stdout #######\n" + callback.getStdout().toString());
}

LOG.error("\n\n ###### Stdout #######\n" + callback.getStdout().toString());
Comment on lines +238 to +239
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert the logging changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This are intended: these are changes replicated from #6175

LOG.error("\n\n ###### Stderr #######\n" + callback.getStderr().toString());

if (checkIfSucceed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean =
internalSchemaOpt.isEmpty

override lazy val mandatoryFields: Seq[String] =
// TODO reconcile, record's key shouldn't be mandatory for base-file only relation
Seq(recordKeyField)
override lazy val mandatoryFields: Seq[String] = Seq.empty

override def imbueConfigs(sqlContext: SQLContext): Unit = {
super.imbueConfigs(sqlContext)
Expand All @@ -73,6 +71,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
partitionSchema: StructType,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
filters: Array[Filter]): HoodieUnsafeRDD = {

val baseFileReader = createBaseFileReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath, projectSchema}
import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath, projectSchema}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
Expand Down Expand Up @@ -204,6 +204,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath
}

/**
* NOTE: This fields are accessed by [[NestedSchemaPruning]] component which is only enabled for
* Spark >= 3.1
*/
lazy val (fileFormat: FileFormat, fileFormatClassName: String) =
metaClient.getTableConfig.getBaseFileFormat match {
case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
Expand Down Expand Up @@ -258,12 +262,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
*
* Check scala-doc for [[shouldExtractPartitionValuesFromPartitionPath]] for more details
*/
def dataSchema: StructType =
if (shouldExtractPartitionValuesFromPartitionPath) {
prunePartitionColumns(tableStructSchema)
} else {
tableStructSchema
}
def dataSchema: StructType = if (shouldExtractPartitionValuesFromPartitionPath) {
prunePartitionColumns(tableStructSchema)
} else {
tableStructSchema
}

/**
* Determines whether relation's schema could be pruned by Spark's Optimizer
Expand Down Expand Up @@ -346,7 +349,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
if (fileSplits.isEmpty) {
sparkSession.sparkContext.emptyRDD
} else {
val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, requiredDataSchema, filters)
val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, requiredDataSchema, targetColumns, filters)

// NOTE: In case when partition columns have been pruned from the required schema, we have to project
// the rows from the pruned schema back into the one expected by the caller
Expand All @@ -369,17 +372,19 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
/**
* Composes RDD provided file splits to read from, table and partition schemas, data filters to be applied
*
* @param fileSplits file splits to be handled by the RDD
* @param partitionSchema target table's partition schema
* @param dataSchema target table's data files' schema
* @param requiredSchema projected schema required by the reader
* @param filters data filters to be applied
* @param fileSplits file splits to be handled by the RDD
* @param partitionSchema target table's partition schema
* @param dataSchema target table's data files' schema
* @param requiredSchema projected schema required by the reader
* @param requestedColumns columns requested by the query
* @param filters data filters to be applied
* @return instance of RDD (implementing [[HoodieUnsafeRDD]])
*/
protected def composeRDD(fileSplits: Seq[FileSplit],
partitionSchema: StructType,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
filters: Array[Filter]): HoodieUnsafeRDD

/**
Expand Down Expand Up @@ -551,37 +556,48 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
requiredSchema: HoodieTableSchema,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
val hfileReader = createHFileReader(
spark = spark,
dataSchema = dataSchema,
requiredSchema = requiredSchema,
filters = filters,
options = options,
hadoopConf = hadoopConf
)

val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = spark,
dataSchema = dataSchema.structTypeSchema,
partitionSchema = partitionSchema,
requiredSchema = requiredSchema.structTypeSchema,
filters = filters,
options = options,
hadoopConf = hadoopConf,
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
)
hadoopConf: Configuration): BaseFileReader = {
val tableBaseFileFormat = tableConfig.getBaseFileFormat

// NOTE: PLEASE READ CAREFULLY
// Lambda returned from this method is going to be invoked on the executor, and therefore
// we have to eagerly initialize all of the readers even though only one specific to the type
// of the file being read will be used. This is required to avoid serialization of the whole
// relation (containing file-index for ex) and passing it to the executor
val reader = tableBaseFileFormat match {
case HoodieFileFormat.PARQUET =>
HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = spark,
dataSchema = dataSchema.structTypeSchema,
partitionSchema = partitionSchema,
requiredSchema = requiredSchema.structTypeSchema,
filters = filters,
options = options,
hadoopConf = hadoopConf,
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
)

case HoodieFileFormat.HFILE =>
createHFileReader(
spark = spark,
dataSchema = dataSchema,
requiredSchema = requiredSchema,
filters = filters,
options = options,
hadoopConf = hadoopConf
)

case _ => throw new UnsupportedOperationException(s"Base file format is not currently supported ($tableBaseFileFormat)")
}

partitionedFile => {
val extension = FSUtils.getFileExtension(partitionedFile.filePath)
if (HoodieFileFormat.PARQUET.getFileExtension.equals(extension)) {
parquetReader.apply(partitionedFile)
} else if (HoodieFileFormat.HFILE.getFileExtension.equals(extension)) {
hfileReader.apply(partitionedFile)
if (tableBaseFileFormat.getFileExtension.equals(extension)) {
reader.apply(partitionedFile)
} else {
throw new UnsupportedOperationException(s"Base file format not supported by Spark DataSource ($partitionedFile)")
throw new UnsupportedOperationException(s"Invalid base-file format ($extension), expected ($tableBaseFileFormat)")
}
}
}
Expand Down Expand Up @@ -629,6 +645,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

object HoodieBaseRelation extends SparkAdapterSupport {

type BaseFileReader = PartitionedFile => Iterator[InternalRow]

private def generateUnsafeProjection(from: StructType, to: StructType) =
sparkAdapter.getCatalystExpressionUtils().generateUnsafeProjection(from, to)

Expand Down Expand Up @@ -676,7 +694,7 @@ object HoodieBaseRelation extends SparkAdapterSupport {
requiredSchema: HoodieTableSchema,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
hadoopConf: Configuration): BaseFileReader = {
val hadoopConfBroadcast =
spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedReco
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.BaseFileReader
import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability}
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
Expand Down Expand Up @@ -55,11 +56,14 @@ import scala.util.Try

case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition

case class HoodieMergeOnReadBaseFileReaders(fullSchemaFileReader: BaseFileReader,
requiredSchemaFileReaderForMerging: BaseFileReader,
requiredSchemaFileReaderForNoMerging: BaseFileReader)

class HoodieMergeOnReadRDD(@transient sc: SparkContext,
@transient config: Configuration,
fullSchemaFileReader: PartitionedFile => Iterator[InternalRow],
requiredSchemaFileReader: PartitionedFile => Iterator[InternalRow],
tableSchema: HoodieTableSchema,
fileReaders: HoodieMergeOnReadBaseFileReaders,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
tableState: HoodieTableState,
mergeType: String,
Expand All @@ -86,13 +90,13 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
val iter = mergeOnReadPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
requiredSchemaFileReader.apply(dataFileOnlySplit.dataFile.get)
fileReaders.requiredSchemaFileReaderForNoMerging.apply(dataFileOnlySplit.dataFile.get)

case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
new LogFileIterator(logFileOnlySplit, getConfig)

case split if mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
val baseFileIterator = requiredSchemaFileReader.apply(split.dataFile.get)
val baseFileIterator = fileReaders.requiredSchemaFileReaderForNoMerging.apply(split.dataFile.get)
new SkipMergeIterator(split, baseFileIterator, getConfig)

case split if mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
Expand Down Expand Up @@ -126,9 +130,9 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
// then we can avoid reading and parsing the records w/ _full_ schema, and instead only
// rely on projected one, nevertheless being able to perform merging correctly
if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName))
(fullSchemaFileReader(split.dataFile.get), tableSchema)
(fileReaders.fullSchemaFileReader(split.dataFile.get), dataSchema)
else
(requiredSchemaFileReader(split.dataFile.get), requiredSchema)
(fileReaders.requiredSchemaFileReaderForMerging(split.dataFile.get), requiredSchema)
}

override protected def getPartitions: Array[Partition] =
Expand All @@ -152,7 +156,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
protected override val requiredAvroSchema: Schema = new Schema.Parser().parse(requiredSchema.avroSchemaStr)
protected override val requiredStructTypeSchema: StructType = requiredSchema.structTypeSchema

protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(tableSchema.avroSchemaStr)
protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(dataSchema.avroSchemaStr)

protected val recordBuilder: GenericRecordBuilder = new GenericRecordBuilder(requiredAvroSchema)
protected var recordToLoad: InternalRow = _
Expand All @@ -167,7 +171,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema)

private var logScanner = {
val internalSchema = tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
val internalSchema = dataSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, tableState,
maxCompactionMemoryInBytes, config, internalSchema)
}
Expand Down Expand Up @@ -232,7 +236,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
override def hasNext: Boolean = {
if (baseFileIterator.hasNext) {
val curRow = baseFileIterator.next()
recordToLoad = unsafeProjection(curRow)
recordToLoad = curRow
true
} else {
super[LogFileIterator].hasNext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
partitionSchema: StructType,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
filters: Array[Filter]): HoodieMergeOnReadRDD = {
val fullSchemaParquetReader = createBaseFileReader(
spark = sqlContext.sparkSession,
Expand All @@ -81,23 +82,25 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
hadoopConf = embedInternalSchema(new Configuration(conf), internalSchemaOpt)
)

val requiredSchemaParquetReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
requiredSchema = requiredSchema,
filters = filters ++ incrementalSpanRecordFilters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema)
)
val (requiredSchemaBaseFileReaderMerging, requiredSchemaBaseFileReaderNoMerging) =
createMergeOnReadBaseFileReaders(partitionSchema, dataSchema, requiredSchema, requestedColumns, filters ++ incrementalSpanRecordFilters)

val hoodieTableState = getTableState
// TODO(HUDI-3639) implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately
// filtered, since file-reader might not be capable to perform filtering
new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, requiredSchemaParquetReader,
dataSchema, requiredSchema, hoodieTableState, mergeType, fileSplits)
new HoodieMergeOnReadRDD(
sqlContext.sparkContext,
config = jobConf,
fileReaders = HoodieMergeOnReadBaseFileReaders(
fullSchemaFileReader = fullSchemaParquetReader,
requiredSchemaFileReaderForMerging = requiredSchemaBaseFileReaderMerging,
requiredSchemaFileReaderForNoMerging = requiredSchemaBaseFileReaderNoMerging
),
dataSchema = dataSchema,
requiredSchema = requiredSchema,
tableState = hoodieTableState,
mergeType = mergeType,
fileSplits = fileSplits)
}

override protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
Expand Down
Loading