Skip to content

Commit

Permalink
Auto compaction update
Browse files Browse the repository at this point in the history
Signed-off-by: Eunjin Song <sezruby@gmail.com>

Co-authored-by: Sandip Raiyani <sandipraiyani1993@gmail.com>
  • Loading branch information
sezruby and Sandip-Raiyani committed Jun 23, 2023
1 parent f052bbb commit cb51e65
Show file tree
Hide file tree
Showing 8 changed files with 576 additions and 22 deletions.
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,16 @@ trait DeltaConfigsBase extends DeltaLogging {
_ => true,
"needs to be a boolean.")

/**
* Enable auto compaction.
*/
val AUTO_COMPACT = buildConfig[Boolean](
"autoOptimize.autoCompact",
"false",
_.toBoolean,
_ => true,
"needs to be a boolean.")

/**
* The number of columns to collect stats on for data skipping. A value of -1 means collecting
* stats for all columns. Updating this conf does not trigger stats re-collection, but redefines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,14 +431,18 @@ object DeltaOperations {
val OPTIMIZE_OPERATION_NAME = "OPTIMIZE"
/** parameter key to indicate which columns to z-order by */
val ZORDER_PARAMETER_KEY = "zOrderBy"
/** operation name for Auto Compaction */
val AUTOCOMPACTION_OPERATION_NAME = "auto"

/** Recorded when optimizing the table. */
case class Optimize(
predicate: Seq[Expression],
zOrderBy: Seq[String] = Seq.empty
zOrderBy: Seq[String] = Seq.empty,
auto: Boolean
) extends OptimizeOrReorg(OPTIMIZE_OPERATION_NAME, predicate) {
override val parameters: Map[String, Any] = super.parameters ++ Map(
ZORDER_PARAMETER_KEY -> JsonUtils.toJson(zOrderBy)
ZORDER_PARAMETER_KEY -> JsonUtils.toJson(zOrderBy),
AUTOCOMPACTION_OPERATION_NAME -> auto
)

override val operationMetrics: Set[String] = DeltaOperationMetrics.OPTIMIZE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.DeletionVectorUtils
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.files._
import org.apache.spark.sql.delta.hooks.{CheckpointHook, GenerateSymlinkManifest, PostCommitHook}
import org.apache.spark.sql.delta.hooks.{CheckpointHook, DoAutoCompaction, GenerateSymlinkManifest, PostCommitHook}
import org.apache.spark.sql.delta.implicits.addFileEncoder
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
Expand Down Expand Up @@ -1030,6 +1030,16 @@ trait OptimisticTransactionImpl extends TransactionalWrite
registerPostCommitHook(GenerateSymlinkManifest)
}

// For autoCompact, session config is prior to table property.
lazy val autoCompactEnabled =
spark.sessionState.conf
.getConf(DeltaSQLConf.AUTO_COMPACT_ENABLED)
.getOrElse {
DeltaConfigs.AUTO_COMPACT.fromMetaData(metadata)}
if (!op.isInstanceOf[DeltaOperations.Optimize] && autoCompactEnabled && hasFileActions) {
registerPostCommitHook(DoAutoCompaction)
}

commitAttemptStartTime = clock.getTimeMillis()
if (preparedActions.isEmpty && canSkipEmptyCommits &&
skipRecordingEmptyCommitAllowed(isolationLevelToUse)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,11 @@ case class OptimizeTableCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val deltaLog = getDeltaLog(sparkSession, path, tableId, "OPTIMIZE", options)

val txn = deltaLog.startTransaction()
if (txn.readVersion == -1) {
if (!deltaLog.tableExists) {
throw DeltaErrors.notADeltaTableException(deltaLog.dataPath.toString)
}

val txn = deltaLog.startTransaction()
val partitionColumns = txn.snapshot.metadata.partitionColumns
// Parse the predicate expression into Catalyst expression and verify only simple filters
// on partition columns are present
Expand Down Expand Up @@ -163,7 +162,8 @@ case class OptimizeTableCommand(
case class DeltaOptimizeContext(
isPurge: Boolean = false,
minFileSize: Option[Long] = None,
maxDeletedRowsRatio: Option[Double] = None) {
maxDeletedRowsRatio: Option[Double] = None,
isAuto: Boolean = false) {
if (isPurge) {
require(
minFileSize.contains(0L) && maxDeletedRowsRatio.contains(0d),
Expand Down Expand Up @@ -192,18 +192,56 @@ class OptimizeExecutor(

private val isMultiDimClustering = zOrderByColumns.nonEmpty

def autoCompact(prevCommitAddFiles: Seq[AddFile]): Unit = {
val minFileSize = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.AUTO_COMPACT_MAX_FILE_SIZE)
val maxFileSize = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.AUTO_COMPACT_MAX_FILE_SIZE)
require(minFileSize > 0, "minFileSize must be > 0")
require(maxFileSize > 0, "maxFileSize must be > 0")

val autoCompactTarget =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_TARGET)

// Filter the candidate files according to autoCompact.target config.
lazy val addedFiles = prevCommitAddFiles.collect { case a: AddFile => a }
val candidateFiles = autoCompactTarget match {
case "table" =>
txn.filterFiles()
case "commit" =>
addedFiles
case "partition" =>
val eligiblePartitions = addedFiles.map(_.partitionValues).toSet
txn.filterFiles().filter(f => eligiblePartitions.contains(f.partitionValues))
case _ =>
logError(s"Invalid config for autoCompact.target: $autoCompactTarget. " +
s"Falling back to the default value 'table'.")
txn.filterFiles()
}

optimizeImpl(minFileSize, maxFileSize, candidateFiles)
}

def optimize(): Seq[Row] = {
val minFileSize = optimizeContext.minFileSize.getOrElse(
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE))
val maxFileSize =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE)

val candidateFiles = txn.filterFiles(partitionPredicate, keepNumRecords = true)
optimizeImpl(minFileSize, maxFileSize, candidateFiles)
}

private def optimizeImpl(
minFileSize: Long,
maxFileSize: Long,
candidateFiles: Seq[AddFile]): Seq[Row] = {
recordDeltaOperation(txn.deltaLog, "delta.optimize") {
val minFileSize = optimizeContext.minFileSize.getOrElse(
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE))
val maxFileSize =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE)
val partitionSchema = txn.metadata.partitionSchema

val maxDeletedRowsRatio = optimizeContext.maxDeletedRowsRatio.getOrElse(
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO))

val candidateFiles = txn.filterFiles(partitionPredicate, keepNumRecords = true)
val partitionSchema = txn.metadata.partitionSchema

val filesToProcess = pruneCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles)
val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq

Expand Down Expand Up @@ -338,11 +376,35 @@ class OptimizeExecutor(
bins += currentBin.toVector
}

bins.filter { bin =>
bin.size > 1 || // bin has more than one file or
(bin.size == 1 && bin(0).deletionVector != null) || // single file in the bin has a DV or
isMultiDimClustering // multi-clustering
}.map(b => (partition, b))
if (!optimizeContext.isAuto) {
bins.filter { bin =>
bin.size > 1 || // bin has more than one file or
// single file in the bin has a DV or
(bin.size == 1 && bin(0).deletionVector != null) ||
isMultiDimClustering // multi-clustering
}.map(b => (partition, b))
} else {
// for AutoCompaction
val autoCompactMinNumFiles = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.AUTO_COMPACT_MIN_NUM_FILES)
val filteredByBinSize = bins.filter { bin =>
// bin size is equal to or greater than autoCompactMinNumFiles files
bin.size >= autoCompactMinNumFiles ||
// or bin size + number of deletion vectors >= autoCompactMinNumFiles files
bin.count(_.deletionVector != null) + bin.size >= autoCompactMinNumFiles
}.map(b => (partition, b))

var acc = 0L
val maxCompactBytes =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_MAX_COMPACT_BYTES)
// bins with more files are prior to less files.
filteredByBinSize
.sortBy { case (_, filesInBin) => -filesInBin.length }
.takeWhile { case (_, filesInBin) =>
acc += filesInBin.map(_.size).sum
acc <= maxCompactBytes
}
}
}
}

Expand Down Expand Up @@ -434,7 +496,7 @@ class OptimizeExecutor(
if (optimizeContext.isPurge) {
DeltaOperations.Reorg(partitionPredicate)
} else {
DeltaOperations.Optimize(partitionPredicate, zOrderByColumns)
DeltaOperations.Optimize(partitionPredicate, zOrderByColumns, optimizeContext.isAuto)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.hooks

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.{DeltaOptimizeContext, OptimizeExecutor}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf

/**
* Post commit hook to trigger compaction.
*/
object DoAutoCompaction extends PostCommitHook
with DeltaLogging
with Serializable {

override val name: String = "Triggers compaction if necessary"

override def run(
spark: SparkSession,
txn: OptimisticTransactionImpl,
committedVersion: Long,
postCommitSnapshot: Snapshot,
committedActions: Seq[Action]): Unit = {

val newTxn = txn.deltaLog.startTransaction()
val addedFiles = committedActions.collect { case a: AddFile => a }
if (addedFiles.nonEmpty) {
new OptimizeExecutor(spark, newTxn, Seq.empty, Seq.empty, DeltaOptimizeContext(isAuto = true))
.autoCompact(addedFiles)
}
}

override def handleError(error: Throwable, version: Long): Unit = {
throw DeltaErrors.postCommitHookFailedException(this, version, name, error)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,53 @@ trait DeltaSQLConfBase {
"repartition(1) incurs a shuffle stage, but the job can be distributed."
)
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val AUTO_COMPACT_ENABLED =
buildConf("autoCompact.enabled")
.internal()
.doc("Enables auto compaction after table update.")
.booleanConf
.createOptional

val AUTO_COMPACT_MAX_FILE_SIZE =
buildConf("autoCompact.maxFileSize")
.internal()
.doc("Maximum file size for auto compaction.")
.longConf
.createWithDefault(128 * 1024 * 1024)

val AUTO_COMPACT_MIN_NUM_FILES =
buildConf("autoCompact.minNumFiles")
.internal()
.doc("Minimum number of files in a directory to trigger auto compaction.")
.longConf
.createWithDefault(50)

val AUTO_COMPACT_MAX_COMPACT_BYTES =
buildConf("autoCompact.maxCompactBytes")
.internal()
.doc("Maximum amount of data for auto compaction.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("20GB")

val AUTO_COMPACT_TARGET =
buildConf("autoCompact.target")
.internal()
.doc(
"""
|Target files for auto compaction.
| "table", "commit", "partition" options are available. (default: partition)
| If "table", all files in table are eligible for auto compaction.
| If "commit", added/updated files by the commit are eligible.
| If "partition", all files in partitions containing any added/updated files
| by the commit are eligible.
|""".stripMargin
)
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("table", "commit", "partition"))
.createWithDefault("partition")

val DELTA_ALTER_TABLE_CHANGE_COLUMN_CHECK_EXPRESSIONS =
buildConf("alterTable.changeColumn.checkExpressions")
Expand Down
Loading

0 comments on commit cb51e65

Please sign in to comment.