Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#868] Register VACUUM operation in the _delta_log #1552

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,34 @@ object DeltaOperations {
override val operationMetrics: Set[String] = DeltaOperationMetrics.CLONE
}

/**
* @param retentionCheckEnabled - whether retention check was enabled for this run of vacuum.
* @param specifiedRetentionMillis - specified retention interval
* @param defaultRetentionMillis - default retention period for the table
*/
case class VacuumStart(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we combine VacuumStart and Vacuum to one? I think this is enough.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean VacuumStart and VacuumEnd? It's better to have two events so that people can quickly check whether the vacuum command completes or not.

retentionCheckEnabled: Boolean,
specifiedRetentionMillis: Option[Long],
defaultRetentionMillis: Long) extends Operation("VACUUM START") {
override val parameters: Map[String, Any] = Map(
"retentionCheckEnabled" -> retentionCheckEnabled,
"defaultRetentionMillis" -> defaultRetentionMillis
) ++ specifiedRetentionMillis.map("specifiedRetentionMillis" -> _)

override val operationMetrics: Set[String] = DeltaOperationMetrics.VACUUM_START
}

/**
* @param status - whether the vacuum operation was successful; either "COMPLETED" or "FAILED"
*/
case class VacuumEnd(status: String) extends Operation(s"VACUUM END") {
override val parameters: Map[String, Any] = Map(
"status" -> status
)

override val operationMetrics: Set[String] = DeltaOperationMetrics.VACUUM_END
}


private def structFieldToMap(colPath: Seq[String], field: StructField): Map[String, Any] = {
Map(
Expand Down Expand Up @@ -660,4 +688,14 @@ private[delta] object DeltaOperationMetrics {
"copiedFilesSize" // size of files copied - 0 for shallow tables
)

val VACUUM_START = Set(
"numFilesToDelete", // number of files that will be deleted by vacuum
"sizeOfDataToDelete" // total size in bytes of files that will be deleted by vacuum
)

val VACUUM_END = Set(
"numDeletedFiles", // number of files deleted by vacuum
"numVacuumedDirectories" // number of directories vacuumed
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.DeltaFileOperations
import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
import org.apache.spark.sql.functions.{col, count, sum}
import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock}

Expand Down Expand Up @@ -289,6 +292,42 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {

trait VacuumCommandImpl extends DeltaCommand {

private val supportedFsForLogging = Seq(
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
"wasbs", "wasbss", "abfs", "abfss", "adl", "gs", "file", "hdfs"
)

private def shouldLogVacuum(
spark: SparkSession,
deltaLog: DeltaLog,
hadoopConf: Configuration,
path: Path): Boolean = {
val logVacuumConf = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_LOGGING_ENABLED)

if (logVacuumConf.nonEmpty) {
return logVacuumConf.get
}

val logStore = deltaLog.store

try {
val rawResolvedUri: URI = logStore.resolvePathOnPhysicalStorage(path, hadoopConf).toUri
val scheme = rawResolvedUri.getScheme
if (supportedFsForLogging.contains(scheme)) {
true
} else {
false
}
} catch {
case _: UnsupportedOperationException =>
logWarning("Vacuum event logging" +
" not enabled on this file system because we cannot detect your cloud storage type.")
false
}
}

/**
* Record Vacuum specific metrics in the commit log at the START of vacuum.
*/
protected def logVacuumStart(
spark: SparkSession,
deltaLog: DeltaLog,
Expand All @@ -299,14 +338,63 @@ trait VacuumCommandImpl extends DeltaCommand {
defaultRetentionMillis: Long): Unit = {
logInfo(s"Deleting untracked files and empty directories in $path. The amount of data to be " +
s"deleted is $sizeOfDataToDelete (in bytes)")

// We perform an empty commit in order to record information about the Vacuum
if (shouldLogVacuum(spark, deltaLog, deltaLog.newDeltaHadoopConf(), path)) {
val checkEnabled =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RETENTION_CHECK_ENABLED)
val txn = deltaLog.startTransaction()
val metrics = Map[String, SQLMetric](
"numFilesToDelete" -> createMetric(spark.sparkContext, "number of files to deleted"),
"sizeOfDataToDelete" -> createMetric(spark.sparkContext,
"The total amount of data to be deleted in bytes")
)
metrics("numFilesToDelete").set(diff.count())
metrics("sizeOfDataToDelete").set(sizeOfDataToDelete)
txn.registerSQLMetrics(spark, metrics)
txn.commit(actions = Seq(), DeltaOperations.VacuumStart(
checkEnabled,
specifiedRetentionMillis,
defaultRetentionMillis
))
}
}

/**
* Record Vacuum specific metrics in the commit log at the END of vacuum.
*
* @param deltaLog - DeltaLog of the table
* @param spark - spark session
* @param filesDeleted - if the vacuum completed this will contain the number of files deleted.
* if the vacuum failed, this will be None.
* @param dirCounts - if the vacuum completed this will contain the number of directories
* vacuumed. if the vacuum failed, this will be None.
*/
protected def logVacuumEnd(
deltaLog: DeltaLog,
spark: SparkSession,
path: Path,
filesDeleted: Option[Long] = None,
dirCounts: Option[Long] = None): Unit = {
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
if (shouldLogVacuum(spark, deltaLog, deltaLog.newDeltaHadoopConf(), path)) {
val txn = deltaLog.startTransaction()
val status = if (filesDeleted.isEmpty && dirCounts.isEmpty) { "FAILED" } else { "COMPLETED" }
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
if (filesDeleted.nonEmpty && dirCounts.nonEmpty) {
val metrics = Map[String, SQLMetric](
"numDeletedFiles" -> createMetric(spark.sparkContext, "number of files deleted."),
"numVacuumedDirectories" ->
createMetric(spark.sparkContext, "num of directories vacuumed."),
"status" -> createMetric(spark.sparkContext, "status of vacuum")
)
metrics("numDeletedFiles").set(filesDeleted.get)
metrics("numVacuumedDirectories").set(dirCounts.get)
txn.registerSQLMetrics(spark, metrics)
}
txn.commit(actions = Seq(), DeltaOperations.VacuumEnd(
status
))
}

if (filesDeleted.nonEmpty) {
logConsole(s"Deleted ${filesDeleted.get} files and directories in a total " +
s"of ${dirCounts.get} directories.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,15 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val DELTA_VACUUM_LOGGING_ENABLED =
buildConf("vacuum.logging.enabled")
.doc("Whether to log vacuum information into the Delta transaction log." +
" 'spark.databricks.delta.commitInfo.enabled' should be enabled when using this config." +
" Users should only set this config to 'true' when the underlying file system safely" +
" supports concurrent writes.")
.booleanConf
.createOptional
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved

val DELTA_VACUUM_RETENTION_CHECK_ENABLED =
buildConf("retentionDurationCheck.enabled")
.doc("Adds a check preventing users from running vacuum with a very short retention " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.hadoop.fs.Path
import org.scalatest.GivenWhenThen

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down Expand Up @@ -772,6 +772,104 @@ trait DeltaVacuumSuiteBase extends QueryTest
test("vacuum for cdc - delete tombstones") {
testCDCVacuumForTombstones()
}

private def getFromHistory(history: DataFrame, key: String, pos: Integer): Map[String, String] = {
val op = history.select(key).take(pos + 1)
if (pos == 0) {
op.head.getMap(0).asInstanceOf[Map[String, String]]
} else {
op.tail.head.getMap(0).asInstanceOf[Map[String, String]]
}
}

private def testEventLogging(
isDryRun: Boolean,
loggingEnabled: Boolean,
retentionHours: Long,
timeGapHours: Long): Unit = {

test(s"vacuum event logging dryRun=$isDryRun loggingEnabled=$loggingEnabled" +
s" retentionHours=$retentionHours timeGap=$timeGapHours") {
withSQLConf(DeltaSQLConf.DELTA_VACUUM_LOGGING_ENABLED.key -> loggingEnabled.toString) {

withEnvironment { (dir, clock) =>
spark.range(2).write.format("delta").save(dir.getAbsolutePath)
val deltaLog = DeltaLog.forTable(spark, dir.getAbsolutePath, clock)
val expectedReturn = if (isDryRun) {
// dry run returns files that will be deleted
Seq(new Path(dir.getAbsolutePath, "file1.txt").toString)
} else {
Seq(dir.getAbsolutePath)
}

gcTest(deltaLog, clock)(
CreateFile("file1.txt", commitToActionLog = true),
CreateFile("file2.txt", commitToActionLog = true),
LogicallyDeleteFile("file1.txt"),
AdvanceClock(timeGapHours * 1000 * 60 * 60),
GC(dryRun = isDryRun, expectedReturn, Some(retentionHours))
)
val deltaTable = io.delta.tables.DeltaTable.forPath(deltaLog.dataPath.toString)
val history = deltaTable.history()
if (isDryRun || !loggingEnabled) {
// We do not record stats when logging is disabled or dryRun
assert(history.select("operation").head() == Row("DELETE"))
} else {
assert(history.select("operation").head() == Row("VACUUM END"))
assert(history.select("operation").collect()(1) == Row("VACUUM START"))

val operationParamsBegin = getFromHistory(history, "operationParameters", 1)
val operationParamsEnd = getFromHistory(history, "operationParameters", 0)
val operationMetricsBegin = getFromHistory(history, "operationMetrics", 1)
val operationMetricsEnd = getFromHistory(history, "operationMetrics", 0)

val filesDeleted = if (retentionHours > timeGapHours) { 0 } else { 1 }
assert(operationParamsBegin("retentionCheckEnabled") === "false")
assert(operationMetricsBegin("numFilesToDelete") === filesDeleted.toString)
assert(operationMetricsBegin("sizeOfDataToDelete") === (filesDeleted * 9).toString)
assert(
operationParamsBegin("specifiedRetentionMillis") ===
(retentionHours * 60 * 60 * 1000).toString)
assert(
operationParamsBegin("defaultRetentionMillis") ===
DeltaLog.tombstoneRetentionMillis(deltaLog.snapshot.metadata).toString)

assert(operationParamsEnd === Map("status" -> "COMPLETED"))
assert(operationMetricsEnd === Map("numDeletedFiles" -> filesDeleted.toString,
"numVacuumedDirectories" -> "1"))
}
}
}
}
}

testEventLogging(
isDryRun = false,
loggingEnabled = true,
retentionHours = 5,
timeGapHours = 10
)

testEventLogging(
isDryRun = true, // dry run will not record the vacuum
loggingEnabled = true,
retentionHours = 5,
timeGapHours = 10
)

testEventLogging(
isDryRun = false,
loggingEnabled = false,
retentionHours = 5,
timeGapHours = 0
)

testEventLogging(
isDryRun = false,
loggingEnabled = true,
retentionHours = 20, // vacuum will not delete any files
timeGapHours = 10
)
}

class DeltaVacuumSuite
Expand Down