Skip to content

Commit

Permalink
commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jackierwzhang committed Dec 7, 2022
1 parent 8dff8cf commit f43699e
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ case class DeltaTableV2(
timeTravelSpec.map { spec =>
// By default, block using CDF + time-travel
if (CDCReader.isCDCRead(cdcOptions) &&
!spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CDF_ENABLE_TIME_TRAVEL)) {
!spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CDF_ALLOW_TIME_TRAVEL)) {
throw DeltaErrors.timeTravelNotSupportedException
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ trait CDCReaderImpl extends DeltaLogging {

/**
* Get a Relation that represents change data between two snapshots of the table.
*
* @param spark Spark session
* @param snapshotToUse Snapshot to use to provide read schema and version
* @param isTimeTravelQuery Whether this CDC scan is used in conjunction with time-travel args
* @param conf SQL conf
* @param options CDC specific options
*/
def getCDCRelation(
spark: SparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -904,19 +904,21 @@ trait DeltaSQLConfBase {
val DELTA_CDF_DEFAULT_SCHEMA_MODE_FOR_COLUMN_MAPPING_TABLE =
buildConf("changeDataFeed.defaultSchemaModeForColumnMappingTable")
.doc(
"Reading batch CDF on column mapping enabled table requires schema mode to be set to " +
"`endVersion` so the ending version's schema will be used." +
"Set this to `latest` to use the schema of the latest available table version," +
"or to `legacy` to fallback to the non column-mapping default behavior, in which" +
"the time travel option can be used to select the version of the schema.")
"""Reading batch CDF on column mapping enabled table requires schema mode to be set to
|`endVersion` so the ending version's schema will be used.
|Set this to `latest` to use the schema of the latest available table version,
|or to `legacy` to fallback to the non column-mapping default behavior, in which
|the time travel option can be used to select the version of the schema.""".stripMargin)
.internal()
.stringConf
.createWithDefault("endVersion")

val DELTA_CDF_ENABLE_TIME_TRAVEL =
buildConf("changeDataFeed.timeTravel.enabled")
val DELTA_CDF_ALLOW_TIME_TRAVEL =
buildConf("changeDataFeed.allowTimeTravelOptionsForSchema")
.doc(
"If enabled, user can specify time-travel reader options while reading change data feed.")
s"""If allowed, user can specify time-travel reader options such as
|'versionAsOf' or 'timestampAsOf' to specify the read schema while
|reading change data feed.""".stripMargin)
.internal()
.booleanConf
.createWithDefault(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ trait DeltaCDCColumnMappingScalaSuiteBase extends DeltaCDCColumnMappingSuiteBase

// NOTE: we do not support time travel option with SQL API, so we will just test Scala API suite
test("cannot specify both time travel options and schema mode") {
withSQLConf(DeltaSQLConf.DELTA_CDF_ENABLE_TIME_TRAVEL.key -> "true") {
withSQLConf(DeltaSQLConf.DELTA_CDF_ALLOW_TIME_TRAVEL.key -> "true") {
withTempDir { dir =>
Seq(1).toDF("id").write.format("delta").save(dir.getCanonicalPath)
val e = intercept[DeltaIllegalArgumentException] {
Expand All @@ -601,7 +601,7 @@ trait DeltaCDCColumnMappingScalaSuiteBase extends DeltaCDCColumnMappingSuiteBase
}

test("time travel option is respected") {
withSQLConf(DeltaSQLConf.DELTA_CDF_ENABLE_TIME_TRAVEL.key -> "true") {
withSQLConf(DeltaSQLConf.DELTA_CDF_ALLOW_TIME_TRAVEL.key -> "true") {
withTempDir { dir =>
// Set up an initial table with 10 records in schema <id string, value string>
setupInitialDeltaTable(dir)
Expand Down

0 comments on commit f43699e

Please sign in to comment.