Skip to content

Commit

Permalink
Rebased MergeOnReadIncrementalRelation
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Kudinkin committed Apr 26, 2022
1 parent 69b9a30 commit e494e1f
Showing 1 changed file with 27 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), internalSchema, metaClient.getBasePath, validCommits)
)

val requiredSchemaParquetReader = createBaseFileReader(
// TODO elaborate bifurcation
val requiredSchemaForMergingBaseFileReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
Expand All @@ -93,11 +94,34 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), requiredSchema.internalSchema, metaClient.getBasePath, validCommits)
)

val requiredSchemaForNoMergingBaseFileReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
requiredSchema = pruneSchemaForMergeSkipping(requiredSchema),
filters = filters ++ incrementalSpanRecordFilters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), requiredSchema.internalSchema, metaClient.getBasePath, validCommits)
)

val hoodieTableState = getTableState
// TODO(HUDI-3639) implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately
// filtered, since file-reader might not be capable to perform filtering
new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, requiredSchemaParquetReader,
dataSchema, requiredSchema, hoodieTableState, mergeType, fileSplits)
new HoodieMergeOnReadRDD(
sqlContext.sparkContext,
config = jobConf,
fileReaders = MergeOnReadFileReaders(
fullSchemaFileReader = fullSchemaParquetReader,
requiredSchemaForMergingFileReader = requiredSchemaForMergingBaseFileReader,
requiredSchemaForNoMergingFileReader = requiredSchemaForNoMergingBaseFileReader,
),
dataSchema = dataSchema,
requiredSchema = requiredSchema,
tableState = hoodieTableState,
mergeType = mergeType,
fileSplits = fileSplits)
}

override protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
Expand Down

0 comments on commit e494e1f

Please sign in to comment.