Skip to content

Commit

Permalink
Register VACUUM operations in the delta log
Browse files Browse the repository at this point in the history
This PR registers the start and end of VACUUM operations in the delta log. This means that we commit a commit with no Add/Remove files, and only a `CommitInfo` file which contains the delta operation info.

`VacuumStart` operation contains metrics: `numFilesToDelete` and `sizeOfDataToDelete`

`VacuumEnd` operation contains metrics: `numDeletedFiles` and `numVacuumedDirectories`

New UTs.

Expose additional metrics and history in the _delta_log for the start and end of VACUUM operations.

Closes delta-io#1552.
Resolves delta-io#868.

Co-authored-by: Yann Byron <biyan900116@gmail.com>
GitOrigin-RevId: 94805531d022bac4afafd0b672d17b8828d8aa2c
  • Loading branch information
scottsand-db and YannByron committed Jan 12, 2023
1 parent 57d68b3 commit a03fda4
Show file tree
Hide file tree
Showing 4 changed files with 285 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,34 @@ object DeltaOperations {
override val operationMetrics: Set[String] = DeltaOperationMetrics.OPTIMIZE
}

/**
* @param retentionCheckEnabled - whether retention check was enabled for this run of vacuum.
* @param specifiedRetentionMillis - specified retention interval
* @param defaultRetentionMillis - default retention period for the table
*/
case class VacuumStart(
retentionCheckEnabled: Boolean,
specifiedRetentionMillis: Option[Long],
defaultRetentionMillis: Long) extends Operation("VACUUM START") {
override val parameters: Map[String, Any] = Map(
"retentionCheckEnabled" -> retentionCheckEnabled,
"defaultRetentionMillis" -> defaultRetentionMillis
) ++ specifiedRetentionMillis.map("specifiedRetentionMillis" -> _)

override val operationMetrics: Set[String] = DeltaOperationMetrics.VACUUM_START
}

/**
* @param status - whether the vacuum operation was successful; either "COMPLETED" or "FAILED"
*/
case class VacuumEnd(status: String) extends Operation(s"VACUUM END") {
override val parameters: Map[String, Any] = Map(
"status" -> status
)

override val operationMetrics: Set[String] = DeltaOperationMetrics.VACUUM_END
}


private def structFieldToMap(colPath: Seq[String], field: StructField): Map[String, Any] = {
Map(
Expand Down Expand Up @@ -497,4 +525,15 @@ private[delta] object DeltaOperationMetrics {
"removedFilesSize", // size in bytes of files removed by the restore
"restoredFilesSize" // size in bytes of files added by the restore
)

val VACUUM_START = Set(
"numFilesToDelete", // number of files that will be deleted by vacuum
"sizeOfDataToDelete" // total size in bytes of files that will be deleted by vacuum
)

val VACUUM_END = Set(
"numDeletedFiles", // number of files deleted by vacuum
"numVacuumedDirectories" // number of directories vacuumed
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.DeltaFileOperations
import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
import org.apache.spark.sql.functions.{col, count, sum}
import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock}

/**
Expand All @@ -49,6 +52,8 @@ import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock}
*/
object VacuumCommand extends VacuumCommandImpl with Serializable {

case class FileNameAndSize(path: String, length: Long)

/**
* Additional check on retention duration to prevent people from shooting themselves in the foot.
*/
Expand Down Expand Up @@ -204,21 +209,32 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
val fs = reservoirBase.getFileSystem(hadoopConf.value.value)
fileStatusIterator.flatMap { fileStatus =>
if (fileStatus.isDir) {
Iterator.single(relativize(fileStatus.getPath, fs, reservoirBase, isDir = true))
Iterator.single(FileNameAndSize(
relativize(fileStatus.getPath, fs, reservoirBase, isDir = true), 0L))
} else {
val dirs = getAllSubdirs(basePath, fileStatus.path, fs)
val dirsWithSlash = dirs.map { p =>
relativize(new Path(p), fs, reservoirBase, isDir = true)
val relativizedPath = relativize(new Path(p), fs, reservoirBase, isDir = true)
FileNameAndSize(relativizedPath, 0L)
}
dirsWithSlash ++ Iterator(
relativize(new Path(fileStatus.path), fs, reservoirBase, isDir = false))
FileNameAndSize(relativize(
fileStatus.getPath, fs, reservoirBase, isDir = false), fileStatus.length))
}
}
}.groupBy($"value" as 'path)
.count()
}.groupBy(col("path")).agg(count(new Column("*")).as("count"), sum("length").as("length"))
.join(validFiles, Seq("path"), "leftanti")
.where('count === 1)
.select('path)

val sizeOfDataToDeleteRow = diff.agg(sum("length").cast("long")).first
val sizeOfDataToDelete = if (sizeOfDataToDeleteRow.isNullAt(0)) {
0L
} else {
sizeOfDataToDeleteRow.getLong(0)
}

val diffFiles = diff
.select(col("path"))
.as[String]
.map { relativePath =>
assert(!stringToPath(relativePath).isAbsolute,
Expand All @@ -227,31 +243,33 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
}

if (dryRun) {
val numFiles = diff.count()
val numFiles = diffFiles.count()
val stats = DeltaVacuumStats(
isDryRun = true,
specifiedRetentionMillis = retentionMillis,
defaultRetentionMillis = deltaLog.tombstoneRetentionMillis,
minRetainedTimestamp = deleteBeforeTimestamp,
dirsPresentBeforeDelete = dirCounts,
objectsDeleted = numFiles)
objectsDeleted = numFiles,
sizeOfDataToDelete = sizeOfDataToDelete)

recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
logConsole(s"Found $numFiles files and directories in a total of " +
s"$dirCounts directories that are safe to delete.")
logConsole(s"Found $numFiles files ($sizeOfDataToDelete bytes) and directories in " +
s"a total of $dirCounts directories that are safe to delete.")

return diff.map(f => stringToPath(f).toString).toDF("path")
return diffFiles.map(f => stringToPath(f).toString).toDF("path")
}
logVacuumStart(
spark,
deltaLog,
path,
diff,
diffFiles,
sizeOfDataToDelete,
retentionMillis,
deltaLog.tombstoneRetentionMillis)

val filesDeleted = try {
delete(diff, spark, basePath, hadoopConf, parallelDeleteEnabled,
delete(diffFiles, spark, basePath, hadoopConf, parallelDeleteEnabled,
parallelDeletePartitions)
} catch { case t: Throwable =>
logVacuumEnd(deltaLog, spark, path)
Expand All @@ -263,7 +281,8 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
defaultRetentionMillis = deltaLog.tombstoneRetentionMillis,
minRetainedTimestamp = deleteBeforeTimestamp,
dirsPresentBeforeDelete = dirCounts,
objectsDeleted = filesDeleted)
objectsDeleted = filesDeleted,
sizeOfDataToDelete = sizeOfDataToDelete)
recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
logVacuumEnd(deltaLog, spark, path, Some(filesDeleted), Some(dirCounts))

Expand All @@ -277,22 +296,122 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {

trait VacuumCommandImpl extends DeltaCommand {

private val supportedFsForLogging = Seq(
"wasbs", "wasbss", "abfs", "abfss", "adl", "gs", "file", "hdfs"
)

/**
* Returns whether we should record vacuum metrics in the delta log.
*/
private def shouldLogVacuum(
spark: SparkSession,
deltaLog: DeltaLog,
hadoopConf: Configuration,
path: Path): Boolean = {
val logVacuumConf = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_LOGGING_ENABLED)

if (logVacuumConf.nonEmpty) {
return logVacuumConf.get
}

val logStore = deltaLog.store

try {
val rawResolvedUri: URI = logStore.resolvePathOnPhysicalStorage(path, hadoopConf).toUri
val scheme = rawResolvedUri.getScheme
if (supportedFsForLogging.contains(scheme)) {
true
} else {
false
}
} catch {
case _: UnsupportedOperationException =>
logWarning("Vacuum event logging" +
" not enabled on this file system because we cannot detect your cloud storage type.")
false
}
}

/**
* Record Vacuum specific metrics in the commit log at the START of vacuum.
*
* @param spark - spark session
* @param deltaLog - DeltaLog of the table
* @param path - the (data) path to the root of the table
* @param diff - the list of paths (files, directories) that are safe to delete
* @param sizeOfDataToDelete - the amount of data (bytes) to be deleted
* @param specifiedRetentionMillis - the optional override retention period (millis) to keep
* logically removed files before deleting them
* @param defaultRetentionMillis - the default retention period (millis)
*/
protected def logVacuumStart(
spark: SparkSession,
deltaLog: DeltaLog,
path: Path,
diff: Dataset[String],
sizeOfDataToDelete: Long,
specifiedRetentionMillis: Option[Long],
defaultRetentionMillis: Long): Unit = {
logInfo(s"Deleting untracked files and empty directories in $path")
logInfo(s"Deleting untracked files and empty directories in $path. The amount of data to be " +
s"deleted is $sizeOfDataToDelete (in bytes)")

// We perform an empty commit in order to record information about the Vacuum
if (shouldLogVacuum(spark, deltaLog, deltaLog.newDeltaHadoopConf(), path)) {
val checkEnabled =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RETENTION_CHECK_ENABLED)
val txn = deltaLog.startTransaction()
val metrics = Map[String, SQLMetric](
"numFilesToDelete" -> createMetric(spark.sparkContext, "number of files to deleted"),
"sizeOfDataToDelete" -> createMetric(spark.sparkContext,
"The total amount of data to be deleted in bytes")
)
metrics("numFilesToDelete").set(diff.count())
metrics("sizeOfDataToDelete").set(sizeOfDataToDelete)
txn.registerSQLMetrics(spark, metrics)
txn.commit(actions = Seq(), DeltaOperations.VacuumStart(
checkEnabled,
specifiedRetentionMillis,
defaultRetentionMillis
))
}
}

/**
* Record Vacuum specific metrics in the commit log at the END of vacuum.
*
* @param deltaLog - DeltaLog of the table
* @param spark - spark session
* @param path - the (data) path to the root of the table
* @param filesDeleted - if the vacuum completed this will contain the number of files deleted.
* if the vacuum failed, this will be None.
* @param dirCounts - if the vacuum completed this will contain the number of directories
* vacuumed. if the vacuum failed, this will be None.
*/
protected def logVacuumEnd(
deltaLog: DeltaLog,
spark: SparkSession,
path: Path,
filesDeleted: Option[Long] = None,
dirCounts: Option[Long] = None): Unit = {
if (shouldLogVacuum(spark, deltaLog, deltaLog.newDeltaHadoopConf(), path)) {
val txn = deltaLog.startTransaction()
val status = if (filesDeleted.isEmpty && dirCounts.isEmpty) { "FAILED" } else { "COMPLETED" }
if (filesDeleted.nonEmpty && dirCounts.nonEmpty) {
val metrics = Map[String, SQLMetric](
"numDeletedFiles" -> createMetric(spark.sparkContext, "number of files deleted."),
"numVacuumedDirectories" ->
createMetric(spark.sparkContext, "num of directories vacuumed."),
"status" -> createMetric(spark.sparkContext, "status of vacuum")
)
metrics("numDeletedFiles").set(filesDeleted.get)
metrics("numVacuumedDirectories").set(dirCounts.get)
txn.registerSQLMetrics(spark, metrics)
}
txn.commit(actions = Seq(), DeltaOperations.VacuumEnd(
status
))
}

if (filesDeleted.nonEmpty) {
logConsole(s"Deleted ${filesDeleted.get} files and directories in a total " +
s"of ${dirCounts.get} directories.")
Expand Down Expand Up @@ -362,4 +481,5 @@ case class DeltaVacuumStats(
defaultRetentionMillis: Long,
minRetainedTimestamp: Long,
dirsPresentBeforeDelete: Long,
objectsDeleted: Long)
objectsDeleted: Long,
sizeOfDataToDelete: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,15 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val DELTA_VACUUM_LOGGING_ENABLED =
buildConf("vacuum.logging.enabled")
.doc("Whether to log vacuum information into the Delta transaction log." +
" 'spark.databricks.delta.commitInfo.enabled' should be enabled when using this config." +
" Users should only set this config to 'true' when the underlying file system safely" +
" supports concurrent writes.")
.booleanConf
.createOptional

val DELTA_VACUUM_RETENTION_CHECK_ENABLED =
buildConf("retentionDurationCheck.enabled")
.doc("Adds a check preventing users from running vacuum with a very short retention " +
Expand Down
Loading

0 comments on commit a03fda4

Please sign in to comment.