diff --git a/delta-lake/common/src/main/databricks/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransactionBase.scala b/delta-lake/common/src/main/databricks/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransactionBase.scala index 9bce5d68344..d41c73c5535 100644 --- a/delta-lake/common/src/main/databricks/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransactionBase.scala +++ b/delta-lake/common/src/main/databricks/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransactionBase.scala @@ -53,7 +53,7 @@ import org.apache.spark.util.Clock * @param rapidsConf RAPIDS Accelerator config settings. */ abstract class GpuOptimisticTransactionBase - (deltaLog: DeltaLog, snapshot: Snapshot, rapidsConf: RapidsConf) + (deltaLog: DeltaLog, snapshot: Snapshot, val rapidsConf: RapidsConf) (implicit clock: Clock) extends OptimisticTransaction(deltaLog, snapshot)(clock) with DeltaLogging { diff --git a/delta-lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaSQLConf.scala b/delta-lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaSQLConf.scala index 55fe00df830..92e5c171301 100644 --- a/delta-lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaSQLConf.scala +++ b/delta-lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaSQLConf.scala @@ -22,8 +22,12 @@ package com.nvidia.spark.rapids.delta +import java.util.Locale + import com.databricks.sql.transaction.tahoe.sources.DeltaSQLConf +import org.apache.spark.network.util.ByteUnit + /** Delta Lake related configs that are not yet provided by Delta Lake. */ trait RapidsDeltaSQLConf { val OPTIMIZE_WRITE_SMALL_PARTITION_FACTOR = @@ -39,6 +43,30 @@ trait RapidsDeltaSQLConf { .doc("Factor used to rebalance partitions for optimize write.") .doubleConf .createWithDefault(1.2) + + val AUTO_COMPACT_TARGET = + DeltaSQLConf.buildConf("autoCompact.target") + .internal() + .doc( + """ + |Target files for auto compaction. + | "table", "commit", "partition" options are available. (default: partition) + | If "table", all files in table are eligible for auto compaction. + | If "commit", added/updated files by the commit are eligible. + | If "partition", all files in partitions containing any added/updated files + | by the commit are eligible. + |""".stripMargin + ) + .stringConf + .transform(_.toLowerCase(Locale.ROOT)) + .createWithDefault("partition") + + val AUTO_COMPACT_MAX_COMPACT_BYTES = + DeltaSQLConf.buildConf("autoCompact.maxCompactBytes") + .internal() + .doc("Maximum amount of data for auto compaction.") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("20GB") } object RapidsDeltaSQLConf extends RapidsDeltaSQLConf diff --git a/delta-lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaUtils.scala b/delta-lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaUtils.scala index 2dfc7d671e3..99f110acb88 100644 --- a/delta-lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaUtils.scala +++ b/delta-lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaUtils.scala @@ -90,16 +90,5 @@ object RapidsDeltaUtils { } } } - - val autoCompactEnabled = - getSQLConf("spark.databricks.delta.autoCompact.enabled").orElse { - val metadata = deltaLog.snapshot.metadata - metadata.configuration.get("delta.autoOptimize.autoCompact").orElse { - getSQLConf("spark.databricks.delta.properties.defaults.autoOptimize.autoCompact") - } - }.exists(_.toBoolean) - if (autoCompactEnabled) { - meta.willNotWorkOnGpu("automatic compaction of Delta Lake tables is not supported") - } } } diff --git a/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala new file mode 100644 index 00000000000..84a86807c15 --- /dev/null +++ b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * This file was derived from DoAutoCompaction.scala + * from https://github.com/delta-io/delta/pull/1156 + * in the Delta Lake project at https://github.com/delta-io/delta. + * + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.sql.transaction.tahoe.rapids + +import com.databricks.sql.transaction.tahoe._ +import com.databricks.sql.transaction.tahoe.actions.Action +import com.databricks.sql.transaction.tahoe.hooks.PostCommitHook +import com.databricks.sql.transaction.tahoe.metering.DeltaLogging + +import org.apache.spark.sql.SparkSession + +object GpuDoAutoCompaction extends PostCommitHook + with DeltaLogging + with Serializable { + override val name: String = "Triggers compaction if necessary" + + override def run(spark: SparkSession, + txn: OptimisticTransactionImpl, + committedActions: Seq[Action]): Unit = { + val gpuTxn = txn.asInstanceOf[GpuOptimisticTransaction] + val newTxn = new GpuDeltaLog(gpuTxn.deltaLog, gpuTxn.rapidsConf).startTransaction() + // Note: The Databricks AutoCompact PostCommitHook cannot be used here + // (with a GpuOptimisticTransaction). It appears that AutoCompact creates a new transaction, + // thereby circumventing GpuOptimisticTransaction (which intercepts Parquet writes + // to go through the GPU). + new GpuOptimizeExecutor(spark, newTxn, Seq.empty, Seq.empty, committedActions).optimize() + } + + override def handleError(error: Throwable, version: Long): Unit = + throw DeltaErrors.postCommitHookFailedException(this, version, name, error) +} \ No newline at end of file diff --git a/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala index bde3e40203a..4b4b1616527 100644 --- a/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala +++ b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala @@ -252,6 +252,23 @@ class GpuOptimisticTransaction( identityTracker.foreach { tracker => updatedIdentityHighWaterMarks.appendAll(tracker.highWaterMarks.toSeq) } - resultFiles.toSeq ++ committer.changeFiles + val fileActions = resultFiles.toSeq ++ committer.changeFiles + + // Check if auto-compaction is enabled. + // (Auto compaction checks are derived from the work in + // https://github.com/delta-io/delta/pull/1156). + lazy val autoCompactEnabled = + spark.sessionState.conf + .getConf[String](DeltaSQLConf.DELTA_AUTO_COMPACT_ENABLED) + .getOrElse { + DeltaConfigs.AUTO_COMPACT.fromMetaData(metadata) + .getOrElse("false") + }.toBoolean + + if (!isOptimize && autoCompactEnabled && fileActions.nonEmpty) { + registerPostCommitHook(GpuDoAutoCompaction) + } + + fileActions } } diff --git a/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala new file mode 100644 index 00000000000..590487d4c60 --- /dev/null +++ b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala @@ -0,0 +1,401 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * This file was derived from: + * 1. DoAutoCompaction.scala from PR#1156 at https://github.com/delta-io/delta/pull/1156, + * 2. OptimizeTableCommand.scala from the Delta Lake project at https://github.com/delta-io/delta. + * + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.databricks.sql.transaction.tahoe.rapids + +import java.util.ConcurrentModificationException + +import scala.annotation.tailrec +import scala.collection.mutable.ArrayBuffer + +import com.databricks.sql.transaction.tahoe._ +import com.databricks.sql.transaction.tahoe.DeltaOperations.Operation +import com.databricks.sql.transaction.tahoe.actions.{Action, AddFile, FileAction, RemoveFile} +import com.databricks.sql.transaction.tahoe.commands.DeltaCommand +import com.databricks.sql.transaction.tahoe.commands.optimize._ +import com.databricks.sql.transaction.tahoe.files.SQLMetricsReporting +import com.databricks.sql.transaction.tahoe.sources.DeltaSQLConf +import com.nvidia.spark.rapids.delta.RapidsDeltaSQLConf + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext.SPARK_JOB_GROUP_ID +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric +import org.apache.spark.util.ThreadUtils + +class GpuOptimizeExecutor( + sparkSession: SparkSession, + txn: OptimisticTransaction, + partitionPredicate: Seq[Expression], + zOrderByColumns: Seq[String], + prevCommitActions: Seq[Action]) + extends DeltaCommand with SQLMetricsReporting with Serializable { + + /** Timestamp to use in [[FileAction]] */ + private val operationTimestamp = System.currentTimeMillis + + private val isMultiDimClustering = zOrderByColumns.nonEmpty + private val isAutoCompact = prevCommitActions.nonEmpty + private val optimizeType = GpuOptimizeType(isMultiDimClustering, isAutoCompact) + + def optimize(): Seq[Row] = { + recordDeltaOperation(txn.deltaLog, "delta.optimize") { + val maxFileSize = optimizeType.maxFileSize + require(maxFileSize > 0, "maxFileSize must be > 0") + + val minNumFilesInDir = optimizeType.minNumFiles + val (candidateFiles, filesToProcess) = optimizeType.targetFiles + val partitionSchema = txn.metadata.partitionSchema + + // select all files in case of multi-dimensional clustering + val partitionsToCompact = filesToProcess + .groupBy(_.partitionValues) + .filter { case (_, filesInPartition) => filesInPartition.size >= minNumFilesInDir } + .toSeq + + val groupedJobs = groupFilesIntoBins(partitionsToCompact, maxFileSize) + val jobs = optimizeType.targetBins(groupedJobs) + + val maxThreads = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS) + val updates = ThreadUtils.parmap(jobs, "OptimizeJob", maxThreads) { partitionBinGroup => + runOptimizeBinJob(txn, partitionBinGroup._1, partitionBinGroup._2, maxFileSize) + }.flatten + + val addedFiles = updates.collect { case a: AddFile => a } + val removedFiles = updates.collect { case r: RemoveFile => r } + if (addedFiles.nonEmpty) { + val operation = DeltaOperations.Optimize(partitionPredicate.map(_.sql), zOrderByColumns) + val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles) + commitAndRetry(txn, operation, updates, metrics) { newTxn => + val newPartitionSchema = newTxn.metadata.partitionSchema + val candidateSetOld = candidateFiles.map(_.path).toSet + val candidateSetNew = newTxn.filterFiles(partitionPredicate).map(_.path).toSet + + // As long as all of the files that we compacted are still part of the table, + // and the partitioning has not changed it is valid to continue to try + // and commit this checkpoint. + if (candidateSetOld.subsetOf(candidateSetNew) && partitionSchema == newPartitionSchema) { + true + } else { + val deleted = candidateSetOld -- candidateSetNew + logWarning(s"The following compacted files were delete " + + s"during checkpoint ${deleted.mkString(",")}. Aborting the compaction.") + false + } + } + } + + val optimizeStats = OptimizeStats() + optimizeStats.addedFilesSizeStats.merge(addedFiles) + optimizeStats.removedFilesSizeStats.merge(removedFiles) + optimizeStats.numPartitionsOptimized = jobs.map(j => j._1).distinct.size + optimizeStats.numBatches = jobs.size + optimizeStats.totalConsideredFiles = candidateFiles.size + optimizeStats.totalFilesSkipped = optimizeStats.totalConsideredFiles - removedFiles.size + + if (isMultiDimClustering) { + val inputFileStats = + ZOrderFileStats(removedFiles.size, removedFiles.map(_.size.getOrElse(0L)).sum) + optimizeStats.zOrderStats = Some(ZOrderStats( + strategyName = "all", // means process all files in a partition + inputCubeFiles = ZOrderFileStats(0, 0), + inputOtherFiles = inputFileStats, + inputNumCubes = 0, + mergedFiles = inputFileStats, + // There will one z-cube for each partition + numOutputCubes = optimizeStats.numPartitionsOptimized)) + } + + return Seq(Row(txn.deltaLog.dataPath.toString, optimizeStats.toOptimizeMetrics)) + } + } + + /** + * Utility methods to group files into bins for optimize. + * + * @param partitionsToCompact List of files to compact group by partition. + * Partition is defined by the partition values (partCol -> partValue) + * @param maxTargetFileSize Max size (in bytes) of the compaction output file. + * @return Sequence of bins. Each bin contains one or more files from the same + * partition and targeted for one output file. + */ + private def groupFilesIntoBins( + partitionsToCompact: Seq[(Map[String, String], Seq[AddFile])], + maxTargetFileSize: Long): Seq[(Map[String, String], Seq[AddFile])] = { + + partitionsToCompact.flatMap { + case (partition, files) => + val bins = new ArrayBuffer[Seq[AddFile]]() + + val currentBin = new ArrayBuffer[AddFile]() + var currentBinSize = 0L + + files.sortBy(_.size).foreach { file => + // Generally, a bin is a group of existing files, whose total size does not exceed the + // desired maxFileSize. They will be coalesced into a single output file. + // However, if isMultiDimClustering = true, all files in a partition will be read by the + // same job, the data will be range-partitioned and numFiles = totalFileSize / maxFileSize + // will be produced. See below. + if (file.size + currentBinSize > maxTargetFileSize && !isMultiDimClustering) { + bins += currentBin.toVector + currentBin.clear() + currentBin += file + currentBinSize = file.size + } else { + currentBin += file + currentBinSize += file.size + } + } + + if (currentBin.nonEmpty) { + bins += currentBin.toVector + } + + bins.map(b => (partition, b)) + // select bins that have at least two files or in case of multi-dim clustering + // select all bins + .filter(_._2.size > 1 || isMultiDimClustering) + } + } + + /** + * Utility method to run a Spark job to compact the files in given bin + * + * @param txn [[OptimisticTransaction]] instance in use to commit the changes to DeltaLog. + * @param partition Partition values of the partition that files in [[bin]] belongs to. + * @param bin List of files to compact into one large file. + * @param maxFileSize Targeted output file size in bytes + */ + private def runOptimizeBinJob( + txn: OptimisticTransaction, + partition: Map[String, String], + bin: Seq[AddFile], + maxFileSize: Long): Seq[FileAction] = { + val baseTablePath = txn.deltaLog.dataPath + + val input = txn.deltaLog.createDataFrame(txn.snapshot, bin, actionTypeOpt = Some("Optimize")) + val repartitionDF = if (isMultiDimClustering) { + // TODO: MultiDimClustering is not currently supported on Databricks 10.4. + // val totalSize = bin.map(_.size).sum + // val approxNumFiles = Math.max(1, totalSize / maxFileSize).toInt + // MultiDimClustering.cluster( + // txn.deltaLog, + // input, + // approxNumFiles, + // zOrderByColumns) + throw new UnsupportedOperationException("MultiDimClustering not supported on compaction") + } else { + // Re-partition is not available in Databricks 10.4 (spark321db) + input.coalesce(numPartitions = 1) + } + + val partitionDesc = partition.toSeq.map(entry => entry._1 + "=" + entry._2).mkString(",") + + val partitionName = if (partition.isEmpty) "" else s" in partition ($partitionDesc)" + val description = s"$baseTablePath
Optimizing ${bin.size} files" + partitionName + sparkSession.sparkContext.setJobGroup( + sparkSession.sparkContext.getLocalProperty(SPARK_JOB_GROUP_ID), + description) + + val addFiles = txn.writeFiles(repartitionDF).collect { + case a: AddFile => + a.copy(dataChange = false) + case other => + throw new IllegalStateException( + s"Unexpected action $other with type ${other.getClass}. File compaction job output" + + s"should only have AddFiles") + } + val removeFiles = bin.map(f => f.removeWithTimestamp(operationTimestamp, dataChange = false)) + val updates = addFiles ++ removeFiles + updates + } + + private type PartitionedBin = (Map[String, String], Seq[AddFile]) + + private trait GpuOptimizeType { + def minNumFiles: Long + + def maxFileSize: Long = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE) + + def targetFiles: (Seq[AddFile], Seq[AddFile]) + + def targetBins(jobs: Seq[PartitionedBin]): Seq[PartitionedBin] = jobs + } + + private case class GpuCompaction() extends GpuOptimizeType { + def minNumFiles: Long = 2 + + def targetFiles: (Seq[AddFile], Seq[AddFile]) = { + val minFileSize = sparkSession.sessionState.conf.getConf( + DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE) + require(minFileSize > 0, "minFileSize must be > 0") + val candidateFiles = txn.filterFiles(partitionPredicate) + val filesToProcess = candidateFiles.filter(_.size < minFileSize) + (candidateFiles, filesToProcess) + } + } + + private case class GpuMultiDimOrdering() extends GpuOptimizeType { + def minNumFiles: Long = 1 + + def targetFiles: (Seq[AddFile], Seq[AddFile]) = { + // select all files in case of multi-dimensional clustering + val candidateFiles = txn.filterFiles(partitionPredicate) + (candidateFiles, candidateFiles) + } + } + + private case class GpuAutoCompaction() extends GpuOptimizeType { + def minNumFiles: Long = { + val minNumFiles = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES) + require(minNumFiles > 0, "minNumFiles must be > 0") + minNumFiles + } + + override def maxFileSize: Long = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_AUTO_COMPACT_MAX_FILE_SIZE) + .getOrElse(128 * 1024 * 1024) + + override def targetFiles: (Seq[AddFile], Seq[AddFile]) = { + val autoCompactTarget = + sparkSession.sessionState.conf.getConf(RapidsDeltaSQLConf.AUTO_COMPACT_TARGET) + // Filter the candidate files according to autoCompact.target config. + lazy val addedFiles = prevCommitActions.collect { case a: AddFile => a } + val candidateFiles = autoCompactTarget match { + case "table" => + txn.filterFiles() + case "commit" => + addedFiles + case "partition" => + val eligiblePartitions = addedFiles.map(_.partitionValues).toSet + txn.filterFiles().filter(f => eligiblePartitions.contains(f.partitionValues)) + case _ => + logError(s"Invalid config for autoCompact.target: $autoCompactTarget. " + + s"Falling back to the default value 'table'.") + txn.filterFiles() + } + val filesToProcess = candidateFiles.filter(_.size < maxFileSize) + (candidateFiles, filesToProcess) + } + + override def targetBins(jobs: Seq[PartitionedBin]): Seq[PartitionedBin] = { + var acc = 0L + val maxCompactBytes = + sparkSession.sessionState.conf.getConf(RapidsDeltaSQLConf.AUTO_COMPACT_MAX_COMPACT_BYTES) + // bins with more files are prior to less files. + jobs + .sortBy { case (_, filesInBin) => -filesInBin.length } + .takeWhile { case (_, filesInBin) => + acc += filesInBin.map(_.size).sum + acc <= maxCompactBytes + } + } + } + + private object GpuOptimizeType { + + def apply(isMultiDimClustering: Boolean, isAutoCompact: Boolean): GpuOptimizeType = { + if (isMultiDimClustering) { + GpuMultiDimOrdering() + } else if (isAutoCompact) { + GpuAutoCompaction() + } else { + GpuCompaction() + } + } + } + + /** + * Attempts to commit the given actions to the log. In the case of a concurrent update, + * the given function will be invoked with a new transaction to allow custom conflict + * detection logic to indicate it is safe to try again, by returning `true`. + * + * This function will continue to try to commit to the log as long as `f` returns `true`, + * otherwise throws a subclass of [[ConcurrentModificationException]]. + */ + @tailrec + private def commitAndRetry( + txn: OptimisticTransaction, + optimizeOperation: Operation, + actions: Seq[Action], + metrics: Map[String, SQLMetric])(f: OptimisticTransaction => Boolean) + : Unit = { + try { + txn.registerSQLMetrics(sparkSession, metrics) + txn.commit(actions, optimizeOperation) + } catch { + case e: ConcurrentModificationException => + val newTxn = txn.deltaLog.startTransaction() + if (f(newTxn)) { + logInfo("Retrying commit after checking for semantic conflicts with concurrent updates.") + commitAndRetry(newTxn, optimizeOperation, actions, metrics)(f) + } else { + logWarning("Semantic conflicts detected. Aborting operation.") + throw e + } + } + } + + /** Create a map of SQL metrics for adding to the commit history. */ + private def createMetrics( + sparkContext: SparkContext, + addedFiles: Seq[AddFile], + removedFiles: Seq[RemoveFile]): Map[String, SQLMetric] = { + + def setAndReturnMetric(description: String, value: Long) = { + val metric = createMetric(sparkContext, description) + metric.set(value) + metric + } + + def totalSize(actions: Seq[FileAction]): Long = { + var totalSize = 0L + actions.foreach { file => + val fileSize = file match { + case addFile: AddFile => addFile.size + case removeFile: RemoveFile => removeFile.size.getOrElse(0L) + case default => + throw new IllegalArgumentException(s"Unknown FileAction type: ${default.getClass}") + } + totalSize += fileSize + } + totalSize + } + + val sizeStats = FileSizeStatsWithHistogram.create(addedFiles.map(_.size).sorted) + Map[String, SQLMetric]( + "minFileSize" -> setAndReturnMetric("minimum file size", sizeStats.get.min), + "p25FileSize" -> setAndReturnMetric("25th percentile file size", sizeStats.get.p25), + "p50FileSize" -> setAndReturnMetric("50th percentile file size", sizeStats.get.p50), + "p75FileSize" -> setAndReturnMetric("75th percentile file size", sizeStats.get.p75), + "maxFileSize" -> setAndReturnMetric("maximum file size", sizeStats.get.max), + "numAddedFiles" -> setAndReturnMetric("total number of files added.", addedFiles.size), + "numRemovedFiles" -> setAndReturnMetric("total number of files removed.", removedFiles.size), + "numAddedBytes" -> setAndReturnMetric("total number of bytes added", totalSize(addedFiles)), + "numRemovedBytes" -> + setAndReturnMetric("total number of bytes removed", totalSize(removedFiles))) + } +} \ No newline at end of file diff --git a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala new file mode 100644 index 00000000000..9726511ad44 --- /dev/null +++ b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * This file was derived from DoAutoCompaction.scala + * from https://github.com/delta-io/delta/pull/1156 + * in the Delta Lake project at https://github.com/delta-io/delta. + * + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.sql.transaction.tahoe.rapids + +import com.databricks.sql.transaction.tahoe._ +import com.databricks.sql.transaction.tahoe.actions.Action +import com.databricks.sql.transaction.tahoe.hooks.PostCommitHook +import com.databricks.sql.transaction.tahoe.metering.DeltaLogging + +import org.apache.spark.sql.SparkSession + +object GpuDoAutoCompaction extends PostCommitHook + with DeltaLogging + with Serializable { + override val name: String = "Triggers compaction if necessary" + + override def run(spark: SparkSession, + txn: OptimisticTransactionImpl, + committedVersion: Long, + postCommitSnapshot: Snapshot, + committedActions: Seq[Action]): Unit = { + val gpuTxn = txn.asInstanceOf[GpuOptimisticTransaction] + val newTxn = new GpuDeltaLog(gpuTxn.deltaLog, gpuTxn.rapidsConf).startTransaction() + // Note: The Databricks AutoCompact PostCommitHook cannot be used here + // (with a GpuOptimisticTransaction). It appears that AutoCompact creates a new transaction, + // thereby circumventing GpuOptimisticTransaction (which intercepts Parquet writes + // to go through the GPU). + new GpuOptimizeExecutor(spark, newTxn, Seq.empty, Seq.empty, committedActions).optimize() + } + + override def handleError(error: Throwable, version: Long): Unit = + throw DeltaErrors.postCommitHookFailedException(this, version, name, error) +} \ No newline at end of file diff --git a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala index 1ce8378a1f6..0e8c7f74fde 100644 --- a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala +++ b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala @@ -279,6 +279,23 @@ class GpuOptimisticTransaction( identityTracker.foreach { tracker => updatedIdentityHighWaterMarks.appendAll(tracker.highWaterMarks.toSeq) } - resultFiles.toSeq ++ committer.changeFiles + val fileActions = resultFiles.toSeq ++ committer.changeFiles + + // Check if auto-compaction is enabled. + // (Auto compaction checks are derived from the work in + // https://github.com/delta-io/delta/pull/1156). + lazy val autoCompactEnabled = + spark.sessionState.conf + .getConf[String](DeltaSQLConf.DELTA_AUTO_COMPACT_ENABLED) + .getOrElse { + DeltaConfigs.AUTO_COMPACT.fromMetaData(metadata) + .getOrElse("false") + }.toBoolean + + if (!isOptimize && autoCompactEnabled && fileActions.nonEmpty) { + registerPostCommitHook(GpuDoAutoCompaction) + } + + fileActions } } diff --git a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala new file mode 100644 index 00000000000..cfa1468b7c9 --- /dev/null +++ b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala @@ -0,0 +1,405 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * This file was derived from: + * 1. DoAutoCompaction.scala from PR#1156 at https://github.com/delta-io/delta/pull/1156, + * 2. OptimizeTableCommand.scala from the Delta Lake project at https://github.com/delta-io/delta. + * + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.databricks.sql.transaction.tahoe.rapids + +import java.util.ConcurrentModificationException + +import scala.annotation.tailrec +import scala.collection.mutable.ArrayBuffer + +import com.databricks.sql.io.skipping.MultiDimClustering +import com.databricks.sql.transaction.tahoe._ +import com.databricks.sql.transaction.tahoe.DeltaOperations.Operation +import com.databricks.sql.transaction.tahoe.actions.{Action, AddFile, FileAction, RemoveFile} +import com.databricks.sql.transaction.tahoe.commands.DeltaCommand +import com.databricks.sql.transaction.tahoe.commands.optimize._ +import com.databricks.sql.transaction.tahoe.files.SQLMetricsReporting +import com.databricks.sql.transaction.tahoe.sources.DeltaSQLConf +import com.nvidia.spark.rapids.delta.RapidsDeltaSQLConf + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext.SPARK_JOB_GROUP_ID +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric +import org.apache.spark.util.ThreadUtils + +class GpuOptimizeExecutor( + sparkSession: SparkSession, + txn: OptimisticTransaction, + partitionPredicate: Seq[Expression], + zOrderByColumns: Seq[String], + prevCommitActions: Seq[Action]) + extends DeltaCommand with SQLMetricsReporting with Serializable { + + /** Timestamp to use in [[FileAction]] */ + private val operationTimestamp = System.currentTimeMillis + + private val isMultiDimClustering = zOrderByColumns.nonEmpty + private val isAutoCompact = prevCommitActions.nonEmpty + private val optimizeType = GpuOptimizeType(isMultiDimClustering, isAutoCompact) + + def optimize(): Seq[Row] = { + recordDeltaOperation(txn.deltaLog, "delta.optimize") { + val maxFileSize = optimizeType.maxFileSize + require(maxFileSize > 0, "maxFileSize must be > 0") + + val minNumFilesInDir = optimizeType.minNumFiles + val (candidateFiles, filesToProcess) = optimizeType.targetFiles + val partitionSchema = txn.metadata.partitionSchema + + // select all files in case of multi-dimensional clustering + val partitionsToCompact = filesToProcess + .groupBy(_.partitionValues) + .filter { case (_, filesInPartition) => filesInPartition.size >= minNumFilesInDir } + .toSeq + + val groupedJobs = groupFilesIntoBins(partitionsToCompact, maxFileSize) + val jobs = optimizeType.targetBins(groupedJobs) + + val maxThreads = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS) + val updates = ThreadUtils.parmap(jobs, "OptimizeJob", maxThreads) { partitionBinGroup => + runOptimizeBinJob(txn, partitionBinGroup._1, partitionBinGroup._2, maxFileSize) + }.flatten + + val addedFiles = updates.collect { case a: AddFile => a } + val removedFiles = updates.collect { case r: RemoveFile => r } + if (addedFiles.nonEmpty) { + val operation = DeltaOperations.Optimize(partitionPredicate.map(_.sql), zOrderByColumns) + val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles) + commitAndRetry(txn, operation, updates, metrics) { newTxn => + val newPartitionSchema = newTxn.metadata.partitionSchema + val candidateSetOld = candidateFiles.map(_.path).toSet + val candidateSetNew = newTxn.filterFiles(partitionPredicate).map(_.path).toSet + + // As long as all of the files that we compacted are still part of the table, + // and the partitioning has not changed it is valid to continue to try + // and commit this checkpoint. + if (candidateSetOld.subsetOf(candidateSetNew) && partitionSchema == newPartitionSchema) { + true + } else { + val deleted = candidateSetOld -- candidateSetNew + logWarning(s"The following compacted files were delete " + + s"during checkpoint ${deleted.mkString(",")}. Aborting the compaction.") + false + } + } + } + + val optimizeStats = OptimizeStats() + optimizeStats.addedFilesSizeStats.merge(addedFiles) + optimizeStats.removedFilesSizeStats.merge(removedFiles) + optimizeStats.numPartitionsOptimized = jobs.map(j => j._1).distinct.size + optimizeStats.numBatches = jobs.size + optimizeStats.totalConsideredFiles = candidateFiles.size + optimizeStats.totalFilesSkipped = optimizeStats.totalConsideredFiles - removedFiles.size + optimizeStats.totalClusterParallelism = sparkSession.sparkContext.defaultParallelism + + if (isMultiDimClustering) { + val inputFileStats = + ZOrderFileStats(removedFiles.size, removedFiles.map(_.size.getOrElse(0L)).sum) + optimizeStats.zOrderStats = Some(ZOrderStats( + strategyName = "all", // means process all files in a partition + inputCubeFiles = ZOrderFileStats(0, 0), + inputOtherFiles = inputFileStats, + inputNumCubes = 0, + mergedFiles = inputFileStats, + // There will one z-cube for each partition + numOutputCubes = optimizeStats.numPartitionsOptimized)) + } + + return Seq(Row(txn.deltaLog.dataPath.toString, optimizeStats.toOptimizeMetrics)) + } + } + + /** + * Utility methods to group files into bins for optimize. + * + * @param partitionsToCompact List of files to compact group by partition. + * Partition is defined by the partition values (partCol -> partValue) + * @param maxTargetFileSize Max size (in bytes) of the compaction output file. + * @return Sequence of bins. Each bin contains one or more files from the same + * partition and targeted for one output file. + */ + private def groupFilesIntoBins( + partitionsToCompact: Seq[(Map[String, String], Seq[AddFile])], + maxTargetFileSize: Long): Seq[(Map[String, String], Seq[AddFile])] = { + + partitionsToCompact.flatMap { + case (partition, files) => + val bins = new ArrayBuffer[Seq[AddFile]]() + + val currentBin = new ArrayBuffer[AddFile]() + var currentBinSize = 0L + + files.sortBy(_.size).foreach { file => + // Generally, a bin is a group of existing files, whose total size does not exceed the + // desired maxFileSize. They will be coalesced into a single output file. + // However, if isMultiDimClustering = true, all files in a partition will be read by the + // same job, the data will be range-partitioned and numFiles = totalFileSize / maxFileSize + // will be produced. See below. + if (file.size + currentBinSize > maxTargetFileSize && !isMultiDimClustering) { + bins += currentBin.toVector + currentBin.clear() + currentBin += file + currentBinSize = file.size + } else { + currentBin += file + currentBinSize += file.size + } + } + + if (currentBin.nonEmpty) { + bins += currentBin.toVector + } + + bins.map(b => (partition, b)) + // select bins that have at least two files or in case of multi-dim clustering + // select all bins + .filter(_._2.size > 1 || isMultiDimClustering) + } + } + + /** + * Utility method to run a Spark job to compact the files in given bin + * + * @param txn [[OptimisticTransaction]] instance in use to commit the changes to DeltaLog. + * @param partition Partition values of the partition that files in [[bin]] belongs to. + * @param bin List of files to compact into one large file. + * @param maxFileSize Targeted output file size in bytes + */ + private def runOptimizeBinJob( + txn: OptimisticTransaction, + partition: Map[String, String], + bin: Seq[AddFile], + maxFileSize: Long): Seq[FileAction] = { + val baseTablePath = txn.deltaLog.dataPath + + val input = txn.deltaLog.createDataFrame(txn.snapshot, bin, actionTypeOpt = Some("Optimize")) + val repartitionDF = if (isMultiDimClustering) { + val totalSize = bin.map(_.size).sum + val approxNumFiles = Math.max(1, totalSize / maxFileSize).toInt + MultiDimClustering.cluster( + input, + approxNumFiles, + zOrderByColumns) + } else { + val useRepartition = sparkSession.sessionState.conf.getConf( + DeltaSQLConf.DELTA_OPTIMIZE_REPARTITION_ENABLED) + if (useRepartition) { + input.repartition(numPartitions = 1) + } else { + input.coalesce(numPartitions = 1) + } + } + + val partitionDesc = partition.toSeq.map(entry => entry._1 + "=" + entry._2).mkString(",") + + val partitionName = if (partition.isEmpty) "" else s" in partition ($partitionDesc)" + val description = s"$baseTablePath
Optimizing ${bin.size} files" + partitionName + sparkSession.sparkContext.setJobGroup( + sparkSession.sparkContext.getLocalProperty(SPARK_JOB_GROUP_ID), + description) + + val addFiles = txn.writeFiles(repartitionDF).collect { + case a: AddFile => + a.copy(dataChange = false) + case other => + throw new IllegalStateException( + s"Unexpected action $other with type ${other.getClass}. File compaction job output" + + s"should only have AddFiles") + } + val removeFiles = bin.map(f => f.removeWithTimestamp(operationTimestamp, dataChange = false)) + val updates = addFiles ++ removeFiles + updates + } + + private type PartitionedBin = (Map[String, String], Seq[AddFile]) + + private trait GpuOptimizeType { + def minNumFiles: Long + + def maxFileSize: Long = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE) + + def targetFiles: (Seq[AddFile], Seq[AddFile]) + + def targetBins(jobs: Seq[PartitionedBin]): Seq[PartitionedBin] = jobs + } + + private case class GpuCompaction() extends GpuOptimizeType { + def minNumFiles: Long = 2 + + def targetFiles: (Seq[AddFile], Seq[AddFile]) = { + val minFileSize = sparkSession.sessionState.conf.getConf( + DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE) + require(minFileSize > 0, "minFileSize must be > 0") + val candidateFiles = txn.filterFiles(partitionPredicate) + val filesToProcess = candidateFiles.filter(_.size < minFileSize) + (candidateFiles, filesToProcess) + } + } + + private case class GpuMultiDimOrdering() extends GpuOptimizeType { + def minNumFiles: Long = 1 + + def targetFiles: (Seq[AddFile], Seq[AddFile]) = { + // select all files in case of multi-dimensional clustering + val candidateFiles = txn.filterFiles(partitionPredicate) + (candidateFiles, candidateFiles) + } + } + + private case class GpuAutoCompaction() extends GpuOptimizeType { + def minNumFiles: Long = { + val minNumFiles = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES) + require(minNumFiles > 0, "minNumFiles must be > 0") + minNumFiles + } + + override def maxFileSize: Long = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_AUTO_COMPACT_MAX_FILE_SIZE) + .getOrElse(128 * 1024 * 1024) + + override def targetFiles: (Seq[AddFile], Seq[AddFile]) = { + val autoCompactTarget = + sparkSession.sessionState.conf.getConf(RapidsDeltaSQLConf.AUTO_COMPACT_TARGET) + // Filter the candidate files according to autoCompact.target config. + lazy val addedFiles = prevCommitActions.collect { case a: AddFile => a } + val candidateFiles = autoCompactTarget match { + case "table" => + txn.filterFiles() + case "commit" => + addedFiles + case "partition" => + val eligiblePartitions = addedFiles.map(_.partitionValues).toSet + txn.filterFiles().filter(f => eligiblePartitions.contains(f.partitionValues)) + case _ => + logError(s"Invalid config for autoCompact.target: $autoCompactTarget. " + + s"Falling back to the default value 'table'.") + txn.filterFiles() + } + val filesToProcess = candidateFiles.filter(_.size < maxFileSize) + (candidateFiles, filesToProcess) + } + + override def targetBins(jobs: Seq[PartitionedBin]): Seq[PartitionedBin] = { + var acc = 0L + val maxCompactBytes = + sparkSession.sessionState.conf.getConf(RapidsDeltaSQLConf.AUTO_COMPACT_MAX_COMPACT_BYTES) + // bins with more files are prior to less files. + jobs + .sortBy { case (_, filesInBin) => -filesInBin.length } + .takeWhile { case (_, filesInBin) => + acc += filesInBin.map(_.size).sum + acc <= maxCompactBytes + } + } + } + + private object GpuOptimizeType { + + def apply(isMultiDimClustering: Boolean, isAutoCompact: Boolean): GpuOptimizeType = { + if (isMultiDimClustering) { + GpuMultiDimOrdering() + } else if (isAutoCompact) { + GpuAutoCompaction() + } else { + GpuCompaction() + } + } + } + + /** + * Attempts to commit the given actions to the log. In the case of a concurrent update, + * the given function will be invoked with a new transaction to allow custom conflict + * detection logic to indicate it is safe to try again, by returning `true`. + * + * This function will continue to try to commit to the log as long as `f` returns `true`, + * otherwise throws a subclass of [[ConcurrentModificationException]]. + */ + @tailrec + private def commitAndRetry( + txn: OptimisticTransaction, + optimizeOperation: Operation, + actions: Seq[Action], + metrics: Map[String, SQLMetric])(f: OptimisticTransaction => Boolean) + : Unit = { + try { + txn.registerSQLMetrics(sparkSession, metrics) + txn.commit(actions, optimizeOperation) + } catch { + case e: ConcurrentModificationException => + val newTxn = txn.deltaLog.startTransaction() + if (f(newTxn)) { + logInfo("Retrying commit after checking for semantic conflicts with concurrent updates.") + commitAndRetry(newTxn, optimizeOperation, actions, metrics)(f) + } else { + logWarning("Semantic conflicts detected. Aborting operation.") + throw e + } + } + } + + /** Create a map of SQL metrics for adding to the commit history. */ + private def createMetrics( + sparkContext: SparkContext, + addedFiles: Seq[AddFile], + removedFiles: Seq[RemoveFile]): Map[String, SQLMetric] = { + + def setAndReturnMetric(description: String, value: Long) = { + val metric = createMetric(sparkContext, description) + metric.set(value) + metric + } + + def totalSize(actions: Seq[FileAction]): Long = { + var totalSize = 0L + actions.foreach { file => + val fileSize = file match { + case addFile: AddFile => addFile.size + case removeFile: RemoveFile => removeFile.size.getOrElse(0L) + case default => + throw new IllegalArgumentException(s"Unknown FileAction type: ${default.getClass}") + } + totalSize += fileSize + } + totalSize + } + + val sizeStats = FileSizeStatsWithHistogram.create(addedFiles.map(_.size).sorted) + Map[String, SQLMetric]( + "minFileSize" -> setAndReturnMetric("minimum file size", sizeStats.get.min), + "p25FileSize" -> setAndReturnMetric("25th percentile file size", sizeStats.get.p25), + "p50FileSize" -> setAndReturnMetric("50th percentile file size", sizeStats.get.p50), + "p75FileSize" -> setAndReturnMetric("75th percentile file size", sizeStats.get.p75), + "maxFileSize" -> setAndReturnMetric("maximum file size", sizeStats.get.max), + "numAddedFiles" -> setAndReturnMetric("total number of files added.", addedFiles.size), + "numRemovedFiles" -> setAndReturnMetric("total number of files removed.", removedFiles.size), + "numAddedBytes" -> setAndReturnMetric("total number of bytes added", totalSize(addedFiles)), + "numRemovedBytes" -> + setAndReturnMetric("total number of bytes removed", totalSize(removedFiles))) + } +} \ No newline at end of file diff --git a/docs/additional-functionality/delta-lake-support.md b/docs/additional-functionality/delta-lake-support.md index 6bfdce172c6..5ea871c9f72 100644 --- a/docs/additional-functionality/delta-lake-support.md +++ b/docs/additional-functionality/delta-lake-support.md @@ -54,8 +54,7 @@ GPU accelerated. These operations will fallback to the CPU. Delta Lake on Databricks has [automatic optimization](https://docs.databricks.com/optimizations/auto-optimize.html) -features for optimized writes and automatic compaction. Automatic compaction is not supported, -and writes configured for automatic compaction will fallback to the CPU. +features for optimized writes and automatic compaction. Optimized writes are supported only on Databricks platforms. The algorithm used is similar but not identical to the Databricks version. The following table describes configuration settings @@ -67,6 +66,16 @@ that control the operation of the optimized write. | spark.databricks.delta.optimizeWrite.smallPartitionFactor | 0.5 | Merge partitions smaller than this factor multiplied by the target partition size | | spark.databricks.delta.optimizeWrite.mergedPartitionFactor | 1.2 | Avoid combining partitions larger than this factor multiplied by the target partition size | +Automatic compaction is supported only on Databricks platforms. The algorithm is similar but +not identical to the Databricks version. The following table describes configuration settings +that control the operation of automatic compaction. + +| Configuration | Default | Description | +|---------------------------------------------------------------------|---------|--------------------------------------------------------------------------------------------------------| +| spark.databricks.delta.autoCompact.enabled | false | Enable/disable auto compaction for writes to Delta directories | +| spark.databricks.delta.properties.defaults.autoOptimize.autoCompact | false | Whether to enable auto compaction by default, if spark.databricks.delta.autoCompact.enabled is not set | +| spark.databricks.delta.autoCompact.minNumFiles | 50 | Minimum number of files in the Delta directory before which auto optimize does not begin compaction | + Note that optimized write support requires round-robin partitioning of the data, and round-robin partitioning requires sorting across all columns for deterministic operation. If the GPU cannot support sorting a particular column type in order to support the round-robin partitioning, the diff --git a/integration_tests/src/main/python/delta_lake_auto_compact_test.py b/integration_tests/src/main/python/delta_lake_auto_compact_test.py new file mode 100644 index 00000000000..937da312589 --- /dev/null +++ b/integration_tests/src/main/python/delta_lake_auto_compact_test.py @@ -0,0 +1,224 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from asserts import assert_gpu_and_cpu_writes_are_equal_collect, with_cpu_session, with_gpu_session +from data_gen import copy_and_update +from delta_lake_write_test import delta_meta_allow +from marks import allow_non_gpu, delta_lake +from pyspark.sql.functions import * +from spark_session import is_databricks104_or_later + +_conf = {'spark.rapids.sql.explain': 'ALL', + 'spark.databricks.delta.autoCompact.minNumFiles': 3} # Num files before compaction. + + +def write_to_delta(num_rows=30, is_partitioned=False, num_writes=3): + """ + Returns bound function that writes to a delta table. + """ + + def write(spark, table_path): + input_data = spark.range(num_rows) + input_data = input_data.withColumn("part", expr("id % 3")) if is_partitioned \ + else input_data.repartition(1) + writer = input_data.write.format("delta").mode("append") + for _ in range(num_writes): + writer.save(table_path) + + return write + + +@delta_lake +@allow_non_gpu(*delta_meta_allow) +@pytest.mark.skipif(not is_databricks104_or_later(), + reason="Auto compaction of Delta Lake tables is only supported " + "on Databricks 10.4+") +@pytest.mark.parametrize("auto_compact_conf", + ["spark.databricks.delta.autoCompact.enabled", + "spark.databricks.delta.properties.defaults.autoOptimize.autoCompact"]) +def test_auto_compact_basic(spark_tmp_path, auto_compact_conf): + """ + This test checks whether the results of auto compactions on an un-partitioned table + match, when written via CPU and GPU. + It also checks that the snapshot metrics (number of files added/removed, etc.) + match. + """ + from delta.tables import DeltaTable + data_path = spark_tmp_path + "/AUTO_COMPACT_TEST_DATA" + + def read_data(spark, table_path): + return spark.read.format("delta").load(table_path) + + assert_gpu_and_cpu_writes_are_equal_collect( + write_func=write_to_delta(is_partitioned=False), + read_func=read_data, + base_path=data_path, + conf=_conf) + + def read_metadata(spark, table_path): + input_table = DeltaTable.forPath(spark, table_path) + table_history = input_table.history() + return table_history.select( + "version", + "operation", + expr("operationMetrics[\"numFiles\"]").alias("numFiles"), + expr("operationMetrics[\"numRemovedFiles\"]").alias("numRemoved"), + expr("operationMetrics[\"numAddedFiles\"]").alias("numAdded") + ) + + conf_enable_auto_compact = copy_and_update(_conf, {auto_compact_conf: "true"}) + + assert_gpu_and_cpu_writes_are_equal_collect( + write_func=lambda spark, table_path: None, # Already written. + read_func=read_metadata, + base_path=data_path, + conf=conf_enable_auto_compact) + + +@delta_lake +@allow_non_gpu(*delta_meta_allow) +@pytest.mark.skipif(not is_databricks104_or_later(), + reason="Auto compaction of Delta Lake tables is only supported " + "on Databricks 10.4+") +@pytest.mark.parametrize("auto_compact_conf", + ["spark.databricks.delta.autoCompact.enabled", + "spark.databricks.delta.properties.defaults.autoOptimize.autoCompact"]) +def test_auto_compact_partitioned(spark_tmp_path, auto_compact_conf): + """ + This test checks whether the results of auto compaction on a partitioned table + match, when written via CPU and GPU. + Note: The behaviour of compaction itself differs from Databricks, in that + the plugin enforces `minFiles` restriction uniformly across all partitions. + Databricks' Delta implementation appears not to. + """ + from delta.tables import DeltaTable + data_path = spark_tmp_path + "/AUTO_COMPACT_TEST_DATA_PARTITIONED" + + def read_data(spark, table_path): + return spark.read.format("delta").load(table_path).orderBy("id", "part") + + assert_gpu_and_cpu_writes_are_equal_collect( + write_func=write_to_delta(is_partitioned=True), + read_func=read_data, + base_path=data_path, + conf=_conf) + + def read_metadata(spark, table_path): + """ + The snapshots might not look alike, in the partitioned case. + Ensure that auto compaction has occurred, even if it's not identical. + """ + input_table = DeltaTable.forPath(spark, table_path) + table_history = input_table.history() + return table_history.select( + "version", + "operation", + expr("operationMetrics[\"numFiles\"] > 0").alias("numFiles_gt_0"), + expr("operationMetrics[\"numRemovedFiles\"] > 0").alias("numRemoved_gt_0"), + expr("operationMetrics[\"numAddedFiles\"] > 0").alias("numAdded_gt_0") + ) + + conf_enable_auto_compact = copy_and_update(_conf, {auto_compact_conf: "true"}) + + assert_gpu_and_cpu_writes_are_equal_collect( + write_func=lambda spark, table_path: None, # Already written. + read_func=read_metadata, + base_path=data_path, + conf=conf_enable_auto_compact) + + +@delta_lake +@allow_non_gpu(*delta_meta_allow) +@pytest.mark.skipif(not is_databricks104_or_later(), + reason="Auto compaction of Delta Lake tables is only supported " + "on Databricks 10.4+") +@pytest.mark.parametrize("auto_compact_conf", + ["spark.databricks.delta.autoCompact.enabled", + "spark.databricks.delta.properties.defaults.autoOptimize.autoCompact"]) +def test_auto_compact_disabled(spark_tmp_path, auto_compact_conf): + """ + This test verifies that auto-compaction does not run if disabled. + """ + from delta.tables import DeltaTable + data_path = spark_tmp_path + "/AUTO_COMPACT_TEST_CHECK_DISABLED" + + disable_auto_compaction = copy_and_update(_conf, {auto_compact_conf: 'false'}) + + writer = write_to_delta(num_writes=10) + with_gpu_session(func=lambda spark: writer(spark, data_path), + conf=disable_auto_compaction) + + # 10 writes should correspond to 10 commits. + # (i.e. there should be no OPTIMIZE commits.) + def verify_table_history(spark): + input_table = DeltaTable.forPath(spark, data_path) + table_history = input_table.history() + assert table_history.select("version", "operation").count() == 10, \ + "Expected 10 versions, 1 for each WRITE." + assert table_history.select("version")\ + .where("operation = 'OPTIMIZE'")\ + .count() == 0,\ + "Expected 0 OPTIMIZE operations." + + with_cpu_session(verify_table_history, {}) + + +@delta_lake +@allow_non_gpu(*delta_meta_allow) +@pytest.mark.skipif(not is_databricks104_or_later(), + reason="Auto compaction of Delta Lake tables is only supported " + "on Databricks 10.4+") +def test_auto_compact_min_num_files(spark_tmp_path): + """ + This test verifies that auto-compaction honours the minNumFiles setting. + """ + from delta.tables import DeltaTable + data_path = spark_tmp_path + "/AUTO_COMPACT_TEST_MIN_FILES" + enable_auto_compaction_on_5 = { + 'spark.databricks.delta.autoCompact.enabled': 'true', # Enable auto compaction. + 'spark.databricks.delta.autoCompact.minNumFiles': 5 # Num files before compaction. + } + + # Minimum number of input files == 5. + # If 4 files are written, there should be no OPTIMIZE. + writer = write_to_delta(num_writes=4) + with_gpu_session(func=lambda spark: writer(spark, data_path), + conf=enable_auto_compaction_on_5) + + def verify_table_history_before_limit(spark): + input_table = DeltaTable.forPath(spark, data_path) + table_history = input_table.history() + assert table_history.select("version", "operation").count() == 4, \ + "Expected 4 versions, 1 for each WRITE." + assert table_history.select("version") \ + .where("operation = 'OPTIMIZE'") \ + .count() == 0, \ + "Expected 0 OPTIMIZE operations." + with_cpu_session(verify_table_history_before_limit, {}) + + # On the 5th file write, auto-OPTIMIZE should kick in. + with_gpu_session(func=lambda spark: write_to_delta(num_writes=1)(spark, data_path), + conf=enable_auto_compaction_on_5) + + def verify_table_history_after_limit(spark): + input_table = DeltaTable.forPath(spark, data_path) + table_history = input_table.history() + assert table_history.select("version", "operation").count() == 6, \ + "Expected 6 versions, i.e. 5 WRITEs + 1 OPTIMIZE." + assert table_history.select("version") \ + .where("operation = 'OPTIMIZE'") \ + .count() == 1, \ + "Expected 1 OPTIMIZE operations." + with_cpu_session(verify_table_history_after_limit, {}) diff --git a/integration_tests/src/main/python/delta_lake_write_test.py b/integration_tests/src/main/python/delta_lake_write_test.py index ab0817f92fc..5966d06bf52 100644 --- a/integration_tests/src/main/python/delta_lake_write_test.py +++ b/integration_tests/src/main/python/delta_lake_write_test.py @@ -663,8 +663,7 @@ def test_delta_write_auto_optimize_write_opts_fallback(confkey, spark_tmp_path): pytest.param("delta.autoOptimize", marks=pytest.mark.skipif( is_databricks_runtime(), reason="Optimize write is supported on Databricks")), pytest.param("delta.autoOptimize.optimizeWrite", marks=pytest.mark.skipif( - is_databricks_runtime(), reason="Optimize write is supported on Databricks")), - "delta.autoOptimize.autoCompact" ], ids=idfn) + is_databricks_runtime(), reason="Optimize write is supported on Databricks"))], ids=idfn) @pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.skipif(not is_databricks_runtime(), reason="Auto optimize only supported on Databricks") def test_delta_write_auto_optimize_table_props_fallback(confkey, spark_tmp_path): @@ -687,8 +686,7 @@ def setup_tables(spark): pytest.param("spark.databricks.delta.optimizeWrite.enabled", marks=pytest.mark.skipif( is_databricks_runtime(), reason="Optimize write is supported on Databricks")), pytest.param("spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite", marks=pytest.mark.skipif( - is_databricks_runtime(), reason="Optimize write is supported on Databricks")), - "spark.databricks.delta.properties.defaults.autoOptimize.autoCompact" ], ids=idfn) + is_databricks_runtime(), reason="Optimize write is supported on Databricks"))], ids=idfn) @pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_auto_optimize_sql_conf_fallback(confkey, spark_tmp_path): data_path = spark_tmp_path + "/DELTA_DATA"