Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support auto-compaction for Delta tables on [databricks] #7889

Merged
merged 18 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ import org.apache.spark.sql.SparkSession
object GpuDoAutoCompaction extends PostCommitHook
with DeltaLogging
with Serializable {
override val name: String = "GpuDoAutoCompaction"
override val name: String = "Triggers compaction if necessary"

override def run(spark: SparkSession,
txn: OptimisticTransactionImpl,
committedActions: Seq[Action]): Unit = {
val gpuTxn = txn.asInstanceOf[GpuOptimisticTransaction]
val newTxn = new GpuDeltaLog(gpuTxn.deltaLog, gpuTxn.rapidsConf).startTransaction()
// Note: The Databricks AutoCompact PostCommitHook cannot be used here
// (with a GpuOptimisticTransaction). It does not appear to use OptimisticTransaction.writeFiles
// to write the compacted file.
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
new GpuOptimizeExecutor(spark, newTxn, Seq.empty, Seq.empty, committedActions).optimize()
jlowe marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ class GpuOptimisticTransaction(
.getConf[String](DeltaSQLConf.DELTA_AUTO_COMPACT_ENABLED)
.getOrElse {
DeltaConfigs.AUTO_COMPACT.fromMetaData(metadata)
"false" // TODO: Fix getting this from DeltaConfigs.AUTO_COMPACT.
.getOrElse("false")
}.toBoolean

if (!isOptimize && autoCompactEnabled && fileActions.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class GpuOptimizeExecutor(

private val isMultiDimClustering = zOrderByColumns.nonEmpty
private val isAutoCompact = prevCommitActions.nonEmpty
private val optimizeType = OptimizeType(isMultiDimClustering, isAutoCompact)
private val optimizeType = GpuOptimizeType(isMultiDimClustering, isAutoCompact)

def optimize(): Seq[Row] = {
recordDeltaOperation(txn.deltaLog, "delta.optimize") {
Expand Down Expand Up @@ -206,7 +206,7 @@ class GpuOptimizeExecutor(
// zOrderByColumns)
throw new UnsupportedOperationException("MultiDimClustering not supported on compaction")
} else {
// Re-partition is not available in Databricks 11.3 (spark321db)
// Re-partition is not available in Databricks 10.4 (spark321db)
input.coalesce(numPartitions = 1)
}

Expand All @@ -231,9 +231,9 @@ class GpuOptimizeExecutor(
updates
}

type PartitionedBin = (Map[String, String], Seq[AddFile])
private type PartitionedBin = (Map[String, String], Seq[AddFile])

trait OptimizeType {
private trait GpuOptimizeType {
def minNumFiles: Long

def maxFileSize: Long =
Expand All @@ -244,7 +244,7 @@ class GpuOptimizeExecutor(
def targetBins(jobs: Seq[PartitionedBin]): Seq[PartitionedBin] = jobs
}

case class Compaction() extends OptimizeType {
private case class GpuCompaction() extends GpuOptimizeType {
def minNumFiles: Long = 2

def targetFiles: (Seq[AddFile], Seq[AddFile]) = {
Expand All @@ -257,7 +257,7 @@ class GpuOptimizeExecutor(
}
}

case class MultiDimOrdering() extends OptimizeType {
private case class GpuMultiDimOrdering() extends GpuOptimizeType {
def minNumFiles: Long = 1

def targetFiles: (Seq[AddFile], Seq[AddFile]) = {
Expand All @@ -267,7 +267,7 @@ class GpuOptimizeExecutor(
}
}

case class AutoCompaction() extends OptimizeType {
private case class GpuAutoCompaction() extends GpuOptimizeType {
def minNumFiles: Long = {
val minNumFiles =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES)
Expand Down Expand Up @@ -315,15 +315,15 @@ class GpuOptimizeExecutor(
}
}

object OptimizeType {
private object GpuOptimizeType {

def apply(isMultiDimClustering: Boolean, isAutoCompact: Boolean): OptimizeType = {
def apply(isMultiDimClustering: Boolean, isAutoCompact: Boolean): GpuOptimizeType = {
if (isMultiDimClustering) {
MultiDimOrdering()
GpuMultiDimOrdering()
} else if (isAutoCompact) {
AutoCompaction()
GpuAutoCompaction()
} else {
Compaction()
GpuCompaction()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.SparkSession
object GpuDoAutoCompaction extends PostCommitHook
with DeltaLogging
with Serializable {
override val name: String = "GpuDoAutoCompaction"
override val name: String = "Triggers compaction if necessary"

override def run(spark: SparkSession,
txn: OptimisticTransactionImpl,
Expand All @@ -41,6 +41,9 @@ object GpuDoAutoCompaction extends PostCommitHook
committedActions: Seq[Action]): Unit = {
val gpuTxn = txn.asInstanceOf[GpuOptimisticTransaction]
val newTxn = new GpuDeltaLog(gpuTxn.deltaLog, gpuTxn.rapidsConf).startTransaction()
// Note: The Databricks AutoCompact PostCommitHook cannot be used here
// (with a GpuOptimisticTransaction). It does not appear to use OptimisticTransaction.writeFiles
// to write the compacted file.
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
new GpuOptimizeExecutor(spark, newTxn, Seq.empty, Seq.empty, committedActions).optimize()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ class GpuOptimisticTransaction(
spark.sessionState.conf
.getConf[String](DeltaSQLConf.DELTA_AUTO_COMPACT_ENABLED)
.getOrElse {
// DeltaConfigs.AUTO_COMPACT.fromMetaData(metadata)
"false" // TODO: Fix getting this from DeltaConfigs.AUTO_COMPACT.
DeltaConfigs.AUTO_COMPACT.fromMetaData(metadata)
.getOrElse("false")
}.toBoolean

if (!isOptimize && autoCompactEnabled && fileActions.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class GpuOptimizeExecutor(

private val isMultiDimClustering = zOrderByColumns.nonEmpty
private val isAutoCompact = prevCommitActions.nonEmpty
private val optimizeType = OptimizeType(isMultiDimClustering, isAutoCompact)
private val optimizeType = GpuOptimizeType(isMultiDimClustering, isAutoCompact)

def optimize(): Seq[Row] = {
recordDeltaOperation(txn.deltaLog, "delta.optimize") {
Expand Down Expand Up @@ -235,9 +235,9 @@ class GpuOptimizeExecutor(
updates
}

type PartitionedBin = (Map[String, String], Seq[AddFile])
private type PartitionedBin = (Map[String, String], Seq[AddFile])

trait OptimizeType {
private trait GpuOptimizeType {
def minNumFiles: Long

def maxFileSize: Long =
Expand All @@ -248,7 +248,7 @@ class GpuOptimizeExecutor(
def targetBins(jobs: Seq[PartitionedBin]): Seq[PartitionedBin] = jobs
}

case class Compaction() extends OptimizeType {
private case class GpuCompaction() extends GpuOptimizeType {
def minNumFiles: Long = 2

def targetFiles: (Seq[AddFile], Seq[AddFile]) = {
Expand All @@ -261,7 +261,7 @@ class GpuOptimizeExecutor(
}
}

case class MultiDimOrdering() extends OptimizeType {
private case class GpuMultiDimOrdering() extends GpuOptimizeType {
def minNumFiles: Long = 1

def targetFiles: (Seq[AddFile], Seq[AddFile]) = {
Expand All @@ -271,7 +271,7 @@ class GpuOptimizeExecutor(
}
}

case class AutoCompaction() extends OptimizeType {
private case class GpuAutoCompaction() extends GpuOptimizeType {
def minNumFiles: Long = {
val minNumFiles =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES)
Expand Down Expand Up @@ -319,15 +319,15 @@ class GpuOptimizeExecutor(
}
}

object OptimizeType {
private object GpuOptimizeType {

def apply(isMultiDimClustering: Boolean, isAutoCompact: Boolean): OptimizeType = {
def apply(isMultiDimClustering: Boolean, isAutoCompact: Boolean): GpuOptimizeType = {
if (isMultiDimClustering) {
MultiDimOrdering()
GpuMultiDimOrdering()
} else if (isAutoCompact) {
AutoCompaction()
GpuAutoCompaction()
} else {
Compaction()
GpuCompaction()
}
}
}
Expand Down