Skip to content

Commit

Permalink
Review commit2
Browse files Browse the repository at this point in the history
Signed-off-by: Eunjin Song <sezruby@gmail.com>
  • Loading branch information
sezruby committed Aug 3, 2022
1 parent ab18631 commit b4895e8
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ case class OptimizeTableCommand(
validateZorderByColumns(sparkSession, txn, zOrderBy)
val zOrderByColumns = zOrderBy.map(_.name).toSeq

new OptimizeExecutor(sparkSession, txn, partitionPredicates, zOrderByColumns).optimize()
new OptimizeExecutor(sparkSession, txn, partitionPredicates, zOrderByColumns, Nil).optimize()
}
}

Expand All @@ -152,85 +152,34 @@ class OptimizeExecutor(
sparkSession: SparkSession,
txn: OptimisticTransaction,
partitionPredicate: Seq[Expression],
zOrderByColumns: Seq[String])
zOrderByColumns: Seq[String],
prevCommitActions: Seq[Action])
extends DeltaCommand with SQLMetricsReporting with Serializable {

/** Timestamp to use in [[FileAction]] */
private val operationTimestamp = new SystemClock().getTimeMillis()

private val isMultiDimClustering = zOrderByColumns.nonEmpty
private val isAutoCompact = prevCommitActions.nonEmpty
private val optimizeType = OptimizeType(isMultiDimClustering, isAutoCompact)

def optimize(isAutoCompact: Boolean = false, targetFiles: Seq[AddFile] = Nil): Seq[Row] = {
def optimize(): Seq[Row] = {
recordDeltaOperation(txn.deltaLog, "delta.optimize") {
val minFileSize = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE)
val maxFileSize = if (isAutoCompact) {
sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_MAX_FILE_SIZE)
} else {
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE)
}
require(minFileSize > 0, "minFileSize must be > 0")
val maxFileSize = optimizeType.maxFileSize
require(maxFileSize > 0, "maxFileSize must be > 0")

val minNumFilesInDir = if (isAutoCompact) {
val minNumFiles =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_MIN_NUM_FILES)
require(minNumFiles > 0, "minNumFiles must be > 0")
minNumFiles
} else if (isMultiDimClustering) {
// Z-order optimize should reorder rows of all target files.
1
} else {
// Compaction should be triggered if more than 1 file.
2
}

val candidateFiles = if (!isAutoCompact) {
txn.filterFiles(partitionPredicate)
} else {
val autoCompactTarget =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_TARGET)
// Filter the candidate files according to autoCompact.target config.
autoCompactTarget match {
case "table" =>
txn.filterFiles()
case "commit" =>
targetFiles
case "partition" =>
val eligiblePartitions = targetFiles.map(_.partitionValues).toSet
txn.filterFiles().filter(f => eligiblePartitions.contains(f.partitionValues))
case _ =>
logError(s"Invalid config for autoCompact.target: $autoCompactTarget. " +
s"Use the default value 'table'.")
txn.filterFiles()
}
}
val minNumFilesInDir = optimizeType.minNumFiles
val (candidateFiles, filesToProcess) = optimizeType.targetFiles

val partitionSchema = txn.metadata.partitionSchema

// select all files in case of multi-dimensional clustering
val filesToProcess = candidateFiles.filter(_.size < minFileSize || isMultiDimClustering)
val partitionsToCompact = filesToProcess
.groupBy(_.partitionValues)
.filter { case (_, filesInPartition) => filesInPartition.size >= minNumFilesInDir }
.toSeq

val groupedJobs = groupFilesIntoBins(partitionsToCompact, maxFileSize)

val jobs = if (isAutoCompact) {
var acc = 0L
val maxCompactBytes =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_MAX_COMPACT_BYTES)
// bins with more files are prior to less files.
groupedJobs
.sortBy { case (_, filesInBin) => -filesInBin.length }
.takeWhile { case (_, filesInBin) =>
acc += filesInBin.map(_.size).sum
acc <= maxCompactBytes
}
} else {
groupedJobs
}
val jobs = optimizeType.targetBins(groupedJobs)

val parallelJobCollection = new ParVector(jobs.toVector)

Expand All @@ -251,7 +200,7 @@ class OptimizeExecutor(

val addedFiles = updates.collect { case a: AddFile => a }
val removedFiles = updates.collect { case r: RemoveFile => r }
if (addedFiles.size > 0) {
if (addedFiles.nonEmpty) {
val operation =
DeltaOperations.Optimize(partitionPredicate.map(_.sql), zOrderByColumns, isAutoCompact)
val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles)
Expand Down Expand Up @@ -401,6 +350,96 @@ class OptimizeExecutor(
updates
}

type PartitionedBin = (Map[String, String], Seq[AddFile])
trait OptimizeType {
def minNumFiles: Long
def maxFileSize: Long =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_MAX_FILE_SIZE)
def targetFiles: (Seq[AddFile], Seq[AddFile])
def targetBins(jobs: Seq[PartitionedBin]): Seq[PartitionedBin] = jobs
}

case class Compaction() extends OptimizeType {
def minNumFiles: Long = 2
def targetFiles: (Seq[AddFile], Seq[AddFile]) = {
val minFileSize = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE)
require(minFileSize > 0, "minFileSize must be > 0")
val candidateFiles = txn.filterFiles(partitionPredicate)
val filesToProcess = candidateFiles.filter(_.size < minFileSize)
(candidateFiles, filesToProcess)
}
}

case class MultiDimOrdering() extends OptimizeType {
def minNumFiles: Long = 1
def targetFiles: (Seq[AddFile], Seq[AddFile]) = {
// select all files in case of multi-dimensional clustering
val candidateFiles = txn.filterFiles(partitionPredicate)
(candidateFiles, candidateFiles)
}
}

case class AutoCompaction() extends OptimizeType {
def minNumFiles: Long = {
val minNumFiles =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_MIN_NUM_FILES)
require(minNumFiles > 0, "minNumFiles must be > 0")
minNumFiles
}

override def maxFileSize: Long =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_MAX_FILE_SIZE)

override def targetFiles: (Seq[AddFile], Seq[AddFile]) = {
val autoCompactTarget =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_TARGET)
// Filter the candidate files according to autoCompact.target config.
lazy val addedFiles = prevCommitActions.collect { case a: AddFile => a }
val candidateFiles = autoCompactTarget match {
case "table" =>
txn.filterFiles()
case "commit" =>
addedFiles
case "partition" =>
val eligiblePartitions = addedFiles.map(_.partitionValues).toSet
txn.filterFiles().filter(f => eligiblePartitions.contains(f.partitionValues))
case _ =>
logError(s"Invalid config for autoCompact.target: $autoCompactTarget. " +
s"Falling back to the default value 'table'.")
txn.filterFiles()
}
val filesToProcess = candidateFiles.filter(_.size < maxFileSize)
(candidateFiles, filesToProcess)
}

override def targetBins(jobs: Seq[PartitionedBin]): Seq[PartitionedBin] = {
var acc = 0L
val maxCompactBytes =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_MAX_COMPACT_BYTES)
// bins with more files are prior to less files.
jobs
.sortBy { case (_, filesInBin) => -filesInBin.length }
.takeWhile { case (_, filesInBin) =>
acc += filesInBin.map(_.size).sum
acc <= maxCompactBytes
}
}
}

object OptimizeType {

def apply(isMultiDimClustering: Boolean, isAutoCompact: Boolean): OptimizeType = {
if (isMultiDimClustering) {
MultiDimOrdering()
} else if (isAutoCompact) {
AutoCompaction()
} else {
Compaction()
}
}
}

/**
* Attempts to commit the given actions to the log. In the case of a concurrent update,
* the given function will be invoked with a new transaction to allow custom conflict
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,7 @@ object DoAutoCompaction extends PostCommitHook
committedActions: Seq[Action]): Unit = {

val newTxn = txn.deltaLog.startTransaction()
if (spark.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_TARGET) == "table") {
new OptimizeExecutor(spark, newTxn, Seq.empty, Seq.empty).optimize(isAutoCompact = true)
} else {
// commit or partition option.
val targetFiles = committedActions.collect {
case a: AddFile => a
}
new OptimizeExecutor(spark, newTxn, Seq.empty, Seq.empty)
.optimize(isAutoCompact = true, targetFiles)
}
new OptimizeExecutor(spark, newTxn, Seq.empty, Seq.empty, committedActions).optimize()
}

override def handleError(error: Throwable, version: Long): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,10 @@ class DeltaAutoOptimizeSuite extends QueryTest with SharedSparkSession with Delt
mode: String = "overwrite"): Unit = {
val df = spark
.range(50000)
.map { _ =>
(
scala.util.Random.nextInt(10000000).toLong,
scala.util.Random.nextInt(1000000000),
scala.util.Random.nextInt(2))
}
.toDF("colA", "colB", "colC")
.withColumn("colA", rand() * 10000000 cast "long")
.withColumn("colB", rand() * 1000000000 cast "int")
.withColumn("colC", rand() * 2 cast "int")
.drop("id")
.repartition(numFiles)
if (partitioned) {
df.write
Expand Down Expand Up @@ -82,7 +79,7 @@ class DeltaAutoOptimizeSuite extends QueryTest with SharedSparkSession with Delt
spark.sql(s"CREATE TABLE $tableName USING DELTA LOCATION '$path'")
spark.sql(
s"ALTER TABLE $tableName SET TBLPROPERTIES (delta.autoOptimize.autoCompact = true)")
expectedTableVersion += 1 // version increased due to SET TBLPROPERTIES
expectedTableVersion += 1 // version increased due to ALTER TABLE

writeDataToCheckAutoCompact(100, path)
expectedTableVersion += 2 // autoCompact should be triggered
Expand Down

0 comments on commit b4895e8

Please sign in to comment.