diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala index 9242899ea67..d77ebbd1bfb 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala @@ -448,6 +448,34 @@ object DeltaOperations { override val operationMetrics: Set[String] = DeltaOperationMetrics.CLONE } + /** + * @param retentionCheckEnabled - whether retention check was enabled for this run of vacuum. + * @param specifiedRetentionMillis - specified retention interval + * @param defaultRetentionMillis - default retention period for the table + */ + case class VacuumStart( + retentionCheckEnabled: Boolean, + specifiedRetentionMillis: Option[Long], + defaultRetentionMillis: Long) extends Operation("VACUUM START") { + override val parameters: Map[String, Any] = Map( + "retentionCheckEnabled" -> retentionCheckEnabled, + "defaultRetentionMillis" -> defaultRetentionMillis + ) ++ specifiedRetentionMillis.map("specifiedRetentionMillis" -> _) + + override val operationMetrics: Set[String] = DeltaOperationMetrics.VACUUM_START + } + + /** + * @param status - whether the vacuum operation was successful; either "COMPLETED" or "FAILED" + */ + case class VacuumEnd(status: String) extends Operation(s"VACUUM END") { + override val parameters: Map[String, Any] = Map( + "status" -> status + ) + + override val operationMetrics: Set[String] = DeltaOperationMetrics.VACUUM_END + } + private def structFieldToMap(colPath: Seq[String], field: StructField): Map[String, Any] = { Map( @@ -660,4 +688,14 @@ private[delta] object DeltaOperationMetrics { "copiedFilesSize" // size of files copied - 0 for shallow tables ) + val VACUUM_START = Set( + "numFilesToDelete", // number of files that will be deleted by vacuum + "sizeOfDataToDelete" // total size in bytes of files that will be deleted by vacuum + ) + + val VACUUM_END = Set( + "numDeletedFiles", // number of files deleted by vacuum + "numVacuumedDirectories" // number of directories vacuumed + ) + } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala index 7753f01cffc..875d38b1b5f 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala @@ -29,10 +29,13 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.DeltaFileOperations import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric import org.apache.spark.sql.functions.{col, count, sum} import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock} @@ -289,6 +292,42 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { trait VacuumCommandImpl extends DeltaCommand { + private val supportedFsForLogging = Seq( + "wasbs", "wasbss", "abfs", "abfss", "adl", "gs", "file", "hdfs" + ) + + private def shouldLogVacuum( + spark: SparkSession, + deltaLog: DeltaLog, + hadoopConf: Configuration, + path: Path): Boolean = { + val logVacuumConf = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_LOGGING_ENABLED) + + if (logVacuumConf.nonEmpty) { + return logVacuumConf.get + } + + val logStore = deltaLog.store + + try { + val rawResolvedUri: URI = logStore.resolvePathOnPhysicalStorage(path, hadoopConf).toUri + val scheme = rawResolvedUri.getScheme + if (supportedFsForLogging.contains(scheme)) { + true + } else { + false + } + } catch { + case _: UnsupportedOperationException => + logWarning("Vacuum event logging" + + " not enabled on this file system because we cannot detect your cloud storage type.") + false + } + } + + /** + * Record Vacuum specific metrics in the commit log at the START of vacuum. + */ protected def logVacuumStart( spark: SparkSession, deltaLog: DeltaLog, @@ -299,14 +338,63 @@ trait VacuumCommandImpl extends DeltaCommand { defaultRetentionMillis: Long): Unit = { logInfo(s"Deleting untracked files and empty directories in $path. The amount of data to be " + s"deleted is $sizeOfDataToDelete (in bytes)") + + // We perform an empty commit in order to record information about the Vacuum + if (shouldLogVacuum(spark, deltaLog, deltaLog.newDeltaHadoopConf(), path)) { + val checkEnabled = + spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RETENTION_CHECK_ENABLED) + val txn = deltaLog.startTransaction() + val metrics = Map[String, SQLMetric]( + "numFilesToDelete" -> createMetric(spark.sparkContext, "number of files to deleted"), + "sizeOfDataToDelete" -> createMetric(spark.sparkContext, + "The total amount of data to be deleted in bytes") + ) + metrics("numFilesToDelete").set(diff.count()) + metrics("sizeOfDataToDelete").set(sizeOfDataToDelete) + txn.registerSQLMetrics(spark, metrics) + txn.commit(actions = Seq(), DeltaOperations.VacuumStart( + checkEnabled, + specifiedRetentionMillis, + defaultRetentionMillis + )) + } } + /** + * Record Vacuum specific metrics in the commit log at the END of vacuum. + * + * @param deltaLog - DeltaLog of the table + * @param spark - spark session + * @param filesDeleted - if the vacuum completed this will contain the number of files deleted. + * if the vacuum failed, this will be None. + * @param dirCounts - if the vacuum completed this will contain the number of directories + * vacuumed. if the vacuum failed, this will be None. + */ protected def logVacuumEnd( deltaLog: DeltaLog, spark: SparkSession, path: Path, filesDeleted: Option[Long] = None, dirCounts: Option[Long] = None): Unit = { + if (shouldLogVacuum(spark, deltaLog, deltaLog.newDeltaHadoopConf(), path)) { + val txn = deltaLog.startTransaction() + val status = if (filesDeleted.isEmpty && dirCounts.isEmpty) { "FAILED" } else { "COMPLETED" } + if (filesDeleted.nonEmpty && dirCounts.nonEmpty) { + val metrics = Map[String, SQLMetric]( + "numDeletedFiles" -> createMetric(spark.sparkContext, "number of files deleted."), + "numVacuumedDirectories" -> + createMetric(spark.sparkContext, "num of directories vacuumed."), + "status" -> createMetric(spark.sparkContext, "status of vacuum") + ) + metrics("numDeletedFiles").set(filesDeleted.get) + metrics("numVacuumedDirectories").set(dirCounts.get) + txn.registerSQLMetrics(spark, metrics) + } + txn.commit(actions = Seq(), DeltaOperations.VacuumEnd( + status + )) + } + if (filesDeleted.nonEmpty) { logConsole(s"Deleted ${filesDeleted.get} files and directories in a total " + s"of ${dirCounts.get} directories.") diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index e6c2e1fb499..696ed46adeb 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -294,6 +294,15 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + val DELTA_VACUUM_LOGGING_ENABLED = + buildConf("vacuum.logging.enabled") + .doc("Whether to log vacuum information into the Delta transaction log." + + " 'spark.databricks.delta.commitInfo.enabled' should be enabled when using this config." + + " Users should only set this config to 'true' when the underlying file system safely" + + " supports concurrent writes.") + .booleanConf + .createOptional + val DELTA_VACUUM_RETENTION_CHECK_ENABLED = buildConf("retentionDurationCheck.enabled") .doc("Adds a check preventing users from running vacuum with a very short retention " + diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala index 84f7270436c..e034db79e50 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.fs.Path import org.scalatest.GivenWhenThen import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode} +import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.util.IntervalUtils import org.apache.spark.sql.execution.metric.SQLMetric @@ -772,6 +772,104 @@ trait DeltaVacuumSuiteBase extends QueryTest test("vacuum for cdc - delete tombstones") { testCDCVacuumForTombstones() } + + private def getFromHistory(history: DataFrame, key: String, pos: Integer): Map[String, String] = { + val op = history.select(key).take(pos + 1) + if (pos == 0) { + op.head.getMap(0).asInstanceOf[Map[String, String]] + } else { + op.tail.head.getMap(0).asInstanceOf[Map[String, String]] + } + } + + private def testEventLogging( + isDryRun: Boolean, + loggingEnabled: Boolean, + retentionHours: Long, + timeGapHours: Long): Unit = { + + test(s"vacuum event logging dryRun=$isDryRun loggingEnabled=$loggingEnabled" + + s" retentionHours=$retentionHours timeGap=$timeGapHours") { + withSQLConf(DeltaSQLConf.DELTA_VACUUM_LOGGING_ENABLED.key -> loggingEnabled.toString) { + + withEnvironment { (dir, clock) => + spark.range(2).write.format("delta").save(dir.getAbsolutePath) + val deltaLog = DeltaLog.forTable(spark, dir.getAbsolutePath, clock) + val expectedReturn = if (isDryRun) { + // dry run returns files that will be deleted + Seq(new Path(dir.getAbsolutePath, "file1.txt").toString) + } else { + Seq(dir.getAbsolutePath) + } + + gcTest(deltaLog, clock)( + CreateFile("file1.txt", commitToActionLog = true), + CreateFile("file2.txt", commitToActionLog = true), + LogicallyDeleteFile("file1.txt"), + AdvanceClock(timeGapHours * 1000 * 60 * 60), + GC(dryRun = isDryRun, expectedReturn, Some(retentionHours)) + ) + val deltaTable = io.delta.tables.DeltaTable.forPath(deltaLog.dataPath.toString) + val history = deltaTable.history() + if (isDryRun || !loggingEnabled) { + // We do not record stats when logging is disabled or dryRun + assert(history.select("operation").head() == Row("DELETE")) + } else { + assert(history.select("operation").head() == Row("VACUUM END")) + assert(history.select("operation").collect()(1) == Row("VACUUM START")) + + val operationParamsBegin = getFromHistory(history, "operationParameters", 1) + val operationParamsEnd = getFromHistory(history, "operationParameters", 0) + val operationMetricsBegin = getFromHistory(history, "operationMetrics", 1) + val operationMetricsEnd = getFromHistory(history, "operationMetrics", 0) + + val filesDeleted = if (retentionHours > timeGapHours) { 0 } else { 1 } + assert(operationParamsBegin("retentionCheckEnabled") === "false") + assert(operationMetricsBegin("numFilesToDelete") === filesDeleted.toString) + assert(operationMetricsBegin("sizeOfDataToDelete") === (filesDeleted * 9).toString) + assert( + operationParamsBegin("specifiedRetentionMillis") === + (retentionHours * 60 * 60 * 1000).toString) + assert( + operationParamsBegin("defaultRetentionMillis") === + DeltaLog.tombstoneRetentionMillis(deltaLog.snapshot.metadata).toString) + + assert(operationParamsEnd === Map("status" -> "COMPLETED")) + assert(operationMetricsEnd === Map("numDeletedFiles" -> filesDeleted.toString, + "numVacuumedDirectories" -> "1")) + } + } + } + } + } + + testEventLogging( + isDryRun = false, + loggingEnabled = true, + retentionHours = 5, + timeGapHours = 10 + ) + + testEventLogging( + isDryRun = true, // dry run will not record the vacuum + loggingEnabled = true, + retentionHours = 5, + timeGapHours = 10 + ) + + testEventLogging( + isDryRun = false, + loggingEnabled = false, + retentionHours = 5, + timeGapHours = 0 + ) + + testEventLogging( + isDryRun = false, + loggingEnabled = true, + retentionHours = 20, // vacuum will not delete any files + timeGapHours = 10 + ) } class DeltaVacuumSuite