Skip to content

Commit

Permalink
Delta metadata reads fail on missing/corrupt files
Browse files Browse the repository at this point in the history
(Cherry-pick 0eb4c7e to branch-2.0)

GitOrigin-RevId: 92ef2532fe717da6368e6124c1ae3307133d134c
  • Loading branch information
ryan-johnson-databricks authored and vkorukanti committed Jan 6, 2023
1 parent 955cff4 commit 07391cf
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 15 deletions.
31 changes: 31 additions & 0 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper
import org.apache.spark.sql.catalyst.util.FailFastMode
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
import org.apache.spark.sql.types.{StructField, StructType}
Expand Down Expand Up @@ -192,6 +193,36 @@ class DeltaLog private(
}
}

/**
* Creates a [[LogicalRelation]] for a given [[DeltaLogFileIndex]], with all necessary file source
* options taken from the Delta Log. All reads of Delta metadata files should use this method.
*/
def indexToRelation(
index: DeltaLogFileIndex,
schema: StructType = Action.logSchema): LogicalRelation = {
val formatSpecificOptions: Map[String, String] = index.format match {
case DeltaLogFileIndex.COMMIT_FILE_FORMAT =>
// Don't tolerate malformed JSON when parsing Delta log actions (default is PERMISSIVE)
Map("mode" -> FailFastMode.name)
case _ => Map.empty
}
// Delta should NEVER ignore missing or corrupt metadata files, because doing so can render the
// entire table unusable. Hard-wire that into the file source options so the user can't override
// it by setting spark.sql.files.ignoreCorruptFiles or spark.sql.files.ignoreMissingFiles.
//
// NOTE: This should ideally be [[FileSourceOptions.IGNORE_CORRUPT_FILES]] etc., but those
// constants are only available since spark-3.4. By hard-coding the values here instead, we
// preserve backward compatibility when compiling Delta against older spark versions (tho
// obviously the desired protection would be missing in that case).
val allOptions = options ++ formatSpecificOptions ++ Map(
"ignoreCorruptFiles" -> "false",
"ignoreMissingFiles" -> "false"
)
val fsRelation = HadoopFsRelation(
index, index.partitionSchema, schema, None, index.format, allOptions)(spark)
LogicalRelation(fsRelation)
}

/* ------------------ *
| Delta Management |
* ------------------ */
Expand Down
16 changes: 1 addition & 15 deletions core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -339,20 +339,6 @@ class Snapshot(
checkpointFileIndexOpt.toSeq ++ deltaFileIndexOpt.toSeq
}

/** Creates a LogicalRelation with the given schema from a DeltaLogFileIndex. */
protected def indexToRelation(
index: DeltaLogFileIndex,
schema: StructType = logSchema): LogicalRelation = {
val fsRelation = HadoopFsRelation(
index,
index.partitionSchema,
schema,
None,
index.format,
deltaLog.options)(spark)
LogicalRelation(fsRelation)
}

/**
* Loads the file indices into a DataFrame that can be used for LogReplay.
*
Expand All @@ -363,7 +349,7 @@ class Snapshot(
* config settings for delta.checkpoint.writeStatsAsJson and delta.checkpoint.writeStatsAsStruct).
*/
protected def loadActions: DataFrame = {
val dfs = fileIndices.map { index => Dataset.ofRows(spark, indexToRelation(index)) }
val dfs = fileIndices.map { index => Dataset.ofRows(spark, deltaLog.indexToRelation(index)) }
dfs.reduceOption(_.union(_)).getOrElse(emptyDF)
.withColumn(ACTION_SORT_COL_NAME, input_file_name())
.withColumn(ADD_STATS_TO_USE_COL_NAME, col("add.stats"))
Expand Down

0 comments on commit 07391cf

Please sign in to comment.