Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Robust schema usage and checks for batch CDF queries #1509

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@
},
"DELTA_BLOCK_COLUMN_MAPPING_SCHEMA_INCOMPATIBLE_OPERATION" : {
"message" : [
"<opName> is not supported on tables with column mapping schema changes (e.g. rename or drop). Read schema: <readSchema>. Incompatible schema: <incompatibleSchema>. You may force enable streaming read at your own risk by turning on <config>."
"<opName> is not supported on tables with read-incompatible schema changes (e.g. rename or drop or datatype changes).",
"Read schema: <readSchema>. Incompatible data schema: <incompatibleSchema>.",
"Although strongly not recommended, you may also force ignore the schema checks during <opName> at your own risk of potentially incorrect results by turning on the SQL conf <escapeConfig>."
],
"sqlState" : "0A000"
},
Expand Down Expand Up @@ -260,6 +262,22 @@
],
"sqlState" : "0A000"
},
"DELTA_CHANGE_DATA_FEED_INCOMPATIBLE_DATA_SCHEMA" : {
"message" : [
"Retrieving table changes between version <start> and <end> failed because of an incompatible data schema.",
"Your read schema is <readSchema> at version <readVersion>, but we found an incompatible data schema at version <incompatibleVersion>.",
"If possible, please retrieve the table changes using the end version's schema by setting <config> to `endVersion`, or contact support."
],
"sqlState" : "0A000"
},
"DELTA_CHANGE_DATA_FEED_INCOMPATIBLE_SCHEMA_CHANGE" : {
"message" : [
"Retrieving table changes between version <start> and <end> failed because of an incompatible schema change.",
"Your read schema is <readSchema> at version <readVersion>, but we found an incompatible schema change at version <incompatibleVersion>.",
"If possible, please query table changes separately from version <start> to <incompatibleVersion> - 1, and from version <incompatibleVersion> to <end>."
],
"sqlState" : "0A000"
},
"DELTA_CHANGE_TABLE_FEED_DISABLED" : {
"message" : [
"Cannot write to table with delta.enableChangeDataFeed set. Change data feed from Delta is not available."
Expand Down Expand Up @@ -1846,7 +1864,7 @@
},
"DELTA_UNSUPPORTED_TIME_TRAVEL_VIEWS" : {
"message" : [
"Cannot time travel views, subqueries or streams."
"Cannot time travel views, subqueries, streams or change data feed queries."
],
"sqlState" : "0A000"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,33 @@ trait DeltaColumnMappingBase extends DeltaLogging {
true
}
}

/**
* Check if a metadata we are scanning is read compatible with another one, considering
* both column mapping changes (rename or drop) as well as other standard checks.
*/
def isMetadataSchemaReadCompatible(
curVersion: Long,
curMetadata: Metadata,
readVersion: Long,
readMetadata: Metadata): Boolean = {
val (oldMetadata, newMetadata) = if (curVersion < readVersion) {
// Snapshot version is newer, ensure there's no read-incompatible CM schema changes
// from current version to snapshot version.
(curMetadata, readMetadata)
} else {
// Current metadata action version is newer, ensure there's no read-incompatible CM
// schema changes from snapshot version to current version.
(readMetadata, curMetadata)
}
// For column-mapping checks, we need to consider version order so we don't accidentally treat
// ADD COLUMN (Ok) as a reverse DROP COLUMN (Not ok).
// For non column-mapping checks, usually we don't need to consider version order, but in CDC
// case, as the current semantics is to ignore ADD COLUMN schema change post the analyzed schema
// version to match non-CDC batch behavior, considering the order can help with that.
DeltaColumnMapping.isColumnMappingReadCompatible(newMetadata, oldMetadata) &&
SchemaUtils.isReadCompatible(oldMetadata.schema, newMetadata.schema)
}
}

object DeltaColumnMapping extends DeltaColumnMappingBase
Expand Down
44 changes: 36 additions & 8 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2425,14 +2425,37 @@ trait DeltaErrorsBase
// scalastyle:on line.size.limit
}

def blockBatchCdfReadOnColumnMappingEnabledTable(
/**
* If `isSchemaChange` is false, this means the `incompatVersion` actually refers to a data schema
* instead of a schema change. This happens when we could not find any read-incompatible schema
* changes within the querying range, but the read schema is still NOT compatible with the data
* files being queried, which could happen if user falls back to `legacy` mode and read past data
* using some diverged latest schema or time-travelled schema. In this uncommon case, we should
* tell the user to try setting it back to endVersion, OR ask us to give them the flag to force
* unblock.
*/
def blockBatchCdfReadWithIncompatibleSchemaChange(
start: Long,
end: Long,
readSchema: StructType,
incompatibleSchema: StructType): Throwable = {
new DeltaColumnMappingUnsupportedSchemaIncompatibleException(
"Change Data Feed (CDF) read",
readSchema,
incompatibleSchema,
DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key
readVersion: Long,
incompatVersion: Long,
isSchemaChange: Boolean = true): Throwable = {
new DeltaUnsupportedOperationException(
if (isSchemaChange) {
"DELTA_CHANGE_DATA_FEED_INCOMPATIBLE_SCHEMA_CHANGE"
} else {
"DELTA_CHANGE_DATA_FEED_INCOMPATIBLE_DATA_SCHEMA"
},
messageParameters = Array(
start.toString, end.toString,
readSchema.json, readVersion.toString, incompatVersion.toString) ++ {
if (isSchemaChange) {
Array(start.toString, incompatVersion.toString, incompatVersion.toString, end.toString)
} else {
Array(DeltaSQLConf.DELTA_CDF_DEFAULT_SCHEMA_MODE_FOR_COLUMN_MAPPING_TABLE.key)
}
}
)
}

Expand Down Expand Up @@ -2762,5 +2785,10 @@ class DeltaColumnMappingUnsupportedSchemaIncompatibleException(
val additionalProperties: Map[String, String] = Map.empty)
extends DeltaUnsupportedOperationException(
errorClass = "DELTA_BLOCK_COLUMN_MAPPING_SCHEMA_INCOMPATIBLE_OPERATION",
messageParameters = Array(opName, readSchema.json, incompatibleSchema.json, escapeConfigName)
messageParameters = Array(
opName,
readSchema.json,
incompatibleSchema.json,
opName,
escapeConfigName)
)
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ class DeltaLog private(
* Tombstones before this timestamp will be dropped from the state and the files can be
* garbage collected.
*/
def minFileRetentionTimestamp: Long = clock.getTimeMillis() - tombstoneRetentionMillis
def minFileRetentionTimestamp: Long = {
clock.getTimeMillis() - tombstoneRetentionMillis
}

/**
* [[SetTransaction]]s before this timestamp will be considered expired and dropped from the
Expand Down Expand Up @@ -456,8 +458,8 @@ class DeltaLog private(
// data.
if (!cdcOptions.isEmpty) {
recordDeltaEvent(this, "delta.cdf.read", data = cdcOptions.asCaseSensitiveMap())
return CDCReader.getCDCRelation(spark,
this, snapshotToUse, partitionFilters, spark.sessionState.conf, cdcOptions)
return CDCReader.getCDCRelation(
spark, snapshotToUse, isTimeTravelQuery, spark.sessionState.conf, cdcOptions)
}

val fileIndex = TahoeLogFileIndex(
Expand Down
53 changes: 51 additions & 2 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ object DeltaOptions extends DeltaLogging {
val CDC_END_TIMESTAMP = "endingTimestamp"
val CDC_READ_OPTION = "readChangeFeed"
val CDC_READ_OPTION_LEGACY = "readChangeData"

val VERSION_AS_OF = "versionAsOf"
val TIMESTAMP_AS_OF = "timestampAsOf"

val COMPRESSION = "compression"
val MAX_RECORDS_PER_FILE = "maxRecordsPerFile"
val TXN_APP_ID = "txnAppId"
Expand Down Expand Up @@ -284,8 +288,8 @@ object DeltaOptions extends DeltaLogging {
"queryName",
"checkpointLocation",
"path",
"timestampAsOf",
"versionAsOf"
VERSION_AS_OF,
TIMESTAMP_AS_OF
)


Expand All @@ -302,6 +306,51 @@ object DeltaOptions extends DeltaLogging {
}
}

/**
* Definitions for the batch read schema mode for CDF
*/
sealed trait DeltaBatchCDFSchemaMode {
def name: String
}

/**
* `latest` batch CDF schema mode specifies that the latest schema should be used when serving
* the CDF batch.
*/
case object BatchCDFSchemaLatest extends DeltaBatchCDFSchemaMode {
val name = "latest"
}

/**
* `endVersion` batch CDF schema mode specifies that the query range's end version's schema should
* be used for serving the CDF batch.
* This is the current default for column mapping enabled tables so we could read using the exact
* schema at the versions being queried to reduce schema read compatibility mismatches.
*/
case object BatchCDFSchemaEndVersion extends DeltaBatchCDFSchemaMode {
val name = "endversion"
}

/**
* `legacy` batch CDF schema mode specifies that neither latest nor end version's schema is
* strictly used for serving the CDF batch, e.g. when user uses TimeTravel with batch CDF and wants
* to respect the time travelled schema.
* This is the current default for non-column mapping tables.
*/
case object BatchCDFSchemaLegacy extends DeltaBatchCDFSchemaMode {
val name = "legacy"
}

object DeltaBatchCDFSchemaMode {
def apply(name: String): DeltaBatchCDFSchemaMode = {
name.toLowerCase(Locale.ROOT) match {
case BatchCDFSchemaLatest.name => BatchCDFSchemaLatest
case BatchCDFSchemaEndVersion.name => BatchCDFSchemaEndVersion
case BatchCDFSchemaLegacy.name => BatchCDFSchemaLegacy
}
}
}

/**
* Definitions for the starting version of a Delta stream.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ class DeltaAnalysisException(
errorClass = Some(errorClass),
messageParameters = messageParameters,
cause = cause)
with DeltaThrowable
with DeltaThrowable {
def getMessageParametersArray: Array[String] = messageParameters
}

class DeltaIllegalArgumentException(
errorClass: String,
Expand All @@ -35,6 +37,7 @@ class DeltaIllegalArgumentException(
DeltaThrowableHelper.getMessage(errorClass, messageParameters), cause)
with DeltaThrowable {
override def getErrorClass: String = errorClass
def getMessageParametersArray: Array[String] = messageParameters
}

class DeltaUnsupportedOperationException(
Expand All @@ -44,4 +47,5 @@ class DeltaUnsupportedOperationException(
DeltaThrowableHelper.getMessage(errorClass, messageParameters))
with DeltaThrowable {
override def getErrorClass: String = errorClass
def getMessageParametersArray: Array[String] = messageParameters
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.delta.commands.WriteIntoDelta
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSourceUtils}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
Expand Down Expand Up @@ -101,6 +102,12 @@ case class DeltaTableV2(

lazy val snapshot: Snapshot = {
timeTravelSpec.map { spec =>
// By default, block using CDF + time-travel
if (CDCReader.isCDCRead(cdcOptions) &&
!spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CDF_ALLOW_TIME_TRAVEL_OPTIONS)) {
throw DeltaErrors.timeTravelNotSupportedException
}

val (version, accessType) = DeltaTableUtils.resolveTimeTravelVersion(
spark.sessionState.conf, deltaLog, spec)
val source = spec.creationSource.getOrElse("unknown")
Expand Down
Loading