diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala index a70c0d4c2da..c0eea4bc1a3 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala @@ -393,6 +393,16 @@ trait DeltaConfigsBase extends DeltaLogging { _ => true, "needs to be a boolean.") + /** + * Enable auto compaction. + */ + val AUTO_COMPACT = buildConfig[Boolean]( + "autoOptimize.autoCompact", + "false", + _.toBoolean, + _ => true, + "needs to be a boolean.") + /** * The number of columns to collect stats on for data skipping. A value of -1 means collecting * stats for all columns. Updating this conf does not trigger stats re-collection, but redefines diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala index 457a7b8e0a8..d5725cf37c9 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala @@ -411,8 +411,9 @@ object DeltaOperations { /** Recorded when optimizing the table. */ case class Optimize( predicate: Seq[String], - zOrderBy: Seq[String] = Seq.empty - ) extends OptimizeOrReorg(OPTIMIZE_OPERATION_NAME) { + zOrderBy: Seq[String] = Seq.empty, + auto: Boolean) + extends OptimizeOrReorg(OPTIMIZE_OPERATION_NAME) { override val parameters: Map[String, Any] = Map( "predicate" -> JsonUtils.toJson(predicate), "zOrderBy" -> JsonUtils.toJson(zOrderBy) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index d0ad6f2ee42..081dac9ef5c 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.delta.DeltaOperations.Operation import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.files._ -import org.apache.spark.sql.delta.hooks.{CheckpointHook, GenerateSymlinkManifest, PostCommitHook} +import org.apache.spark.sql.delta.hooks.{CheckpointHook, DoAutoCompaction, GenerateSymlinkManifest, PostCommitHook} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -746,6 +746,16 @@ trait OptimisticTransactionImpl extends TransactionalWrite registerPostCommitHook(GenerateSymlinkManifest) } + // For autoCompact, session config is prior to table property. + lazy val autoCompactEnabled = + spark.sessionState.conf + .getConf(DeltaSQLConf.AUTO_COMPACT_ENABLED) + .getOrElse { + DeltaConfigs.AUTO_COMPACT.fromMetaData(metadata)} + if (!op.isInstanceOf[DeltaOperations.Optimize] && autoCompactEnabled && hasFileActions) { + registerPostCommitHook(DoAutoCompaction) + } + commitAttemptStartTime = clock.getTimeMillis() val (commitVersion, postCommitSnapshot, updatedCurrentTransactionInfo) = doCommitRetryIteratively(snapshot.version + 1, currentTransactionInfo, isolationLevelToUse) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala index 3c3a1e8c6d1..8b830213232 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala @@ -118,7 +118,7 @@ case class OptimizeTableCommand( val deltaLog = getDeltaLog(sparkSession, path, tableId, "OPTIMIZE", options) val txn = deltaLog.startTransaction() - if (txn.readVersion == -1) { + if (!txn.deltaLog.tableExists) { throw DeltaErrors.notADeltaTableException(deltaLog.dataPath.toString) } @@ -138,7 +138,7 @@ case class OptimizeTableCommand( validateZorderByColumns(sparkSession, txn, zOrderBy) val zOrderByColumns = zOrderBy.map(_.name).toSeq - new OptimizeExecutor(sparkSession, txn, partitionPredicates, zOrderByColumns).optimize() + new OptimizeExecutor(sparkSession, txn, partitionPredicates, zOrderByColumns, Nil).optimize() } } @@ -154,31 +154,34 @@ class OptimizeExecutor( sparkSession: SparkSession, txn: OptimisticTransaction, partitionPredicate: Seq[Expression], - zOrderByColumns: Seq[String]) + zOrderByColumns: Seq[String], + prevCommitActions: Seq[Action]) extends DeltaCommand with SQLMetricsReporting with Serializable { /** Timestamp to use in [[FileAction]] */ private val operationTimestamp = new SystemClock().getTimeMillis() private val isMultiDimClustering = zOrderByColumns.nonEmpty + private val isAutoCompact = prevCommitActions.nonEmpty + private val optimizeType = OptimizeType(isMultiDimClustering, isAutoCompact) def optimize(): Seq[Row] = { recordDeltaOperation(txn.deltaLog, "delta.optimize") { - val minFileSize = sparkSession.sessionState.conf.getConf( - DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE) - val maxFileSize = sparkSession.sessionState.conf.getConf( - DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE) - require(minFileSize > 0, "minFileSize must be > 0") + val maxFileSize = optimizeType.maxFileSize require(maxFileSize > 0, "maxFileSize must be > 0") - val candidateFiles = txn.filterFiles(partitionPredicate) + val minNumFilesInDir = optimizeType.minNumFiles + val (candidateFiles, filesToProcess) = optimizeType.targetFiles + val partitionSchema = txn.metadata.partitionSchema - // select all files in case of multi-dimensional clustering - val filesToProcess = candidateFiles.filter(_.size < minFileSize || isMultiDimClustering) - val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq + val partitionsToCompact = filesToProcess + .groupBy(_.partitionValues) + .filter { case (_, filesInPartition) => filesInPartition.size >= minNumFilesInDir } + .toSeq - val jobs = groupFilesIntoBins(partitionsToCompact, maxFileSize) + val groupedJobs = groupFilesIntoBins(partitionsToCompact, maxFileSize) + val jobs = optimizeType.targetBins(groupedJobs) val maxThreads = sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_THREADS) @@ -188,8 +191,9 @@ class OptimizeExecutor( val addedFiles = updates.collect { case a: AddFile => a } val removedFiles = updates.collect { case r: RemoveFile => r } - if (addedFiles.size > 0) { - val operation = DeltaOperations.Optimize(partitionPredicate.map(_.sql), zOrderByColumns) + if (addedFiles.nonEmpty) { + val operation = + DeltaOperations.Optimize(partitionPredicate.map(_.sql), zOrderByColumns, isAutoCompact) val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles) commitAndRetry(txn, operation, updates, metrics) { newTxn => val newPartitionSchema = newTxn.metadata.partitionSchema @@ -337,6 +341,96 @@ class OptimizeExecutor( updates } + type PartitionedBin = (Map[String, String], Seq[AddFile]) + trait OptimizeType { + def minNumFiles: Long + def maxFileSize: Long = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE) + def targetFiles: (Seq[AddFile], Seq[AddFile]) + def targetBins(jobs: Seq[PartitionedBin]): Seq[PartitionedBin] = jobs + } + + case class Compaction() extends OptimizeType { + def minNumFiles: Long = 2 + def targetFiles: (Seq[AddFile], Seq[AddFile]) = { + val minFileSize = sparkSession.sessionState.conf.getConf( + DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE) + require(minFileSize > 0, "minFileSize must be > 0") + val candidateFiles = txn.filterFiles(partitionPredicate) + val filesToProcess = candidateFiles.filter(_.size < minFileSize) + (candidateFiles, filesToProcess) + } + } + + case class MultiDimOrdering() extends OptimizeType { + def minNumFiles: Long = 1 + def targetFiles: (Seq[AddFile], Seq[AddFile]) = { + // select all files in case of multi-dimensional clustering + val candidateFiles = txn.filterFiles(partitionPredicate) + (candidateFiles, candidateFiles) + } + } + + case class AutoCompaction() extends OptimizeType { + def minNumFiles: Long = { + val minNumFiles = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_MIN_NUM_FILES) + require(minNumFiles > 0, "minNumFiles must be > 0") + minNumFiles + } + + override def maxFileSize: Long = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_MAX_FILE_SIZE) + + override def targetFiles: (Seq[AddFile], Seq[AddFile]) = { + val autoCompactTarget = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_TARGET) + // Filter the candidate files according to autoCompact.target config. + lazy val addedFiles = prevCommitActions.collect { case a: AddFile => a } + val candidateFiles = autoCompactTarget match { + case "table" => + txn.filterFiles() + case "commit" => + addedFiles + case "partition" => + val eligiblePartitions = addedFiles.map(_.partitionValues).toSet + txn.filterFiles().filter(f => eligiblePartitions.contains(f.partitionValues)) + case _ => + logError(s"Invalid config for autoCompact.target: $autoCompactTarget. " + + s"Falling back to the default value 'table'.") + txn.filterFiles() + } + val filesToProcess = candidateFiles.filter(_.size < maxFileSize) + (candidateFiles, filesToProcess) + } + + override def targetBins(jobs: Seq[PartitionedBin]): Seq[PartitionedBin] = { + var acc = 0L + val maxCompactBytes = + sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_MAX_COMPACT_BYTES) + // bins with more files are prior to less files. + jobs + .sortBy { case (_, filesInBin) => -filesInBin.length } + .takeWhile { case (_, filesInBin) => + acc += filesInBin.map(_.size).sum + acc <= maxCompactBytes + } + } + } + + object OptimizeType { + + def apply(isMultiDimClustering: Boolean, isAutoCompact: Boolean): OptimizeType = { + if (isMultiDimClustering) { + MultiDimOrdering() + } else if (isAutoCompact) { + AutoCompaction() + } else { + Compaction() + } + } + } + /** * Attempts to commit the given actions to the log. In the case of a concurrent update, * the given function will be invoked with a new transaction to allow custom conflict diff --git a/core/src/main/scala/org/apache/spark/sql/delta/hooks/DoAutoCompaction.scala b/core/src/main/scala/org/apache/spark/sql/delta/hooks/DoAutoCompaction.scala new file mode 100644 index 00000000000..632a7e2fc37 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/hooks/DoAutoCompaction.scala @@ -0,0 +1,48 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.hooks + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.actions._ +import org.apache.spark.sql.delta.commands.OptimizeExecutor +import org.apache.spark.sql.delta.metering.DeltaLogging + +/** + * Post commit hook to trigger compaction. + */ +object DoAutoCompaction extends PostCommitHook + with DeltaLogging + with Serializable { + + override val name: String = "Triggers compaction if necessary" + + override def run( + spark: SparkSession, + txn: OptimisticTransactionImpl, + committedVersion: Long, + postCommitSnapshot: Snapshot, + committedActions: Seq[Action]): Unit = { + + val newTxn = txn.deltaLog.startTransaction() + new OptimizeExecutor(spark, newTxn, Seq.empty, Seq.empty, committedActions).optimize() + } + + override def handleError(error: Throwable, version: Long): Unit = { + throw DeltaErrors.postCommitHookFailedException(this, version, name, error) + } +} diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index b915b626b79..42f792dbc91 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -18,8 +18,10 @@ package org.apache.spark.sql.delta.sources // scalastyle:off import.ordering.noEmptyLine import java.util.concurrent.TimeUnit +import java.util.Locale import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.internal.SQLConf /** @@ -756,6 +758,52 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(false) + val AUTO_COMPACT_ENABLED = + buildConf("autoCompact.enabled") + .internal() + .doc("Enables auto compaction after table update.") + .booleanConf + .createOptional + + val AUTO_COMPACT_MAX_FILE_SIZE = + buildConf("autoCompact.maxFileSize") + .internal() + .doc("Maximum file size for auto compaction.") + .longConf + .createWithDefault(128 * 1024 * 1024) + + val AUTO_COMPACT_MIN_NUM_FILES = + buildConf("autoCompact.minNumFiles") + .internal() + .doc("Minimum number of files in a directory to trigger auto compaction.") + .longConf + .createWithDefault(50) + + val AUTO_COMPACT_MAX_COMPACT_BYTES = + buildConf("autoCompact.maxCompactBytes") + .internal() + .doc("Maximum amount of data for auto compaction.") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("50GB") + + val AUTO_COMPACT_TARGET = + buildConf("autoCompact.target") + .internal() + .doc( + """ + |Target files for auto compaction. + | "table", "commit", "partition" options are available. (default: table) + | If "table", all files in table are eligible for auto compaction. + | If "commit", added/updated files by the commit are eligible. + | If "partition", all files in partitions containing any added/updated files + | by the commit are eligible. + |""".stripMargin + ) + .stringConf + .transform(_.toLowerCase(Locale.ROOT)) + .createWithDefault("table") + + val DELTA_ALTER_TABLE_CHANGE_COLUMN_CHECK_EXPRESSIONS = buildConf("alterTable.changeColumn.checkExpressions") .internal() diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaAutoOptimizeSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaAutoOptimizeSuite.scala new file mode 100644 index 00000000000..f361d0c30b9 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaAutoOptimizeSuite.scala @@ -0,0 +1,283 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.functions._ + +class DeltaAutoOptimizeSuite extends QueryTest with SharedSparkSession with DeltaSQLCommandTest { + import testImplicits._ + + def writeDataToCheckAutoCompact( + numFiles: Int, + dataPath: String, + partitioned: Boolean = false, + mode: String = "overwrite"): Unit = { + val df = spark + .range(50000) + .withColumn("colA", rand() * 10000000 cast "long") + .withColumn("colB", rand() * 1000000000 cast "int") + .withColumn("colC", rand() * 2 cast "int") + .drop("id") + .repartition(numFiles) + if (partitioned) { + df.write + .partitionBy("colC") + .mode(mode) + .format("delta") + .save(dataPath) + } else { + df.write + .mode(mode) + .format("delta") + .save(dataPath) + } + } + + def checkTableVersionAndNumFiles( + path: String, + expectedVer: Long, + expectedNumFiles: Long): Unit = { + val dt = DeltaLog.forTable(spark, path) + assert(dt.snapshot.allFiles.count() == expectedNumFiles) + assert(dt.snapshot.version == expectedVer) + } + + test("test enabling autoCompact") { + val tableName = "autoCompactTestTable" + val tableName2 = s"${tableName}2" + withTable(tableName, tableName2) { + withTempDir { dir => + val rootPath = dir.getCanonicalPath + val path = new Path(rootPath, "table1").toString + var expectedTableVersion = -1 + spark.conf.unset(DeltaSQLConf.AUTO_COMPACT_ENABLED.key) + writeDataToCheckAutoCompact(100, path) + // No autoCompact triggered - version should be 0. + expectedTableVersion += 1 + checkTableVersionAndNumFiles(path, expectedTableVersion, 100) + + // Create table + spark.sql(s"CREATE TABLE $tableName USING DELTA LOCATION '$path'") + spark.sql( + s"ALTER TABLE $tableName SET TBLPROPERTIES (delta.autoOptimize.autoCompact = true)") + expectedTableVersion += 1 // version increased due to ALTER TABLE + + writeDataToCheckAutoCompact(100, path) + expectedTableVersion += 2 // autoCompact should be triggered + checkTableVersionAndNumFiles(path, expectedTableVersion, 1) + + withSQLConf(DeltaSQLConf.AUTO_COMPACT_ENABLED.key -> "false") { + // Session config should be prior to table properties + writeDataToCheckAutoCompact(100, path) + expectedTableVersion += 1 // autoCompact should not be triggered + checkTableVersionAndNumFiles(path, expectedTableVersion, 100) + } + + spark.sql( + s"ALTER TABLE $tableName SET TBLPROPERTIES (delta.autoOptimize.autoCompact = false)") + expectedTableVersion += 1 // version increased due to SET TBLPROPERTIES + + withSQLConf(DeltaSQLConf.AUTO_COMPACT_ENABLED.key -> "true") { + // Session config should be prior to table properties + writeDataToCheckAutoCompact(100, path) + expectedTableVersion += 2 // autoCompact should be triggered + checkTableVersionAndNumFiles(path, expectedTableVersion, 1) + } + + spark.conf.unset(DeltaSQLConf.AUTO_COMPACT_ENABLED.key) + + // Test default delta table config + withSQLConf( + "spark.databricks.delta.properties.defaults.autoOptimize.autoCompact" -> "true") { + val path2 = new Path(rootPath, "table2").toString + writeDataToCheckAutoCompact(100, path2) + // autoCompact should be triggered for path2. + checkTableVersionAndNumFiles(path2, 1, 1) + } + } + } + } + + test("test autoCompact configs") { + val tableName = "autoCompactTestTable" + withTable(tableName) { + withTempDir { dir => + val rootPath = dir.getCanonicalPath + val path = new Path(rootPath, "table1").toString + var expectedTableVersion = -1 + withSQLConf(DeltaSQLConf.AUTO_COMPACT_ENABLED.key -> "true") { + writeDataToCheckAutoCompact(100, path, partitioned = true) + expectedTableVersion += 2 // autoCompact should be triggered + checkTableVersionAndNumFiles(path, expectedTableVersion, 2) + + withSQLConf(DeltaSQLConf.AUTO_COMPACT_MIN_NUM_FILES.key -> "200") { + writeDataToCheckAutoCompact(100, path, partitioned = true) + expectedTableVersion += 1 // autoCompact should not be triggered + checkTableVersionAndNumFiles(path, expectedTableVersion, 200) + } + + withSQLConf(DeltaSQLConf.AUTO_COMPACT_MAX_FILE_SIZE.key -> "1") { + writeDataToCheckAutoCompact(100, path, partitioned = true) + expectedTableVersion += 1 // autoCompact should not be triggered + checkTableVersionAndNumFiles(path, expectedTableVersion, 200) + } + + withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE.key -> "101024", + DeltaSQLConf.AUTO_COMPACT_MIN_NUM_FILES.key -> "2") { + val dt = io.delta.tables.DeltaTable.forPath(path) + dt.optimize().executeCompaction() + expectedTableVersion += 1 // autoCompact should not be triggered + checkTableVersionAndNumFiles(path, expectedTableVersion, 8) + } + + withSQLConf(DeltaSQLConf.AUTO_COMPACT_MIN_NUM_FILES.key -> "100") { + writeDataToCheckAutoCompact(100, path, partitioned = true) + expectedTableVersion += 2 // autoCompact should be triggered + checkTableVersionAndNumFiles(path, expectedTableVersion, 2) + } + } + } + } + } + + test("test max compact data size config") { + withTempDir { dir => + val rootPath = dir.getCanonicalPath + val path = new Path(rootPath, "table1").toString + var expectedTableVersion = -1 + writeDataToCheckAutoCompact(100, path, partitioned = true) + expectedTableVersion += 1 + val dt = io.delta.tables.DeltaTable.forPath(path) + val dl = DeltaLog.forTable(spark, path) + val sizeLimit = + dl.snapshot.allFiles + .filter(col("path").contains("colC=1")) + .agg(sum(col("size"))) + .head + .getLong(0) * 2 + + withSQLConf(DeltaSQLConf.AUTO_COMPACT_ENABLED.key -> "true", + DeltaSQLConf.AUTO_COMPACT_MAX_COMPACT_BYTES.key -> sizeLimit.toString) { + dt.toDF + .filter("colC == 1") + .repartition(50) + .write + .format("delta") + .mode("append") + .save(path) + val dl = DeltaLog.forTable(spark, path) + // version 0: write, 1: append, 2: autoCompact + assert(dl.snapshot.version == 2) + + { + val afterAutoCompact = dl.snapshot.allFiles.filter(col("path").contains("colC=1")).count + val beforeAutoCompact = dl + .getSnapshotAt(dl.snapshot.version - 1) + .allFiles + .filter(col("path").contains("colC=1")) + .count + assert(beforeAutoCompact == 150) + assert(afterAutoCompact == 1) + } + + { + val afterAutoCompact = dl.snapshot.allFiles.filter(col("path").contains("colC=0")).count + val beforeAutoCompact = dl + .getSnapshotAt(dl.snapshot.version - 1) + .allFiles + .filter(col("path").contains("colC=0")) + .count + assert(beforeAutoCompact == 100) + assert(afterAutoCompact == 100) + } + } + } + } + + test("test autoCompact.target config") { + withTempDir { dir => + val rootPath = dir.getCanonicalPath + val path1 = new Path(rootPath, "table1").toString + val path2 = new Path(rootPath, "table2").toString + val path3 = new Path(rootPath, "table3").toString + + def testAutoCompactTarget(path: String, target: String, expectedColC1Cnt: Long): Unit = { + writeDataToCheckAutoCompact(100, path, partitioned = true) + val dt = io.delta.tables.DeltaTable.forPath(path) + + withSQLConf( + "spark.databricks.delta.autoCompact.enabled" -> "true", + "spark.databricks.delta.autoCompact.target" -> target) { + dt.toDF + .filter("colC == 1") + .repartition(50) + .write + .format("delta") + .mode("append") + .save(path) + + val dl = DeltaLog.forTable(spark, path) + // version 0: write, 1: append, 2: autoCompact + assert(dl.snapshot.version == 2, target) + + { + val afterAutoCompact = dl.snapshot.allFiles.filter(col("path").contains("colC=1")).count + val beforeAutoCompact = dl + .getSnapshotAt(dl.snapshot.version - 1) + .allFiles + .filter(col("path").contains("colC=1")) + .count + + assert(beforeAutoCompact == 150) + assert(afterAutoCompact == expectedColC1Cnt) + } + + { + val afterAutoCompact = dl.snapshot.allFiles.filter(col("path").contains("colC=0")).count + val beforeAutoCompact = dl + .getSnapshotAt(dl.snapshot.version - 1) + .allFiles + .filter(col("path").contains("colC=0")) + .count + + assert(beforeAutoCompact == 100) + assert(afterAutoCompact == 100) + } + } + } + // Existing files are not optimized; newly added 50 files should be optimized. + // 100 of colC=0, 101 of colC=1 + testAutoCompactTarget(path1, "commit", 101) + // Modified partition should be optimized. + // 100 of colC=0, 1 of colC=1 + testAutoCompactTarget(path2, "partition", 1) + + withSQLConf( + "spark.databricks.delta.autoCompact.enabled" -> "true", + "spark.databricks.delta.autoCompact.target" -> "partition") { + writeDataToCheckAutoCompact(100, path3) + // non-partitioned data should work with "partition" option. + checkTableVersionAndNumFiles(path3, 1, 1) + } + } + } +}