Skip to content

Commit

Permalink
[HUDI-3189] Fallback to full table scan with incremental query when f…
Browse files Browse the repository at this point in the history
…iles are cleaned up or achived for MOR table (apache#6141)

* Spark support MOR read archived commits for incremental query
  • Loading branch information
boneanxs authored and fengjian committed Apr 5, 2023
1 parent b119f8f commit 7cebf3c
Show file tree
Hide file tree
Showing 4 changed files with 311 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
throw new HoodieException(s"Specify the begin instant time to pull from using " +
s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME.key}")
}

if (!metaClient.getTableConfig.populateMetaFields()) {
throw new HoodieException("Incremental queries are not supported when meta fields are disabled")
}
Expand Down Expand Up @@ -188,71 +189,90 @@ class IncrementalRelation(val sqlContext: SQLContext,
case HoodieFileFormat.ORC => "orc"
}
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")

// Fallback to full table scan if any of the following conditions matches:
// 1. the start commit is archived
// 2. the end commit is archived
// 3. there are files in metadata be deleted
val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean

val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) {
sqlContext.sparkContext.emptyRDD[Row]
} else {
log.info("Additional Filters to be applied to incremental source are :" + filters.mkString("Array(", ", ", ")"))

var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
val startInstantTime = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
val startInstantArchived = commitTimeline.isBeforeTimelineStarts(startInstantTime)
val endInstantTime = optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(), lastInstant.getTimestamp)
val endInstantArchived = commitTimeline.isBeforeTimelineStarts(endInstantTime)

val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean
val scanDf = if (fallbackToFullTableScan && (startInstantArchived || endInstantArchived)) {
log.info(s"Falling back to full table scan as startInstantArchived: $startInstantArchived, endInstantArchived: $endInstantArchived")
fullTableScanDataFrame(startInstantTime, endInstantTime)
} else {
if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) {
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
} else {
log.info("Additional Filters to be applied to incremental source are :" + filters.mkString("Array(", ", ", ")"))

var doFullTableScan = false
var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)

if (fallbackToFullTableScan) {
val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
val timer = new HoodieTimer().startTimer();
var doFullTableScan = false

val allFilesToCheck = filteredMetaBootstrapFullPaths ++ filteredRegularFullPaths
val firstNotFoundPath = allFilesToCheck.find(path => !fs.exists(new Path(path)))
val timeTaken = timer.endTimer()
log.info("Checking if paths exists took " + timeTaken + "ms")
if (fallbackToFullTableScan) {
val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration);
val timer = new HoodieTimer().startTimer();

val optStartTs = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
val isInstantArchived = optStartTs.compareTo(commitTimeline.firstInstant().get().getTimestamp) < 0 // True if optStartTs < activeTimeline.first
val allFilesToCheck = filteredMetaBootstrapFullPaths ++ filteredRegularFullPaths
val firstNotFoundPath = allFilesToCheck.find(path => !fs.exists(new Path(path)))
val timeTaken = timer.endTimer()
log.info("Checking if paths exists took " + timeTaken + "ms")

if (isInstantArchived || firstNotFoundPath.isDefined) {
doFullTableScan = true
log.info("Falling back to full table scan")
if (firstNotFoundPath.isDefined) {
doFullTableScan = true
log.info("Falling back to full table scan as some files cannot be found.")
}
}
}

if (doFullTableScan) {
val hudiDF = sqlContext.read
.format("hudi_v1")
.schema(usedSchema)
.load(basePath.toString)
.filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, //Notice the > in place of >= because we are working with optParam instead of first commit > optParam
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.last.getTimestamp))
// schema enforcement does not happen in above spark.read with hudi. hence selecting explicitly w/ right column order
val fieldNames : Array[String] = df.schema.fields.map(field => field.name)
df = df.union(hudiDF.select(fieldNames.head, fieldNames.tail: _*))
} else {
if (metaBootstrapFileIdToFullPath.nonEmpty) {
df = sqlContext.sparkSession.read
.format("hudi_v1")
.schema(usedSchema)
.option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(","))
.load()
}
if (doFullTableScan) {
fullTableScanDataFrame(startInstantTime, endInstantTime)
} else {
if (metaBootstrapFileIdToFullPath.nonEmpty) {
df = sqlContext.sparkSession.read
.format("hudi_v1")
.schema(usedSchema)
.option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(","))
.load()
}

if (regularFileIdToFullPath.nonEmpty) {
df = df.union(sqlContext.read.options(sOpts)
.schema(usedSchema).format(formatClassName)
.load(filteredRegularFullPaths.toList: _*)
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.head.getTimestamp))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.last.getTimestamp)))
if (regularFileIdToFullPath.nonEmpty) {
df = df.union(sqlContext.read.options(sOpts)
.schema(usedSchema).format(formatClassName)
.load(filteredRegularFullPaths.toList: _*)
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.head.getTimestamp))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.last.getTimestamp)))
}
df
}
}

filters.foldLeft(df)((e, f) => e.filter(f)).rdd
}

filters.foldLeft(scanDf)((e, f) => e.filter(f)).rdd
}
}

private def fullTableScanDataFrame(startInstantTime: String, endInstantTime: String): DataFrame = {
val hudiDF = sqlContext.read
.format("hudi_v1")
.schema(usedSchema)
.load(basePath.toString)
.filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, //Notice the > in place of >= because we are working with optParam instead of first commit > optParam
startInstantTime))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
endInstantTime))

// schema enforcement does not happen in above spark.read with hudi. hence selecting explicitly w/ right column order
val fieldNames = usedSchema.fieldNames
hudiDF.select(fieldNames.head, fieldNames.tail: _*)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.hudi

import org.apache.hadoop.fs.{GlobPattern, Path}
import org.apache.hadoop.fs.{FileStatus, GlobPattern, Path}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
Expand Down Expand Up @@ -53,9 +53,11 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
}

override protected def timeline: HoodieTimeline = {
val startTimestamp = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
val endTimestamp = optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, super.timeline.lastInstant().get.getTimestamp)
super.timeline.findInstantsInRange(startTimestamp, endTimestamp)
if (fullTableScan) {
super.timeline
} else {
super.timeline.findInstantsInRange(startTimestamp, endTimestamp)
}
}

protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit],
Expand Down Expand Up @@ -87,17 +89,19 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
if (includedCommits.isEmpty) {
List()
} else {
val latestCommit = includedCommits.last.getTimestamp
val commitsMetadata = includedCommits.map(getCommitMetadata(_, timeline)).asJava
val fileSlices = if (fullTableScan) {
listLatestFileSlices(Seq(), partitionFilters, dataFilters)
} else {
val latestCommit = includedCommits.last.getTimestamp

val modifiedFiles = listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), commitsMetadata)
val fsView = new HoodieTableFileSystemView(metaClient, timeline, modifiedFiles)
val fsView = new HoodieTableFileSystemView(metaClient, timeline, affectedFilesInCommits)

val modifiedPartitions = getWritePartitionPaths(commitsMetadata)
val modifiedPartitions = getWritePartitionPaths(commitsMetadata)

val fileSlices = modifiedPartitions.asScala.flatMap { relativePartitionPath =>
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala
}.toSeq
modifiedPartitions.asScala.flatMap { relativePartitionPath =>
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala
}.toSeq
}

buildSplits(filterFileSlices(fileSlices, globPattern))
}
Expand All @@ -124,14 +128,48 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
// Validate this Incremental implementation is properly configured
validate()

protected lazy val includedCommits: immutable.Seq[HoodieInstant] = timeline.getInstants.iterator().asScala.toList
protected def startTimestamp: String = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
protected def endTimestamp: String = optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, super.timeline.lastInstant().get.getTimestamp)

protected def startInstantArchived: Boolean = super.timeline.isBeforeTimelineStarts(startTimestamp)
protected def endInstantArchived: Boolean = super.timeline.isBeforeTimelineStarts(endTimestamp)

// Fallback to full table scan if any of the following conditions matches:
// 1. the start commit is archived
// 2. the end commit is archived
// 3. there are files in metadata be deleted
protected lazy val fullTableScan: Boolean = {
val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key,
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean

fallbackToFullTableScan && (startInstantArchived || endInstantArchived || affectedFilesInCommits.exists(fileStatus => !metaClient.getFs.exists(fileStatus.getPath)))
}

protected lazy val includedCommits: immutable.Seq[HoodieInstant] = {
if (!startInstantArchived || !endInstantArchived) {
// If endTimestamp commit is not archived, will filter instants
// before endTimestamp.
super.timeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants.iterator().asScala.toList
} else {
super.timeline.getInstants.iterator().asScala.toList
}
}

protected lazy val commitsMetadata = includedCommits.map(getCommitMetadata(_, super.timeline)).asJava

protected lazy val affectedFilesInCommits: Array[FileStatus] = {
listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), commitsMetadata)
}

// Record filters making sure that only records w/in the requested bounds are being fetched as part of the
// scan collected by this relation
protected lazy val incrementalSpanRecordFilters: Seq[Filter] = {
val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
val largerThanFilter = GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, includedCommits.head.getTimestamp)
val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, includedCommits.last.getTimestamp)

val largerThanFilter = GreaterThan(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTimestamp)

val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
if (endInstantArchived) endTimestamp else includedCommits.last.getTimestamp)

Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,89 +859,6 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count())
}

@Test def testFailEarlyForIncrViewQueryForNonExistingFiles(): Unit = {
// Create 10 commits
for (i <- 1 to 10) {
val records = recordsToStrings(dataGen.generateInserts("%05d".format(i), 100)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
inputDF.write.format("org.apache.hudi")
.options(commonOpts)
.option("hoodie.cleaner.commits.retained", "3")
.option("hoodie.keep.min.commits", "4")
.option("hoodie.keep.max.commits", "5")
.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
}

val hoodieMetaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build()
/**
* State of timeline after 10 commits
* +------------------+--------------------------------------+
* | Archived | Active Timeline |
* +------------------+--------------+-----------------------+
* | C0 C1 C2 C3 | C4 C5 | C6 C7 C8 C9 |
* +------------------+--------------+-----------------------+
* | Data cleaned | Data exists in table |
* +---------------------------------+-----------------------+
*/

val completedCommits = hoodieMetaClient.getCommitsTimeline.filterCompletedInstants() // C4 to C9
//Anything less than 2 is a valid commit in the sense no cleanup has been done for those commit files
var startTs = completedCommits.nthInstant(0).get().getTimestamp //C4
var endTs = completedCommits.nthInstant(1).get().getTimestamp //C5

//Calling without the fallback should result in Path does not exist
var hoodieIncViewDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
.option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
.load(basePath)

val msg = "Should fail with Path does not exist"
assertThrows(classOf[AnalysisException], new Executable {
override def execute(): Unit = {
hoodieIncViewDF.count()
}
}, msg)

//Should work with fallback enabled
hoodieIncViewDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
.option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(), "true")
.load(basePath)
assertEquals(100, hoodieIncViewDF.count())

//Test out for archived commits
val archivedInstants = hoodieMetaClient.getArchivedTimeline.getInstants.distinct().toArray
startTs = archivedInstants(0).asInstanceOf[HoodieInstant].getTimestamp //C0
endTs = completedCommits.nthInstant(1).get().getTimestamp //C5

//Calling without the fallback should result in Path does not exist
hoodieIncViewDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
.option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
.load(basePath)

assertThrows(classOf[AnalysisException], new Executable {
override def execute(): Unit = {
hoodieIncViewDF.count()
}
}, msg)

//Should work with fallback enabled
hoodieIncViewDF = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs)
.option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs)
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(), "true")
.load(basePath)
assertEquals(500, hoodieIncViewDF.count())
}

@Test
def testWriteSmallPrecisionDecimalTable(): Unit = {
val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
Expand Down
Loading

0 comments on commit 7cebf3c

Please sign in to comment.