diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 81b12dbcb6e4a..db48f224f2c98 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -71,6 +71,7 @@ class IncrementalRelation(val sqlContext: SQLContext, throw new HoodieException(s"Specify the begin instant time to pull from using " + s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME.key}") } + if (!metaClient.getTableConfig.populateMetaFields()) { throw new HoodieException("Incremental queries are not supported when meta fields are disabled") } @@ -188,71 +189,90 @@ class IncrementalRelation(val sqlContext: SQLContext, case HoodieFileFormat.ORC => "orc" } sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class") + + // Fallback to full table scan if any of the following conditions matches: + // 1. the start commit is archived + // 2. the end commit is archived + // 3. there are files in metadata be deleted + val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key, + DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean + val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path")) - if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) { - sqlContext.sparkContext.emptyRDD[Row] - } else { - log.info("Additional Filters to be applied to incremental source are :" + filters.mkString("Array(", ", ", ")")) - var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema) + val startInstantTime = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key) + val startInstantArchived = commitTimeline.isBeforeTimelineStarts(startInstantTime) + val endInstantTime = optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(), lastInstant.getTimestamp) + val endInstantArchived = commitTimeline.isBeforeTimelineStarts(endInstantTime) - val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key, - DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean + val scanDf = if (fallbackToFullTableScan && (startInstantArchived || endInstantArchived)) { + log.info(s"Falling back to full table scan as startInstantArchived: $startInstantArchived, endInstantArchived: $endInstantArchived") + fullTableScanDataFrame(startInstantTime, endInstantTime) + } else { + if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) { + sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema) + } else { + log.info("Additional Filters to be applied to incremental source are :" + filters.mkString("Array(", ", ", ")")) - var doFullTableScan = false + var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema) - if (fallbackToFullTableScan) { - val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration); - val timer = new HoodieTimer().startTimer(); + var doFullTableScan = false - val allFilesToCheck = filteredMetaBootstrapFullPaths ++ filteredRegularFullPaths - val firstNotFoundPath = allFilesToCheck.find(path => !fs.exists(new Path(path))) - val timeTaken = timer.endTimer() - log.info("Checking if paths exists took " + timeTaken + "ms") + if (fallbackToFullTableScan) { + val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration); + val timer = new HoodieTimer().startTimer(); - val optStartTs = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key) - val isInstantArchived = optStartTs.compareTo(commitTimeline.firstInstant().get().getTimestamp) < 0 // True if optStartTs < activeTimeline.first + val allFilesToCheck = filteredMetaBootstrapFullPaths ++ filteredRegularFullPaths + val firstNotFoundPath = allFilesToCheck.find(path => !fs.exists(new Path(path))) + val timeTaken = timer.endTimer() + log.info("Checking if paths exists took " + timeTaken + "ms") - if (isInstantArchived || firstNotFoundPath.isDefined) { - doFullTableScan = true - log.info("Falling back to full table scan") + if (firstNotFoundPath.isDefined) { + doFullTableScan = true + log.info("Falling back to full table scan as some files cannot be found.") + } } - } - if (doFullTableScan) { - val hudiDF = sqlContext.read - .format("hudi_v1") - .schema(usedSchema) - .load(basePath.toString) - .filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, //Notice the > in place of >= because we are working with optParam instead of first commit > optParam - optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key))) - .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - commitsToReturn.last.getTimestamp)) - // schema enforcement does not happen in above spark.read with hudi. hence selecting explicitly w/ right column order - val fieldNames : Array[String] = df.schema.fields.map(field => field.name) - df = df.union(hudiDF.select(fieldNames.head, fieldNames.tail: _*)) - } else { - if (metaBootstrapFileIdToFullPath.nonEmpty) { - df = sqlContext.sparkSession.read - .format("hudi_v1") - .schema(usedSchema) - .option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(",")) - .load() - } + if (doFullTableScan) { + fullTableScanDataFrame(startInstantTime, endInstantTime) + } else { + if (metaBootstrapFileIdToFullPath.nonEmpty) { + df = sqlContext.sparkSession.read + .format("hudi_v1") + .schema(usedSchema) + .option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(",")) + .load() + } - if (regularFileIdToFullPath.nonEmpty) { - df = df.union(sqlContext.read.options(sOpts) - .schema(usedSchema).format(formatClassName) - .load(filteredRegularFullPaths.toList: _*) - .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - commitsToReturn.head.getTimestamp)) - .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - commitsToReturn.last.getTimestamp))) + if (regularFileIdToFullPath.nonEmpty) { + df = df.union(sqlContext.read.options(sOpts) + .schema(usedSchema).format(formatClassName) + .load(filteredRegularFullPaths.toList: _*) + .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + commitsToReturn.head.getTimestamp)) + .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + commitsToReturn.last.getTimestamp))) + } + df } } - - filters.foldLeft(df)((e, f) => e.filter(f)).rdd } + + filters.foldLeft(scanDf)((e, f) => e.filter(f)).rdd } } + + private def fullTableScanDataFrame(startInstantTime: String, endInstantTime: String): DataFrame = { + val hudiDF = sqlContext.read + .format("hudi_v1") + .schema(usedSchema) + .load(basePath.toString) + .filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, //Notice the > in place of >= because we are working with optParam instead of first commit > optParam + startInstantTime)) + .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + endInstantTime)) + + // schema enforcement does not happen in above spark.read with hudi. hence selecting explicitly w/ right column order + val fieldNames = usedSchema.fieldNames + hudiDF.select(fieldNames.head, fieldNames.tail: _*) + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 0fc6ef2f83aec..446c806b1804d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -17,7 +17,7 @@ package org.apache.hudi -import org.apache.hadoop.fs.{GlobPattern, Path} +import org.apache.hadoop.fs.{FileStatus, GlobPattern, Path} import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.common.model.{FileSlice, HoodieRecord} import org.apache.hudi.common.table.HoodieTableMetaClient @@ -53,9 +53,11 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, } override protected def timeline: HoodieTimeline = { - val startTimestamp = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key) - val endTimestamp = optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, super.timeline.lastInstant().get.getTimestamp) - super.timeline.findInstantsInRange(startTimestamp, endTimestamp) + if (fullTableScan) { + super.timeline + } else { + super.timeline.findInstantsInRange(startTimestamp, endTimestamp) + } } protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit], @@ -87,17 +89,19 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, if (includedCommits.isEmpty) { List() } else { - val latestCommit = includedCommits.last.getTimestamp - val commitsMetadata = includedCommits.map(getCommitMetadata(_, timeline)).asJava + val fileSlices = if (fullTableScan) { + listLatestFileSlices(Seq(), partitionFilters, dataFilters) + } else { + val latestCommit = includedCommits.last.getTimestamp - val modifiedFiles = listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), commitsMetadata) - val fsView = new HoodieTableFileSystemView(metaClient, timeline, modifiedFiles) + val fsView = new HoodieTableFileSystemView(metaClient, timeline, affectedFilesInCommits) - val modifiedPartitions = getWritePartitionPaths(commitsMetadata) + val modifiedPartitions = getWritePartitionPaths(commitsMetadata) - val fileSlices = modifiedPartitions.asScala.flatMap { relativePartitionPath => - fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala - }.toSeq + modifiedPartitions.asScala.flatMap { relativePartitionPath => + fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala + }.toSeq + } buildSplits(filterFileSlices(fileSlices, globPattern)) } @@ -124,14 +128,48 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation { // Validate this Incremental implementation is properly configured validate() - protected lazy val includedCommits: immutable.Seq[HoodieInstant] = timeline.getInstants.iterator().asScala.toList + protected def startTimestamp: String = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key) + protected def endTimestamp: String = optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, super.timeline.lastInstant().get.getTimestamp) + + protected def startInstantArchived: Boolean = super.timeline.isBeforeTimelineStarts(startTimestamp) + protected def endInstantArchived: Boolean = super.timeline.isBeforeTimelineStarts(endTimestamp) + + // Fallback to full table scan if any of the following conditions matches: + // 1. the start commit is archived + // 2. the end commit is archived + // 3. there are files in metadata be deleted + protected lazy val fullTableScan: Boolean = { + val fallbackToFullTableScan = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key, + DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.defaultValue).toBoolean + + fallbackToFullTableScan && (startInstantArchived || endInstantArchived || affectedFilesInCommits.exists(fileStatus => !metaClient.getFs.exists(fileStatus.getPath))) + } + + protected lazy val includedCommits: immutable.Seq[HoodieInstant] = { + if (!startInstantArchived || !endInstantArchived) { + // If endTimestamp commit is not archived, will filter instants + // before endTimestamp. + super.timeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants.iterator().asScala.toList + } else { + super.timeline.getInstants.iterator().asScala.toList + } + } + + protected lazy val commitsMetadata = includedCommits.map(getCommitMetadata(_, super.timeline)).asJava + + protected lazy val affectedFilesInCommits: Array[FileStatus] = { + listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), commitsMetadata) + } // Record filters making sure that only records w/in the requested bounds are being fetched as part of the // scan collected by this relation protected lazy val incrementalSpanRecordFilters: Seq[Filter] = { val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD) - val largerThanFilter = GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, includedCommits.head.getTimestamp) - val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, includedCommits.last.getTimestamp) + + val largerThanFilter = GreaterThan(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTimestamp) + + val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, + if (endInstantArchived) endTimestamp else includedCommits.last.getTimestamp) Seq(isNotNullFilter, largerThanFilter, lessThanFilter) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index c7eef5bce4270..6697ec1514cdd 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -859,89 +859,6 @@ class TestCOWDataSource extends HoodieClientTestBase { assertEquals(numRecords - numRecordsToDelete, snapshotDF2.count()) } - @Test def testFailEarlyForIncrViewQueryForNonExistingFiles(): Unit = { - // Create 10 commits - for (i <- 1 to 10) { - val records = recordsToStrings(dataGen.generateInserts("%05d".format(i), 100)).toList - val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) - inputDF.write.format("org.apache.hudi") - .options(commonOpts) - .option("hoodie.cleaner.commits.retained", "3") - .option("hoodie.keep.min.commits", "4") - .option("hoodie.keep.max.commits", "5") - .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) - .mode(SaveMode.Append) - .save(basePath) - } - - val hoodieMetaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build() - /** - * State of timeline after 10 commits - * +------------------+--------------------------------------+ - * | Archived | Active Timeline | - * +------------------+--------------+-----------------------+ - * | C0 C1 C2 C3 | C4 C5 | C6 C7 C8 C9 | - * +------------------+--------------+-----------------------+ - * | Data cleaned | Data exists in table | - * +---------------------------------+-----------------------+ - */ - - val completedCommits = hoodieMetaClient.getCommitsTimeline.filterCompletedInstants() // C4 to C9 - //Anything less than 2 is a valid commit in the sense no cleanup has been done for those commit files - var startTs = completedCommits.nthInstant(0).get().getTimestamp //C4 - var endTs = completedCommits.nthInstant(1).get().getTimestamp //C5 - - //Calling without the fallback should result in Path does not exist - var hoodieIncViewDF = spark.read.format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs) - .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs) - .load(basePath) - - val msg = "Should fail with Path does not exist" - assertThrows(classOf[AnalysisException], new Executable { - override def execute(): Unit = { - hoodieIncViewDF.count() - } - }, msg) - - //Should work with fallback enabled - hoodieIncViewDF = spark.read.format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs) - .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs) - .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(), "true") - .load(basePath) - assertEquals(100, hoodieIncViewDF.count()) - - //Test out for archived commits - val archivedInstants = hoodieMetaClient.getArchivedTimeline.getInstants.distinct().toArray - startTs = archivedInstants(0).asInstanceOf[HoodieInstant].getTimestamp //C0 - endTs = completedCommits.nthInstant(1).get().getTimestamp //C5 - - //Calling without the fallback should result in Path does not exist - hoodieIncViewDF = spark.read.format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs) - .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs) - .load(basePath) - - assertThrows(classOf[AnalysisException], new Executable { - override def execute(): Unit = { - hoodieIncViewDF.count() - } - }, msg) - - //Should work with fallback enabled - hoodieIncViewDF = spark.read.format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs) - .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs) - .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(), "true") - .load(basePath) - assertEquals(500, hoodieIncViewDF.count()) - } - @Test def testWriteSmallPrecisionDecimalTable(): Unit = { val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala new file mode 100644 index 0000000000000..fb0cf5e179ded --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieInstantTimeGenerator, HoodieTimeline} +import org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.testutils.HoodieClientTestBase +import org.apache.log4j.LogManager +import org.apache.spark.SparkException +import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} +import org.junit.jupiter.api.{AfterEach, BeforeEach} +import org.junit.jupiter.api.function.Executable +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.EnumSource + +import scala.collection.JavaConversions.asScalaBuffer + +class TestIncrementalReadWithFullTableScan extends HoodieClientTestBase { + + var spark: SparkSession = null + private val log = LogManager.getLogger(classOf[TestIncrementalReadWithFullTableScan]) + + private val perBatchSize = 100 + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1" + ) + + + val verificationCol: String = "driver" + val updatedVerificationVal: String = "driver_update" + + @BeforeEach override def setUp() { + setTableName("hoodie_test") + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + initTestDataGenerator() + initFileSystem() + } + + @AfterEach override def tearDown() = { + cleanupSparkContexts() + cleanupTestDataGenerator() + cleanupFileSystem() + } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testFailEarlyForIncrViewQueryForNonExistingFiles(tableType: HoodieTableType): Unit = { + // Create 10 commits + for (i <- 1 to 10) { + val records = recordsToStrings(dataGen.generateInserts("%05d".format(i), perBatchSize)).toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + inputDF.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) + .option("hoodie.cleaner.commits.retained", "3") + .option("hoodie.keep.min.commits", "4") + .option("hoodie.keep.max.commits", "5") + .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + } + + val hoodieMetaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build() + /** + * State of timeline after 10 commits + * +------------------+--------------------------------------+ + * | Archived | Active Timeline | + * +------------------+--------------+-----------------------+ + * | C0 C1 C2 C3 | C4 C5 | C6 C7 C8 C9 | + * +------------------+--------------+-----------------------+ + * | Data cleaned | Data exists in table | + * +---------------------------------+-----------------------+ + */ + + val completedCommits = hoodieMetaClient.getCommitsTimeline.filterCompletedInstants() // C4 to C9 + val archivedInstants = hoodieMetaClient.getArchivedTimeline.filterCompletedInstants() + .getInstants.distinct().toArray // C0 to C3 + + //Anything less than 2 is a valid commit in the sense no cleanup has been done for those commit files + val startUnarchivedCommitTs = completedCommits.nthInstant(0).get().getTimestamp //C4 + val endUnarchivedCommitTs = completedCommits.nthInstant(1).get().getTimestamp //C5 + + val startArchivedCommitTs = archivedInstants(0).asInstanceOf[HoodieInstant].getTimestamp //C0 + val endArchivedCommitTs = archivedInstants(1).asInstanceOf[HoodieInstant].getTimestamp //C1 + + val startOutOfRangeCommitTs = HoodieInstantTimeGenerator.createNewInstantTime(0) + val endOutOfRangeCommitTs = HoodieInstantTimeGenerator.createNewInstantTime(0) + + assertTrue(HoodieTimeline.compareTimestamps(startOutOfRangeCommitTs, GREATER_THAN, completedCommits.lastInstant().get().getTimestamp)) + assertTrue(HoodieTimeline.compareTimestamps(endOutOfRangeCommitTs, GREATER_THAN, completedCommits.lastInstant().get().getTimestamp)) + + // Test both start and end commits are archived + runIncrementalQueryAndCompare(startArchivedCommitTs, endArchivedCommitTs, 1, true) + + // Test start commit is archived, end commit is not archived + shouldThrowIfFallbackIsFalse(tableType, + () => runIncrementalQueryAndCompare(startArchivedCommitTs, endUnarchivedCommitTs, 5, false)) + runIncrementalQueryAndCompare(startArchivedCommitTs, endUnarchivedCommitTs, 5, true) + + // Test both start commit and end commits are not archived but got cleaned + shouldThrowIfFallbackIsFalse(tableType, + () => runIncrementalQueryAndCompare(startUnarchivedCommitTs, endUnarchivedCommitTs, 1, false)) + runIncrementalQueryAndCompare(startUnarchivedCommitTs, endUnarchivedCommitTs, 1, true) + + // Test start commit is not archived, end commits is out of the timeline + runIncrementalQueryAndCompare(startUnarchivedCommitTs, endOutOfRangeCommitTs, 5, true) + + // Test both start commit and end commits are out of the timeline + runIncrementalQueryAndCompare(startOutOfRangeCommitTs, endOutOfRangeCommitTs, 0, false) + runIncrementalQueryAndCompare(startOutOfRangeCommitTs, endOutOfRangeCommitTs, 0, true) + + // Test end commit is smaller than the start commit + runIncrementalQueryAndCompare(endUnarchivedCommitTs, startUnarchivedCommitTs, 0, false) + runIncrementalQueryAndCompare(endUnarchivedCommitTs, startUnarchivedCommitTs, 0, true) + + // Test both start commit and end commits is not archived and not cleaned + val reversedCommits = completedCommits.getReverseOrderedInstants.toArray + val startUncleanedCommitTs = reversedCommits.apply(1).asInstanceOf[HoodieInstant].getTimestamp + val endUncleanedCommitTs = reversedCommits.apply(0).asInstanceOf[HoodieInstant].getTimestamp + runIncrementalQueryAndCompare(startUncleanedCommitTs, endUncleanedCommitTs, 1, true) + runIncrementalQueryAndCompare(startUncleanedCommitTs, endUncleanedCommitTs, 1, false) + } + + private def runIncrementalQueryAndCompare( + startTs: String, + endTs: String, + batchNum: Int, + fallBackFullTableScan: Boolean): Unit = { + val hoodieIncViewDF = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), startTs) + .option(DataSourceReadOptions.END_INSTANTTIME.key(), endTs) + .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES.key(), fallBackFullTableScan) + .load(basePath) + assertEquals(perBatchSize * batchNum, hoodieIncViewDF.count()) + } + + private def shouldThrowIfFallbackIsFalse(tableType: HoodieTableType, fn: () => Unit): Unit = { + val msg = "Should fail with Path does not exist" + tableType match { + case HoodieTableType.COPY_ON_WRITE => + assertThrows(classOf[AnalysisException], new Executable { + override def execute(): Unit = { + fn() + } + }, msg) + case HoodieTableType.MERGE_ON_READ => + val exp = assertThrows(classOf[SparkException], new Executable { + override def execute(): Unit = { + fn() + } + }, msg) + assertTrue(exp.getMessage.contains("FileNotFoundException")) + } + } +}