From e1520a5afd6042422e390f3510bbd868e6224f12 Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 9 Mar 2023 22:30:46 -0800 Subject: [PATCH 01/17] Rebased auto-compaction. Still bug: Compaction is not routed through GpuParquetFileFormat. --- .../rapids/delta/RapidsDeltaSQLConf.scala | 28 ++ .../spark/rapids/delta/RapidsDeltaUtils.scala | 11 - .../tahoe/rapids/GpuDoAutoCompaction.scala | 48 +++ .../rapids/GpuOptimisticTransaction.scala | 23 +- .../tahoe/rapids/GpuOptimizeExecutor.scala | 405 ++++++++++++++++++ .../python/delta_lake_auto_compact_test.py | 69 +++ 6 files changed, 571 insertions(+), 13 deletions(-) create mode 100644 delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala create mode 100644 delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala create mode 100644 integration_tests/src/main/python/delta_lake_auto_compact_test.py diff --git a/delta-lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaSQLConf.scala b/delta-lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaSQLConf.scala index 55fe00df830..92e5c171301 100644 --- a/delta-lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaSQLConf.scala +++ b/delta-lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaSQLConf.scala @@ -22,8 +22,12 @@ package com.nvidia.spark.rapids.delta +import java.util.Locale + import com.databricks.sql.transaction.tahoe.sources.DeltaSQLConf +import org.apache.spark.network.util.ByteUnit + /** Delta Lake related configs that are not yet provided by Delta Lake. */ trait RapidsDeltaSQLConf { val OPTIMIZE_WRITE_SMALL_PARTITION_FACTOR = @@ -39,6 +43,30 @@ trait RapidsDeltaSQLConf { .doc("Factor used to rebalance partitions for optimize write.") .doubleConf .createWithDefault(1.2) + + val AUTO_COMPACT_TARGET = + DeltaSQLConf.buildConf("autoCompact.target") + .internal() + .doc( + """ + |Target files for auto compaction. + | "table", "commit", "partition" options are available. (default: partition) + | If "table", all files in table are eligible for auto compaction. + | If "commit", added/updated files by the commit are eligible. + | If "partition", all files in partitions containing any added/updated files + | by the commit are eligible. + |""".stripMargin + ) + .stringConf + .transform(_.toLowerCase(Locale.ROOT)) + .createWithDefault("partition") + + val AUTO_COMPACT_MAX_COMPACT_BYTES = + DeltaSQLConf.buildConf("autoCompact.maxCompactBytes") + .internal() + .doc("Maximum amount of data for auto compaction.") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("20GB") } object RapidsDeltaSQLConf extends RapidsDeltaSQLConf diff --git a/delta-lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaUtils.scala b/delta-lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaUtils.scala index 2dfc7d671e3..99f110acb88 100644 --- a/delta-lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaUtils.scala +++ b/delta-lake/common/src/main/databricks/scala/com/nvidia/spark/rapids/delta/RapidsDeltaUtils.scala @@ -90,16 +90,5 @@ object RapidsDeltaUtils { } } } - - val autoCompactEnabled = - getSQLConf("spark.databricks.delta.autoCompact.enabled").orElse { - val metadata = deltaLog.snapshot.metadata - metadata.configuration.get("delta.autoOptimize.autoCompact").orElse { - getSQLConf("spark.databricks.delta.properties.defaults.autoOptimize.autoCompact") - } - }.exists(_.toBoolean) - if (autoCompactEnabled) { - meta.willNotWorkOnGpu("automatic compaction of Delta Lake tables is not supported") - } } } diff --git a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala new file mode 100644 index 00000000000..663c7917f48 --- /dev/null +++ b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * This file was derived from DoAutoCompaction.scala + * from https://github.com/delta-io/delta/pull/1156 + * in the Delta Lake project at https://github.com/delta-io/delta. + * + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.sql.transaction.tahoe.rapids + +import com.databricks.sql.transaction.tahoe.actions.Action +import com.databricks.sql.transaction.tahoe.hooks.PostCommitHook +import com.databricks.sql.transaction.tahoe.metering.DeltaLogging +import com.databricks.sql.transaction.tahoe._ + +import org.apache.spark.sql.SparkSession + +object GpuDoAutoCompaction extends PostCommitHook + with DeltaLogging + with Serializable { + override val name: String = "GpuDoAutoCompaction" + + override def run(spark: SparkSession, + txn: OptimisticTransactionImpl, + committedVersion: Long, + postCommitSnapshot: Snapshot, + committedActions: Seq[Action]): Unit = { + val newTxn = txn.deltaLog.startTransaction() + new GpuOptimizeExecutor(spark, newTxn, Seq.empty, Seq.empty, committedActions).optimize() + } + + override def handleError(error: Throwable, version: Long): Unit = + throw DeltaErrors.postCommitHookFailedException(this, version, name, error) +} \ No newline at end of file diff --git a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala index 1ce8378a1f6..dc80a866ae5 100644 --- a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala +++ b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala @@ -228,7 +228,9 @@ class GpuOptimisticTransaction( } val gpuFileFormat = deltaLog.fileFormat(metadata) match { - case _: DeltaParquetFileFormat => new GpuParquetFileFormat + case _: DeltaParquetFileFormat => + println("CALEB: GPU Writing Parquet!") + new GpuParquetFileFormat case f => throw new IllegalStateException(s"file format $f is not supported") } @@ -279,6 +281,23 @@ class GpuOptimisticTransaction( identityTracker.foreach { tracker => updatedIdentityHighWaterMarks.appendAll(tracker.highWaterMarks.toSeq) } - resultFiles.toSeq ++ committer.changeFiles + val fileActions = resultFiles.toSeq ++ committer.changeFiles + + // Check if auto-compaction is enabled. + // (Auto compaction checks are derived from the work in + // https://github.com/delta-io/delta/pull/1156). + lazy val autoCompactEnabled = + spark.sessionState.conf + .getConf[String](DeltaSQLConf.DELTA_AUTO_COMPACT_ENABLED) + .getOrElse { + // DeltaConfigs.AUTO_COMPACT.fromMetaData(metadata) + "false" // TODO: Fix getting this from DeltaConfigs.AUTO_COMPACT. + }.toBoolean + + if (!isOptimize && autoCompactEnabled && fileActions.nonEmpty) { + registerPostCommitHook(GpuDoAutoCompaction) + } + + fileActions } } diff --git a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala new file mode 100644 index 00000000000..d1075837f6a --- /dev/null +++ b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala @@ -0,0 +1,405 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * This file was derived from: + * 1. DoAutoCompaction.scala from PR#1156 at https://github.com/delta-io/delta/pull/1156, + * 2. OptimizeTableCommand.scala from the Delta Lake project at https://github.com/delta-io/delta. + * + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.databricks.sql.transaction.tahoe.rapids + +import java.util.ConcurrentModificationException + +import scala.annotation.tailrec +import scala.collection.mutable.ArrayBuffer + +import com.databricks.sql.io.skipping.MultiDimClustering +import com.databricks.sql.transaction.tahoe._ +import com.databricks.sql.transaction.tahoe.DeltaOperations.Operation +import com.databricks.sql.transaction.tahoe.actions.{Action, AddFile, FileAction, RemoveFile} +import com.databricks.sql.transaction.tahoe.commands.DeltaCommand +import com.databricks.sql.transaction.tahoe.commands.optimize._ +import com.databricks.sql.transaction.tahoe.files.SQLMetricsReporting +import com.databricks.sql.transaction.tahoe.sources.DeltaSQLConf +import com.nvidia.spark.rapids.delta.RapidsDeltaSQLConf + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext.SPARK_JOB_GROUP_ID +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric +import org.apache.spark.util.{SystemClock, ThreadUtils} + +class GpuOptimizeExecutor( + sparkSession: SparkSession, + txn: OptimisticTransaction, + partitionPredicate: Seq[Expression], + zOrderByColumns: Seq[String], + prevCommitActions: Seq[Action]) + extends DeltaCommand with SQLMetricsReporting with Serializable { + + /** Timestamp to use in [[FileAction]] */ + private val operationTimestamp = new SystemClock().getTimeMillis() + + private val isMultiDimClustering = zOrderByColumns.nonEmpty + private val isAutoCompact = prevCommitActions.nonEmpty + private val optimizeType = OptimizeType(isMultiDimClustering, isAutoCompact) + + def optimize(): Seq[Row] = { + recordDeltaOperation(txn.deltaLog, "delta.optimize") { + val maxFileSize = optimizeType.maxFileSize + require(maxFileSize > 0, "maxFileSize must be > 0") + + val minNumFilesInDir = optimizeType.minNumFiles + val (candidateFiles, filesToProcess) = optimizeType.targetFiles + val partitionSchema = txn.metadata.partitionSchema + + // select all files in case of multi-dimensional clustering + val partitionsToCompact = filesToProcess + .groupBy(_.partitionValues) + .filter { case (_, filesInPartition) => filesInPartition.size >= minNumFilesInDir } + .toSeq + + val groupedJobs = groupFilesIntoBins(partitionsToCompact, maxFileSize) + val jobs = optimizeType.targetBins(groupedJobs) + + val maxThreads = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS) + val updates = ThreadUtils.parmap(jobs, "OptimizeJob", maxThreads) { partitionBinGroup => + runOptimizeBinJob(txn, partitionBinGroup._1, partitionBinGroup._2, maxFileSize) + }.flatten + + val addedFiles = updates.collect { case a: AddFile => a } + val removedFiles = updates.collect { case r: RemoveFile => r } + if (addedFiles.nonEmpty) { + val operation = DeltaOperations.Optimize(partitionPredicate.map(_.sql), zOrderByColumns) + val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles) + commitAndRetry(txn, operation, updates, metrics) { newTxn => + val newPartitionSchema = newTxn.metadata.partitionSchema + val candidateSetOld = candidateFiles.map(_.path).toSet + val candidateSetNew = newTxn.filterFiles(partitionPredicate).map(_.path).toSet + + // As long as all of the files that we compacted are still part of the table, + // and the partitioning has not changed it is valid to continue to try + // and commit this checkpoint. + if (candidateSetOld.subsetOf(candidateSetNew) && partitionSchema == newPartitionSchema) { + true + } else { + val deleted = candidateSetOld -- candidateSetNew + logWarning(s"The following compacted files were delete " + + s"during checkpoint ${deleted.mkString(",")}. Aborting the compaction.") + false + } + } + } + + val optimizeStats = OptimizeStats() + optimizeStats.addedFilesSizeStats.merge(addedFiles) + optimizeStats.removedFilesSizeStats.merge(removedFiles) + optimizeStats.numPartitionsOptimized = jobs.map(j => j._1).distinct.size + optimizeStats.numBatches = jobs.size + optimizeStats.totalConsideredFiles = candidateFiles.size + optimizeStats.totalFilesSkipped = optimizeStats.totalConsideredFiles - removedFiles.size + optimizeStats.totalClusterParallelism = sparkSession.sparkContext.defaultParallelism + + if (isMultiDimClustering) { + val inputFileStats = + ZOrderFileStats(removedFiles.size, removedFiles.map(_.size.getOrElse(0L)).sum) + optimizeStats.zOrderStats = Some(ZOrderStats( + strategyName = "all", // means process all files in a partition + inputCubeFiles = ZOrderFileStats(0, 0), + inputOtherFiles = inputFileStats, + inputNumCubes = 0, + mergedFiles = inputFileStats, + // There will one z-cube for each partition + numOutputCubes = optimizeStats.numPartitionsOptimized)) + } + + return Seq(Row(txn.deltaLog.dataPath.toString, optimizeStats.toOptimizeMetrics)) + } + } + + /** + * Utility methods to group files into bins for optimize. + * + * @param partitionsToCompact List of files to compact group by partition. + * Partition is defined by the partition values (partCol -> partValue) + * @param maxTargetFileSize Max size (in bytes) of the compaction output file. + * @return Sequence of bins. Each bin contains one or more files from the same + * partition and targeted for one output file. + */ + private def groupFilesIntoBins( + partitionsToCompact: Seq[(Map[String, String], Seq[AddFile])], + maxTargetFileSize: Long): Seq[(Map[String, String], Seq[AddFile])] = { + + partitionsToCompact.flatMap { + case (partition, files) => + val bins = new ArrayBuffer[Seq[AddFile]]() + + val currentBin = new ArrayBuffer[AddFile]() + var currentBinSize = 0L + + files.sortBy(_.size).foreach { file => + // Generally, a bin is a group of existing files, whose total size does not exceed the + // desired maxFileSize. They will be coalesced into a single output file. + // However, if isMultiDimClustering = true, all files in a partition will be read by the + // same job, the data will be range-partitioned and numFiles = totalFileSize / maxFileSize + // will be produced. See below. + if (file.size + currentBinSize > maxTargetFileSize && !isMultiDimClustering) { + bins += currentBin.toVector + currentBin.clear() + currentBin += file + currentBinSize = file.size + } else { + currentBin += file + currentBinSize += file.size + } + } + + if (currentBin.nonEmpty) { + bins += currentBin.toVector + } + + bins.map(b => (partition, b)) + // select bins that have at least two files or in case of multi-dim clustering + // select all bins + .filter(_._2.size > 1 || isMultiDimClustering) + } + } + + /** + * Utility method to run a Spark job to compact the files in given bin + * + * @param txn [[OptimisticTransaction]] instance in use to commit the changes to DeltaLog. + * @param partition Partition values of the partition that files in [[bin]] belongs to. + * @param bin List of files to compact into one large file. + * @param maxFileSize Targeted output file size in bytes + */ + private def runOptimizeBinJob( + txn: OptimisticTransaction, + partition: Map[String, String], + bin: Seq[AddFile], + maxFileSize: Long): Seq[FileAction] = { + val baseTablePath = txn.deltaLog.dataPath + + val input = txn.deltaLog.createDataFrame(txn.snapshot, bin, actionTypeOpt = Some("Optimize")) + val repartitionDF = if (isMultiDimClustering) { + val totalSize = bin.map(_.size).sum + val approxNumFiles = Math.max(1, totalSize / maxFileSize).toInt + MultiDimClustering.cluster( + input, + approxNumFiles, + zOrderByColumns) + } else { + val useRepartition = sparkSession.sessionState.conf.getConf( + DeltaSQLConf.DELTA_OPTIMIZE_REPARTITION_ENABLED) + if (useRepartition) { + input.repartition(numPartitions = 1) + } else { + input.coalesce(numPartitions = 1) + } + } + + val partitionDesc = partition.toSeq.map(entry => entry._1 + "=" + entry._2).mkString(",") + + val partitionName = if (partition.isEmpty) "" else s" in partition ($partitionDesc)" + val description = s"$baseTablePath
Optimizing ${bin.size} files" + partitionName + sparkSession.sparkContext.setJobGroup( + sparkSession.sparkContext.getLocalProperty(SPARK_JOB_GROUP_ID), + description) + + val addFiles = txn.writeFiles(repartitionDF).collect { + case a: AddFile => + a.copy(dataChange = false) + case other => + throw new IllegalStateException( + s"Unexpected action $other with type ${other.getClass}. File compaction job output" + + s"should only have AddFiles") + } + val removeFiles = bin.map(f => f.removeWithTimestamp(operationTimestamp, dataChange = false)) + val updates = addFiles ++ removeFiles + updates + } + + type PartitionedBin = (Map[String, String], Seq[AddFile]) + + trait OptimizeType { + def minNumFiles: Long + + def maxFileSize: Long = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE) + + def targetFiles: (Seq[AddFile], Seq[AddFile]) + + def targetBins(jobs: Seq[PartitionedBin]): Seq[PartitionedBin] = jobs + } + + case class Compaction() extends OptimizeType { + def minNumFiles: Long = 2 + + def targetFiles: (Seq[AddFile], Seq[AddFile]) = { + val minFileSize = sparkSession.sessionState.conf.getConf( + DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE) + require(minFileSize > 0, "minFileSize must be > 0") + val candidateFiles = txn.filterFiles(partitionPredicate) + val filesToProcess = candidateFiles.filter(_.size < minFileSize) + (candidateFiles, filesToProcess) + } + } + + case class MultiDimOrdering() extends OptimizeType { + def minNumFiles: Long = 1 + + def targetFiles: (Seq[AddFile], Seq[AddFile]) = { + // select all files in case of multi-dimensional clustering + val candidateFiles = txn.filterFiles(partitionPredicate) + (candidateFiles, candidateFiles) + } + } + + case class AutoCompaction() extends OptimizeType { + def minNumFiles: Long = { + val minNumFiles = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES) + require(minNumFiles > 0, "minNumFiles must be > 0") + minNumFiles + } + + override def maxFileSize: Long = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_AUTO_COMPACT_MAX_FILE_SIZE) + .getOrElse(128 * 1024 * 1024) + + override def targetFiles: (Seq[AddFile], Seq[AddFile]) = { + val autoCompactTarget = + sparkSession.sessionState.conf.getConf(RapidsDeltaSQLConf.AUTO_COMPACT_TARGET) + // Filter the candidate files according to autoCompact.target config. + lazy val addedFiles = prevCommitActions.collect { case a: AddFile => a } + val candidateFiles = autoCompactTarget match { + case "table" => + txn.filterFiles() + case "commit" => + addedFiles + case "partition" => + val eligiblePartitions = addedFiles.map(_.partitionValues).toSet + txn.filterFiles().filter(f => eligiblePartitions.contains(f.partitionValues)) + case _ => + logError(s"Invalid config for autoCompact.target: $autoCompactTarget. " + + s"Falling back to the default value 'table'.") + txn.filterFiles() + } + val filesToProcess = candidateFiles.filter(_.size < maxFileSize) + (candidateFiles, filesToProcess) + } + + override def targetBins(jobs: Seq[PartitionedBin]): Seq[PartitionedBin] = { + var acc = 0L + val maxCompactBytes = + sparkSession.sessionState.conf.getConf(RapidsDeltaSQLConf.AUTO_COMPACT_MAX_COMPACT_BYTES) + // bins with more files are prior to less files. + jobs + .sortBy { case (_, filesInBin) => -filesInBin.length } + .takeWhile { case (_, filesInBin) => + acc += filesInBin.map(_.size).sum + acc <= maxCompactBytes + } + } + } + + object OptimizeType { + + def apply(isMultiDimClustering: Boolean, isAutoCompact: Boolean): OptimizeType = { + if (isMultiDimClustering) { + MultiDimOrdering() + } else if (isAutoCompact) { + AutoCompaction() + } else { + Compaction() + } + } + } + + /** + * Attempts to commit the given actions to the log. In the case of a concurrent update, + * the given function will be invoked with a new transaction to allow custom conflict + * detection logic to indicate it is safe to try again, by returning `true`. + * + * This function will continue to try to commit to the log as long as `f` returns `true`, + * otherwise throws a subclass of [[ConcurrentModificationException]]. + */ + @tailrec + private def commitAndRetry( + txn: OptimisticTransaction, + optimizeOperation: Operation, + actions: Seq[Action], + metrics: Map[String, SQLMetric])(f: OptimisticTransaction => Boolean) + : Unit = { + try { + txn.registerSQLMetrics(sparkSession, metrics) + txn.commit(actions, optimizeOperation) + } catch { + case e: ConcurrentModificationException => + val newTxn = txn.deltaLog.startTransaction() + if (f(newTxn)) { + logInfo("Retrying commit after checking for semantic conflicts with concurrent updates.") + commitAndRetry(newTxn, optimizeOperation, actions, metrics)(f) + } else { + logWarning("Semantic conflicts detected. Aborting operation.") + throw e + } + } + } + + /** Create a map of SQL metrics for adding to the commit history. */ + private def createMetrics( + sparkContext: SparkContext, + addedFiles: Seq[AddFile], + removedFiles: Seq[RemoveFile]): Map[String, SQLMetric] = { + + def setAndReturnMetric(description: String, value: Long) = { + val metric = createMetric(sparkContext, description) + metric.set(value) + metric + } + + def totalSize(actions: Seq[FileAction]): Long = { + var totalSize = 0L + actions.foreach { file => + val fileSize = file match { + case addFile: AddFile => addFile.size + case removeFile: RemoveFile => removeFile.size.getOrElse(0L) + case default => + throw new IllegalArgumentException(s"Unknown FileAction type: ${default.getClass}") + } + totalSize += fileSize + } + totalSize + } + + val sizeStats = FileSizeStatsWithHistogram.create(addedFiles.map(_.size).sorted) + Map[String, SQLMetric]( + "minFileSize" -> setAndReturnMetric("minimum file size", sizeStats.get.min), + "p25FileSize" -> setAndReturnMetric("25th percentile file size", sizeStats.get.p25), + "p50FileSize" -> setAndReturnMetric("50th percentile file size", sizeStats.get.p50), + "p75FileSize" -> setAndReturnMetric("75th percentile file size", sizeStats.get.p75), + "maxFileSize" -> setAndReturnMetric("maximum file size", sizeStats.get.max), + "numAddedFiles" -> setAndReturnMetric("total number of files added.", addedFiles.size), + "numRemovedFiles" -> setAndReturnMetric("total number of files removed.", removedFiles.size), + "numAddedBytes" -> setAndReturnMetric("total number of bytes added", totalSize(addedFiles)), + "numRemovedBytes" -> + setAndReturnMetric("total number of bytes removed", totalSize(removedFiles))) + } +} \ No newline at end of file diff --git a/integration_tests/src/main/python/delta_lake_auto_compact_test.py b/integration_tests/src/main/python/delta_lake_auto_compact_test.py new file mode 100644 index 00000000000..2cdfb9fa987 --- /dev/null +++ b/integration_tests/src/main/python/delta_lake_auto_compact_test.py @@ -0,0 +1,69 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from asserts import assert_gpu_and_cpu_writes_are_equal_collect +from delta.tables import DeltaTable +from delta_lake_write_test import delta_meta_allow +from marks import allow_non_gpu, delta_lake +from pyspark.sql.functions import * +from spark_session import is_databricks104_or_later + +_conf = {'spark.rapids.sql.explain': 'ALL', + 'spark.databricks.delta.autoCompact.enabled': 'true', # Enable auto compaction. + 'spark.databricks.delta.autoCompact.minNumFiles': 3} # Num files before compaction. + + +@delta_lake +@allow_non_gpu(*delta_meta_allow) +@pytest.mark.skipif(not is_databricks104_or_later(), + reason="Auto compaction of Delta Lake tables is only supported " + "on Databricks 10.4+") +def test_auto_compact(spark_tmp_path): + + data_path = spark_tmp_path + "/AUTO_COMPACT_TEST_DATA" + + # Write to Delta table. Ensure reads with CPU/GPU produce the same results. + def write_to_delta(spark, table_path): + input_data = spark.range(3).repartition(1) + writer = input_data.write.format("delta").mode("append") + writer.save(table_path) # <-- Wait for it. + writer.save(table_path) # <-- Wait for it. + writer.save(table_path) # <-- Auto compact on 3. + + def read_data(spark, table_path): + return spark.read.format("delta").load(table_path) + + def read_metadata(spark, table_path): + input_table = DeltaTable.forPath(spark, table_path) + table_history = input_table.history() + return table_history.select( + "version", + "operation", + expr("operationMetrics[\"numFiles\"]"), + expr("operationMetrics[\"numRemovedFiles\"]"), + expr("operationMetrics[\"numAddedFiles\"]") + ) + + assert_gpu_and_cpu_writes_are_equal_collect( + write_func=write_to_delta, + read_func=read_data, + base_path=data_path, + conf=_conf) + + assert_gpu_and_cpu_writes_are_equal_collect( + write_func=write_to_delta, + read_func=read_metadata, + base_path=data_path, + conf=_conf) From fbf60e7198fd1ba7d6330f112ddb287f9ad3d38d Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 10 Mar 2023 13:08:31 -0800 Subject: [PATCH 02/17] Use GpuOptimisticTransaction for auto-optimized write. --- .../sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala | 3 ++- .../transaction/tahoe/rapids/GpuOptimisticTransaction.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala index 663c7917f48..9afd7992b8c 100644 --- a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala +++ b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala @@ -39,7 +39,8 @@ object GpuDoAutoCompaction extends PostCommitHook committedVersion: Long, postCommitSnapshot: Snapshot, committedActions: Seq[Action]): Unit = { - val newTxn = txn.deltaLog.startTransaction() + val gpuTxn = txn.asInstanceOf[GpuOptimisticTransaction] + val newTxn = new GpuDeltaLog(gpuTxn.deltaLog, gpuTxn.rapidsConf).startTransaction() new GpuOptimizeExecutor(spark, newTxn, Seq.empty, Seq.empty, committedActions).optimize() } diff --git a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala index dc80a866ae5..45e0e1be32c 100644 --- a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala +++ b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala @@ -64,7 +64,7 @@ import org.apache.spark.util.{Clock, SerializableConfiguration} class GpuOptimisticTransaction( deltaLog: DeltaLog, snapshot: Snapshot, - rapidsConf: RapidsConf)(implicit clock: Clock) + val rapidsConf: RapidsConf)(implicit clock: Clock) extends GpuOptimisticTransactionBase(deltaLog, snapshot, rapidsConf)(clock) { /** Creates a new OptimisticTransaction. From 72fd27dcb0fa210e5555798ad428ede2e6de6f6f Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 14 Mar 2023 11:37:13 -0700 Subject: [PATCH 03/17] Support for Databricks 10.4. --- .../rapids/GpuOptimisticTransactionBase.scala | 2 +- .../tahoe/rapids/GpuDoAutoCompaction.scala | 47 ++ .../rapids/GpuOptimisticTransaction.scala | 19 +- .../tahoe/rapids/GpuOptimizeExecutor.scala | 401 ++++++++++++++++++ .../rapids/GpuOptimisticTransaction.scala | 6 +- 5 files changed, 469 insertions(+), 6 deletions(-) create mode 100644 delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala create mode 100644 delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala diff --git a/delta-lake/common/src/main/databricks/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransactionBase.scala b/delta-lake/common/src/main/databricks/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransactionBase.scala index 9bce5d68344..d41c73c5535 100644 --- a/delta-lake/common/src/main/databricks/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransactionBase.scala +++ b/delta-lake/common/src/main/databricks/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransactionBase.scala @@ -53,7 +53,7 @@ import org.apache.spark.util.Clock * @param rapidsConf RAPIDS Accelerator config settings. */ abstract class GpuOptimisticTransactionBase - (deltaLog: DeltaLog, snapshot: Snapshot, rapidsConf: RapidsConf) + (deltaLog: DeltaLog, snapshot: Snapshot, val rapidsConf: RapidsConf) (implicit clock: Clock) extends OptimisticTransaction(deltaLog, snapshot)(clock) with DeltaLogging { diff --git a/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala new file mode 100644 index 00000000000..fe690da3ddd --- /dev/null +++ b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * This file was derived from DoAutoCompaction.scala + * from https://github.com/delta-io/delta/pull/1156 + * in the Delta Lake project at https://github.com/delta-io/delta. + * + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.sql.transaction.tahoe.rapids + +import com.databricks.sql.transaction.tahoe.actions.Action +import com.databricks.sql.transaction.tahoe.hooks.PostCommitHook +import com.databricks.sql.transaction.tahoe.metering.DeltaLogging +import com.databricks.sql.transaction.tahoe._ + +import org.apache.spark.sql.SparkSession + +object GpuDoAutoCompaction extends PostCommitHook + with DeltaLogging + with Serializable { + override val name: String = "GpuDoAutoCompaction" + + override def run(spark: SparkSession, + txn: OptimisticTransactionImpl, + committedActions: Seq[Action]): Unit = { + val gpuTxn = txn.asInstanceOf[GpuOptimisticTransaction] + val newTxn = new GpuDeltaLog(gpuTxn.deltaLog, gpuTxn.rapidsConf).startTransaction() + new GpuOptimizeExecutor(spark, newTxn, Seq.empty, Seq.empty, committedActions).optimize() + } + + override def handleError(error: Throwable, version: Long): Unit = + throw DeltaErrors.postCommitHookFailedException(this, version, name, error) +} \ No newline at end of file diff --git a/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala index bde3e40203a..0e37ce7c124 100644 --- a/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala +++ b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala @@ -252,6 +252,23 @@ class GpuOptimisticTransaction( identityTracker.foreach { tracker => updatedIdentityHighWaterMarks.appendAll(tracker.highWaterMarks.toSeq) } - resultFiles.toSeq ++ committer.changeFiles + val fileActions = resultFiles.toSeq ++ committer.changeFiles + + // Check if auto-compaction is enabled. + // (Auto compaction checks are derived from the work in + // https://github.com/delta-io/delta/pull/1156). + lazy val autoCompactEnabled = + spark.sessionState.conf + .getConf[String](DeltaSQLConf.DELTA_AUTO_COMPACT_ENABLED) + .getOrElse { + DeltaConfigs.AUTO_COMPACT.fromMetaData(metadata) + "false" // TODO: Fix getting this from DeltaConfigs.AUTO_COMPACT. + }.toBoolean + + if (!isOptimize && autoCompactEnabled && fileActions.nonEmpty) { + registerPostCommitHook(GpuDoAutoCompaction) + } + + fileActions } } diff --git a/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala new file mode 100644 index 00000000000..dc67165b29e --- /dev/null +++ b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala @@ -0,0 +1,401 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * This file was derived from: + * 1. DoAutoCompaction.scala from PR#1156 at https://github.com/delta-io/delta/pull/1156, + * 2. OptimizeTableCommand.scala from the Delta Lake project at https://github.com/delta-io/delta. + * + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.databricks.sql.transaction.tahoe.rapids + +import java.util.ConcurrentModificationException + +import scala.annotation.tailrec +import scala.collection.mutable.ArrayBuffer + +import com.databricks.sql.transaction.tahoe._ +import com.databricks.sql.transaction.tahoe.DeltaOperations.Operation +import com.databricks.sql.transaction.tahoe.actions.{Action, AddFile, FileAction, RemoveFile} +import com.databricks.sql.transaction.tahoe.commands.DeltaCommand +import com.databricks.sql.transaction.tahoe.commands.optimize._ +import com.databricks.sql.transaction.tahoe.files.SQLMetricsReporting +import com.databricks.sql.transaction.tahoe.sources.DeltaSQLConf +import com.nvidia.spark.rapids.delta.RapidsDeltaSQLConf + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext.SPARK_JOB_GROUP_ID +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric +import org.apache.spark.util.{SystemClock, ThreadUtils} + +class GpuOptimizeExecutor( + sparkSession: SparkSession, + txn: OptimisticTransaction, + partitionPredicate: Seq[Expression], + zOrderByColumns: Seq[String], + prevCommitActions: Seq[Action]) + extends DeltaCommand with SQLMetricsReporting with Serializable { + + /** Timestamp to use in [[FileAction]] */ + private val operationTimestamp = new SystemClock().getTimeMillis() + + private val isMultiDimClustering = zOrderByColumns.nonEmpty + private val isAutoCompact = prevCommitActions.nonEmpty + private val optimizeType = OptimizeType(isMultiDimClustering, isAutoCompact) + + def optimize(): Seq[Row] = { + recordDeltaOperation(txn.deltaLog, "delta.optimize") { + val maxFileSize = optimizeType.maxFileSize + require(maxFileSize > 0, "maxFileSize must be > 0") + + val minNumFilesInDir = optimizeType.minNumFiles + val (candidateFiles, filesToProcess) = optimizeType.targetFiles + val partitionSchema = txn.metadata.partitionSchema + + // select all files in case of multi-dimensional clustering + val partitionsToCompact = filesToProcess + .groupBy(_.partitionValues) + .filter { case (_, filesInPartition) => filesInPartition.size >= minNumFilesInDir } + .toSeq + + val groupedJobs = groupFilesIntoBins(partitionsToCompact, maxFileSize) + val jobs = optimizeType.targetBins(groupedJobs) + + val maxThreads = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS) + val updates = ThreadUtils.parmap(jobs, "OptimizeJob", maxThreads) { partitionBinGroup => + runOptimizeBinJob(txn, partitionBinGroup._1, partitionBinGroup._2, maxFileSize) + }.flatten + + val addedFiles = updates.collect { case a: AddFile => a } + val removedFiles = updates.collect { case r: RemoveFile => r } + if (addedFiles.nonEmpty) { + val operation = DeltaOperations.Optimize(partitionPredicate.map(_.sql), zOrderByColumns) + val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles) + commitAndRetry(txn, operation, updates, metrics) { newTxn => + val newPartitionSchema = newTxn.metadata.partitionSchema + val candidateSetOld = candidateFiles.map(_.path).toSet + val candidateSetNew = newTxn.filterFiles(partitionPredicate).map(_.path).toSet + + // As long as all of the files that we compacted are still part of the table, + // and the partitioning has not changed it is valid to continue to try + // and commit this checkpoint. + if (candidateSetOld.subsetOf(candidateSetNew) && partitionSchema == newPartitionSchema) { + true + } else { + val deleted = candidateSetOld -- candidateSetNew + logWarning(s"The following compacted files were delete " + + s"during checkpoint ${deleted.mkString(",")}. Aborting the compaction.") + false + } + } + } + + val optimizeStats = OptimizeStats() + optimizeStats.addedFilesSizeStats.merge(addedFiles) + optimizeStats.removedFilesSizeStats.merge(removedFiles) + optimizeStats.numPartitionsOptimized = jobs.map(j => j._1).distinct.size + optimizeStats.numBatches = jobs.size + optimizeStats.totalConsideredFiles = candidateFiles.size + optimizeStats.totalFilesSkipped = optimizeStats.totalConsideredFiles - removedFiles.size + + if (isMultiDimClustering) { + val inputFileStats = + ZOrderFileStats(removedFiles.size, removedFiles.map(_.size.getOrElse(0L)).sum) + optimizeStats.zOrderStats = Some(ZOrderStats( + strategyName = "all", // means process all files in a partition + inputCubeFiles = ZOrderFileStats(0, 0), + inputOtherFiles = inputFileStats, + inputNumCubes = 0, + mergedFiles = inputFileStats, + // There will one z-cube for each partition + numOutputCubes = optimizeStats.numPartitionsOptimized)) + } + + return Seq(Row(txn.deltaLog.dataPath.toString, optimizeStats.toOptimizeMetrics)) + } + } + + /** + * Utility methods to group files into bins for optimize. + * + * @param partitionsToCompact List of files to compact group by partition. + * Partition is defined by the partition values (partCol -> partValue) + * @param maxTargetFileSize Max size (in bytes) of the compaction output file. + * @return Sequence of bins. Each bin contains one or more files from the same + * partition and targeted for one output file. + */ + private def groupFilesIntoBins( + partitionsToCompact: Seq[(Map[String, String], Seq[AddFile])], + maxTargetFileSize: Long): Seq[(Map[String, String], Seq[AddFile])] = { + + partitionsToCompact.flatMap { + case (partition, files) => + val bins = new ArrayBuffer[Seq[AddFile]]() + + val currentBin = new ArrayBuffer[AddFile]() + var currentBinSize = 0L + + files.sortBy(_.size).foreach { file => + // Generally, a bin is a group of existing files, whose total size does not exceed the + // desired maxFileSize. They will be coalesced into a single output file. + // However, if isMultiDimClustering = true, all files in a partition will be read by the + // same job, the data will be range-partitioned and numFiles = totalFileSize / maxFileSize + // will be produced. See below. + if (file.size + currentBinSize > maxTargetFileSize && !isMultiDimClustering) { + bins += currentBin.toVector + currentBin.clear() + currentBin += file + currentBinSize = file.size + } else { + currentBin += file + currentBinSize += file.size + } + } + + if (currentBin.nonEmpty) { + bins += currentBin.toVector + } + + bins.map(b => (partition, b)) + // select bins that have at least two files or in case of multi-dim clustering + // select all bins + .filter(_._2.size > 1 || isMultiDimClustering) + } + } + + /** + * Utility method to run a Spark job to compact the files in given bin + * + * @param txn [[OptimisticTransaction]] instance in use to commit the changes to DeltaLog. + * @param partition Partition values of the partition that files in [[bin]] belongs to. + * @param bin List of files to compact into one large file. + * @param maxFileSize Targeted output file size in bytes + */ + private def runOptimizeBinJob( + txn: OptimisticTransaction, + partition: Map[String, String], + bin: Seq[AddFile], + maxFileSize: Long): Seq[FileAction] = { + val baseTablePath = txn.deltaLog.dataPath + + val input = txn.deltaLog.createDataFrame(txn.snapshot, bin, actionTypeOpt = Some("Optimize")) + val repartitionDF = if (isMultiDimClustering) { + // TODO: MultiDimClustering is not currently supported on Databricks 10.4. + // val totalSize = bin.map(_.size).sum + // val approxNumFiles = Math.max(1, totalSize / maxFileSize).toInt + // MultiDimClustering.cluster( + // txn.deltaLog, + // input, + // approxNumFiles, + // zOrderByColumns) + throw new UnsupportedOperationException("MultiDimClustering not supported on compaction") + } else { + // Re-partition is not available in Databricks 11.3 (spark321db) + input.coalesce(numPartitions = 1) + } + + val partitionDesc = partition.toSeq.map(entry => entry._1 + "=" + entry._2).mkString(",") + + val partitionName = if (partition.isEmpty) "" else s" in partition ($partitionDesc)" + val description = s"$baseTablePath
Optimizing ${bin.size} files" + partitionName + sparkSession.sparkContext.setJobGroup( + sparkSession.sparkContext.getLocalProperty(SPARK_JOB_GROUP_ID), + description) + + val addFiles = txn.writeFiles(repartitionDF).collect { + case a: AddFile => + a.copy(dataChange = false) + case other => + throw new IllegalStateException( + s"Unexpected action $other with type ${other.getClass}. File compaction job output" + + s"should only have AddFiles") + } + val removeFiles = bin.map(f => f.removeWithTimestamp(operationTimestamp, dataChange = false)) + val updates = addFiles ++ removeFiles + updates + } + + type PartitionedBin = (Map[String, String], Seq[AddFile]) + + trait OptimizeType { + def minNumFiles: Long + + def maxFileSize: Long = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE) + + def targetFiles: (Seq[AddFile], Seq[AddFile]) + + def targetBins(jobs: Seq[PartitionedBin]): Seq[PartitionedBin] = jobs + } + + case class Compaction() extends OptimizeType { + def minNumFiles: Long = 2 + + def targetFiles: (Seq[AddFile], Seq[AddFile]) = { + val minFileSize = sparkSession.sessionState.conf.getConf( + DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE) + require(minFileSize > 0, "minFileSize must be > 0") + val candidateFiles = txn.filterFiles(partitionPredicate) + val filesToProcess = candidateFiles.filter(_.size < minFileSize) + (candidateFiles, filesToProcess) + } + } + + case class MultiDimOrdering() extends OptimizeType { + def minNumFiles: Long = 1 + + def targetFiles: (Seq[AddFile], Seq[AddFile]) = { + // select all files in case of multi-dimensional clustering + val candidateFiles = txn.filterFiles(partitionPredicate) + (candidateFiles, candidateFiles) + } + } + + case class AutoCompaction() extends OptimizeType { + def minNumFiles: Long = { + val minNumFiles = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES) + require(minNumFiles > 0, "minNumFiles must be > 0") + minNumFiles + } + + override def maxFileSize: Long = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_AUTO_COMPACT_MAX_FILE_SIZE) + .getOrElse(128 * 1024 * 1024) + + override def targetFiles: (Seq[AddFile], Seq[AddFile]) = { + val autoCompactTarget = + sparkSession.sessionState.conf.getConf(RapidsDeltaSQLConf.AUTO_COMPACT_TARGET) + // Filter the candidate files according to autoCompact.target config. + lazy val addedFiles = prevCommitActions.collect { case a: AddFile => a } + val candidateFiles = autoCompactTarget match { + case "table" => + txn.filterFiles() + case "commit" => + addedFiles + case "partition" => + val eligiblePartitions = addedFiles.map(_.partitionValues).toSet + txn.filterFiles().filter(f => eligiblePartitions.contains(f.partitionValues)) + case _ => + logError(s"Invalid config for autoCompact.target: $autoCompactTarget. " + + s"Falling back to the default value 'table'.") + txn.filterFiles() + } + val filesToProcess = candidateFiles.filter(_.size < maxFileSize) + (candidateFiles, filesToProcess) + } + + override def targetBins(jobs: Seq[PartitionedBin]): Seq[PartitionedBin] = { + var acc = 0L + val maxCompactBytes = + sparkSession.sessionState.conf.getConf(RapidsDeltaSQLConf.AUTO_COMPACT_MAX_COMPACT_BYTES) + // bins with more files are prior to less files. + jobs + .sortBy { case (_, filesInBin) => -filesInBin.length } + .takeWhile { case (_, filesInBin) => + acc += filesInBin.map(_.size).sum + acc <= maxCompactBytes + } + } + } + + object OptimizeType { + + def apply(isMultiDimClustering: Boolean, isAutoCompact: Boolean): OptimizeType = { + if (isMultiDimClustering) { + MultiDimOrdering() + } else if (isAutoCompact) { + AutoCompaction() + } else { + Compaction() + } + } + } + + /** + * Attempts to commit the given actions to the log. In the case of a concurrent update, + * the given function will be invoked with a new transaction to allow custom conflict + * detection logic to indicate it is safe to try again, by returning `true`. + * + * This function will continue to try to commit to the log as long as `f` returns `true`, + * otherwise throws a subclass of [[ConcurrentModificationException]]. + */ + @tailrec + private def commitAndRetry( + txn: OptimisticTransaction, + optimizeOperation: Operation, + actions: Seq[Action], + metrics: Map[String, SQLMetric])(f: OptimisticTransaction => Boolean) + : Unit = { + try { + txn.registerSQLMetrics(sparkSession, metrics) + txn.commit(actions, optimizeOperation) + } catch { + case e: ConcurrentModificationException => + val newTxn = txn.deltaLog.startTransaction() + if (f(newTxn)) { + logInfo("Retrying commit after checking for semantic conflicts with concurrent updates.") + commitAndRetry(newTxn, optimizeOperation, actions, metrics)(f) + } else { + logWarning("Semantic conflicts detected. Aborting operation.") + throw e + } + } + } + + /** Create a map of SQL metrics for adding to the commit history. */ + private def createMetrics( + sparkContext: SparkContext, + addedFiles: Seq[AddFile], + removedFiles: Seq[RemoveFile]): Map[String, SQLMetric] = { + + def setAndReturnMetric(description: String, value: Long) = { + val metric = createMetric(sparkContext, description) + metric.set(value) + metric + } + + def totalSize(actions: Seq[FileAction]): Long = { + var totalSize = 0L + actions.foreach { file => + val fileSize = file match { + case addFile: AddFile => addFile.size + case removeFile: RemoveFile => removeFile.size.getOrElse(0L) + case default => + throw new IllegalArgumentException(s"Unknown FileAction type: ${default.getClass}") + } + totalSize += fileSize + } + totalSize + } + + val sizeStats = FileSizeStatsWithHistogram.create(addedFiles.map(_.size).sorted) + Map[String, SQLMetric]( + "minFileSize" -> setAndReturnMetric("minimum file size", sizeStats.get.min), + "p25FileSize" -> setAndReturnMetric("25th percentile file size", sizeStats.get.p25), + "p50FileSize" -> setAndReturnMetric("50th percentile file size", sizeStats.get.p50), + "p75FileSize" -> setAndReturnMetric("75th percentile file size", sizeStats.get.p75), + "maxFileSize" -> setAndReturnMetric("maximum file size", sizeStats.get.max), + "numAddedFiles" -> setAndReturnMetric("total number of files added.", addedFiles.size), + "numRemovedFiles" -> setAndReturnMetric("total number of files removed.", removedFiles.size), + "numAddedBytes" -> setAndReturnMetric("total number of bytes added", totalSize(addedFiles)), + "numRemovedBytes" -> + setAndReturnMetric("total number of bytes removed", totalSize(removedFiles))) + } +} \ No newline at end of file diff --git a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala index 45e0e1be32c..d3b861230d3 100644 --- a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala +++ b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala @@ -64,7 +64,7 @@ import org.apache.spark.util.{Clock, SerializableConfiguration} class GpuOptimisticTransaction( deltaLog: DeltaLog, snapshot: Snapshot, - val rapidsConf: RapidsConf)(implicit clock: Clock) + rapidsConf: RapidsConf)(implicit clock: Clock) extends GpuOptimisticTransactionBase(deltaLog, snapshot, rapidsConf)(clock) { /** Creates a new OptimisticTransaction. @@ -228,9 +228,7 @@ class GpuOptimisticTransaction( } val gpuFileFormat = deltaLog.fileFormat(metadata) match { - case _: DeltaParquetFileFormat => - println("CALEB: GPU Writing Parquet!") - new GpuParquetFileFormat + case _: DeltaParquetFileFormat => new GpuParquetFileFormat case f => throw new IllegalStateException(s"file format $f is not supported") } From 9923a5bf879614055f7813b4077a719f8a403b79 Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 14 Mar 2023 15:54:02 -0700 Subject: [PATCH 04/17] Test for partitioned case. Signed-off-by: MithunR --- .../python/delta_lake_auto_compact_test.py | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/integration_tests/src/main/python/delta_lake_auto_compact_test.py b/integration_tests/src/main/python/delta_lake_auto_compact_test.py index 2cdfb9fa987..93a486ad76b 100644 --- a/integration_tests/src/main/python/delta_lake_auto_compact_test.py +++ b/integration_tests/src/main/python/delta_lake_auto_compact_test.py @@ -67,3 +67,60 @@ def read_metadata(spark, table_path): read_func=read_metadata, base_path=data_path, conf=_conf) + + +@delta_lake +@allow_non_gpu(*delta_meta_allow) +@pytest.mark.skipif(not is_databricks104_or_later(), + reason="Auto compaction of Delta Lake tables is only supported " + "on Databricks 10.4+") +def test_auto_compact_partitioned(spark_tmp_path): + + """ + This test checks whether the results of auto compaction on a partitioned table + match, when written via CPU and GPU. + Note: The behaviour of compaction itself differs from Databricks, in that + the plugin enforces `minFiles` restriction uniformly across all partitions. + Databricks' Delta implementation appears not to. + """ + data_path = spark_tmp_path + "/AUTO_COMPACT_TEST_DATA_PARTITIONED" + + # Write to Delta table. Ensure reads with CPU/GPU produce the same results. + def write_to_delta(spark, table_path): + input_data = spark.range(3).withColumn("part", expr("id % 3")) + writer = input_data.write.partitionBy("part").format("delta").mode("append") + writer.save(table_path) # <-- Wait for it. + writer.save(table_path) # <-- Wait for it. + writer.save(table_path) # <-- Auto compact on 3. + + def read_data(spark, table_path): + return spark.read.format("delta").load(table_path).orderBy("id", "part") + + assert_gpu_and_cpu_writes_are_equal_collect( + write_func=write_to_delta, + read_func=read_data, + base_path=data_path, + conf=_conf) + + def read_metadata(spark, table_path): + """ + The snapshots might not look alike, in the partitioned case. + Ensure that auto compaction has occurred, even if it's not identical. + """ + input_table = DeltaTable.forPath(spark, table_path) + table_history = input_table.history() + return table_history.select( + "version", + "operation", + expr("operationMetrics[\"numFiles\"] > 0"), + expr("operationMetrics[\"numRemovedFiles\"] > 0"), + expr("operationMetrics[\"numAddedFiles\"] > 0") + ) + + assert_gpu_and_cpu_writes_are_equal_collect( + write_func=write_to_delta, + read_func=read_metadata, + base_path=data_path, + conf=_conf) + + From 176f4a6cc9a39168f8476a199d2cd73a79a80841 Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 14 Mar 2023 16:32:16 -0700 Subject: [PATCH 05/17] Fixed import order. Signed-off-by: MithunR --- .../sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala | 2 +- .../sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala index fe690da3ddd..e137e15c6f5 100644 --- a/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala +++ b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala @@ -22,10 +22,10 @@ package com.databricks.sql.transaction.tahoe.rapids +import com.databricks.sql.transaction.tahoe._ import com.databricks.sql.transaction.tahoe.actions.Action import com.databricks.sql.transaction.tahoe.hooks.PostCommitHook import com.databricks.sql.transaction.tahoe.metering.DeltaLogging -import com.databricks.sql.transaction.tahoe._ import org.apache.spark.sql.SparkSession diff --git a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala index 9afd7992b8c..14fdaa8b6c0 100644 --- a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala +++ b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala @@ -22,10 +22,10 @@ package com.databricks.sql.transaction.tahoe.rapids +import com.databricks.sql.transaction.tahoe._ import com.databricks.sql.transaction.tahoe.actions.Action import com.databricks.sql.transaction.tahoe.hooks.PostCommitHook import com.databricks.sql.transaction.tahoe.metering.DeltaLogging -import com.databricks.sql.transaction.tahoe._ import org.apache.spark.sql.SparkSession From 155eedb605cf167f7069eca20db446f5d3f1ae00 Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 14 Mar 2023 17:18:33 -0700 Subject: [PATCH 06/17] Fix test for 10.4. --- .../python/delta_lake_auto_compact_test.py | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/integration_tests/src/main/python/delta_lake_auto_compact_test.py b/integration_tests/src/main/python/delta_lake_auto_compact_test.py index 93a486ad76b..a818285f435 100644 --- a/integration_tests/src/main/python/delta_lake_auto_compact_test.py +++ b/integration_tests/src/main/python/delta_lake_auto_compact_test.py @@ -45,25 +45,25 @@ def write_to_delta(spark, table_path): def read_data(spark, table_path): return spark.read.format("delta").load(table_path) + assert_gpu_and_cpu_writes_are_equal_collect( + write_func=write_to_delta, + read_func=read_data, + base_path=data_path, + conf=_conf) + def read_metadata(spark, table_path): input_table = DeltaTable.forPath(spark, table_path) table_history = input_table.history() return table_history.select( "version", "operation", - expr("operationMetrics[\"numFiles\"]"), - expr("operationMetrics[\"numRemovedFiles\"]"), - expr("operationMetrics[\"numAddedFiles\"]") + expr("operationMetrics[\"numFiles\"]").alias("numFiles"), + expr("operationMetrics[\"numRemovedFiles\"]").alias("numRemoved"), + expr("operationMetrics[\"numAddedFiles\"]").alias("numAdded") ) assert_gpu_and_cpu_writes_are_equal_collect( - write_func=write_to_delta, - read_func=read_data, - base_path=data_path, - conf=_conf) - - assert_gpu_and_cpu_writes_are_equal_collect( - write_func=write_to_delta, + write_func=lambda spark, table_path: None, # Already written. read_func=read_metadata, base_path=data_path, conf=_conf) @@ -112,13 +112,13 @@ def read_metadata(spark, table_path): return table_history.select( "version", "operation", - expr("operationMetrics[\"numFiles\"] > 0"), - expr("operationMetrics[\"numRemovedFiles\"] > 0"), - expr("operationMetrics[\"numAddedFiles\"] > 0") + expr("operationMetrics[\"numFiles\"] > 0").alias("numFiles_gt_0"), + expr("operationMetrics[\"numRemovedFiles\"] > 0").alias("numRemoved_gt_0"), + expr("operationMetrics[\"numAddedFiles\"] > 0").alias("numAdded_gt_0") ) assert_gpu_and_cpu_writes_are_equal_collect( - write_func=write_to_delta, + write_func=lambda spark, table_path: None, # Already written. read_func=read_metadata, base_path=data_path, conf=_conf) From a2b2a2c64649f7d86050e1491bb9d8fb74764278 Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 15 Mar 2023 10:24:54 -0700 Subject: [PATCH 07/17] Review comment: Use System.currentTimeMillis. --- .../sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala | 4 ++-- .../sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala index dc67165b29e..57d0d8d2487 100644 --- a/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala +++ b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric -import org.apache.spark.util.{SystemClock, ThreadUtils} +import org.apache.spark.util.ThreadUtils class GpuOptimizeExecutor( sparkSession: SparkSession, @@ -52,7 +52,7 @@ class GpuOptimizeExecutor( extends DeltaCommand with SQLMetricsReporting with Serializable { /** Timestamp to use in [[FileAction]] */ - private val operationTimestamp = new SystemClock().getTimeMillis() + private val operationTimestamp = System.currentTimeMillis private val isMultiDimClustering = zOrderByColumns.nonEmpty private val isAutoCompact = prevCommitActions.nonEmpty diff --git a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala index d1075837f6a..41d3878d425 100644 --- a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala +++ b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric -import org.apache.spark.util.{SystemClock, ThreadUtils} +import org.apache.spark.util.ThreadUtils class GpuOptimizeExecutor( sparkSession: SparkSession, @@ -53,7 +53,7 @@ class GpuOptimizeExecutor( extends DeltaCommand with SQLMetricsReporting with Serializable { /** Timestamp to use in [[FileAction]] */ - private val operationTimestamp = new SystemClock().getTimeMillis() + private val operationTimestamp = System.currentTimeMillis private val isMultiDimClustering = zOrderByColumns.nonEmpty private val isAutoCompact = prevCommitActions.nonEmpty From 5979720d53153ede29b81d90d7982854a2a1b6f8 Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 15 Mar 2023 14:40:37 -0700 Subject: [PATCH 08/17] Test for disabled auto-compact. Also, minor test refactor. --- .../python/delta_lake_auto_compact_test.py | 75 +++++++++++++------ 1 file changed, 53 insertions(+), 22 deletions(-) diff --git a/integration_tests/src/main/python/delta_lake_auto_compact_test.py b/integration_tests/src/main/python/delta_lake_auto_compact_test.py index a818285f435..c5787642f94 100644 --- a/integration_tests/src/main/python/delta_lake_auto_compact_test.py +++ b/integration_tests/src/main/python/delta_lake_auto_compact_test.py @@ -13,7 +13,7 @@ # limitations under the License. import pytest -from asserts import assert_gpu_and_cpu_writes_are_equal_collect +from asserts import assert_gpu_and_cpu_writes_are_equal_collect, with_cpu_session, with_gpu_session from delta.tables import DeltaTable from delta_lake_write_test import delta_meta_allow from marks import allow_non_gpu, delta_lake @@ -22,7 +22,23 @@ _conf = {'spark.rapids.sql.explain': 'ALL', 'spark.databricks.delta.autoCompact.enabled': 'true', # Enable auto compaction. - 'spark.databricks.delta.autoCompact.minNumFiles': 3} # Num files before compaction. + 'spark.databricks.delta.autoCompact.minNumFiles': 3} # Num files before compaction. + + +def write_to_delta(num_rows=30, is_partitioned=False, num_writes=3): + """ + Returns bound function that writes to a delta table. + """ + + def write(spark, table_path): + input_data = spark.range(num_rows) + input_data = input_data.withColumn("part", expr("id % 3")) if is_partitioned \ + else input_data.repartition(1) + writer = input_data.write.format("delta").mode("append") + for _ in range(num_writes): + writer.save(table_path) + + return write @delta_lake @@ -31,22 +47,13 @@ reason="Auto compaction of Delta Lake tables is only supported " "on Databricks 10.4+") def test_auto_compact(spark_tmp_path): - data_path = spark_tmp_path + "/AUTO_COMPACT_TEST_DATA" - # Write to Delta table. Ensure reads with CPU/GPU produce the same results. - def write_to_delta(spark, table_path): - input_data = spark.range(3).repartition(1) - writer = input_data.write.format("delta").mode("append") - writer.save(table_path) # <-- Wait for it. - writer.save(table_path) # <-- Wait for it. - writer.save(table_path) # <-- Auto compact on 3. - def read_data(spark, table_path): return spark.read.format("delta").load(table_path) assert_gpu_and_cpu_writes_are_equal_collect( - write_func=write_to_delta, + write_func=write_to_delta(is_partitioned=False), read_func=read_data, base_path=data_path, conf=_conf) @@ -75,7 +82,6 @@ def read_metadata(spark, table_path): reason="Auto compaction of Delta Lake tables is only supported " "on Databricks 10.4+") def test_auto_compact_partitioned(spark_tmp_path): - """ This test checks whether the results of auto compaction on a partitioned table match, when written via CPU and GPU. @@ -85,19 +91,11 @@ def test_auto_compact_partitioned(spark_tmp_path): """ data_path = spark_tmp_path + "/AUTO_COMPACT_TEST_DATA_PARTITIONED" - # Write to Delta table. Ensure reads with CPU/GPU produce the same results. - def write_to_delta(spark, table_path): - input_data = spark.range(3).withColumn("part", expr("id % 3")) - writer = input_data.write.partitionBy("part").format("delta").mode("append") - writer.save(table_path) # <-- Wait for it. - writer.save(table_path) # <-- Wait for it. - writer.save(table_path) # <-- Auto compact on 3. - def read_data(spark, table_path): return spark.read.format("delta").load(table_path).orderBy("id", "part") assert_gpu_and_cpu_writes_are_equal_collect( - write_func=write_to_delta, + write_func=write_to_delta(is_partitioned=True), read_func=read_data, base_path=data_path, conf=_conf) @@ -124,3 +122,36 @@ def read_metadata(spark, table_path): conf=_conf) +@delta_lake +@allow_non_gpu(*delta_meta_allow) +@pytest.mark.skipif(not is_databricks104_or_later(), + reason="Auto compaction of Delta Lake tables is only supported " + "on Databricks 10.4+") +def test_auto_compact_disabled(spark_tmp_path): + """ + This test verifies that auto-compaction does not run if disabled. + """ + data_path = spark_tmp_path + "/AUTO_COMPACT_TEST_CHECK_DISABLED" + + disable_auto_compaction = { + 'spark.databricks.delta.autoCompact.enabled': 'false' # Enable auto compaction. + } + + writer = write_to_delta(num_writes=10) + with_gpu_session(func=lambda spark: writer(spark, data_path), + conf=disable_auto_compaction) + + # 10 writes should correspond to 10 commits. + # (i.e. there should be no OPTIMIZE commits.) + def verify_table_history(spark): + input_table = DeltaTable.forPath(spark, data_path) + table_history = input_table.history() + assert table_history.select("version", "operation").count() == 10, \ + "Expected 10 versions, 1 for each WRITE." + assert table_history.select("version")\ + .where("operation = 'OPTIMIZE'")\ + .count() == 0,\ + "Expected 0 OPTIMIZE operations." + + with_cpu_session(verify_table_history, {}) + From c5c11a685f7a5050d880781241b97366108507b5 Mon Sep 17 00:00:00 2001 From: MithunR Date: Wed, 15 Mar 2023 15:10:31 -0700 Subject: [PATCH 09/17] Added test for min-num-files. --- .../python/delta_lake_auto_compact_test.py | 49 ++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/delta_lake_auto_compact_test.py b/integration_tests/src/main/python/delta_lake_auto_compact_test.py index c5787642f94..9f51a60c314 100644 --- a/integration_tests/src/main/python/delta_lake_auto_compact_test.py +++ b/integration_tests/src/main/python/delta_lake_auto_compact_test.py @@ -134,7 +134,7 @@ def test_auto_compact_disabled(spark_tmp_path): data_path = spark_tmp_path + "/AUTO_COMPACT_TEST_CHECK_DISABLED" disable_auto_compaction = { - 'spark.databricks.delta.autoCompact.enabled': 'false' # Enable auto compaction. + 'spark.databricks.delta.autoCompact.enabled': 'false' # Disable auto compaction. } writer = write_to_delta(num_writes=10) @@ -155,3 +155,50 @@ def verify_table_history(spark): with_cpu_session(verify_table_history, {}) + +@delta_lake +@allow_non_gpu(*delta_meta_allow) +@pytest.mark.skipif(not is_databricks104_or_later(), + reason="Auto compaction of Delta Lake tables is only supported " + "on Databricks 10.4+") +def test_auto_compact_min_num_files(spark_tmp_path): + """ + This test verifies that auto-compaction honours the minNumFiles setting. + """ + data_path = spark_tmp_path + "/AUTO_COMPACT_TEST_MIN_FILES" + enable_auto_compaction_on_5 = { + 'spark.databricks.delta.autoCompact.enabled': 'true', # Enable auto compaction. + 'spark.databricks.delta.autoCompact.minNumFiles': 5 # Num files before compaction. + } + + # Minimum number of input files == 5. + # If 4 files are written, there should be no OPTIMIZE. + writer = write_to_delta(num_writes=4) + with_gpu_session(func=lambda spark: writer(spark, data_path), + conf=enable_auto_compaction_on_5) + + def verify_table_history_before_limit(spark): + input_table = DeltaTable.forPath(spark, data_path) + table_history = input_table.history() + assert table_history.select("version", "operation").count() == 4, \ + "Expected 4 versions, 1 for each WRITE." + assert table_history.select("version") \ + .where("operation = 'OPTIMIZE'") \ + .count() == 0, \ + "Expected 0 OPTIMIZE operations." + with_cpu_session(verify_table_history_before_limit, {}) + + # On the 5th file write, auto-OPTIMIZE should kick in. + with_gpu_session(func=lambda spark: write_to_delta(num_writes=1)(spark, data_path), + conf=enable_auto_compaction_on_5) + + def verify_table_history_after_limit(spark): + input_table = DeltaTable.forPath(spark, data_path) + table_history = input_table.history() + assert table_history.select("version", "operation").count() == 6, \ + "Expected 6 versions, i.e. 5 WRITEs + 1 OPTIMIZE." + assert table_history.select("version") \ + .where("operation = 'OPTIMIZE'") \ + .count() == 1, \ + "Expected 1 OPTIMIZE operations." + with_cpu_session(verify_table_history_after_limit, {}) From 36d32efb84dbea626cd9b60115bb3251937b750d Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 16 Mar 2023 12:24:59 -0700 Subject: [PATCH 10/17] Fix skipping tests on non-Databricks systems. --- .../src/main/python/delta_lake_auto_compact_test.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/delta_lake_auto_compact_test.py b/integration_tests/src/main/python/delta_lake_auto_compact_test.py index 9f51a60c314..cbadc1fac7e 100644 --- a/integration_tests/src/main/python/delta_lake_auto_compact_test.py +++ b/integration_tests/src/main/python/delta_lake_auto_compact_test.py @@ -14,7 +14,6 @@ import pytest from asserts import assert_gpu_and_cpu_writes_are_equal_collect, with_cpu_session, with_gpu_session -from delta.tables import DeltaTable from delta_lake_write_test import delta_meta_allow from marks import allow_non_gpu, delta_lake from pyspark.sql.functions import * @@ -47,6 +46,13 @@ def write(spark, table_path): reason="Auto compaction of Delta Lake tables is only supported " "on Databricks 10.4+") def test_auto_compact(spark_tmp_path): + """ + This test checks whether the results of auto compactions on an un-partitioned table + match, when written via CPU and GPU. + It also checks that the snapshot metrics (number of files added/removed, etc.) + match. + """ + from delta.tables import DeltaTable data_path = spark_tmp_path + "/AUTO_COMPACT_TEST_DATA" def read_data(spark, table_path): @@ -89,6 +95,7 @@ def test_auto_compact_partitioned(spark_tmp_path): the plugin enforces `minFiles` restriction uniformly across all partitions. Databricks' Delta implementation appears not to. """ + from delta.tables import DeltaTable data_path = spark_tmp_path + "/AUTO_COMPACT_TEST_DATA_PARTITIONED" def read_data(spark, table_path): @@ -131,6 +138,7 @@ def test_auto_compact_disabled(spark_tmp_path): """ This test verifies that auto-compaction does not run if disabled. """ + from delta.tables import DeltaTable data_path = spark_tmp_path + "/AUTO_COMPACT_TEST_CHECK_DISABLED" disable_auto_compaction = { @@ -165,6 +173,7 @@ def test_auto_compact_min_num_files(spark_tmp_path): """ This test verifies that auto-compaction honours the minNumFiles setting. """ + from delta.tables import DeltaTable data_path = spark_tmp_path + "/AUTO_COMPACT_TEST_MIN_FILES" enable_auto_compaction_on_5 = { 'spark.databricks.delta.autoCompact.enabled': 'true', # Enable auto compaction. From 4424dd5ad8f168af47d45318b842c465862ddc4b Mon Sep 17 00:00:00 2001 From: MithunR Date: Thu, 16 Mar 2023 21:54:07 -0700 Subject: [PATCH 11/17] Disable GPU-fallback test for autoCompact. This test is not valid, now that auto-compaction works. --- integration_tests/src/main/python/delta_lake_write_test.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/integration_tests/src/main/python/delta_lake_write_test.py b/integration_tests/src/main/python/delta_lake_write_test.py index ab0817f92fc..5966d06bf52 100644 --- a/integration_tests/src/main/python/delta_lake_write_test.py +++ b/integration_tests/src/main/python/delta_lake_write_test.py @@ -663,8 +663,7 @@ def test_delta_write_auto_optimize_write_opts_fallback(confkey, spark_tmp_path): pytest.param("delta.autoOptimize", marks=pytest.mark.skipif( is_databricks_runtime(), reason="Optimize write is supported on Databricks")), pytest.param("delta.autoOptimize.optimizeWrite", marks=pytest.mark.skipif( - is_databricks_runtime(), reason="Optimize write is supported on Databricks")), - "delta.autoOptimize.autoCompact" ], ids=idfn) + is_databricks_runtime(), reason="Optimize write is supported on Databricks"))], ids=idfn) @pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") @pytest.mark.skipif(not is_databricks_runtime(), reason="Auto optimize only supported on Databricks") def test_delta_write_auto_optimize_table_props_fallback(confkey, spark_tmp_path): @@ -687,8 +686,7 @@ def setup_tables(spark): pytest.param("spark.databricks.delta.optimizeWrite.enabled", marks=pytest.mark.skipif( is_databricks_runtime(), reason="Optimize write is supported on Databricks")), pytest.param("spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite", marks=pytest.mark.skipif( - is_databricks_runtime(), reason="Optimize write is supported on Databricks")), - "spark.databricks.delta.properties.defaults.autoOptimize.autoCompact" ], ids=idfn) + is_databricks_runtime(), reason="Optimize write is supported on Databricks"))], ids=idfn) @pytest.mark.skipif(is_before_spark_320(), reason="Delta Lake writes are not supported before Spark 3.2.x") def test_delta_write_auto_optimize_sql_conf_fallback(confkey, spark_tmp_path): data_path = spark_tmp_path + "/DELTA_DATA" From 147c2ff8876a8c08dbca4ccfb9eeb976a815bab3 Mon Sep 17 00:00:00 2001 From: Mithun RK Date: Wed, 22 Mar 2023 15:33:44 -0700 Subject: [PATCH 12/17] Review updates: 1. Attempted using databricks-specific post-commit hooks for auto compaction. Failed. 2. Changed support class names to use "Gpu" prefix. 3. Added/fixed code comments. --- .../tahoe/rapids/GpuDoAutoCompaction.scala | 5 +++- .../rapids/GpuOptimisticTransaction.scala | 2 +- .../tahoe/rapids/GpuOptimizeExecutor.scala | 24 +++++++++---------- .../tahoe/rapids/GpuDoAutoCompaction.scala | 5 +++- .../rapids/GpuOptimisticTransaction.scala | 4 ++-- .../tahoe/rapids/GpuOptimizeExecutor.scala | 22 ++++++++--------- 6 files changed, 34 insertions(+), 28 deletions(-) diff --git a/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala index e137e15c6f5..31d390043dd 100644 --- a/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala +++ b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala @@ -32,13 +32,16 @@ import org.apache.spark.sql.SparkSession object GpuDoAutoCompaction extends PostCommitHook with DeltaLogging with Serializable { - override val name: String = "GpuDoAutoCompaction" + override val name: String = "Triggers compaction if necessary" override def run(spark: SparkSession, txn: OptimisticTransactionImpl, committedActions: Seq[Action]): Unit = { val gpuTxn = txn.asInstanceOf[GpuOptimisticTransaction] val newTxn = new GpuDeltaLog(gpuTxn.deltaLog, gpuTxn.rapidsConf).startTransaction() + // Note: The Databricks AutoCompact PostCommitHook cannot be used here + // (with a GpuOptimisticTransaction). It does not appear to use OptimisticTransaction.writeFiles + // to write the compacted file. new GpuOptimizeExecutor(spark, newTxn, Seq.empty, Seq.empty, committedActions).optimize() } diff --git a/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala index 0e37ce7c124..4b4b1616527 100644 --- a/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala +++ b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala @@ -262,7 +262,7 @@ class GpuOptimisticTransaction( .getConf[String](DeltaSQLConf.DELTA_AUTO_COMPACT_ENABLED) .getOrElse { DeltaConfigs.AUTO_COMPACT.fromMetaData(metadata) - "false" // TODO: Fix getting this from DeltaConfigs.AUTO_COMPACT. + .getOrElse("false") }.toBoolean if (!isOptimize && autoCompactEnabled && fileActions.nonEmpty) { diff --git a/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala index 57d0d8d2487..590487d4c60 100644 --- a/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala +++ b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala @@ -56,7 +56,7 @@ class GpuOptimizeExecutor( private val isMultiDimClustering = zOrderByColumns.nonEmpty private val isAutoCompact = prevCommitActions.nonEmpty - private val optimizeType = OptimizeType(isMultiDimClustering, isAutoCompact) + private val optimizeType = GpuOptimizeType(isMultiDimClustering, isAutoCompact) def optimize(): Seq[Row] = { recordDeltaOperation(txn.deltaLog, "delta.optimize") { @@ -206,7 +206,7 @@ class GpuOptimizeExecutor( // zOrderByColumns) throw new UnsupportedOperationException("MultiDimClustering not supported on compaction") } else { - // Re-partition is not available in Databricks 11.3 (spark321db) + // Re-partition is not available in Databricks 10.4 (spark321db) input.coalesce(numPartitions = 1) } @@ -231,9 +231,9 @@ class GpuOptimizeExecutor( updates } - type PartitionedBin = (Map[String, String], Seq[AddFile]) + private type PartitionedBin = (Map[String, String], Seq[AddFile]) - trait OptimizeType { + private trait GpuOptimizeType { def minNumFiles: Long def maxFileSize: Long = @@ -244,7 +244,7 @@ class GpuOptimizeExecutor( def targetBins(jobs: Seq[PartitionedBin]): Seq[PartitionedBin] = jobs } - case class Compaction() extends OptimizeType { + private case class GpuCompaction() extends GpuOptimizeType { def minNumFiles: Long = 2 def targetFiles: (Seq[AddFile], Seq[AddFile]) = { @@ -257,7 +257,7 @@ class GpuOptimizeExecutor( } } - case class MultiDimOrdering() extends OptimizeType { + private case class GpuMultiDimOrdering() extends GpuOptimizeType { def minNumFiles: Long = 1 def targetFiles: (Seq[AddFile], Seq[AddFile]) = { @@ -267,7 +267,7 @@ class GpuOptimizeExecutor( } } - case class AutoCompaction() extends OptimizeType { + private case class GpuAutoCompaction() extends GpuOptimizeType { def minNumFiles: Long = { val minNumFiles = sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES) @@ -315,15 +315,15 @@ class GpuOptimizeExecutor( } } - object OptimizeType { + private object GpuOptimizeType { - def apply(isMultiDimClustering: Boolean, isAutoCompact: Boolean): OptimizeType = { + def apply(isMultiDimClustering: Boolean, isAutoCompact: Boolean): GpuOptimizeType = { if (isMultiDimClustering) { - MultiDimOrdering() + GpuMultiDimOrdering() } else if (isAutoCompact) { - AutoCompaction() + GpuAutoCompaction() } else { - Compaction() + GpuCompaction() } } } diff --git a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala index 14fdaa8b6c0..5728348591e 100644 --- a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala +++ b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.SparkSession object GpuDoAutoCompaction extends PostCommitHook with DeltaLogging with Serializable { - override val name: String = "GpuDoAutoCompaction" + override val name: String = "Triggers compaction if necessary" override def run(spark: SparkSession, txn: OptimisticTransactionImpl, @@ -41,6 +41,9 @@ object GpuDoAutoCompaction extends PostCommitHook committedActions: Seq[Action]): Unit = { val gpuTxn = txn.asInstanceOf[GpuOptimisticTransaction] val newTxn = new GpuDeltaLog(gpuTxn.deltaLog, gpuTxn.rapidsConf).startTransaction() + // Note: The Databricks AutoCompact PostCommitHook cannot be used here + // (with a GpuOptimisticTransaction). It does not appear to use OptimisticTransaction.writeFiles + // to write the compacted file. new GpuOptimizeExecutor(spark, newTxn, Seq.empty, Seq.empty, committedActions).optimize() } diff --git a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala index d3b861230d3..0e8c7f74fde 100644 --- a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala +++ b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimisticTransaction.scala @@ -288,8 +288,8 @@ class GpuOptimisticTransaction( spark.sessionState.conf .getConf[String](DeltaSQLConf.DELTA_AUTO_COMPACT_ENABLED) .getOrElse { - // DeltaConfigs.AUTO_COMPACT.fromMetaData(metadata) - "false" // TODO: Fix getting this from DeltaConfigs.AUTO_COMPACT. + DeltaConfigs.AUTO_COMPACT.fromMetaData(metadata) + .getOrElse("false") }.toBoolean if (!isOptimize && autoCompactEnabled && fileActions.nonEmpty) { diff --git a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala index 41d3878d425..cfa1468b7c9 100644 --- a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala +++ b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuOptimizeExecutor.scala @@ -57,7 +57,7 @@ class GpuOptimizeExecutor( private val isMultiDimClustering = zOrderByColumns.nonEmpty private val isAutoCompact = prevCommitActions.nonEmpty - private val optimizeType = OptimizeType(isMultiDimClustering, isAutoCompact) + private val optimizeType = GpuOptimizeType(isMultiDimClustering, isAutoCompact) def optimize(): Seq[Row] = { recordDeltaOperation(txn.deltaLog, "delta.optimize") { @@ -235,9 +235,9 @@ class GpuOptimizeExecutor( updates } - type PartitionedBin = (Map[String, String], Seq[AddFile]) + private type PartitionedBin = (Map[String, String], Seq[AddFile]) - trait OptimizeType { + private trait GpuOptimizeType { def minNumFiles: Long def maxFileSize: Long = @@ -248,7 +248,7 @@ class GpuOptimizeExecutor( def targetBins(jobs: Seq[PartitionedBin]): Seq[PartitionedBin] = jobs } - case class Compaction() extends OptimizeType { + private case class GpuCompaction() extends GpuOptimizeType { def minNumFiles: Long = 2 def targetFiles: (Seq[AddFile], Seq[AddFile]) = { @@ -261,7 +261,7 @@ class GpuOptimizeExecutor( } } - case class MultiDimOrdering() extends OptimizeType { + private case class GpuMultiDimOrdering() extends GpuOptimizeType { def minNumFiles: Long = 1 def targetFiles: (Seq[AddFile], Seq[AddFile]) = { @@ -271,7 +271,7 @@ class GpuOptimizeExecutor( } } - case class AutoCompaction() extends OptimizeType { + private case class GpuAutoCompaction() extends GpuOptimizeType { def minNumFiles: Long = { val minNumFiles = sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES) @@ -319,15 +319,15 @@ class GpuOptimizeExecutor( } } - object OptimizeType { + private object GpuOptimizeType { - def apply(isMultiDimClustering: Boolean, isAutoCompact: Boolean): OptimizeType = { + def apply(isMultiDimClustering: Boolean, isAutoCompact: Boolean): GpuOptimizeType = { if (isMultiDimClustering) { - MultiDimOrdering() + GpuMultiDimOrdering() } else if (isAutoCompact) { - AutoCompaction() + GpuAutoCompaction() } else { - Compaction() + GpuCompaction() } } } From bc2a68de715774cee83538bea94b971f3866d47d Mon Sep 17 00:00:00 2001 From: Mithun RK Date: Thu, 23 Mar 2023 14:04:04 -0700 Subject: [PATCH 13/17] Added test for auto-compact default. --- .../python/delta_lake_auto_compact_test.py | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/integration_tests/src/main/python/delta_lake_auto_compact_test.py b/integration_tests/src/main/python/delta_lake_auto_compact_test.py index cbadc1fac7e..937da312589 100644 --- a/integration_tests/src/main/python/delta_lake_auto_compact_test.py +++ b/integration_tests/src/main/python/delta_lake_auto_compact_test.py @@ -14,13 +14,13 @@ import pytest from asserts import assert_gpu_and_cpu_writes_are_equal_collect, with_cpu_session, with_gpu_session +from data_gen import copy_and_update from delta_lake_write_test import delta_meta_allow from marks import allow_non_gpu, delta_lake from pyspark.sql.functions import * from spark_session import is_databricks104_or_later _conf = {'spark.rapids.sql.explain': 'ALL', - 'spark.databricks.delta.autoCompact.enabled': 'true', # Enable auto compaction. 'spark.databricks.delta.autoCompact.minNumFiles': 3} # Num files before compaction. @@ -45,7 +45,10 @@ def write(spark, table_path): @pytest.mark.skipif(not is_databricks104_or_later(), reason="Auto compaction of Delta Lake tables is only supported " "on Databricks 10.4+") -def test_auto_compact(spark_tmp_path): +@pytest.mark.parametrize("auto_compact_conf", + ["spark.databricks.delta.autoCompact.enabled", + "spark.databricks.delta.properties.defaults.autoOptimize.autoCompact"]) +def test_auto_compact_basic(spark_tmp_path, auto_compact_conf): """ This test checks whether the results of auto compactions on an un-partitioned table match, when written via CPU and GPU. @@ -75,11 +78,13 @@ def read_metadata(spark, table_path): expr("operationMetrics[\"numAddedFiles\"]").alias("numAdded") ) + conf_enable_auto_compact = copy_and_update(_conf, {auto_compact_conf: "true"}) + assert_gpu_and_cpu_writes_are_equal_collect( write_func=lambda spark, table_path: None, # Already written. read_func=read_metadata, base_path=data_path, - conf=_conf) + conf=conf_enable_auto_compact) @delta_lake @@ -87,7 +92,10 @@ def read_metadata(spark, table_path): @pytest.mark.skipif(not is_databricks104_or_later(), reason="Auto compaction of Delta Lake tables is only supported " "on Databricks 10.4+") -def test_auto_compact_partitioned(spark_tmp_path): +@pytest.mark.parametrize("auto_compact_conf", + ["spark.databricks.delta.autoCompact.enabled", + "spark.databricks.delta.properties.defaults.autoOptimize.autoCompact"]) +def test_auto_compact_partitioned(spark_tmp_path, auto_compact_conf): """ This test checks whether the results of auto compaction on a partitioned table match, when written via CPU and GPU. @@ -122,11 +130,13 @@ def read_metadata(spark, table_path): expr("operationMetrics[\"numAddedFiles\"] > 0").alias("numAdded_gt_0") ) + conf_enable_auto_compact = copy_and_update(_conf, {auto_compact_conf: "true"}) + assert_gpu_and_cpu_writes_are_equal_collect( write_func=lambda spark, table_path: None, # Already written. read_func=read_metadata, base_path=data_path, - conf=_conf) + conf=conf_enable_auto_compact) @delta_lake @@ -134,16 +144,17 @@ def read_metadata(spark, table_path): @pytest.mark.skipif(not is_databricks104_or_later(), reason="Auto compaction of Delta Lake tables is only supported " "on Databricks 10.4+") -def test_auto_compact_disabled(spark_tmp_path): +@pytest.mark.parametrize("auto_compact_conf", + ["spark.databricks.delta.autoCompact.enabled", + "spark.databricks.delta.properties.defaults.autoOptimize.autoCompact"]) +def test_auto_compact_disabled(spark_tmp_path, auto_compact_conf): """ This test verifies that auto-compaction does not run if disabled. """ from delta.tables import DeltaTable data_path = spark_tmp_path + "/AUTO_COMPACT_TEST_CHECK_DISABLED" - disable_auto_compaction = { - 'spark.databricks.delta.autoCompact.enabled': 'false' # Disable auto compaction. - } + disable_auto_compaction = copy_and_update(_conf, {auto_compact_conf: 'false'}) writer = write_to_delta(num_writes=10) with_gpu_session(func=lambda spark: writer(spark, data_path), From bf2e848f1adb6d1a092268c5a488116da300ca44 Mon Sep 17 00:00:00 2001 From: Mithun RK Date: Thu, 23 Mar 2023 14:11:23 -0700 Subject: [PATCH 14/17] Added clarifying code comment. --- .../sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala | 5 +++-- .../sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala index 31d390043dd..84a86807c15 100644 --- a/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala +++ b/delta-lake/delta-spark321db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala @@ -40,8 +40,9 @@ object GpuDoAutoCompaction extends PostCommitHook val gpuTxn = txn.asInstanceOf[GpuOptimisticTransaction] val newTxn = new GpuDeltaLog(gpuTxn.deltaLog, gpuTxn.rapidsConf).startTransaction() // Note: The Databricks AutoCompact PostCommitHook cannot be used here - // (with a GpuOptimisticTransaction). It does not appear to use OptimisticTransaction.writeFiles - // to write the compacted file. + // (with a GpuOptimisticTransaction). It appears that AutoCompact creates a new transaction, + // thereby circumventing GpuOptimisticTransaction (which intercepts Parquet writes + // to go through the GPU). new GpuOptimizeExecutor(spark, newTxn, Seq.empty, Seq.empty, committedActions).optimize() } diff --git a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala index 5728348591e..9726511ad44 100644 --- a/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala +++ b/delta-lake/delta-spark330db/src/main/scala/com/databricks/sql/transaction/tahoe/rapids/GpuDoAutoCompaction.scala @@ -42,8 +42,9 @@ object GpuDoAutoCompaction extends PostCommitHook val gpuTxn = txn.asInstanceOf[GpuOptimisticTransaction] val newTxn = new GpuDeltaLog(gpuTxn.deltaLog, gpuTxn.rapidsConf).startTransaction() // Note: The Databricks AutoCompact PostCommitHook cannot be used here - // (with a GpuOptimisticTransaction). It does not appear to use OptimisticTransaction.writeFiles - // to write the compacted file. + // (with a GpuOptimisticTransaction). It appears that AutoCompact creates a new transaction, + // thereby circumventing GpuOptimisticTransaction (which intercepts Parquet writes + // to go through the GPU). new GpuOptimizeExecutor(spark, newTxn, Seq.empty, Seq.empty, committedActions).optimize() } From 61e8e5904fe07e7d11aa41b929e882d223992903 Mon Sep 17 00:00:00 2001 From: Mithun RK Date: Thu, 23 Mar 2023 14:37:47 -0700 Subject: [PATCH 15/17] Updated delta-lake-support docs for autoCompact. --- docs/additional-functionality/delta-lake-support.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/additional-functionality/delta-lake-support.md b/docs/additional-functionality/delta-lake-support.md index 69620f4c3c3..cd7f631bd15 100644 --- a/docs/additional-functionality/delta-lake-support.md +++ b/docs/additional-functionality/delta-lake-support.md @@ -67,6 +67,16 @@ that control the operation of the optimized write. | spark.databricks.delta.optimizeWrite.smallPartitionFactor | 0.5 | Merge partitions smaller than this factor multiplied by the target partition size | | spark.databricks.delta.optimizeWrite.mergedPartitionFactor | 1.2 | Avoid combining partitions larger than this factor multiplied by the target partition size | +Automatic compaction is supported only on Databricks platforms. The algorithm is similar but +not identical to the Databricks version. The following table describes configuration settings +that control the operation of automatic compaction. + +| Configuration | Default | Description | +|---------------------------------------------------------------------|---------|--------------------------------------------------------------------------------------------------------| +| spark.databricks.delta.autoCompact.enabled | false | Enable/disable auto compaction for writes to Delta directories | +| spark.databricks.delta.properties.defaults.autoOptimize.autoCompact | false | Whether to enable auto compaction by default, if spark.databricks.delta.autoCompact.enabled is not set | +| spark.databricks.delta.autoCompact.minNumFiles | 50 | Minimum number of files in the Delta directory before which auto optimize does not begin compaction | + ### RapidsDeltaWrite Node in Query Plans A side-effect of performing a GPU accelerated Delta Lake write is a new node will appear in the From 35dc58c6eec1bc70df2235a285247b1cd3dd3f03 Mon Sep 17 00:00:00 2001 From: Mithun RK Date: Thu, 23 Mar 2023 15:40:05 -0700 Subject: [PATCH 16/17] Fixed foul-up in auto-compaction documentation. --- docs/additional-functionality/delta-lake-support.md | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/docs/additional-functionality/delta-lake-support.md b/docs/additional-functionality/delta-lake-support.md index cd7f631bd15..0ea2483f518 100644 --- a/docs/additional-functionality/delta-lake-support.md +++ b/docs/additional-functionality/delta-lake-support.md @@ -54,8 +54,13 @@ GPU accelerated. These operations will fallback to the CPU. Delta Lake on Databricks has [automatic optimization](https://docs.databricks.com/optimizations/auto-optimize.html) -features for optimized writes and automatic compaction. Automatic compaction is not supported, -and writes configured for automatic compaction will fallback to the CPU. +features for optimized writes and automatic compaction. Automatic compaction is supported +when writing to directories via Scala/Python APIs, as follows: +```scala +my_dataframe.write.format("delta").mode("append").save("/path/to/delta-dir") +``` +Automatic compaction is not supported to Delta tables via SQL. Auto compaction for such tables +will automatically fall back to the CPU. Optimized writes are supported only on Databricks platforms. The algorithm used is similar but not identical to the Databricks version. The following table describes configuration settings From 74e6afea23145f06248b21d787bcbb6b2a185308 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 24 Mar 2023 09:56:49 -0700 Subject: [PATCH 17/17] Removed confusing distinction of SQL writes vs Scala/Python. --- docs/additional-functionality/delta-lake-support.md | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/docs/additional-functionality/delta-lake-support.md b/docs/additional-functionality/delta-lake-support.md index cf004192883..5ea871c9f72 100644 --- a/docs/additional-functionality/delta-lake-support.md +++ b/docs/additional-functionality/delta-lake-support.md @@ -54,13 +54,7 @@ GPU accelerated. These operations will fallback to the CPU. Delta Lake on Databricks has [automatic optimization](https://docs.databricks.com/optimizations/auto-optimize.html) -features for optimized writes and automatic compaction. Automatic compaction is supported -when writing to directories via Scala/Python APIs, as follows: -```scala -my_dataframe.write.format("delta").mode("append").save("/path/to/delta-dir") -``` -Automatic compaction is not supported to Delta tables via SQL. Auto compaction for such tables -will automatically fall back to the CPU. +features for optimized writes and automatic compaction. Optimized writes are supported only on Databricks platforms. The algorithm used is similar but not identical to the Databricks version. The following table describes configuration settings