From 8dff8cf16691bafa750cb22edea8da18539bbbc4 Mon Sep 17 00:00:00 2001 From: jackierwzhang Date: Fri, 2 Dec 2022 09:41:25 -0800 Subject: [PATCH 1/4] commit --- .../resources/error/delta-error-classes.json | 22 +- .../spark/sql/delta/DeltaColumnMapping.scala | 27 + .../apache/spark/sql/delta/DeltaErrors.scala | 44 +- .../org/apache/spark/sql/delta/DeltaLog.scala | 11 +- .../apache/spark/sql/delta/DeltaOptions.scala | 53 +- .../sql/delta/DeltaSharedExceptions.scala | 6 +- .../sql/delta/catalog/DeltaTableV2.scala | 7 + .../sql/delta/commands/cdc/CDCReader.scala | 216 ++++-- .../sql/delta/sources/DeltaSQLConf.scala | 20 + .../delta/sources/DeltaSourceCDCSupport.scala | 4 +- .../delta/DeltaCDCColumnMappingSuite.scala | 706 ++++++++++++++++++ .../spark/sql/delta/DeltaCDCSuite.scala | 237 +----- .../spark/sql/delta/DeltaErrorsSuite.scala | 19 +- 13 files changed, 1083 insertions(+), 289 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala diff --git a/core/src/main/resources/error/delta-error-classes.json b/core/src/main/resources/error/delta-error-classes.json index 2fe9b694ec6..26b9c5fa24c 100644 --- a/core/src/main/resources/error/delta-error-classes.json +++ b/core/src/main/resources/error/delta-error-classes.json @@ -76,7 +76,9 @@ }, "DELTA_BLOCK_COLUMN_MAPPING_SCHEMA_INCOMPATIBLE_OPERATION" : { "message" : [ - " is not supported on tables with column mapping schema changes (e.g. rename or drop). Read schema: . Incompatible schema: . You may force enable streaming read at your own risk by turning on ." + " is not supported on tables with read-incompatible schema changes (e.g. rename or drop or datatype changes).", + "Read schema: . Incompatible data schema: .", + "Although strongly not recommended, you may also force ignore the schema checks during at your own risk of potentially incorrect results by turning on the SQL conf ." ], "sqlState" : "0A000" }, @@ -260,6 +262,22 @@ ], "sqlState" : "0A000" }, + "DELTA_CHANGE_DATA_FEED_INCOMPATIBLE_DATA_SCHEMA" : { + "message" : [ + "Retrieving table changes between version and failed because of an incompatible data schema.", + "Your read schema is at version , but we found an incompatible data schema at version .", + "If possible, please retrieve the table changes using the end version's schema by setting to `endVersion`, or contact support." + ], + "sqlState" : "0A000" + }, + "DELTA_CHANGE_DATA_FEED_INCOMPATIBLE_SCHEMA_CHANGE" : { + "message" : [ + "Retrieving table changes between version and failed because of an incompatible schema change.", + "Your read schema is at version , but we found an incompatible schema change at version .", + "If possible, please query table changes separately from version to - 1, and from version to ." + ], + "sqlState" : "0A000" + }, "DELTA_CHANGE_TABLE_FEED_DISABLED" : { "message" : [ "Cannot write to table with delta.enableChangeDataFeed set. Change data feed from Delta is not available." @@ -1846,7 +1864,7 @@ }, "DELTA_UNSUPPORTED_TIME_TRAVEL_VIEWS" : { "message" : [ - "Cannot time travel views, subqueries or streams." + "Cannot time travel views, subqueries, streams or change data feed queries." ], "sqlState" : "0A000" }, diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala index 9b02e45eb10..7a406e0cc43 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala @@ -569,6 +569,33 @@ trait DeltaColumnMappingBase extends DeltaLogging { true } } + + /** + * Check if a metadata we are scanning is read compatible with another one, considering + * both column mapping changes (rename or drop) as well as other standard checks. + */ + def isMetadataSchemaReadCompatible( + curVersion: Long, + curMetadata: Metadata, + readVersion: Long, + readMetadata: Metadata): Boolean = { + val (oldMetadata, newMetadata) = if (curVersion < readVersion) { + // Snapshot version is newer, ensure there's no read-incompatible CM schema changes + // from current version to snapshot version. + (curMetadata, readMetadata) + } else { + // Current metadata action version is newer, ensure there's no read-incompatible CM + // schema changes from snapshot version to current version. + (readMetadata, curMetadata) + } + // For column-mapping checks, we need to consider version order so we don't accidentally treat + // ADD COLUMN (Ok) as a reverse DROP COLUMN (Not ok). + // For non column-mapping checks, usually we don't need to consider version order, but in CDC + // case, as the current semantics is to ignore ADD COLUMN schema change post the analyzed schema + // version to match non-CDC batch behavior, considering the order can help with that. + DeltaColumnMapping.isColumnMappingReadCompatible(newMetadata, oldMetadata) && + SchemaUtils.isReadCompatible(oldMetadata.schema, newMetadata.schema) + } } object DeltaColumnMapping extends DeltaColumnMappingBase diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 0a89ea847a3..9bd9799719b 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -2425,14 +2425,37 @@ trait DeltaErrorsBase // scalastyle:on line.size.limit } - def blockBatchCdfReadOnColumnMappingEnabledTable( + /** + * If `isSchemaChange` is false, this means the `incompatVersion` actually refers to a data schema + * instead of a schema change. This happens when we could not find any read-incompatible schema + * changes within the querying range, but the read schema is still NOT compatible with the data + * files being queried, which could happen if user falls back to `legacy` mode and read past data + * using some diverged latest schema or time-travelled schema. In this uncommon case, we should + * tell the user to try setting it back to endVersion, OR ask us to give them the flag to force + * unblock. + */ + def blockBatchCdfReadWithIncompatibleSchemaChange( + start: Long, + end: Long, readSchema: StructType, - incompatibleSchema: StructType): Throwable = { - new DeltaColumnMappingUnsupportedSchemaIncompatibleException( - "Change Data Feed (CDF) read", - readSchema, - incompatibleSchema, - DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key + readVersion: Long, + incompatVersion: Long, + isSchemaChange: Boolean = true): Throwable = { + new DeltaUnsupportedOperationException( + if (isSchemaChange) { + "DELTA_CHANGE_DATA_FEED_INCOMPATIBLE_SCHEMA_CHANGE" + } else { + "DELTA_CHANGE_DATA_FEED_INCOMPATIBLE_DATA_SCHEMA" + }, + messageParameters = Array( + start.toString, end.toString, + readSchema.json, readVersion.toString, incompatVersion.toString) ++ { + if (isSchemaChange) { + Array(start.toString, incompatVersion.toString, incompatVersion.toString, end.toString) + } else { + Array(DeltaSQLConf.DELTA_CDF_DEFAULT_SCHEMA_MODE_FOR_COLUMN_MAPPING_TABLE.key) + } + } ) } @@ -2762,5 +2785,10 @@ class DeltaColumnMappingUnsupportedSchemaIncompatibleException( val additionalProperties: Map[String, String] = Map.empty) extends DeltaUnsupportedOperationException( errorClass = "DELTA_BLOCK_COLUMN_MAPPING_SCHEMA_INCOMPATIBLE_OPERATION", - messageParameters = Array(opName, readSchema.json, incompatibleSchema.json, escapeConfigName) + messageParameters = Array( + opName, + readSchema.json, + incompatibleSchema.json, + opName, + escapeConfigName) ) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index 394ac664202..f04b2a962a6 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -139,7 +139,12 @@ class DeltaLog private( * Tombstones before this timestamp will be dropped from the state and the files can be * garbage collected. */ - def minFileRetentionTimestamp: Long = clock.getTimeMillis() - tombstoneRetentionMillis + def minFileRetentionTimestamp: Long = { + // TODO (Fred): Get rid of this FrameProfiler record once SC-94033 is addressed + recordFrameProfile("Delta", "DeltaLog.minFileRetentionTimestamp") { + clock.getTimeMillis() - tombstoneRetentionMillis + } + } /** * [[SetTransaction]]s before this timestamp will be considered expired and dropped from the @@ -456,8 +461,8 @@ class DeltaLog private( // data. if (!cdcOptions.isEmpty) { recordDeltaEvent(this, "delta.cdf.read", data = cdcOptions.asCaseSensitiveMap()) - return CDCReader.getCDCRelation(spark, - this, snapshotToUse, partitionFilters, spark.sessionState.conf, cdcOptions) + return CDCReader.getCDCRelation( + spark, snapshotToUse, isTimeTravelQuery, spark.sessionState.conf, cdcOptions) } val fileIndex = TahoeLogFileIndex( diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala index 9cc4cd35068..0cf3da77856 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala @@ -250,6 +250,10 @@ object DeltaOptions extends DeltaLogging { val CDC_END_TIMESTAMP = "endingTimestamp" val CDC_READ_OPTION = "readChangeFeed" val CDC_READ_OPTION_LEGACY = "readChangeData" + + val VERSION_AS_OF = "versionAsOf" + val TIMESTAMP_AS_OF = "timestampAsOf" + val COMPRESSION = "compression" val MAX_RECORDS_PER_FILE = "maxRecordsPerFile" val TXN_APP_ID = "txnAppId" @@ -284,8 +288,8 @@ object DeltaOptions extends DeltaLogging { "queryName", "checkpointLocation", "path", - "timestampAsOf", - "versionAsOf" + VERSION_AS_OF, + TIMESTAMP_AS_OF ) @@ -302,6 +306,51 @@ object DeltaOptions extends DeltaLogging { } } +/** + * Definitions for the batch read schema mode for CDF + */ +sealed trait DeltaBatchCDFSchemaMode { + def name: String +} + +/** + * `latest` batch CDF schema mode specifies that the latest schema should be used when serving + * the CDF batch. + */ +case object BatchCDFSchemaLatest extends DeltaBatchCDFSchemaMode { + val name = "latest" +} + +/** + * `endVersion` batch CDF schema mode specifies that the query range's end version's schema should + * be used for serving the CDF batch. + * This is the current default for column mapping enabled tables so we could read using the exact + * schema at the versions being queried to reduce schema read compatibility mismatches. + */ +case object BatchCDFSchemaEndVersion extends DeltaBatchCDFSchemaMode { + val name = "endversion" +} + +/** + * `legacy` batch CDF schema mode specifies that neither latest nor end version's schema is + * strictly used for serving the CDF batch, e.g. when user uses TimeTravel with batch CDF and wants + * to respect the time travelled schema. + * This is the current default for non-column mapping tables. + */ +case object BatchCDFSchemaLegacy extends DeltaBatchCDFSchemaMode { + val name = "legacy" +} + +object DeltaBatchCDFSchemaMode { + def apply(name: String): DeltaBatchCDFSchemaMode = { + name.toLowerCase(Locale.ROOT) match { + case BatchCDFSchemaLatest.name => BatchCDFSchemaLatest + case BatchCDFSchemaEndVersion.name => BatchCDFSchemaEndVersion + case BatchCDFSchemaLegacy.name => BatchCDFSchemaLegacy + } + } +} + /** * Definitions for the starting version of a Delta stream. */ diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaSharedExceptions.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaSharedExceptions.scala index 6e889d9d050..d282e3d2949 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaSharedExceptions.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaSharedExceptions.scala @@ -25,7 +25,9 @@ class DeltaAnalysisException( errorClass = Some(errorClass), messageParameters = messageParameters, cause = cause) - with DeltaThrowable + with DeltaThrowable { + def getMessageParametersArray: Array[String] = messageParameters +} class DeltaIllegalArgumentException( errorClass: String, @@ -35,6 +37,7 @@ class DeltaIllegalArgumentException( DeltaThrowableHelper.getMessage(errorClass, messageParameters), cause) with DeltaThrowable { override def getErrorClass: String = errorClass + def getMessageParametersArray: Array[String] = messageParameters } class DeltaUnsupportedOperationException( @@ -44,4 +47,5 @@ class DeltaUnsupportedOperationException( DeltaThrowableHelper.getMessage(errorClass, messageParameters)) with DeltaThrowable { override def getErrorClass: String = errorClass + def getMessageParametersArray: Array[String] = messageParameters } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala index 580d383d020..ab1e3656400 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.delta.commands.WriteIntoDelta import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSourceUtils} +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.hadoop.fs.Path import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} @@ -101,6 +102,12 @@ case class DeltaTableV2( lazy val snapshot: Snapshot = { timeTravelSpec.map { spec => + // By default, block using CDF + time-travel + if (CDCReader.isCDCRead(cdcOptions) && + !spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CDF_ENABLE_TIME_TRAVEL)) { + throw DeltaErrors.timeTravelNotSupportedException + } + val (version, accessType) = DeltaTableUtils.resolveTimeTravelVersion( spark.sessionState.conf, deltaLog, spec) val source = spec.creationSource.getOrElse("unknown") diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala index 7f84e9df207..7ff287ca079 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala @@ -86,27 +86,63 @@ object CDCReader extends CDCReaderImpl // CDC specific columns in data written by operations val CDC_COLUMNS_IN_DATA = Seq(CDC_PARTITION_COL, CDC_TYPE_COLUMN_NAME) + // A snapshot coupled with a schema mode that user specified + case class SnapshotWithSchemaMode(snapshot: Snapshot, schemaMode: DeltaBatchCDFSchemaMode) + /** * A special BaseRelation wrapper for CDF reads. */ case class DeltaCDFRelation( - schema: StructType, + snapshotWithSchemaMode: SnapshotWithSchemaMode, sqlContext: SQLContext, - deltaLog: DeltaLog, startingVersion: Option[Long], endingVersion: Option[Long]) extends BaseRelation with PrunedFilteredScan { + private val deltaLog = snapshotWithSchemaMode.snapshot.deltaLog + + private lazy val latestVersionOfTableDuringAnalysis: Long = deltaLog.update().version + + /** + * There may be a slight divergence here in terms of what schema is in the latest data vs what + * schema we have captured during analysis, but this is an inherent limitation of Spark. + * + * However, if there are schema changes between analysis and execution, since we froze this + * schema, our schema incompatibility checks will kick in during the scan so we will always + * be safe - Although it is a notable caveat that user should be aware of because the CDC query + * may break. + */ + private lazy val endingVersionForBatchSchema: Long = endingVersion.map { v => + // As defined in docs, if ending version is greater than the latest version, we will just use + // the latest version to find the schema. + latestVersionOfTableDuringAnalysis min v + }.getOrElse { + // Or if endingVersion is not specified, we just use the latest schema. + latestVersionOfTableDuringAnalysis + } + + // The final snapshot whose schema is going to be used as this CDF relation's schema + private val snapshotForBatchSchema: Snapshot = snapshotWithSchemaMode.schemaMode match { + case BatchCDFSchemaEndVersion => + // Fetch the ending version and its schema + deltaLog.getSnapshotAt(endingVersionForBatchSchema) + case _ => + // Apply the default, either latest generated by DeltaTableV2 or specified by Time-travel + // options. + snapshotWithSchemaMode.snapshot + } + + override val schema: StructType = cdcReadSchema(snapshotForBatchSchema.metadata.schema) + override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { val df = changesToBatchDF( deltaLog, startingVersion.get, + // The actual ending version we should scan until during execution, as it might have changed endingVersion.getOrElse { - // If no ending version was specified, use the latest version as of scan building time. - // Note that this line won't be invoked (and thus we won't incur the update() cost) - // when endingVersion is present. deltaLog.update().version }, - sqlContext.sparkSession) + sqlContext.sparkSession, + Some(snapshotForBatchSchema)) df.select(requiredColumns.map(SchemaUtils.fieldNameToColumn): _*).rdd } @@ -161,20 +197,40 @@ trait CDCReaderImpl extends DeltaLogging { } } + /** + * Get the batch cdf schema mode for a table, considering whether it has column mapping enabled + * or not. + */ + def getBatchSchemaModeForTable( + spark: SparkSession, + snapshot: Snapshot): DeltaBatchCDFSchemaMode = { + if (snapshot.metadata.columnMappingMode != NoMapping) { + // Column-mapping table uses exact schema by default, but can be overridden by conf + DeltaBatchCDFSchemaMode(spark.sessionState.conf.getConf( + DeltaSQLConf.DELTA_CDF_DEFAULT_SCHEMA_MODE_FOR_COLUMN_MAPPING_TABLE + )) + } else { + // Non column-mapping table uses the current default, which is typically `legacy` - usually + // the latest schema is used, but it can depend on time-travel arguments as well. + // Using time-travel arguments with CDF is default blocked right now as it is an non-expected + // use case, users can unblock themselves with `DeltaSQLConf.DELTA_CDF_ENABLE_TIME_TRAVEL`. + BatchCDFSchemaLegacy + } + } + /** * Get a Relation that represents change data between two snapshots of the table. */ def getCDCRelation( spark: SparkSession, - deltaLog: DeltaLog, snapshotToUse: Snapshot, - partitionFilters: Seq[Expression], + isTimeTravelQuery: Boolean, conf: SQLConf, options: CaseInsensitiveStringMap): BaseRelation = { val startingVersion = getVersionForCDC( spark, - deltaLog, + snapshotToUse.deltaLog, conf, options, DeltaDataSource.CDC_START_VERSION_KEY, @@ -185,6 +241,18 @@ trait CDCReaderImpl extends DeltaLogging { throw DeltaErrors.noStartVersionForCDC() } + val schemaMode = getBatchSchemaModeForTable(spark, snapshotToUse) + + // Non-legacy schema mode options cannot be used with time-travel because the schema to use + // will be confusing. + if (isTimeTravelQuery && schemaMode != BatchCDFSchemaLegacy) { + throw DeltaErrors.illegalDeltaOptionException( + DeltaSQLConf.DELTA_CDF_DEFAULT_SCHEMA_MODE_FOR_COLUMN_MAPPING_TABLE.key, + schemaMode.name, + s"${DeltaSQLConf.DELTA_CDF_DEFAULT_SCHEMA_MODE_FOR_COLUMN_MAPPING_TABLE.key} " + + s"cannot be used with time travel options.") + } + // add a version check here that is cheap instead of after trying to list a large version // that doesn't exist if (startingVersion.get > snapshotToUse.version) { @@ -192,12 +260,10 @@ trait CDCReaderImpl extends DeltaLogging { // LS-129: return an empty relation if start version passed in is beyond latest commit version if (allowOutOfRange) { return new DeltaCDFRelation( - cdcReadSchema(snapshotToUse.metadata.schema), + SnapshotWithSchemaMode(snapshotToUse, schemaMode), spark.sqlContext, - deltaLog, None, - None - ) { + None) { override def buildScan( requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { @@ -211,7 +277,7 @@ trait CDCReaderImpl extends DeltaLogging { val endingVersion = getVersionForCDC( spark, - deltaLog, + snapshotToUse.deltaLog, conf, options, DeltaDataSource.CDC_END_VERSION_KEY, @@ -225,12 +291,10 @@ trait CDCReaderImpl extends DeltaLogging { logInfo(s"startingVersion: $startingVersion, endingVersion: $endingVersion") DeltaCDFRelation( - cdcReadSchema(snapshotToUse.metadata.schema), + SnapshotWithSchemaMode(snapshotToUse, schemaMode), spark.sqlContext, - deltaLog, startingVersion, - endingVersion - ) + endingVersion) } /** @@ -273,7 +337,11 @@ trait CDCReaderImpl extends DeltaLogging { * given action type to read CDC data. These FileIndexes are then unioned to produce the final * DataFrame. * - * @param deltaLog - DeltaLog for the table for which we are creating a cdc dataFrame + * @param readSchemaSnapshot - Snapshot for the table for which we are creating a CDF + * Dataframe, the schema of the snapshot is expected to be + * the change DF's schema. We have already adjusted this + * snapshot with the schema mode if there's any. We don't use + * its data actually. * @param start - startingVersion of the changes * @param end - endingVersion of the changes * @param changes - changes is an iterator of all FileActions for a particular commit version. @@ -286,20 +354,19 @@ trait CDCReaderImpl extends DeltaLogging { * related to the changes */ def changesToDF( - deltaLog: DeltaLog, + readSchemaSnapshot: Snapshot, start: Long, end: Long, changes: Iterator[(Long, Seq[Action])], spark: SparkSession, isStreaming: Boolean = false, ignoreAddCDCFileActions: Boolean = false): CDCVersionDiffInfo = { + val deltaLog = readSchemaSnapshot.deltaLog if (end < start) { throw DeltaErrors.endBeforeStartVersionInCDC(start, end) } - val snapshot = deltaLog.unsafeVolatileSnapshot - // A map from change version to associated commit timestamp. val timestampsByVersion: Map[Long, Timestamp] = getTimestampsByVersion(deltaLog, start, end, spark) @@ -313,24 +380,44 @@ trait CDCReaderImpl extends DeltaLogging { throw DeltaErrors.changeDataNotRecordedException(start, start, end) } - /** - * TODO: Unblock this when we figure out the correct semantics. - * Currently batch CDC read on column mapping tables with Rename/Drop is blocked due to - * unclear semantics. - * Streaming CDF read is blocked on a separate code path in DeltaSource. - */ - val shouldCheckToBlockBatchReadOnColumnMappingTable = - !isStreaming && - snapshot.metadata.columnMappingMode != NoMapping && - !spark.sessionState.conf.getConf( + // Check schema read-compatibility + val forceEnableUnsafeBatchReadOnIncompatibleSchemaChanges = + spark.sessionState.conf.getConf( DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES) - // Compare with start snapshot's metadata schema to fail fast - if (shouldCheckToBlockBatchReadOnColumnMappingTable && - !DeltaColumnMapping.isColumnMappingReadCompatible( - snapshot.metadata, startVersionSnapshot.metadata)) { - throw DeltaErrors.blockBatchCdfReadOnColumnMappingEnabledTable( - snapshot.metadata.schema, startVersionSnapshot.metadata.schema) + /** + * Check any metadata (which may contain schema change) against a snapshot pointing to data + * schema of the querying range, it can be either: + * 1. The start version snapshot - this is used to detect if a metadata action actually contains + * a read-incompatible schema CHANGE. + * 2. The read schema's snapshot - typically 1. could suffice, but we have corner cases in which + * all schema changes are compatible but the read schema still diverges. So we need to run + * another check against the read schema as well. + */ + def checkBatchReadColumnMappingSchemaIncompatibility( + metadata: Metadata, + metadataVer: Long, + snapshotWithDataSchema: Snapshot, + isSchemaChange: Boolean): Unit = { + val shouldCheckToBlockBatchReadOnColumnMappingTable = + !isStreaming && !forceEnableUnsafeBatchReadOnIncompatibleSchemaChanges + + // We do not check for any incompatibility if the global "I don't care" flag is turned on + if (shouldCheckToBlockBatchReadOnColumnMappingTable) { + // If there's schema incompatibility ... + if (!DeltaColumnMapping.isMetadataSchemaReadCompatible( + metadataVer, metadata, + snapshotWithDataSchema.version, snapshotWithDataSchema.metadata)) { + throw DeltaErrors.blockBatchCdfReadWithIncompatibleSchemaChange( + start, end, + // The consistent read schema + readSchemaSnapshot.metadata.schema, readSchemaSnapshot.version, + // The conflicting schema or schema change version + metadataVer, + isSchemaChange + ) + } + } } var totalBytes = 0L @@ -350,13 +437,21 @@ trait CDCReaderImpl extends DeltaLogging { throw DeltaErrors.changeDataNotRecordedException(v, start, end) } - // Check all intermediary metadata schema changes as well - if (shouldCheckToBlockBatchReadOnColumnMappingTable) { + // Check all intermediary metadata schema changes, this guarantees that there will be no + // read-incompatible schema changes across the querying range. + // Note that we don't have to check the schema change if it's at the start version, because: + // 1. If it's an initialization, e.g. CREATE AS SELECT, we don't have to consider this + // as a schema change and report weird error messages. + // 2. If it's indeed a schema change, as we won't be reading any data prior to it that + // falls back to the previous (possibly incorrect) schema, we will be safe. Also if there + // are any data file residing in the same commit, it will follow the new schema as well. + if (v > start) { actions.collect { case a: Metadata => a }.foreach { metadata => - if (!DeltaColumnMapping.isColumnMappingReadCompatible(snapshot.metadata, metadata)) { - throw DeltaErrors.blockBatchCdfReadOnColumnMappingEnabledTable( - snapshot.metadata.schema, metadata.schema) - } + // Verify with start snapshot to check for any read-incompatible changes + // This also detects the corner case in that there's only one schema change between + // start and end, which looks exactly like the end schema. + checkBatchReadColumnMappingSchemaIncompatibility( + metadata, v, startVersionSnapshot, isSchemaChange = true) } } @@ -405,23 +500,37 @@ trait CDCReaderImpl extends DeltaLogging { } } + // Verify the final read schema with the start snapshot version once again + // This is needed to: + // 1. Handle the case in that there are no read-incompatible schema change with the range, BUT + // the latest schema may still be incompatible as it COULD be arbitrary. + // 2. Similarly, handle the corner case when there are no read-incompatible schema change with + // the range, BUT time-travel is used so the read schema could also be arbitrary. + // It is sufficient to just verify with the start version schema because we have already + // verified that all data being queries is read-compatible with start schema. + checkBatchReadColumnMappingSchemaIncompatibility( + startVersionSnapshot.metadata, startVersionSnapshot.version, + readSchemaSnapshot, isSchemaChange = false + ) + val dfs = ListBuffer[DataFrame]() if (changeFiles.nonEmpty) { dfs.append(scanIndex( spark, - new TahoeChangeFileIndex(spark, changeFiles.toSeq, deltaLog, deltaLog.dataPath, snapshot), - snapshot.metadata, + new TahoeChangeFileIndex( + spark, changeFiles.toSeq, deltaLog, deltaLog.dataPath, readSchemaSnapshot), + readSchemaSnapshot.metadata, isStreaming)) } val deletedAndAddedRows = getDeletedAndAddedRows(addFiles.toSeq, removeFiles.toSeq, deltaLog, - snapshot, isStreaming, spark) + readSchemaSnapshot, isStreaming, spark) dfs.append(deletedAndAddedRows: _*) // build an empty DS. This DS retains the table schema and the isStreaming property val emptyDf = spark.sqlContext.internalCreateDataFrame( spark.sparkContext.emptyRDD[InternalRow], - cdcReadSchema(snapshot.metadata.schema), + cdcReadSchema(readSchemaSnapshot.metadata.schema), isStreaming) CDCVersionDiffInfo( @@ -489,19 +598,28 @@ trait CDCReaderImpl extends DeltaLogging { /** * Get the block of change data from start to end Delta log versions (both sides inclusive). * The returned DataFrame has isStreaming set to false. + * + * @param readSchemaSnapshot The snapshot with the desired schema that will be used to + * serve this CDF batch. It is usually passed upstream from + * e.g. DeltaTableV2 as an effort to stablize the schema used for the + * batch DF. We don't actually use its data. + * If not set, it will fallback to the legacy behavior of using + * whatever deltaLog.unsafeVolatileSnapshot is. This should be + * avoided in production. */ def changesToBatchDF( deltaLog: DeltaLog, start: Long, end: Long, spark: SparkSession, + readSchemaSnapshot: Option[Snapshot] = None, ignoreAddCDCFileActions: Boolean = false): DataFrame = { val changesWithinRange = deltaLog.getChanges(start).takeWhile { case (version, _) => version <= end } changesToDF( - deltaLog, + readSchemaSnapshot.getOrElse(deltaLog.unsafeVolatileSnapshot), start, end, changesWithinRange, diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 79a9737b80c..6060bea58ad 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -901,6 +901,26 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(false) + val DELTA_CDF_DEFAULT_SCHEMA_MODE_FOR_COLUMN_MAPPING_TABLE = + buildConf("changeDataFeed.defaultSchemaModeForColumnMappingTable") + .doc( + "Reading batch CDF on column mapping enabled table requires schema mode to be set to " + + "`endVersion` so the ending version's schema will be used." + + "Set this to `latest` to use the schema of the latest available table version," + + "or to `legacy` to fallback to the non column-mapping default behavior, in which" + + "the time travel option can be used to select the version of the schema.") + .internal() + .stringConf + .createWithDefault("endVersion") + + val DELTA_CDF_ENABLE_TIME_TRAVEL = + buildConf("changeDataFeed.timeTravel.enabled") + .doc( + "If enabled, user can specify time-travel reader options while reading change data feed.") + .internal() + .booleanConf + .createWithDefault(false) + val DYNAMIC_PARTITION_OVERWRITE_ENABLED = buildConf("dynamicPartitionOverwrite.enabled") .doc("Whether to overwrite partitions dynamically when 'partitionOverwriteMode' is set to " + diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala index 44d6eab7f58..434f406b575 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala @@ -16,10 +16,8 @@ package org.apache.spark.sql.delta.sources -import org.apache.spark.sql.delta.{DeltaErrors, DeltaOperations} import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, AddFile, CommitInfo, FileAction, Metadata, Protocol, RemoveFile, SetTransaction} import org.apache.spark.sql.delta.commands.cdc.CDCReader -import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.DataFrame @@ -174,7 +172,7 @@ trait DeltaSourceCDCSupport { self: DeltaSource => } val cdcInfo = CDCReader.changesToDF( - deltaLog, + snapshotAtSourceInit, startVersion, endOffset.reservoirVersion, groupedFileActions, diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala new file mode 100644 index 00000000000..2062132e760 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala @@ -0,0 +1,706 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import java.io.File + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate +import org.apache.spark.sql.delta.commands.cdc.CDCReader._ +import org.apache.spark.sql.delta.schema.SchemaMergingUtils +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaColumnMappingSelectedTestMixin + +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ + +trait DeltaCDCColumnMappingSuiteBase extends DeltaCDCSuiteBase + with DeltaColumnMappingTestUtils + with DeltaColumnMappingSelectedTestMixin { + + import testImplicits._ + + implicit class DataFrameDropCDCFields(df: DataFrame) { + def dropCDCFields: DataFrame = + df.drop(CDC_COMMIT_TIMESTAMP) + .drop(CDC_TYPE_COLUMN_NAME) + .drop(CDC_COMMIT_VERSION) + } + + test("add column batch cdc read not blocked") { + withTempDir { dir => + // Set up an initial table with 10 records in schema + setupInitialDeltaTable(dir) + implicit val deltaLog: DeltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath) + + // add column should not be blocked + sql( + s""" + |ALTER TABLE delta.`${dir.getCanonicalPath}` ADD COLUMN (name string) + |""".stripMargin) + + // write more data + writeDeltaData((10 until 15)) + + // None of the schema mode should block this use case + Seq(BatchCDFSchemaLegacy, BatchCDFSchemaLatest, BatchCDFSchemaEndVersion).foreach { mode => + checkAnswer( + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(deltaLog.update().version.toString), + Some(mode)).dropCDCFields, + (0 until 10).map(_.toString).toDF("id") + .withColumn("value", col("id")) + .withColumn("name", lit(null)) union + (10 until 15).map(_.toString).toDF("id") + .withColumn("value", col("id")) + .withColumn("name", col("id"))) + } + } + } + + test("data type and nullability change batch cdc read blocked") { + withTempDir { dir => + // Set up an initial table with 10 records in schema + setupInitialDeltaTable(dir) + implicit val deltaLog: DeltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath) + val s1 = deltaLog.update() + val v1 = s1.version + + // Change the data type of column + deltaLog.withNewTransaction { txn => + // id was string + val updatedSchema = + SchemaMergingUtils.transformColumns( + StructType.fromDDL("id INT, value STRING")) { (_, field, _) => + val refField = s1.metadata.schema(field.name) + field.copy(metadata = refField.metadata, nullable = false) + } + txn.commit(s1.metadata.copy(schemaString = updatedSchema.json) :: Nil, ManualUpdate) + } + val v2 = deltaLog.update().version + + // write more data in updated schema + Seq((10, "10")).toDF("id", "value") + .write.format("delta").mode("append").save(dir.getCanonicalPath) + val v3 = deltaLog.update().version + + // query all changes using latest schema blocked + assertBlocked(expectedIncompatSchemaVersion = v2, expectedReadSchemaVersion = v3) { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(v3.toString)).collect() + } + + // query using end version also blocked if cross schema change + assertBlocked( + expectedIncompatSchemaVersion = v2, + expectedReadSchemaVersion = v3, + schemaMode = BatchCDFSchemaEndVersion) { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(v3.toString), + Some(BatchCDFSchemaEndVersion)).collect() + } + + // query using end version NOT blocked if NOT cross schema change + checkAnswer( + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion(v3.toString), + EndingVersion(v3.toString), + Some(BatchCDFSchemaEndVersion)).dropCDCFields, + Row(10, "10") :: Nil + ) + + val s2 = deltaLog.update() + // Change nullability + deltaLog.withNewTransaction { txn => + // the schema was not nullable + val updatedSchema = + SchemaMergingUtils.transformColumns( + StructType.fromDDL("id INT, value string").asNullable) { (_, field, _) => + val refField = s1.metadata.schema(field.name) + field.copy(metadata = refField.metadata) + } + txn.commit(s2.metadata.copy(schemaString = updatedSchema.json) :: Nil, ManualUpdate) + } + val v4 = deltaLog.update().version + + // write more data in updated schema + Seq((11, "11")).toDF("id", "value") + .write.format("delta").mode("append").save(dir.getCanonicalPath) + + val v5 = deltaLog.update().version + + // query changes using latest schema blocked + assertBlocked(expectedIncompatSchemaVersion = v4, expectedReadSchemaVersion = v5) { + cdcRead( + new TablePath(dir.getCanonicalPath), + // v3 is the first post the data type schema change + StartingVersion(v3.toString), + EndingVersion(v5.toString)).collect() + } + + // query using end version also blocked if cross schema change + assertBlocked( + expectedIncompatSchemaVersion = v4, + expectedReadSchemaVersion = v5, + schemaMode = BatchCDFSchemaEndVersion) { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion(v3.toString), + EndingVersion(v5.toString), + Some(BatchCDFSchemaEndVersion)).collect() + } + + // query using end version NOT blocked if NOT cross schema change + checkAnswer( + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion(v5.toString), + EndingVersion(v5.toString), + Some(BatchCDFSchemaEndVersion)).dropCDCFields, + Row(11, "11") :: Nil + ) + } + } + + test("drop column batch cdc read blocked") { + withTempDir { dir => + // Set up an initial table with 10 records in schema + setupInitialDeltaTable(dir) + implicit val deltaLog: DeltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath) + val v1 = deltaLog.update().version + + // drop column would cause CDC read to be blocked + sql( + s""" + |ALTER TABLE delta.`${dir.getCanonicalPath}` DROP COLUMN value + |""".stripMargin) + val v2 = deltaLog.update().version + + // write more data + writeDeltaData(Seq(10)) + val v3 = deltaLog.update().version + + // query all changes using latest schema blocked + assertBlocked(expectedIncompatSchemaVersion = v2, expectedReadSchemaVersion = v3) { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(v3.toString)).collect() + } + + // query just first two versions which have more columns than latest schema is also blocked + assertBlocked( + expectedIncompatSchemaVersion = 0, + expectedReadSchemaVersion = v3, + bySchemaChange = false) { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion("1")).collect() + } + + // query unblocked if force enabled by user + withSQLConf( + DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key -> "true") { + checkAnswer( + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(v3.toString)).dropCDCFields, + // Note id is dropped because we are using latest schema + (0 until 11).map(i => Row(i.toString)) + ) + } + + // querying changes using endVersion schema blocked if crossing schema boundary + assertBlocked( + expectedIncompatSchemaVersion = v2, + expectedReadSchemaVersion = v3, + schemaMode = BatchCDFSchemaEndVersion) { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(v3.toString), + Some(BatchCDFSchemaEndVersion)).collect() + } + + assertBlocked( + expectedIncompatSchemaVersion = v2, + expectedReadSchemaVersion = v3, + schemaMode = BatchCDFSchemaEndVersion) { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion(v1.toString), + EndingVersion(v3.toString), + Some(BatchCDFSchemaEndVersion)).collect() + } + + // querying changes using endVersion schema NOT blocked if NOT crossing schema boundary + // with schema + checkAnswer( + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(v1.toString), + Some(BatchCDFSchemaEndVersion)).dropCDCFields, + (0 until 10).map(_.toString).map(i => Row(i, i))) + + // with schema + checkAnswer( + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion(v3.toString), + EndingVersion(v3.toString), + Some(BatchCDFSchemaEndVersion)).dropCDCFields, + Row("10") :: Nil + ) + + // let's add the column back... + sql( + s""" + |ALTER TABLE delta.`${dir.getCanonicalPath}` ADD COLUMN (value string) + |""".stripMargin) + val v4 = deltaLog.update().version + + // write more data + writeDeltaData(Seq(11)) + val v5 = deltaLog.update().version + + // The read is still blocked, even schema @ v0 looks the "same" as the latest schema + // but the added column now maps to a different physical column. + assertBlocked(expectedIncompatSchemaVersion = v2, expectedReadSchemaVersion = v5) { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(v5.toString)).collect() + } + + // In this case, tho there aren't any read-incompat schema changes in the querying range, + // the latest schema is not read-compat with the data files @ v0, so we still block. + assertBlocked( + expectedIncompatSchemaVersion = 0, + expectedReadSchemaVersion = v5, + bySchemaChange = false) { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion("1")).collect() + } + } + } + + test("rename column batch cdc read blocked") { + withTempDir { dir => + // Set up an initial table with 10 records in schema + setupInitialDeltaTable(dir) + implicit val deltaLog: DeltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath) + val v1 = deltaLog.update().version + + // Rename column + sql( + s""" + |ALTER TABLE delta.`${dir.getCanonicalPath}` RENAME COLUMN id TO id2 + |""".stripMargin) + val v2 = deltaLog.update().version + + // write more data + writeDeltaData(Seq(10)) + val v3 = deltaLog.update().version + + // query all versions using latest schema blocked + assertBlocked(expectedIncompatSchemaVersion = v2, expectedReadSchemaVersion = v3) { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(v3.toString)).collect() + } + + // query unblocked if force enabled by user + withSQLConf( + DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key -> "true") { + val df = cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(v3.toString)).dropCDCFields + checkAnswer(df, (0 until 11).map(i => Row(i.toString, i.toString))) + // Note we serve the batch using the renamed column in the latest schema. + assert(df.schema.fieldNames.sameElements(Array("id2", "value"))) + } + + // query just the first few versions using latest schema also blocked + assertBlocked( + expectedIncompatSchemaVersion = 0, + expectedReadSchemaVersion = v3, + bySchemaChange = false) { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion("1")).collect() + } + + // query using endVersion schema across schema boundary also blocked + assertBlocked( + expectedIncompatSchemaVersion = v2, + expectedReadSchemaVersion = v2, + schemaMode = BatchCDFSchemaEndVersion) { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(v2.toString), + Some(BatchCDFSchemaEndVersion)).collect() + } + + // query using endVersion schema NOT blocked if NOT crossing schema boundary + checkAnswer( + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(v1.toString), + Some(BatchCDFSchemaEndVersion)).dropCDCFields, + (0 until 10).map(_.toString).map(i => Row(i, i)) + ) + + checkAnswer( + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion(v2.toString), + EndingVersion(v3.toString), + Some(BatchCDFSchemaEndVersion)).dropCDCFields, + Row("10", "10") :: Nil + ) + + // Let's rename the column back + sql( + s""" + |ALTER TABLE delta.`${dir.getCanonicalPath}` RENAME COLUMN id2 TO id + |""".stripMargin) + val v4 = deltaLog.update().version + + // write more data + writeDeltaData(Seq(11)) + val v5 = deltaLog.update().version + + // query all changes using latest schema would still block because we crossed an + // intermediary action with a conflicting schema (the first rename). + assertBlocked(expectedIncompatSchemaVersion = v2, expectedReadSchemaVersion = v5) { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(v5.toString)).collect() + } + + // query all changes using LATEST schema would NOT block if we exclude the first + // rename back, because the data schemas before that are now consistent with the latest. + checkAnswer( + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(v1.toString)).dropCDCFields, + (0 until 10).map(_.toString).map(i => Row(i, i))) + + // query using endVersion schema is blocked if we cross schema boundary + assertBlocked( + expectedIncompatSchemaVersion = v4, + expectedReadSchemaVersion = v5, + schemaMode = BatchCDFSchemaEndVersion) { + cdcRead( + new TablePath(dir.getCanonicalPath), + // v3 just pass the first schema change + StartingVersion(v3.toString), + EndingVersion(v5.toString), + Some(BatchCDFSchemaEndVersion)).collect() + } + + // Note how the conflictingVersion is v2 (the first rename), because v1 matches our end + // version schema due to renaming back. + assertBlocked( + expectedIncompatSchemaVersion = v2, + expectedReadSchemaVersion = v5, + schemaMode = BatchCDFSchemaEndVersion) { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion(v1.toString), + EndingVersion(v5.toString), + Some(BatchCDFSchemaEndVersion)).collect() + } + } + } + + override def runOnlyTests: Seq[String] = Seq( + "changes from table by name", + "changes from table by path", + "batch write: append, dynamic partition overwrite + CDF", + // incompatible schema changes & schema mode tests + "add column batch cdc read not blocked", + "data type and nullability change batch cdc read blocked", + "drop column batch cdc read blocked", + "rename column batch cdc read blocked" + ) + + protected def assertBlocked( + expectedIncompatSchemaVersion: Long, + expectedReadSchemaVersion: Long, + schemaMode: DeltaBatchCDFSchemaMode = BatchCDFSchemaLegacy, + timeTravel: Boolean = false, + bySchemaChange: Boolean = true)(f: => Unit)(implicit log: DeltaLog): Unit = { + val e = intercept[DeltaUnsupportedOperationException] { + f + } + val (end, readSchemaJson) = if (bySchemaChange) { + assert(e.getErrorClass == "DELTA_CHANGE_DATA_FEED_INCOMPATIBLE_SCHEMA_CHANGE") + val Seq(_, end, readSchemaJson, readSchemaVersion, incompatibleVersion, _, _, _, _) = + e.getMessageParametersArray.toSeq + assert(incompatibleVersion.toLong == expectedIncompatSchemaVersion) + assert(readSchemaVersion.toLong == expectedReadSchemaVersion) + (end, readSchemaJson) + } else { + assert(e.getErrorClass == "DELTA_CHANGE_DATA_FEED_INCOMPATIBLE_DATA_SCHEMA") + val Seq(_, end, readSchemaJson, readSchemaVersion, incompatibleVersion, config) = + e.getMessageParametersArray.toSeq + assert(incompatibleVersion.toLong == expectedIncompatSchemaVersion) + assert(readSchemaVersion.toLong == expectedReadSchemaVersion) + assert(config == DeltaSQLConf.DELTA_CDF_DEFAULT_SCHEMA_MODE_FOR_COLUMN_MAPPING_TABLE.key) + (end, readSchemaJson) + } + + val latestSnapshot = log.update() + schemaMode match { + case BatchCDFSchemaLegacy if timeTravel => + // Read using time travelled schema, it can be arbitrary so nothing to check here + case BatchCDFSchemaEndVersion => + // Read using end version schema + assert(expectedReadSchemaVersion == end.toLong && + log.getSnapshotAt(expectedReadSchemaVersion).schema.json == readSchemaJson) + case _ => + // non time-travel legacy mode and latest mode should both read latest schema + assert(expectedReadSchemaVersion == latestSnapshot.version && + latestSnapshot.schema.json == readSchemaJson) + } + } + + /** + * Write test delta data to test blocking column mapping for CDC batch queries, it takes a + * sequence and write out as a row of strings, assuming the delta log's schema are all strings. + */ + protected def writeDeltaData( + data: Seq[Int], + userSpecifiedSchema: Option[StructType] = None)(implicit log: DeltaLog): Unit = { + val schema = userSpecifiedSchema.getOrElse(log.update().schema) + data.foreach { i => + val data = Seq(Row(schema.map(_ => i.toString): _*)) + spark.createDataFrame(data.asJava, schema) + .write.format("delta").mode("append").save(log.dataPath.toString) + } + } + + /** + * Set up initial table data, considering current column mapping mode + * + * The table contains 10 rows, with schema both are string + */ + protected def setupInitialDeltaTable(dir: File): Unit = { + require(columnMappingModeString != NoMapping.name) + val tablePath = dir.getCanonicalPath + implicit val deltaLog: DeltaLog = DeltaLog.forTable(spark, tablePath) + + if (columnMappingModeString == NameMapping.name) { + // For name mode, we do an upgrade then write to test that behavior as well + // init table with 5 versions without column mapping + withColumnMappingConf("none") { + writeDeltaData((0 until 5), userSpecifiedSchema = Some( + new StructType().add("id", StringType, true).add("value", StringType, true) + )) + } + // upgrade to name mode + sql( + s""" + |ALTER TABLE delta.`${dir.getCanonicalPath}` + |SET TBLPROPERTIES ( + | ${DeltaConfigs.COLUMN_MAPPING_MODE.key} = "name", + | ${DeltaConfigs.MIN_READER_VERSION.key} = "2", + | ${DeltaConfigs.MIN_WRITER_VERSION.key} = "5")""".stripMargin) + // write more data + writeDeltaData((5 until 10)) + } + else if (columnMappingModeString == IdMapping.name) { + // For id mode, we could only create a table from scratch + withColumnMappingConf("id") { + writeDeltaData((0 until 10), userSpecifiedSchema = Some( + new StructType().add("id", StringType, true).add("value", StringType, true) + )) + } + } + + checkAnswer( + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(deltaLog.update().version.toString)).dropCDCFields, + (0 until 10).map(_.toString).toDF("id").withColumn("value", col("id"))) + } +} + +trait DeltaCDCColumnMappingScalaSuiteBase extends DeltaCDCColumnMappingSuiteBase { + + import testImplicits._ + + test("time travel with batch cdf is disbaled by default") { + withTempDir { dir => + Seq(1).toDF("id").write.format("delta").save(dir.getCanonicalPath) + val e = intercept[DeltaAnalysisException] { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion("1"), + readerOptions = Map(DeltaOptions.VERSION_AS_OF -> "0")).collect() + } + assert(e.getErrorClass == "DELTA_UNSUPPORTED_TIME_TRAVEL_VIEWS") + } + } + + // NOTE: we do not support time travel option with SQL API, so we will just test Scala API suite + test("cannot specify both time travel options and schema mode") { + withSQLConf(DeltaSQLConf.DELTA_CDF_ENABLE_TIME_TRAVEL.key -> "true") { + withTempDir { dir => + Seq(1).toDF("id").write.format("delta").save(dir.getCanonicalPath) + val e = intercept[DeltaIllegalArgumentException] { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion("1"), + Some(BatchCDFSchemaEndVersion), + readerOptions = Map(DeltaOptions.VERSION_AS_OF -> "0")).collect() + } + assert(e.getMessage.contains( + DeltaSQLConf.DELTA_CDF_DEFAULT_SCHEMA_MODE_FOR_COLUMN_MAPPING_TABLE.key)) + } + } + } + + test("time travel option is respected") { + withSQLConf(DeltaSQLConf.DELTA_CDF_ENABLE_TIME_TRAVEL.key -> "true") { + withTempDir { dir => + // Set up an initial table with 10 records in schema + setupInitialDeltaTable(dir) + implicit val deltaLog: DeltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath) + val v1 = deltaLog.update().version + + // Add a column + sql( + s""" + |ALTER TABLE delta.`${dir.getCanonicalPath}` ADD COLUMN (prop string) + |""".stripMargin) + val v2 = deltaLog.update().version + + // write more data + writeDeltaData(Seq(10)) + val v3 = deltaLog.update().version + + // Rename a column + sql( + s""" + |ALTER TABLE delta.`${dir.getCanonicalPath}` RENAME COLUMN id TO id2 + |""".stripMargin) + val v4 = deltaLog.update().version + + // write more data + writeDeltaData(Seq(11)) + val v5 = deltaLog.update().version + + // query changes between version 0 - v1, not crossing schema boundary + checkAnswer( + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(v1.toString), + readerOptions = Map(DeltaOptions.VERSION_AS_OF -> v1.toString)).dropCDCFields, + (0 until 10).map(_.toString).map(i => Row(i, i))) + + // query across add column, but not cross the rename, not blocked + checkAnswer( + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(v3.toString), + // v2 is the add column schema change + readerOptions = Map(DeltaOptions.VERSION_AS_OF -> v2.toString)).dropCDCFields, + // Note how the first 10 records now misses a column, but it's fine + (0 until 10).map(_.toString).map(i => Row(i, i, null)) ++ + Seq(Row("10", "10", "10"))) + + // query across rename is blocked, if we are still specifying an old version + // note it failed at v4, because the initial schema does not conflict with schema @ v2 + assertBlocked( + expectedIncompatSchemaVersion = v4, + expectedReadSchemaVersion = v2, + timeTravel = true) { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + // v5 cross the v4 rename column + EndingVersion(v5.toString), + // v2 is the add column schema change + readerOptions = Map(DeltaOptions.VERSION_AS_OF -> v2.toString)).collect() + } + + // Even the querying range has no schema change, the data files are still not + // compatible with the read schema due to arbitrary time travel. + assertBlocked( + expectedIncompatSchemaVersion = 0, + expectedReadSchemaVersion = v4, + timeTravel = true, + bySchemaChange = false) { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + // v1 still uses the schema prior to the rename + EndingVersion(v1.toString), + // v4 is the rename column change + readerOptions = Map(DeltaOptions.VERSION_AS_OF -> v4.toString)).collect() + } + + // But without crossing schema change boundary (v4 - v5) using v4's renamed schema, + // we can load the batch. + checkAnswer( + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion(v4.toString), + EndingVersion(v5.toString), + readerOptions = Map(DeltaOptions.VERSION_AS_OF -> v4.toString)).dropCDCFields, + Seq(Row("11", "11", "11"))) + } + } + } +} + +class DeltaCDCIdColumnMappingSuite extends DeltaCDCScalaSuite + with DeltaCDCColumnMappingScalaSuiteBase + with DeltaColumnMappingEnableIdMode + +class DeltaCDCNameColumnMappingSuite extends DeltaCDCScalaSuite + with DeltaCDCColumnMappingScalaSuiteBase + with DeltaColumnMappingEnableNameMode + diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala index 9f9154ffbf3..22038a2845f 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala @@ -94,7 +94,14 @@ abstract class DeltaCDCSuiteBase } /** Single method to do all kinds of CDC reads */ - def cdcRead(tblId: TblId, start: Boundary, end: Boundary): DataFrame + // By default, we use the `legacy` batch CDF schema mode, in which either latest schema is used + // or the time-travelled schema is used. + def cdcRead( + tblId: TblId, + start: Boundary, + end: Boundary, + schemaMode: Option[DeltaBatchCDFSchemaMode] = Some(BatchCDFSchemaLegacy), + readerOptions: Map[String, String] = Map.empty): DataFrame /** Modify timestamp for a delta commit, used to test timestamp querying */ def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = { @@ -701,7 +708,19 @@ class DeltaCDCScalaSuite extends DeltaCDCSuiteBase { def cdcRead( tblId: TblId, start: Boundary, - end: Boundary): DataFrame = { + end: Boundary, + schemaMode: Option[DeltaBatchCDFSchemaMode] = Some(BatchCDFSchemaLegacy), + readerOptions: Map[String, String] = Map.empty): DataFrame = { + + // Set the batch CDF schema mode using SQL conf if we specified it + if (schemaMode.isDefined) { + var result: DataFrame = null + withSQLConf(DeltaSQLConf.DELTA_CDF_DEFAULT_SCHEMA_MODE_FOR_COLUMN_MAPPING_TABLE.key -> + schemaMode.get.name) { + result = cdcRead(tblId, start, end, None, readerOptions) + } + return result + } val startPrefix: (String, String) = start match { case startingVersion: StartingVersion => @@ -723,20 +742,22 @@ class DeltaCDCScalaSuite extends DeltaCDCSuiteBase { case Unbounded => ("", "") } + + var dfr = spark.read.format("delta") + .option(DeltaOptions.CDC_READ_OPTION, "true") + .option(startPrefix._1, startPrefix._2) + .option(endPrefix._1, endPrefix._2) + + readerOptions.foreach { case (k, v) => + dfr = dfr.option(k, v) + } + tblId match { case path: TablePath => - spark.read.format("delta") - .option(DeltaOptions.CDC_READ_OPTION, "true") - .option(startPrefix._1, startPrefix._2) - .option(endPrefix._1, endPrefix._2) - .load(path.id) + dfr.load(path.id) case tblName: TableName => - spark.read.format("delta") - .option(DeltaOptions.CDC_READ_OPTION, "true") - .option(startPrefix._1, startPrefix._2) - .option(endPrefix._1, endPrefix._2) - .table(tblName.id) + dfr.table(tblName.id) case _ => throw new IllegalArgumentException("No table name or path provided") @@ -795,195 +816,3 @@ class DeltaCDCScalaSuite extends DeltaCDCSuiteBase { } } - -abstract class DeltaCDCColumnMappingSuiteBase extends DeltaCDCScalaSuite - with DeltaColumnMappingTestUtils - with DeltaColumnMappingSelectedTestMixin { - - override def runOnlyTests: Seq[String] = Seq( - "changes from table by name", - "changes from table by path", - "batch write: append, dynamic partition overwrite + CDF", - "blocking batch cdc read" - ) - - private def assertBlocked(f: => Unit): Unit = { - val e = intercept[DeltaColumnMappingUnsupportedSchemaIncompatibleException] { - f - } - assert(e.getErrorClass == "DELTA_BLOCK_COLUMN_MAPPING_SCHEMA_INCOMPATIBLE_OPERATION" && - e.getMessage.contains( - DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key)) - } - - // Drop CDC fields because they are not useful for testing the blocking behavior - private def dropCDCFields(df: DataFrame): DataFrame = - df.drop(CDC_COMMIT_TIMESTAMP) - .drop(CDC_TYPE_COLUMN_NAME) - .drop(CDC_COMMIT_VERSION) - - import testImplicits._ - - /** - * Write test delta data to test blocking column mapping for CDC batch queries, it takes a - * sequence and write out as a row of strings, assuming the delta log's schema are all strings. - */ - private def writeDeltaData( - data: Seq[Int], - deltaLog: DeltaLog, - userSpecifiedSchema: Option[StructType] = None): Unit = { - val schema = userSpecifiedSchema.getOrElse(deltaLog.update().schema) - data.foreach { i => - val data = Seq(Row(schema.map(_ => i.toString): _*)) - spark.createDataFrame(data.asJava, schema) - .write.format("delta").mode("append").save(deltaLog.dataPath.toString) - } - } - - /** - * Set up initial table data, considering current column mapping mode - */ - protected def setupInitialDeltaTable(dir: File): Unit = { - require(columnMappingModeString != NoMapping.name) - val tablePath = dir.getCanonicalPath - val deltaLog = DeltaLog.forTable(spark, tablePath) - - if (columnMappingModeString == NameMapping.name) { - // For name mode, we do an upgrade then write to test that behavior as well - // init table with 5 versions without column mapping - withColumnMappingConf("none") { - writeDeltaData((0 until 5), deltaLog, userSpecifiedSchema = Some( - new StructType().add("id", StringType, true).add("value", StringType, true) - )) - } - // upgrade to name mode - sql( - s""" - |ALTER TABLE delta.`${dir.getCanonicalPath}` - |SET TBLPROPERTIES ( - | ${DeltaConfigs.COLUMN_MAPPING_MODE.key} = "name", - | ${DeltaConfigs.MIN_READER_VERSION.key} = "2", - | ${DeltaConfigs.MIN_WRITER_VERSION.key} = "5")""".stripMargin) - // write more data - writeDeltaData((5 until 10), deltaLog) - } - else if (columnMappingModeString == IdMapping.name) { - // For id mode, we could only create a table from scratch - withColumnMappingConf("id") { - writeDeltaData((0 until 10), deltaLog, userSpecifiedSchema = Some( - new StructType().add("id", StringType, true).add("value", StringType, true) - )) - } - } - - checkAnswer( - dropCDCFields( - cdcRead( - new TablePath(dir.getCanonicalPath), - StartingVersion("0"), - EndingVersion(deltaLog.update().version.toString))), - (0 until 10).map(_.toString).toDF("id").withColumn("value", col("id"))) - } - - test(s"blocking batch cdc read") { - withTempDir { dir => - // Set up an initial table with 10 records in schema - setupInitialDeltaTable(dir) - val deltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath) - - // add column should not be blocked - sql( - s""" - |ALTER TABLE delta.`${dir.getCanonicalPath}` - |ADD COLUMN (name string) - |""".stripMargin) - - // write more data - writeDeltaData((10 until 15), deltaLog) - - checkAnswer( - dropCDCFields( - cdcRead( - new TablePath(dir.getCanonicalPath), - StartingVersion("0"), - EndingVersion(deltaLog.update().version.toString))), - (0 until 10).map(_.toString).toDF("id") - .withColumn("value", col("id")) - .withColumn("name", lit(null)) union - (10 until 15).map(_.toString).toDF("id") - .withColumn("value", col("id")) - .withColumn("name", col("id"))) - } - - withTempDir { dir => - // Set up an initial table with 10 records in schema - setupInitialDeltaTable(dir) - val deltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath) - - // drop column would cause CDC read to be blocked - sql( - s""" - |ALTER TABLE delta.`${dir.getCanonicalPath}` DROP COLUMN value - |""".stripMargin) - - assertBlocked { - cdcRead( - new TablePath(dir.getCanonicalPath), - StartingVersion("0"), - EndingVersion(deltaLog.update().version.toString)).collect() - } - } - - withTempDir { dir => - // Set up an initial table with 10 records in schema - setupInitialDeltaTable(dir) - val deltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath) - - // rename column would cause CDC read to be blocked - sql( - s""" - |ALTER TABLE delta.`${dir.getCanonicalPath}` RENAME COLUMN id TO id2 - |""".stripMargin) - - assertBlocked { - cdcRead( - new TablePath(dir.getCanonicalPath), - StartingVersion("0"), - EndingVersion(deltaLog.update().version.toString)).collect() - } - - // rename the column back - sql( - s""" - |ALTER TABLE delta.`${dir.getCanonicalPath}` RENAME COLUMN id2 TO id - |""".stripMargin) - - // Case 1 - would still block because we detected an intermediary action with a conflicting - // schema (the first rename). - assertBlocked { - cdcRead( - new TablePath(dir.getCanonicalPath), - StartingVersion("0"), - EndingVersion(deltaLog.update().version.toString)).collect() - } - - // Case 2 - would NOT block if we exclude the second rename back, because the data schemas - // before that are now consistent with the latest. - checkAnswer( - dropCDCFields( - cdcRead( - new TablePath(dir.getCanonicalPath), - StartingVersion("0"), - // -2 to get rid of the last 2 schema change commits - EndingVersion((deltaLog.update().version - 2).toString))), - (0 until 10).map(_.toString).toDF("id").withColumn("value", col("id"))) - } - } -} - -class DeltaCDCIdColumnMappingSuite extends DeltaCDCColumnMappingSuiteBase - with DeltaColumnMappingEnableIdMode - -class DeltaCDCNameColumnMappingSuite extends DeltaCDCColumnMappingSuiteBase - with DeltaColumnMappingEnableNameMode { -} diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala index 6fc946fbe5b..65bea633f97 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala @@ -1405,7 +1405,8 @@ trait DeltaErrorsSuiteBase } assert(e.getErrorClass == "DELTA_UNSUPPORTED_TIME_TRAVEL_VIEWS") assert(e.getSqlState == "0A000") - assert(e.getMessage == "Cannot time travel views, subqueries or streams.") + assert(e.getMessage == + "Cannot time travel views, subqueries, streams or change data feed queries.") } { val e = intercept[DeltaIllegalStateException] { @@ -2698,22 +2699,6 @@ trait DeltaErrorsSuiteBase DeltaSQLConf.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key) assert(!e.additionalProperties("detectedDuringStreaming").toBoolean) } - { - val e = intercept[DeltaColumnMappingUnsupportedSchemaIncompatibleException] { - throw DeltaErrors.blockBatchCdfReadOnColumnMappingEnabledTable( - readSchema = StructType.fromDDL("id int"), - incompatibleSchema = StructType.fromDDL("id2 int")) - } - assert(e.getErrorClass == "DELTA_BLOCK_COLUMN_MAPPING_SCHEMA_INCOMPATIBLE_OPERATION") - assert(e.getSqlState == "0A000") - assert(e.opName == "Change Data Feed (CDF) read") - assert(e.readSchema == StructType.fromDDL("id int")) - assert(e.incompatibleSchema == StructType.fromDDL("id2 int")) - assert(e.escapeConfigName == - DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key) - assert(e.additionalProperties.isEmpty) - - } { val e = intercept[DeltaUnsupportedOperationException] { throw DeltaErrors.blockColumnMappingAndCdcOperation(DeltaOperations.ManualUpdate) From f43699ee89a389e881030202bf606dcde79c3ed3 Mon Sep 17 00:00:00 2001 From: jackierwzhang Date: Tue, 6 Dec 2022 18:32:54 -0800 Subject: [PATCH 2/4] commit --- .../spark/sql/delta/catalog/DeltaTableV2.scala | 2 +- .../sql/delta/commands/cdc/CDCReader.scala | 6 ++++++ .../spark/sql/delta/sources/DeltaSQLConf.scala | 18 ++++++++++-------- .../sql/delta/DeltaCDCColumnMappingSuite.scala | 4 ++-- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala index ab1e3656400..d1b90e66854 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala @@ -104,7 +104,7 @@ case class DeltaTableV2( timeTravelSpec.map { spec => // By default, block using CDF + time-travel if (CDCReader.isCDCRead(cdcOptions) && - !spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CDF_ENABLE_TIME_TRAVEL)) { + !spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CDF_ALLOW_TIME_TRAVEL)) { throw DeltaErrors.timeTravelNotSupportedException } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala index 7ff287ca079..313afeba180 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala @@ -220,6 +220,12 @@ trait CDCReaderImpl extends DeltaLogging { /** * Get a Relation that represents change data between two snapshots of the table. + * + * @param spark Spark session + * @param snapshotToUse Snapshot to use to provide read schema and version + * @param isTimeTravelQuery Whether this CDC scan is used in conjunction with time-travel args + * @param conf SQL conf + * @param options CDC specific options */ def getCDCRelation( spark: SparkSession, diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 6060bea58ad..de95ed26a22 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -904,19 +904,21 @@ trait DeltaSQLConfBase { val DELTA_CDF_DEFAULT_SCHEMA_MODE_FOR_COLUMN_MAPPING_TABLE = buildConf("changeDataFeed.defaultSchemaModeForColumnMappingTable") .doc( - "Reading batch CDF on column mapping enabled table requires schema mode to be set to " + - "`endVersion` so the ending version's schema will be used." + - "Set this to `latest` to use the schema of the latest available table version," + - "or to `legacy` to fallback to the non column-mapping default behavior, in which" + - "the time travel option can be used to select the version of the schema.") + """Reading batch CDF on column mapping enabled table requires schema mode to be set to + |`endVersion` so the ending version's schema will be used. + |Set this to `latest` to use the schema of the latest available table version, + |or to `legacy` to fallback to the non column-mapping default behavior, in which + |the time travel option can be used to select the version of the schema.""".stripMargin) .internal() .stringConf .createWithDefault("endVersion") - val DELTA_CDF_ENABLE_TIME_TRAVEL = - buildConf("changeDataFeed.timeTravel.enabled") + val DELTA_CDF_ALLOW_TIME_TRAVEL = + buildConf("changeDataFeed.allowTimeTravelOptionsForSchema") .doc( - "If enabled, user can specify time-travel reader options while reading change data feed.") + s"""If allowed, user can specify time-travel reader options such as + |'versionAsOf' or 'timestampAsOf' to specify the read schema while + |reading change data feed.""".stripMargin) .internal() .booleanConf .createWithDefault(false) diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala index 2062132e760..b2ab84f71fa 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala @@ -583,7 +583,7 @@ trait DeltaCDCColumnMappingScalaSuiteBase extends DeltaCDCColumnMappingSuiteBase // NOTE: we do not support time travel option with SQL API, so we will just test Scala API suite test("cannot specify both time travel options and schema mode") { - withSQLConf(DeltaSQLConf.DELTA_CDF_ENABLE_TIME_TRAVEL.key -> "true") { + withSQLConf(DeltaSQLConf.DELTA_CDF_ALLOW_TIME_TRAVEL.key -> "true") { withTempDir { dir => Seq(1).toDF("id").write.format("delta").save(dir.getCanonicalPath) val e = intercept[DeltaIllegalArgumentException] { @@ -601,7 +601,7 @@ trait DeltaCDCColumnMappingScalaSuiteBase extends DeltaCDCColumnMappingSuiteBase } test("time travel option is respected") { - withSQLConf(DeltaSQLConf.DELTA_CDF_ENABLE_TIME_TRAVEL.key -> "true") { + withSQLConf(DeltaSQLConf.DELTA_CDF_ALLOW_TIME_TRAVEL.key -> "true") { withTempDir { dir => // Set up an initial table with 10 records in schema setupInitialDeltaTable(dir) From f299dc20227b56b8fc1948a5b24620c77c763b34 Mon Sep 17 00:00:00 2001 From: jackierwzhang Date: Thu, 8 Dec 2022 09:16:06 -0800 Subject: [PATCH 3/4] commit --- core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala | 3 --- .../org/apache/spark/sql/delta/catalog/DeltaTableV2.scala | 2 +- .../org/apache/spark/sql/delta/sources/DeltaSQLConf.scala | 2 +- .../apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala | 4 ++-- 4 files changed, 4 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index f04b2a962a6..e61a2a7e895 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -140,10 +140,7 @@ class DeltaLog private( * garbage collected. */ def minFileRetentionTimestamp: Long = { - // TODO (Fred): Get rid of this FrameProfiler record once SC-94033 is addressed - recordFrameProfile("Delta", "DeltaLog.minFileRetentionTimestamp") { clock.getTimeMillis() - tombstoneRetentionMillis - } } /** diff --git a/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala index d1b90e66854..944b8640595 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala @@ -104,7 +104,7 @@ case class DeltaTableV2( timeTravelSpec.map { spec => // By default, block using CDF + time-travel if (CDCReader.isCDCRead(cdcOptions) && - !spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CDF_ALLOW_TIME_TRAVEL)) { + !spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CDF_ALLOW_TIME_TRAVEL_OPTIONS)) { throw DeltaErrors.timeTravelNotSupportedException } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index de95ed26a22..de1688a72c2 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -913,7 +913,7 @@ trait DeltaSQLConfBase { .stringConf .createWithDefault("endVersion") - val DELTA_CDF_ALLOW_TIME_TRAVEL = + val DELTA_CDF_ALLOW_TIME_TRAVEL_OPTIONS = buildConf("changeDataFeed.allowTimeTravelOptionsForSchema") .doc( s"""If allowed, user can specify time-travel reader options such as diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala index b2ab84f71fa..7a565d03dab 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala @@ -583,7 +583,7 @@ trait DeltaCDCColumnMappingScalaSuiteBase extends DeltaCDCColumnMappingSuiteBase // NOTE: we do not support time travel option with SQL API, so we will just test Scala API suite test("cannot specify both time travel options and schema mode") { - withSQLConf(DeltaSQLConf.DELTA_CDF_ALLOW_TIME_TRAVEL.key -> "true") { + withSQLConf(DeltaSQLConf.DELTA_CDF_ALLOW_TIME_TRAVEL_OPTIONS.key -> "true") { withTempDir { dir => Seq(1).toDF("id").write.format("delta").save(dir.getCanonicalPath) val e = intercept[DeltaIllegalArgumentException] { @@ -601,7 +601,7 @@ trait DeltaCDCColumnMappingScalaSuiteBase extends DeltaCDCColumnMappingSuiteBase } test("time travel option is respected") { - withSQLConf(DeltaSQLConf.DELTA_CDF_ALLOW_TIME_TRAVEL.key -> "true") { + withSQLConf(DeltaSQLConf.DELTA_CDF_ALLOW_TIME_TRAVEL_OPTIONS.key -> "true") { withTempDir { dir => // Set up an initial table with 10 records in schema setupInitialDeltaTable(dir) From 8eb45d928cd071e1f4701f3af00ef334470892e4 Mon Sep 17 00:00:00 2001 From: jackierwzhang Date: Thu, 8 Dec 2022 09:33:00 -0800 Subject: [PATCH 4/4] fix --- .../org/apache/spark/sql/delta/DeltaLog.scala | 4 +-- .../sql/delta/commands/cdc/CDCReader.scala | 4 +-- .../delta/DeltaCDCColumnMappingSuite.scala | 35 ++++--------------- 3 files changed, 10 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index e61a2a7e895..98438e1d840 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -139,9 +139,7 @@ class DeltaLog private( * Tombstones before this timestamp will be dropped from the state and the files can be * garbage collected. */ - def minFileRetentionTimestamp: Long = { - clock.getTimeMillis() - tombstoneRetentionMillis - } + def minFileRetentionTimestamp: Long = clock.getTimeMillis() - tombstoneRetentionMillis /** * [[SetTransaction]]s before this timestamp will be considered expired and dropped from the diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala index 313afeba180..56abcd5643c 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala @@ -112,8 +112,8 @@ object CDCReader extends CDCReaderImpl * may break. */ private lazy val endingVersionForBatchSchema: Long = endingVersion.map { v => - // As defined in docs, if ending version is greater than the latest version, we will just use - // the latest version to find the schema. + // As defined in the method doc, if ending version is greater than the latest version, we will + // just use the latest version to find the schema. latestVersionOfTableDuringAnalysis min v }.getOrElse { // Or if endingVersion is not specified, we just use the latest schema. diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala index 7a565d03dab..655d424f2e6 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCColumnMappingSuite.scala @@ -50,10 +50,7 @@ trait DeltaCDCColumnMappingSuiteBase extends DeltaCDCSuiteBase implicit val deltaLog: DeltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath) // add column should not be blocked - sql( - s""" - |ALTER TABLE delta.`${dir.getCanonicalPath}` ADD COLUMN (name string) - |""".stripMargin) + sql(s"ALTER TABLE delta.`${dir.getCanonicalPath}` ADD COLUMN (name string)") // write more data writeDeltaData((10 until 15)) @@ -193,10 +190,7 @@ trait DeltaCDCColumnMappingSuiteBase extends DeltaCDCSuiteBase val v1 = deltaLog.update().version // drop column would cause CDC read to be blocked - sql( - s""" - |ALTER TABLE delta.`${dir.getCanonicalPath}` DROP COLUMN value - |""".stripMargin) + sql(s"ALTER TABLE delta.`${dir.getCanonicalPath}` DROP COLUMN value") val v2 = deltaLog.update().version // write more data @@ -279,10 +273,7 @@ trait DeltaCDCColumnMappingSuiteBase extends DeltaCDCSuiteBase ) // let's add the column back... - sql( - s""" - |ALTER TABLE delta.`${dir.getCanonicalPath}` ADD COLUMN (value string) - |""".stripMargin) + sql(s"ALTER TABLE delta.`${dir.getCanonicalPath}` ADD COLUMN (value string)") val v4 = deltaLog.update().version // write more data @@ -320,10 +311,7 @@ trait DeltaCDCColumnMappingSuiteBase extends DeltaCDCSuiteBase val v1 = deltaLog.update().version // Rename column - sql( - s""" - |ALTER TABLE delta.`${dir.getCanonicalPath}` RENAME COLUMN id TO id2 - |""".stripMargin) + sql(s"ALTER TABLE delta.`${dir.getCanonicalPath}` RENAME COLUMN id TO id2") val v2 = deltaLog.update().version // write more data @@ -393,10 +381,7 @@ trait DeltaCDCColumnMappingSuiteBase extends DeltaCDCSuiteBase ) // Let's rename the column back - sql( - s""" - |ALTER TABLE delta.`${dir.getCanonicalPath}` RENAME COLUMN id2 TO id - |""".stripMargin) + sql(s"ALTER TABLE delta.`${dir.getCanonicalPath}` RENAME COLUMN id2 TO id") val v4 = deltaLog.update().version // write more data @@ -609,10 +594,7 @@ trait DeltaCDCColumnMappingScalaSuiteBase extends DeltaCDCColumnMappingSuiteBase val v1 = deltaLog.update().version // Add a column - sql( - s""" - |ALTER TABLE delta.`${dir.getCanonicalPath}` ADD COLUMN (prop string) - |""".stripMargin) + sql(s"ALTER TABLE delta.`${dir.getCanonicalPath}` ADD COLUMN (prop string)") val v2 = deltaLog.update().version // write more data @@ -620,10 +602,7 @@ trait DeltaCDCColumnMappingScalaSuiteBase extends DeltaCDCColumnMappingSuiteBase val v3 = deltaLog.update().version // Rename a column - sql( - s""" - |ALTER TABLE delta.`${dir.getCanonicalPath}` RENAME COLUMN id TO id2 - |""".stripMargin) + sql(s"ALTER TABLE delta.`${dir.getCanonicalPath}` RENAME COLUMN id TO id2") val v4 = deltaLog.update().version // write more data