Skip to content

Commit

Permalink
[HUDI-4703] use the historical schema to response time travel query (a…
Browse files Browse the repository at this point in the history
…pache#6499)

* [HUDI-4703] use the historical schema to response time travel query

(cherry picked from commit b6b5159)
  • Loading branch information
YannByron authored and neverdizzy committed Dec 1, 2022
1 parent 291844e commit 53e61d1
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.IndexedRecord;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
Expand All @@ -53,8 +55,10 @@
import org.apache.hudi.io.storage.HoodieHFileReader;
import org.apache.hudi.io.storage.HoodieOrcReader;
import org.apache.hudi.util.Lazy;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
Expand Down Expand Up @@ -138,6 +142,19 @@ public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception
return getTableAvroSchemaInternal(includeMetadataFields, Option.empty());
}

/**
* Fetches tables schema in Avro format as of the given instant
*
* @param timestamp as of which table's schema will be fetched
*/
public Schema getTableAvroSchema(String timestamp) throws Exception {
Option<HoodieInstant> instant = metaClient.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants()
.findInstantsBeforeOrEquals(timestamp)
.lastInstant();
return getTableAvroSchemaInternal(metaClient.getTableConfig().populateMetaFields(), instant);
}

/**
* Fetches tables schema in Avro format as of the given instant
*
Expand Down Expand Up @@ -496,6 +513,18 @@ public Option<InternalSchema> getTableInternalSchemaFromCommitMetadata() {
return timeline.lastInstant().flatMap(this::getTableInternalSchemaFromCommitMetadata);
}

/**
* Gets the InternalSchema for a hoodie table from the HoodieCommitMetadata of the instant.
*
* @return InternalSchema for this table
*/
public Option<InternalSchema> getTableInternalSchemaFromCommitMetadata(String timestamp) {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants()
.findInstantsBeforeOrEquals(timestamp);
return timeline.lastInstant().flatMap(this::getTableInternalSchemaFromCommitMetadata);
}

/**
* Gets the InternalSchema for a hoodie table from the HoodieCommitMetadata of the instant.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package org.apache.hudi

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hadoop.mapred.JobConf

import org.apache.hudi.HoodieBaseRelation._
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.HoodieAvroUtils
Expand All @@ -41,6 +43,7 @@ import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
import org.apache.hudi.io.storage.HoodieHFileReader

import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
Expand All @@ -59,6 +62,7 @@ import org.apache.spark.unsafe.types.UTF8String

import java.net.URI
import java.util.Locale

import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -139,7 +143,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
val internalSchemaOpt = if (!isSchemaEvolutionEnabled) {
None
} else {
Try(schemaResolver.getTableInternalSchemaFromCommitMetadata) match {
Try {
specifiedQueryTimestamp.map(schemaResolver.getTableInternalSchemaFromCommitMetadata)
.getOrElse(schemaResolver.getTableInternalSchemaFromCommitMetadata)
} match {
case Success(internalSchemaOpt) => toScalaOption(internalSchemaOpt)
case Failure(e) =>
logWarning("Failed to fetch internal-schema from the table", e)
Expand All @@ -149,6 +156,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

val avroSchema = internalSchemaOpt.map { is =>
AvroInternalSchemaConverter.convert(is, "schema")
} orElse {
specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
} orElse {
schemaSpec.map(convertToAvroSchema)
} getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package org.apache.hudi.functional

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator}
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}

import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}

import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertNull, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
Expand All @@ -39,9 +42,8 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
"hoodie.upsert.shuffle.parallelism" -> "4",
"hoodie.bulkinsert.shuffle.parallelism" -> "2",
"hoodie.delete.shuffle.parallelism" -> "1",
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "version",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
)

Expand Down Expand Up @@ -72,8 +74,6 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
df1.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "version")
.option(PARTITIONPATH_FIELD.key, "")
.option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName)
.mode(SaveMode.Overwrite)
Expand All @@ -86,8 +86,6 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
df2.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "version")
.option(PARTITIONPATH_FIELD.key, "")
.option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName)
.mode(SaveMode.Append)
Expand All @@ -100,8 +98,6 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
df3.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "version")
.option(PARTITIONPATH_FIELD.key, "")
.option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName)
.mode(SaveMode.Append)
Expand Down Expand Up @@ -228,4 +224,84 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
val format = new SimpleDateFormat("yyyy-MM-dd")
format.format(date)
}

@ParameterizedTest
@EnumSource(value = classOf[HoodieTableType])
def testTimeTravelQueryWithSchemaEvolution(tableType: HoodieTableType): Unit = {
initMetaClient(tableType)
val _spark = spark
import _spark.implicits._

// First write
val df1 = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "version")
df1.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
.option(PARTITIONPATH_FIELD.key, "")
.mode(SaveMode.Overwrite)
.save(basePath)

metaClient = HoodieTableMetaClient.builder()
.setBasePath(basePath)
.setConf(spark.sessionState.newHadoopConf)
.build()
val firstCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp

// Second write
val df2 = Seq((1, "a1", 12, 1001, "2022")).toDF("id", "name", "value", "version", "year")
df2.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
.option(PARTITIONPATH_FIELD.key, "")
.mode(SaveMode.Append)
.save(basePath)
metaClient.reloadActiveTimeline()
val secondCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp

// Third write
val df3 = Seq((1, "a1", 13, 1002, "2022", "08")).toDF("id", "name", "value", "version", "year", "month")
df3.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
.option(PARTITIONPATH_FIELD.key, "")
.mode(SaveMode.Append)
.save(basePath)
metaClient.reloadActiveTimeline()
val thirdCommit = metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp

val tableSchemaResolver = new TableSchemaResolver(metaClient)

// Query as of firstCommitTime
val result1 = spark.read.format("hudi")
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, firstCommit)
.load(basePath)
.select("id", "name", "value", "version")
.take(1)(0)
assertEquals(Row(1, "a1", 10, 1000), result1)
val schema1 = tableSchemaResolver.getTableAvroSchema(firstCommit)
assertNull(schema1.getField("year"))
assertNull(schema1.getField("month"))

// Query as of secondCommitTime
val result2 = spark.read.format("hudi")
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, secondCommit)
.load(basePath)
.select("id", "name", "value", "version", "year")
.take(1)(0)
assertEquals(Row(1, "a1", 12, 1001, "2022"), result2)
val schema2 = tableSchemaResolver.getTableAvroSchema(secondCommit)
assertNotNull(schema2.getField("year"))
assertNull(schema2.getField("month"))

// Query as of thirdCommitTime
val result3 = spark.read.format("hudi")
.option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, thirdCommit)
.load(basePath)
.select("id", "name", "value", "version", "year", "month")
.take(1)(0)
assertEquals(Row(1, "a1", 13, 1002, "2022", "08"), result3)
val schema3 = tableSchemaResolver.getTableAvroSchema(thirdCommit)
assertNotNull(schema3.getField("year"))
assertNotNull(schema3.getField("month"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,4 +294,63 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase {
}
}
}

test("Test Time Travel With Schema Evolution") {
if (HoodieSparkUtils.gteqSpark3_2) {
withTempDir { tmp =>
spark.sql("set hoodie.schema.on.read.enable=true")
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts'
| )
| location '${tmp.getCanonicalPath}/$tableName'
""".stripMargin)

spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")

val metaClient = HoodieTableMetaClient.builder()
.setBasePath(s"${tmp.getCanonicalPath}/$tableName")
.setConf(spark.sessionState.newHadoopConf())
.build()
val instant1 = metaClient.reloadActiveTimeline().getAllCommitsTimeline
.lastInstant().get().getTimestamp

// add column
spark.sql(s"alter table $tableName add columns (company string)")
spark.sql(s"insert into $tableName values(2, 'a2', 11, 1100, 'hudi')")
val instant2 = metaClient.reloadActiveTimeline().getAllCommitsTimeline
.lastInstant().get().getTimestamp

// drop column
spark.sql(s"alter table $tableName drop column price")

val result1 = spark.sql(s"select * from ${tableName} timestamp as of $instant1 order by id")
.drop("_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
checkAnswer(result1)(Seq(1, "a1", 10.0, 1000))

val result2 = spark.sql(s"select * from ${tableName} timestamp as of $instant2 order by id")
.drop("_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
checkAnswer(result2)(
Seq(1, "a1", 10.0, 1000, null),
Seq(2, "a2", 11.0, 1100, "hudi")
)

val result3 = spark.sql(s"select * from ${tableName} order by id")
.drop("_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
checkAnswer(result3)(
Seq(1, "a1", 1000, null),
Seq(2, "a2", 1100, "hudi")
)
}
}
}
}

0 comments on commit 53e61d1

Please sign in to comment.