From 53e61d175b340253df061db78b4e228a2147deb5 Mon Sep 17 00:00:00 2001 From: Yann Byron Date: Mon, 29 Aug 2022 10:32:26 +0800 Subject: [PATCH] [HUDI-4703] use the historical schema to response time travel query (#6499) * [HUDI-4703] use the historical schema to response time travel query (cherry picked from commit b6b515917c34c4111216375bc2639876ab50dc30) --- .../common/table/TableSchemaResolver.java | 29 ++++++ .../org/apache/hudi/HoodieBaseRelation.scala | 11 ++- .../hudi/functional/TestTimeTravelQuery.scala | 96 +++++++++++++++++-- .../spark/sql/hudi/TestTimeTravelTable.scala | 59 ++++++++++++ 4 files changed, 184 insertions(+), 11 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 4ada97e35ce6d..f825fd6b99d3e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -23,10 +23,12 @@ import org.apache.avro.Schema.Field; import org.apache.avro.SchemaCompatibility; import org.apache.avro.generic.IndexedRecord; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.hfile.CacheConfig; + import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; @@ -53,8 +55,10 @@ import org.apache.hudi.io.storage.HoodieHFileReader; import org.apache.hudi.io.storage.HoodieOrcReader; import org.apache.hudi.util.Lazy; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; + import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; @@ -138,6 +142,19 @@ public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception return getTableAvroSchemaInternal(includeMetadataFields, Option.empty()); } + /** + * Fetches tables schema in Avro format as of the given instant + * + * @param timestamp as of which table's schema will be fetched + */ + public Schema getTableAvroSchema(String timestamp) throws Exception { + Option instant = metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants() + .findInstantsBeforeOrEquals(timestamp) + .lastInstant(); + return getTableAvroSchemaInternal(metaClient.getTableConfig().populateMetaFields(), instant); + } + /** * Fetches tables schema in Avro format as of the given instant * @@ -496,6 +513,18 @@ public Option getTableInternalSchemaFromCommitMetadata() { return timeline.lastInstant().flatMap(this::getTableInternalSchemaFromCommitMetadata); } + /** + * Gets the InternalSchema for a hoodie table from the HoodieCommitMetadata of the instant. + * + * @return InternalSchema for this table + */ + public Option getTableInternalSchemaFromCommitMetadata(String timestamp) { + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants() + .findInstantsBeforeOrEquals(timestamp); + return timeline.lastInstant().flatMap(this::getTableInternalSchemaFromCommitMetadata); + } + /** * Gets the InternalSchema for a hoodie table from the HoodieCommitMetadata of the instant. * diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index cc77033d94bdc..afc0781eb1b0e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -19,10 +19,12 @@ package org.apache.hudi import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hbase.io.hfile.CacheConfig import org.apache.hadoop.mapred.JobConf + import org.apache.hudi.HoodieBaseRelation._ import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.avro.HoodieAvroUtils @@ -41,6 +43,7 @@ import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema} import org.apache.hudi.io.storage.HoodieHFileReader + import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD @@ -59,6 +62,7 @@ import org.apache.spark.unsafe.types.UTF8String import java.net.URI import java.util.Locale + import scala.collection.JavaConverters._ import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} @@ -139,7 +143,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, val internalSchemaOpt = if (!isSchemaEvolutionEnabled) { None } else { - Try(schemaResolver.getTableInternalSchemaFromCommitMetadata) match { + Try { + specifiedQueryTimestamp.map(schemaResolver.getTableInternalSchemaFromCommitMetadata) + .getOrElse(schemaResolver.getTableInternalSchemaFromCommitMetadata) + } match { case Success(internalSchemaOpt) => toScalaOption(internalSchemaOpt) case Failure(e) => logWarning("Failed to fetch internal-schema from the table", e) @@ -149,6 +156,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, val avroSchema = internalSchemaOpt.map { is => AvroInternalSchemaConverter.convert(is, "schema") + } orElse { + specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema) } orElse { schemaSpec.map(convertToAvroSchema) } getOrElse { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala index 55f90f0ddef45..890f8a9019a30 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala @@ -19,13 +19,16 @@ package org.apache.hudi.functional import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator} import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} + import org.apache.spark.sql.{Row, SaveMode, SparkSession} -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} + +import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertNull, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.EnumSource @@ -39,9 +42,8 @@ class TestTimeTravelQuery extends HoodieClientTestBase { "hoodie.upsert.shuffle.parallelism" -> "4", "hoodie.bulkinsert.shuffle.parallelism" -> "2", "hoodie.delete.shuffle.parallelism" -> "1", - DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", - DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "version", HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" ) @@ -72,8 +74,6 @@ class TestTimeTravelQuery extends HoodieClientTestBase { df1.write.format("hudi") .options(commonOpts) .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) - .option(RECORDKEY_FIELD.key, "id") - .option(PRECOMBINE_FIELD.key, "version") .option(PARTITIONPATH_FIELD.key, "") .option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName) .mode(SaveMode.Overwrite) @@ -86,8 +86,6 @@ class TestTimeTravelQuery extends HoodieClientTestBase { df2.write.format("hudi") .options(commonOpts) .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) - .option(RECORDKEY_FIELD.key, "id") - .option(PRECOMBINE_FIELD.key, "version") .option(PARTITIONPATH_FIELD.key, "") .option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName) .mode(SaveMode.Append) @@ -100,8 +98,6 @@ class TestTimeTravelQuery extends HoodieClientTestBase { df3.write.format("hudi") .options(commonOpts) .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) - .option(RECORDKEY_FIELD.key, "id") - .option(PRECOMBINE_FIELD.key, "version") .option(PARTITIONPATH_FIELD.key, "") .option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName) .mode(SaveMode.Append) @@ -228,4 +224,84 @@ class TestTimeTravelQuery extends HoodieClientTestBase { val format = new SimpleDateFormat("yyyy-MM-dd") format.format(date) } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testTimeTravelQueryWithSchemaEvolution(tableType: HoodieTableType): Unit = { + initMetaClient(tableType) + val _spark = spark + import _spark.implicits._ + + // First write + val df1 = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "version") + df1.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) + .option(PARTITIONPATH_FIELD.key, "") + .mode(SaveMode.Overwrite) + .save(basePath) + + metaClient = HoodieTableMetaClient.builder() + .setBasePath(basePath) + .setConf(spark.sessionState.newHadoopConf) + .build() + val firstCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + + // Second write + val df2 = Seq((1, "a1", 12, 1001, "2022")).toDF("id", "name", "value", "version", "year") + df2.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) + .option(PARTITIONPATH_FIELD.key, "") + .mode(SaveMode.Append) + .save(basePath) + metaClient.reloadActiveTimeline() + val secondCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + + // Third write + val df3 = Seq((1, "a1", 13, 1002, "2022", "08")).toDF("id", "name", "value", "version", "year", "month") + df3.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) + .option(PARTITIONPATH_FIELD.key, "") + .mode(SaveMode.Append) + .save(basePath) + metaClient.reloadActiveTimeline() + val thirdCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp + + val tableSchemaResolver = new TableSchemaResolver(metaClient) + + // Query as of firstCommitTime + val result1 = spark.read.format("hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, firstCommit) + .load(basePath) + .select("id", "name", "value", "version") + .take(1)(0) + assertEquals(Row(1, "a1", 10, 1000), result1) + val schema1 = tableSchemaResolver.getTableAvroSchema(firstCommit) + assertNull(schema1.getField("year")) + assertNull(schema1.getField("month")) + + // Query as of secondCommitTime + val result2 = spark.read.format("hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, secondCommit) + .load(basePath) + .select("id", "name", "value", "version", "year") + .take(1)(0) + assertEquals(Row(1, "a1", 12, 1001, "2022"), result2) + val schema2 = tableSchemaResolver.getTableAvroSchema(secondCommit) + assertNotNull(schema2.getField("year")) + assertNull(schema2.getField("month")) + + // Query as of thirdCommitTime + val result3 = spark.read.format("hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, thirdCommit) + .load(basePath) + .select("id", "name", "value", "version", "year", "month") + .take(1)(0) + assertEquals(Row(1, "a1", 13, 1002, "2022", "08"), result3) + val schema3 = tableSchemaResolver.getTableAvroSchema(thirdCommit) + assertNotNull(schema3.getField("year")) + assertNotNull(schema3.getField("month")) + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala index 2ddaa9f36b313..30d465f38d3c7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestTimeTravelTable.scala @@ -294,4 +294,63 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase { } } } + + test("Test Time Travel With Schema Evolution") { + if (HoodieSparkUtils.gteqSpark3_2) { + withTempDir { tmp => + spark.sql("set hoodie.schema.on.read.enable=true") + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + | location '${tmp.getCanonicalPath}/$tableName' + """.stripMargin) + + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(s"${tmp.getCanonicalPath}/$tableName") + .setConf(spark.sessionState.newHadoopConf()) + .build() + val instant1 = metaClient.reloadActiveTimeline().getAllCommitsTimeline + .lastInstant().get().getTimestamp + + // add column + spark.sql(s"alter table $tableName add columns (company string)") + spark.sql(s"insert into $tableName values(2, 'a2', 11, 1100, 'hudi')") + val instant2 = metaClient.reloadActiveTimeline().getAllCommitsTimeline + .lastInstant().get().getTimestamp + + // drop column + spark.sql(s"alter table $tableName drop column price") + + val result1 = spark.sql(s"select * from ${tableName} timestamp as of $instant1 order by id") + .drop("_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect() + checkAnswer(result1)(Seq(1, "a1", 10.0, 1000)) + + val result2 = spark.sql(s"select * from ${tableName} timestamp as of $instant2 order by id") + .drop("_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect() + checkAnswer(result2)( + Seq(1, "a1", 10.0, 1000, null), + Seq(2, "a2", 11.0, 1100, "hudi") + ) + + val result3 = spark.sql(s"select * from ${tableName} order by id") + .drop("_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect() + checkAnswer(result3)( + Seq(1, "a1", 1000, null), + Seq(2, "a2", 1100, "hudi") + ) + } + } + } }