Skip to content

Commit

Permalink
Always validate state reconstruction
Browse files Browse the repository at this point in the history
Delta has configs that disables state reconstruction validation and sometimes exposes them to user in err msg. Those are dangerous as backdoors to disasters. This PR made it always validate the state (for missing metadata and protocol) and always throw exception if fails.

Existing UT.

GitOrigin-RevId: 020d23774e7fda0d7b05bc68baf75cfe000c7746
  • Loading branch information
lzlfred authored and scottsand-db committed Oct 14, 2022
1 parent 7420cfc commit 6bca231
Show file tree
Hide file tree
Showing 8 changed files with 13 additions and 64 deletions.
4 changes: 1 addition & 3 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1371,9 +1371,7 @@
"DELTA_STATE_RECOVER_ERROR" : {
"message" : [
"The <operation> of your Delta table could not be recovered while Reconstructing",
"version: <version>. Did you manually delete files in the _delta_log directory?",
"Set <config> to \"false\"",
"to skip validation."
"version: <version>. Did you manually delete files in the _delta_log directory?"
],
"sqlState" : "22000"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -912,8 +912,7 @@ trait DeltaErrorsBase
def actionNotFoundException(action: String, version: Long): Throwable = {
new DeltaIllegalStateException(
errorClass = "DELTA_STATE_RECOVER_ERROR",
messageParameters = Array(action, version.toString,
DeltaSQLConf.DELTA_STATE_RECONSTRUCTION_VALIDATION_ENABLED.key))
messageParameters = Array(action, version.toString))
}

def schemaChangedException(
Expand Down
22 changes: 2 additions & 20 deletions core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,6 @@ class Snapshot(
cachedState.getDF
}

/** Helper method to log missing actions when state reconstruction checks are not enabled */
protected def logMissingActionWarning(action: String): Unit = {
logWarning(
s"""
|Found no $action in computed state, setting it to defaults. State reconstruction
|validation was turned off. To turn it back on set
|${DeltaSQLConf.DELTA_STATE_RECONSTRUCTION_VALIDATION_ENABLED.key} to "true"
""".stripMargin)
}

/**
* A Map of alias to aggregations which needs to be done to calculate the `computedState`
*/
Expand Down Expand Up @@ -213,29 +203,21 @@ class Snapshot(
val _computedState = recordFrameProfile("Delta", "snapshot.computedState.aggregations") {
stateDF.select(aggregations: _*).as[State].first()
}
val stateReconstructionCheck = spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_STATE_RECONSTRUCTION_VALIDATION_ENABLED)
if (_computedState.protocol == null) {
recordDeltaEvent(
deltaLog,
opType = "delta.assertions.missingAction",
data = Map(
"version" -> version.toString, "action" -> "Protocol", "source" -> "Snapshot"))
if (stateReconstructionCheck) {
throw DeltaErrors.actionNotFoundException("protocol", version)
}
throw DeltaErrors.actionNotFoundException("protocol", version)
}
if (_computedState.metadata == null) {
recordDeltaEvent(
deltaLog,
opType = "delta.assertions.missingAction",
data = Map(
"version" -> version.toString, "action" -> "Metadata", "source" -> "Metadata"))
if (stateReconstructionCheck) {
throw DeltaErrors.actionNotFoundException("metadata", version)
}
logMissingActionWarning("metadata")
_computedState.copy(metadata = Metadata())
throw DeltaErrors.actionNotFoundException("metadata", version)
} else {
_computedState
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,6 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val DELTA_STATE_RECONSTRUCTION_VALIDATION_ENABLED =
buildConf("stateReconstructionValidation.enabled")
.internal()
.doc("Whether to perform validation checks on the reconstructed state.")
.booleanConf
.createWithDefault(true)

val DELTA_COMMIT_VALIDATION_ENABLED =
buildConf("commitValidation.enabled")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,11 @@ class ActionSerializerSuite extends QueryTest with SharedSparkSession with Delta
// all cases.
val settings = Seq(
DeltaSQLConf.DELTA_COMMIT_VALIDATION_ENABLED.key -> "false",
DeltaSQLConf.DELTA_STATE_RECONSTRUCTION_VALIDATION_ENABLED.key -> "false",
DeltaSQLConf.DELTA_COMMIT_INFO_ENABLED.key -> "false") ++ extraSettings
withSQLConf(settings: _*) {

// Do one empty commit so that protocol gets committed.
deltaLog.startTransaction().commit(Seq(), ManualUpdate)
deltaLog.startTransaction().commit(Seq(Protocol(1, 2), Metadata()), ManualUpdate)

// Commit the actual action.
val version = deltaLog.startTransaction().commit(Seq(action), ManualUpdate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,9 +585,7 @@ trait DeltaErrorsSuiteBase
throw DeltaErrors.actionNotFoundException("action", 0)
}
val msg = s"""The action of your Delta table could not be recovered while Reconstructing
|version: 0. Did you manually delete files in the _delta_log directory?
|Set ${DeltaSQLConf.DELTA_STATE_RECONSTRUCTION_VALIDATION_ENABLED.key} to "false"
|to skip validation.""".stripMargin
|version: 0. Did you manually delete files in the _delta_log directory?""".stripMargin
assert(e.getMessage == msg)
}
{
Expand Down
27 changes: 7 additions & 20 deletions core/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -352,16 +352,10 @@ class DeltaLogSuite extends QueryTest
Iterator(selectedAction, file).map(a => JsonUtils.toJson(a.wrap)),
overwrite = false,
log.newDeltaHadoopConf())
withSQLConf(DeltaSQLConf.DELTA_STATE_RECONSTRUCTION_VALIDATION_ENABLED.key -> "true") {
val e = intercept[IllegalStateException] {
log.update()
}
assert(e.getMessage === DeltaErrors.actionNotFoundException(action, 0).getMessage)
}
// Disable the validation check
withSQLConf(DeltaSQLConf.DELTA_STATE_RECONSTRUCTION_VALIDATION_ENABLED.key -> "false") {
assert(log.update().version === 0L)
val e = intercept[IllegalStateException] {
log.update()
}
assert(e.getMessage === DeltaErrors.actionNotFoundException(action, 0).getMessage)
}
}
}
Expand Down Expand Up @@ -421,18 +415,11 @@ class DeltaLogSuite extends QueryTest
}

// Verify if the state reconstruction from the checkpoint fails.
withSQLConf(DeltaSQLConf.DELTA_STATE_RECONSTRUCTION_VALIDATION_ENABLED.key -> "true") {
val e = intercept[IllegalStateException] {
staleLog.update()
}
assert(e.getMessage ===
DeltaErrors.actionNotFoundException(action, checkpointInterval).getMessage)
}

// Disable state reconstruction validation and try again
withSQLConf(DeltaSQLConf.DELTA_STATE_RECONSTRUCTION_VALIDATION_ENABLED.key -> "false") {
assert(staleLog.update().version === checkpointInterval)
val e = intercept[IllegalStateException] {
staleLog.update()
}
assert(e.getMessage ===
DeltaErrors.actionNotFoundException(action, checkpointInterval).getMessage)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,6 @@ class OptimisticTransactionSuite
assert(e.getMessage == DeltaErrors.addFilePartitioningMismatchException(
Seq("col3"), Seq("col2")).getMessage)
}
// Try with commit validation turned off
withSQLConf(DeltaSQLConf.DELTA_STATE_RECONSTRUCTION_VALIDATION_ENABLED.key -> "false",
DeltaSQLConf.DELTA_COMMIT_VALIDATION_ENABLED.key -> "false") {
log.startTransaction().commit(Seq(AddFile(
log.dataPath.toString, Map("col3" -> "1"), 12322, 0L, true, null, null)), ManualUpdate)
assert(log.update().version === 1)
}
}
}

Expand Down

0 comments on commit 6bca231

Please sign in to comment.