Skip to content

Commit

Permalink
Unify predicate strings in CommitInfo to record the information in a …
Browse files Browse the repository at this point in the history
…consistent way.

GitOrigin-RevId: 043a6a4181c112b9c9a45906c1275fbbdbbb1388
  • Loading branch information
Lukas Rupprecht authored and allisonport-db committed May 11, 2023
1 parent ba7dc56 commit dcad4fd
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import org.apache.spark.sql.delta.util.JsonUtils

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.DeltaMergeIntoClause
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{StructField, StructType}

Expand All @@ -40,7 +42,7 @@ object DeltaOperations {
* @param name The name of the operation.
*/
sealed abstract class Operation(val name: String) {
val parameters: Map[String, Any]
def parameters: Map[String, Any]

lazy val jsonEncodedValues: Map[String, String] =
parameters.mapValues(JsonUtils.toJson(_)).toMap
Expand All @@ -59,6 +61,12 @@ object DeltaOperations {
def changesData: Boolean = false
}

abstract class OperationWithPredicates(name: String, val predicates: Seq[Expression])
extends Operation(name) {
private val predicateString = JsonUtils.toJson(predicatesToString(predicates))
override def parameters: Map[String, Any] = Map("predicate" -> predicateString)
}

/** Recorded during batch inserts. Predicates can be provided for overwrites. */
case class Write(
mode: SaveMode,
Expand Down Expand Up @@ -123,8 +131,8 @@ object DeltaOperations {
override def changesData: Boolean = true
}
/** Recorded while deleting certain partitions. */
case class Delete(predicate: Seq[String]) extends Operation("DELETE") {
override val parameters: Map[String, Any] = Map("predicate" -> JsonUtils.toJson(predicate))
case class Delete(predicate: Seq[Expression])
extends OperationWithPredicates("DELETE", predicate) {
override val operationMetrics: Set[String] = DeltaOperationMetrics.DELETE

override def transformMetrics(metrics: Map[String, SQLMetric]): Map[String, String] = {
Expand Down Expand Up @@ -174,7 +182,7 @@ object DeltaOperations {
object MergePredicate {
def apply(mergeClause: DeltaMergeIntoClause): MergePredicate = {
MergePredicate(
predicate = mergeClause.condition.map(_.sql),
predicate = mergeClause.condition.map(_.simpleString(SQLConf.get.maxToStringFields)),
mergeClause.clauseType.toLowerCase())
}
}
Expand All @@ -188,16 +196,17 @@ object DeltaOperations {
*/
val OP_MERGE = "MERGE"
case class Merge(
predicate: Option[String],
predicate: Option[Expression],
updatePredicate: Option[String],
deletePredicate: Option[String],
insertPredicate: Option[String],
matchedPredicates: Seq[MergePredicate],
notMatchedPredicates: Seq[MergePredicate],
notMatchedBySourcePredicates: Seq[MergePredicate]) extends Operation(OP_MERGE) {
notMatchedBySourcePredicates: Seq[MergePredicate])
extends OperationWithPredicates(OP_MERGE, predicate.toSeq) {

override val parameters: Map[String, Any] = {
predicate.map("predicate" -> _).toMap ++
super.parameters ++
updatePredicate.map("updatePredicate" -> _).toMap ++
deletePredicate.map("deletePredicate" -> _).toMap ++
insertPredicate.map("insertPredicate" -> _).toMap +
Expand Down Expand Up @@ -230,7 +239,7 @@ object DeltaOperations {
object Merge {
/** constructor to provide default values for deprecated fields */
def apply(
predicate: Option[String],
predicate: Option[Expression],
matchedPredicates: Seq[MergePredicate],
notMatchedPredicates: Seq[MergePredicate],
notMatchedBySourcePredicates: Seq[MergePredicate]): Merge = Merge(
Expand All @@ -244,8 +253,8 @@ object DeltaOperations {
}

/** Recorded when an update operation is committed to the table. */
case class Update(predicate: Option[String]) extends Operation("UPDATE") {
override val parameters: Map[String, Any] = predicate.map("predicate" -> _).toMap
case class Update(predicate: Option[Expression])
extends OperationWithPredicates("UPDATE", predicate.toSeq) {
override val operationMetrics: Set[String] = DeltaOperationMetrics.UPDATE

override def changesData: Boolean = true
Expand Down Expand Up @@ -398,10 +407,8 @@ object DeltaOperations {
}

/** Recorded when recomputing stats on the table. */
case class ComputeStats(predicate: Seq[String]) extends Operation("COMPUTE STATS") {
override val parameters: Map[String, Any] = Map(
"predicate" -> JsonUtils.toJson(predicate))
}
case class ComputeStats(predicate: Seq[Expression])
extends OperationWithPredicates("COMPUTE STATS", predicate)

/** Recorded when restoring a Delta table to an older version. */
case class Restore(
Expand All @@ -415,7 +422,8 @@ object DeltaOperations {
override val operationMetrics: Set[String] = DeltaOperationMetrics.RESTORE
}

sealed abstract class OptimizeOrReorg(override val name: String) extends Operation(name)
sealed abstract class OptimizeOrReorg(override val name: String, predicates: Seq[Expression])
extends OperationWithPredicates(name, predicates)

/** operation name for OPTIMIZE command */
val OPTIMIZE_OPERATION_NAME = "OPTIMIZE"
Expand All @@ -424,11 +432,10 @@ object DeltaOperations {

/** Recorded when optimizing the table. */
case class Optimize(
predicate: Seq[String],
predicate: Seq[Expression],
zOrderBy: Seq[String] = Seq.empty
) extends OptimizeOrReorg(OPTIMIZE_OPERATION_NAME) {
override val parameters: Map[String, Any] = Map(
"predicate" -> JsonUtils.toJson(predicate),
) extends OptimizeOrReorg(OPTIMIZE_OPERATION_NAME, predicate) {
override val parameters: Map[String, Any] = super.parameters ++ Map(
ZORDER_PARAMETER_KEY -> JsonUtils.toJson(zOrderBy)
)

Expand Down Expand Up @@ -499,6 +506,15 @@ object DeltaOperations {
case class TestOperation(operationName: String = "TEST") extends Operation(operationName) {
override val parameters: Map[String, Any] = Map.empty
}

/**
* Helper method to convert a sequence of command predicates in the form of an
* [[Expression]]s to a sequence of Strings so be stored in the commit info.
*/
def predicatesToString(predicates: Seq[Expression]): Seq[String] = {
val maxToStringFields = SQLConf.get.maxToStringFields
predicates.map(_.simpleString(maxToStringFields))
}
}

private[delta] object DeltaOperationMetrics {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ case class DeleteCommand(
}

val deleteActions = performDelete(sparkSession, deltaLog, txn)
txn.commitIfNeeded(deleteActions, DeltaOperations.Delete(condition.map(_.sql).toSeq))
txn.commitIfNeeded(deleteActions, DeltaOperations.Delete(condition.toSeq))
}
// Re-cache all cached plans(including this relation itself, if it's cached) that refer to
// this data source relation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ case class MergeIntoCommand(
deltaTxn.commitIfNeeded(
finalActions,
DeltaOperations.Merge(
Option(condition.sql),
Option(condition),
matchedClauses.map(DeltaOperations.MergePredicate(_)),
notMatchedClauses.map(DeltaOperations.MergePredicate(_)),
notMatchedBySourceClauses.map(DeltaOperations.MergePredicate(_))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ class OptimizeExecutor(
val removedFiles = updates.collect { case r: RemoveFile => r }
val removedDVs = filesToProcess.filter(_.deletionVector != null).map(_.deletionVector).toSeq
if (addedFiles.size > 0) {
val operation = DeltaOperations.Optimize(partitionPredicate.map(_.sql), zOrderByColumns)
val operation = DeltaOperations.Optimize(partitionPredicate, zOrderByColumns)
val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles, removedDVs)
commitAndRetry(txn, operation, updates, metrics) { newTxn =>
val newPartitionSchema = newTxn.metadata.partitionSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ case class UpdateCommand(
txn.registerSQLMetrics(sparkSession, metrics)

val finalActions = createSetTransaction(sparkSession, deltaLog).toSeq ++ totalActions
txn.commitIfNeeded(finalActions, DeltaOperations.Update(condition.map(_.toString)))
txn.commitIfNeeded(finalActions, DeltaOperations.Update(condition))
sendDriverMetrics(sparkSession, metrics)

recordDeltaEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ object StatisticsCollection extends DeltaCommand {
}
}

txn.commit(newAddFiles, ComputeStats(predicates.map(_.sql)))
txn.commit(newAddFiles, ComputeStats(predicates))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.scalatest.GivenWhenThen
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
Expand Down Expand Up @@ -198,7 +199,8 @@ trait DeltaVacuumSuiteBase extends QueryTest
"numCopiedRows" -> createMetric(sparkContext, "total number of rows.")
)
txn.registerSQLMetrics(spark, metrics)
txn.commit(Seq(RemoveFile(path, Option(clock.getTimeMillis()))), Delete("true" :: Nil))
txn.commit(Seq(RemoveFile(path, Option(clock.getTimeMillis()))),
Delete(Seq(Literal.TrueLiteral)))
// scalastyle:on
case e: ExecuteVacuumInSQL =>
Given(s"*** Executing SQL: ${e.sql}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,23 @@ trait DescribeDeltaHistorySuiteBase
protected def checkLastOperation(
basePath: String,
expected: Seq[String],
columns: Seq[Column] = Seq($"operation", $"operationParameters.mode")): Unit = {
val df = io.delta.tables.DeltaTable.forPath(spark, basePath).history(1)
checkAnswer(df.select(columns: _*), Seq(Row(expected: _*)))
val df2 = spark.sql(s"DESCRIBE HISTORY delta.`$basePath` LIMIT 1")
checkAnswer(df2.select(columns: _*), Seq(Row(expected: _*)))
columns: Seq[Column] = Seq($"operation", $"operationParameters.mode"),
removeExpressionId: Boolean = false): Unit = {
var df = io.delta.tables.DeltaTable.forPath(spark, basePath).history(1)
df = df.select(columns: _*)
if (removeExpressionId) {
// As the expression ID is written as part of the column predicate (in the form of col#expId)
// but it is non-deterministic, we remove it here so that any comparison can just go against
// the column name
df = df.withColumn("predicate", regexp_replace(col("predicate"), "#[0-9]+", ""))
}
checkAnswer(df, Seq(Row(expected: _*)))
df = spark.sql(s"DESCRIBE HISTORY delta.`$basePath` LIMIT 1")
df = df.select(columns: _*)
if (removeExpressionId) {
df = df.withColumn("predicate", regexp_replace(col("predicate"), "#[0-9]+", ""))
}
checkAnswer(df, Seq(Row(expected: _*)))
}

protected def checkOperationMetrics(
Expand Down Expand Up @@ -487,7 +499,7 @@ trait DescribeDeltaHistorySuiteBase
checkLastOperation(
tempDir,
Seq("DELETE", """["(id = 1)"]"""),
Seq($"operation", $"operationParameters.predicate"))
Seq($"operation", $"operationParameters.predicate"), removeExpressionId = true)
}

testWithFlag("old and new writers") {
Expand Down
2 changes: 1 addition & 1 deletion python/delta/tests/test_deltatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,7 @@ def test_optimize_w_partition_filter(self) -> None:
# assertions
self.assertEqual(1, res.first().metrics.numFilesAdded)
self.assertEqual(2, res.first().metrics.numFilesRemoved)
self.assertEqual('["(key = \'a\')"]', op_params['predicate'])
self.assertEqual('''["('key = a)"]''', op_params['predicate'])

# test non-partition column
def optimize() -> None:
Expand Down

0 comments on commit dcad4fd

Please sign in to comment.