diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 1f52912d1cc00..6b64ec4897b7c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -594,11 +594,11 @@ public String getKeyGeneratorClassName() { } public String getHiveStylePartitioningEnable() { - return getString(HIVE_STYLE_PARTITIONING_ENABLE); + return getStringOrDefault(HIVE_STYLE_PARTITIONING_ENABLE); } public String getUrlEncodePartitioning() { - return getString(URL_ENCODE_PARTITIONING); + return getStringOrDefault(URL_ENCODE_PARTITIONING); } public Boolean shouldDropPartitionColumns() { diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java index db87f5dce0087..8115d50a78c12 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java @@ -235,9 +235,8 @@ private TestExecStartResultCallback executeCommandInDocker(String containerName, } int exitCode = dockerClient.inspectExecCmd(createCmdResponse.getId()).exec().getExitCode(); LOG.info("Exit code for command : " + exitCode); - if (exitCode != 0) { - LOG.error("\n\n ###### Stdout #######\n" + callback.getStdout().toString()); - } + + LOG.error("\n\n ###### Stdout #######\n" + callback.getStdout().toString()); LOG.error("\n\n ###### Stderr #######\n" + callback.getStderr().toString()); if (checkIfSucceed) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index 29b565712d6ac..b7033c3bfc31c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -60,9 +60,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = internalSchemaOpt.isEmpty - override lazy val mandatoryFields: Seq[String] = - // TODO reconcile, record's key shouldn't be mandatory for base-file only relation - Seq(recordKeyField) + override lazy val mandatoryFields: Seq[String] = Seq.empty override def imbueConfigs(sqlContext: SQLContext): Unit = { super.imbueConfigs(sqlContext) @@ -73,6 +71,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, partitionSchema: StructType, dataSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, + requestedColumns: Array[String], filters: Array[Filter]): HoodieUnsafeRDD = { val baseFileReader = createBaseFileReader( diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index f2d2c31f6794d..ff6515db325ac 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hbase.io.hfile.CacheConfig import org.apache.hadoop.mapred.JobConf -import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath, projectSchema} +import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath, projectSchema} import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.client.utils.SparkInternalSchemaConverter @@ -204,6 +204,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath } + /** + * NOTE: This fields are accessed by [[NestedSchemaPruning]] component which is only enabled for + * Spark >= 3.1 + */ lazy val (fileFormat: FileFormat, fileFormatClassName: String) = metaClient.getTableConfig.getBaseFileFormat match { case HoodieFileFormat.ORC => (new OrcFileFormat, "orc") @@ -258,12 +262,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * * Check scala-doc for [[shouldExtractPartitionValuesFromPartitionPath]] for more details */ - def dataSchema: StructType = - if (shouldExtractPartitionValuesFromPartitionPath) { - prunePartitionColumns(tableStructSchema) - } else { - tableStructSchema - } + def dataSchema: StructType = if (shouldExtractPartitionValuesFromPartitionPath) { + prunePartitionColumns(tableStructSchema) + } else { + tableStructSchema + } /** * Determines whether relation's schema could be pruned by Spark's Optimizer @@ -346,7 +349,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, if (fileSplits.isEmpty) { sparkSession.sparkContext.emptyRDD } else { - val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, requiredDataSchema, filters) + val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, requiredDataSchema, targetColumns, filters) // NOTE: In case when partition columns have been pruned from the required schema, we have to project // the rows from the pruned schema back into the one expected by the caller @@ -369,17 +372,19 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, /** * Composes RDD provided file splits to read from, table and partition schemas, data filters to be applied * - * @param fileSplits file splits to be handled by the RDD - * @param partitionSchema target table's partition schema - * @param dataSchema target table's data files' schema - * @param requiredSchema projected schema required by the reader - * @param filters data filters to be applied + * @param fileSplits file splits to be handled by the RDD + * @param partitionSchema target table's partition schema + * @param dataSchema target table's data files' schema + * @param requiredSchema projected schema required by the reader + * @param requestedColumns columns requested by the query + * @param filters data filters to be applied * @return instance of RDD (implementing [[HoodieUnsafeRDD]]) */ protected def composeRDD(fileSplits: Seq[FileSplit], partitionSchema: StructType, dataSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, + requestedColumns: Array[String], filters: Array[Filter]): HoodieUnsafeRDD /** @@ -551,37 +556,48 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, requiredSchema: HoodieTableSchema, filters: Seq[Filter], options: Map[String, String], - hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { - val hfileReader = createHFileReader( - spark = spark, - dataSchema = dataSchema, - requiredSchema = requiredSchema, - filters = filters, - options = options, - hadoopConf = hadoopConf - ) - - val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( - sparkSession = spark, - dataSchema = dataSchema.structTypeSchema, - partitionSchema = partitionSchema, - requiredSchema = requiredSchema.structTypeSchema, - filters = filters, - options = options, - hadoopConf = hadoopConf, - // We're delegating to Spark to append partition values to every row only in cases - // when these corresponding partition-values are not persisted w/in the data file itself - appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath - ) + hadoopConf: Configuration): BaseFileReader = { + val tableBaseFileFormat = tableConfig.getBaseFileFormat + + // NOTE: PLEASE READ CAREFULLY + // Lambda returned from this method is going to be invoked on the executor, and therefore + // we have to eagerly initialize all of the readers even though only one specific to the type + // of the file being read will be used. This is required to avoid serialization of the whole + // relation (containing file-index for ex) and passing it to the executor + val reader = tableBaseFileFormat match { + case HoodieFileFormat.PARQUET => + HoodieDataSourceHelper.buildHoodieParquetReader( + sparkSession = spark, + dataSchema = dataSchema.structTypeSchema, + partitionSchema = partitionSchema, + requiredSchema = requiredSchema.structTypeSchema, + filters = filters, + options = options, + hadoopConf = hadoopConf, + // We're delegating to Spark to append partition values to every row only in cases + // when these corresponding partition-values are not persisted w/in the data file itself + appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath + ) + + case HoodieFileFormat.HFILE => + createHFileReader( + spark = spark, + dataSchema = dataSchema, + requiredSchema = requiredSchema, + filters = filters, + options = options, + hadoopConf = hadoopConf + ) + + case _ => throw new UnsupportedOperationException(s"Base file format is not currently supported ($tableBaseFileFormat)") + } partitionedFile => { val extension = FSUtils.getFileExtension(partitionedFile.filePath) - if (HoodieFileFormat.PARQUET.getFileExtension.equals(extension)) { - parquetReader.apply(partitionedFile) - } else if (HoodieFileFormat.HFILE.getFileExtension.equals(extension)) { - hfileReader.apply(partitionedFile) + if (tableBaseFileFormat.getFileExtension.equals(extension)) { + reader.apply(partitionedFile) } else { - throw new UnsupportedOperationException(s"Base file format not supported by Spark DataSource ($partitionedFile)") + throw new UnsupportedOperationException(s"Invalid base-file format ($extension), expected ($tableBaseFileFormat)") } } } @@ -629,6 +645,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, object HoodieBaseRelation extends SparkAdapterSupport { + type BaseFileReader = PartitionedFile => Iterator[InternalRow] + private def generateUnsafeProjection(from: StructType, to: StructType) = sparkAdapter.getCatalystExpressionUtils().generateUnsafeProjection(from, to) @@ -676,7 +694,7 @@ object HoodieBaseRelation extends SparkAdapterSupport { requiredSchema: HoodieTableSchema, filters: Seq[Filter], options: Map[String, String], - hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + hadoopConf: Configuration): BaseFileReader = { val hadoopConfBroadcast = spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index c9080d021e33a..c4c70cb414e32 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -23,6 +23,7 @@ import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedReco import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.JobConf +import org.apache.hudi.HoodieBaseRelation.BaseFileReader import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption} import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability} import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} @@ -55,11 +56,14 @@ import scala.util.Try case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition +case class HoodieMergeOnReadBaseFileReaders(fullSchemaFileReader: BaseFileReader, + requiredSchemaFileReaderForMerging: BaseFileReader, + requiredSchemaFileReaderForNoMerging: BaseFileReader) + class HoodieMergeOnReadRDD(@transient sc: SparkContext, @transient config: Configuration, - fullSchemaFileReader: PartitionedFile => Iterator[InternalRow], - requiredSchemaFileReader: PartitionedFile => Iterator[InternalRow], - tableSchema: HoodieTableSchema, + fileReaders: HoodieMergeOnReadBaseFileReaders, + dataSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, tableState: HoodieTableState, mergeType: String, @@ -86,13 +90,13 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition] val iter = mergeOnReadPartition.split match { case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty => - requiredSchemaFileReader.apply(dataFileOnlySplit.dataFile.get) + fileReaders.requiredSchemaFileReaderForNoMerging.apply(dataFileOnlySplit.dataFile.get) case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty => new LogFileIterator(logFileOnlySplit, getConfig) case split if mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) => - val baseFileIterator = requiredSchemaFileReader.apply(split.dataFile.get) + val baseFileIterator = fileReaders.requiredSchemaFileReaderForNoMerging.apply(split.dataFile.get) new SkipMergeIterator(split, baseFileIterator, getConfig) case split if mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) => @@ -126,9 +130,9 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, // then we can avoid reading and parsing the records w/ _full_ schema, and instead only // rely on projected one, nevertheless being able to perform merging correctly if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName)) - (fullSchemaFileReader(split.dataFile.get), tableSchema) + (fileReaders.fullSchemaFileReader(split.dataFile.get), dataSchema) else - (requiredSchemaFileReader(split.dataFile.get), requiredSchema) + (fileReaders.requiredSchemaFileReaderForMerging(split.dataFile.get), requiredSchema) } override protected def getPartitions: Array[Partition] = @@ -152,7 +156,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, protected override val requiredAvroSchema: Schema = new Schema.Parser().parse(requiredSchema.avroSchemaStr) protected override val requiredStructTypeSchema: StructType = requiredSchema.structTypeSchema - protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(tableSchema.avroSchemaStr) + protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(dataSchema.avroSchemaStr) protected val recordBuilder: GenericRecordBuilder = new GenericRecordBuilder(requiredAvroSchema) protected var recordToLoad: InternalRow = _ @@ -167,7 +171,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema) private var logScanner = { - val internalSchema = tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema) + val internalSchema = dataSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema) HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, tableState, maxCompactionMemoryInBytes, config, internalSchema) } @@ -232,7 +236,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, override def hasNext: Boolean = { if (baseFileIterator.hasNext) { val curRow = baseFileIterator.next() - recordToLoad = unsafeProjection(curRow) + recordToLoad = curRow true } else { super[LogFileIterator].hasNext diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 38945cec9fc98..6fa130ac8caf8 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -61,6 +61,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, partitionSchema: StructType, dataSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, + requestedColumns: Array[String], filters: Array[Filter]): HoodieMergeOnReadRDD = { val fullSchemaParquetReader = createBaseFileReader( spark = sqlContext.sparkSession, @@ -81,23 +82,25 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, hadoopConf = embedInternalSchema(new Configuration(conf), internalSchemaOpt) ) - val requiredSchemaParquetReader = createBaseFileReader( - spark = sqlContext.sparkSession, - partitionSchema = partitionSchema, - dataSchema = dataSchema, - requiredSchema = requiredSchema, - filters = filters ++ incrementalSpanRecordFilters, - options = optParams, - // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it - // to configure Parquet reader appropriately - hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema) - ) + val (requiredSchemaBaseFileReaderMerging, requiredSchemaBaseFileReaderNoMerging) = + createMergeOnReadBaseFileReaders(partitionSchema, dataSchema, requiredSchema, requestedColumns, filters ++ incrementalSpanRecordFilters) val hoodieTableState = getTableState // TODO(HUDI-3639) implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately // filtered, since file-reader might not be capable to perform filtering - new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, requiredSchemaParquetReader, - dataSchema, requiredSchema, hoodieTableState, mergeType, fileSplits) + new HoodieMergeOnReadRDD( + sqlContext.sparkContext, + config = jobConf, + fileReaders = HoodieMergeOnReadBaseFileReaders( + fullSchemaFileReader = fullSchemaParquetReader, + requiredSchemaFileReaderForMerging = requiredSchemaBaseFileReaderMerging, + requiredSchemaFileReaderForNoMerging = requiredSchemaBaseFileReaderNoMerging + ), + dataSchema = dataSchema, + requiredSchema = requiredSchema, + tableState = hoodieTableState, + mergeType = mergeType, + fileSplits = fileSplits) } override protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 9c69190f98343..c6d4eafafc91d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -20,14 +20,17 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, convertToAvroSchema} import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath +import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath import org.apache.hudi.common.model.{FileSlice, HoodieLogFile} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.sources.Filter @@ -47,9 +50,27 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, override type FileSplit = HoodieMergeOnReadFileSplit - override lazy val mandatoryFields: Seq[String] = + /** + * NOTE: These are the fields that are required to properly fulfil Merge-on-Read (MOR) + * semantic: + * + *