Skip to content

Commit

Permalink
Review commit
Browse files Browse the repository at this point in the history
Signed-off-by: Eunjin Song <sezruby@gmail.com>
  • Loading branch information
sezruby committed Jul 28, 2022
1 parent 06c2384 commit 7479f46
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ trait DeltaConfigsBase extends DeltaLogging {
"needs to be a boolean.")

/**
* Whether this table will automagically optimize the layout of files after update.
* Enable auto compaction.
*/
val AUTO_COMPACT = buildConfig[Boolean](
"autoOptimize.autoCompact",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ object DeltaOperations {
case class Optimize(
predicate: Seq[String],
zOrderBy: Seq[String] = Seq.empty,
auto: Boolean) extends OptimizeOrReorg(OPTIMIZE_OPERATION_NAME) {
auto: Boolean)
extends OptimizeOrReorg(OPTIMIZE_OPERATION_NAME) {
override val parameters: Map[String, Any] = Map(
"predicate" -> JsonUtils.toJson(predicate),
"zOrderBy" -> JsonUtils.toJson(zOrderBy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
lazy val autoCompactEnabled =
spark.sessionState.conf
.getConf(DeltaSQLConf.AUTO_COMPACT_ENABLED)
.getOrElse{
.getOrElse {
DeltaConfigs.AUTO_COMPACT.fromMetaData(metadata)}
if (!op.isInstanceOf[DeltaOperations.Optimize] && autoCompactEnabled && hasFileActions) {
registerPostCommitHook(DoAutoCompaction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ case class OptimizeTableCommand(
val deltaLog = getDeltaLog(sparkSession, path, tableId, "OPTIMIZE")

val txn = deltaLog.startTransaction()
if (txn.readVersion == -1) {
if (!txn.deltaLog.tableExists) {
throw DeltaErrors.notADeltaTableException(deltaLog.dataPath.toString)
}

Expand Down Expand Up @@ -178,16 +178,13 @@ class OptimizeExecutor(
require(minNumFiles > 0, "minNumFiles must be > 0")
minNumFiles
} else if (isMultiDimClustering) {
// Z-order optimize should reorder rows of all target files.
1
} else {
// compaction
// Compaction should be triggered if more than 1 file.
2
}

if (txn.readVersion == -1) {
throw DeltaErrors.notADeltaTableException(txn.deltaLog.dataPath.toString)
}

val candidateFiles = if (!isAutoCompact) {
txn.filterFiles(partitionPredicate)
} else {
Expand Down Expand Up @@ -215,7 +212,7 @@ class OptimizeExecutor(
val filesToProcess = candidateFiles.filter(_.size < minFileSize || isMultiDimClustering)
val partitionsToCompact = filesToProcess
.groupBy(_.partitionValues)
.filter(_._2.size >= minNumFilesInDir)
.filter { case (_, filesInPartition) => filesInPartition.size >= minNumFilesInDir }
.toSeq

val groupedJobs = groupFilesIntoBins(partitionsToCompact, maxFileSize)
Expand All @@ -226,9 +223,9 @@ class OptimizeExecutor(
sparkSession.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_MAX_COMPACT_BYTES)
// bins with more files are prior to less files.
groupedJobs
.sortBy(-_._2.length)
.takeWhile { job =>
acc += job._2.map(_.size).sum
.sortBy { case (_, filesInBin) => -filesInBin.length }
.takeWhile { case (_, filesInBin) =>
acc += filesInBin.map(_.size).sum
acc <= maxCompactBytes
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,19 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
/**
* Post commit hook to trigger compaction.
*/
object DoAutoCompaction extends PostCommitHook with DeltaLogging with Serializable {
object DoAutoCompaction extends PostCommitHook
with DeltaLogging
with Serializable {

override val name: String = "Trigger compaction if necessary"
override val name: String = "Triggers compaction if necessary"

override def run(
spark: SparkSession,
txn: OptimisticTransactionImpl,
committedActions: Seq[Action]): Unit = {

val newTxn = txn.deltaLog.startTransaction()
if (spark.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_TARGET).equals("table")) {
if (spark.sessionState.conf.getConf(DeltaSQLConf.AUTO_COMPACT_TARGET) == "table") {
new OptimizeExecutor(spark, newTxn, Seq.empty, Seq.empty).optimize(isAutoCompact = true)
} else {
// commit or partition option.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,14 +733,14 @@ trait DeltaSQLConfBase {
val AUTO_COMPACT_ENABLED =
buildConf("autoCompact.enabled")
.internal()
.doc("Enable auto compaction to trigger a compaction job for small files after table update.")
.doc("Enables auto compaction to trigger a compaction job for small files after table update.")
.booleanConf
.createOptional

val AUTO_COMPACT_MAX_FILE_SIZE =
buildConf("autoCompact.maxFileSize")
.internal()
.doc("Maximum file size when for auto compaction.")
.doc("Maximum file size for auto compaction.")
.longConf
.createWithDefault(128 * 1024 * 1024)

Expand Down

0 comments on commit 7479f46

Please sign in to comment.