Skip to content

Commit

Permalink
Support multiple where calls in DeltaTable.optimize
Browse files Browse the repository at this point in the history
## Description

Resolves #1338

test suite & by check the metrics of optimize operation in history table
one test added which replicates the previous (test above it ) with multiple where statement

## Does this PR introduce _any_ user-facing changes?

Closes #1353

Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
GitOrigin-RevId: 17a4ed28cb5fa28a11f12426b8c869799b4d38a8
  • Loading branch information
sherlockbeard authored and allisonport-db committed Sep 6, 2022
1 parent 8938463 commit ddc3691
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
OptimizeTableCommand(
Option(ctx.path).map(string),
Option(ctx.table).map(visitTableIdentifier),
Option(ctx.partitionPredicate).map(extractRawText(_)), Map.empty)(interleaveBy)
Option(ctx.partitionPredicate).map(extractRawText(_)).toSeq, Map.empty)(interleaveBy)
}

override def visitDescribeDeltaDetail(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class DeltaOptimizeBuilder private(
sparkSession: SparkSession,
tableIdentifier: String,
options: Map[String, String]) extends AnalysisHelper {
@volatile private var partitionFilter: Option[String] = None
private var partitionFilter: Seq[String] = Seq.empty

/**
* Apply partition filter on this optimize command builder to limit
Expand All @@ -49,7 +49,7 @@ class DeltaOptimizeBuilder private(
* @since 2.0.0
*/
def where(partitionFilter: String): DeltaOptimizeBuilder = {
this.partitionFilter = Some(partitionFilter)
this.partitionFilter = this.partitionFilter :+ partitionFilter
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ abstract class OptimizeTableCommandBase extends RunnableCommand with DeltaComman
case class OptimizeTableCommand(
path: Option[String],
tableId: Option[TableIdentifier],
partitionPredicate: Option[String],
userPartitionPredicates: Seq[String],
options: Map[String, String])(val zOrderBy: Seq[UnresolvedAttribute])
extends OptimizeTableCommandBase with LeafRunnableCommand {

Expand All @@ -125,14 +125,15 @@ case class OptimizeTableCommand(
val partitionColumns = txn.snapshot.metadata.partitionColumns
// Parse the predicate expression into Catalyst expression and verify only simple filters
// on partition columns are present
val partitionPredicates = partitionPredicate.map(predicate => {
val predicates = parsePredicates(sparkSession, predicate)
verifyPartitionPredicates(
sparkSession,
partitionColumns,
predicates)
predicates
}).getOrElse(Seq.empty)

val partitionPredicates = userPartitionPredicates.flatMap { predicate =>
val predicates = parsePredicates(sparkSession, predicate)
verifyPartitionPredicates(
sparkSession,
partitionColumns,
predicates)
predicates
}

validateZorderByColumns(sparkSession, txn, zOrderBy)
val zOrderByColumns = zOrderBy.map(_.name).toSeq
Expand Down
38 changes: 20 additions & 18 deletions core/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,49 +47,50 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {
test("OPTIMIZE command is parsed as expected") {
val parser = new DeltaSqlParser(null)
assert(parser.parsePlan("OPTIMIZE tbl") ===
OptimizeTableCommand(None, Some(tblId("tbl")), None, Map.empty)(Seq()))
OptimizeTableCommand(None, Some(tblId("tbl")), Seq.empty, Map.empty)(Seq()))

assert(parser.parsePlan("OPTIMIZE db.tbl") ===
OptimizeTableCommand(None, Some(tblId("tbl", "db")), None, Map.empty)(Seq()))
OptimizeTableCommand(None, Some(tblId("tbl", "db")), Seq.empty, Map.empty)(Seq()))

assert(parser.parsePlan("OPTIMIZE tbl_${system:spark.testing}") ===
OptimizeTableCommand(None, Some(tblId("tbl_true")), None, Map.empty)(Seq()))
OptimizeTableCommand(None, Some(tblId("tbl_true")), Seq.empty, Map.empty)(Seq()))

withSQLConf("tbl_var" -> "tbl") {
assert(parser.parsePlan("OPTIMIZE ${tbl_var}") ===
OptimizeTableCommand(None, Some(tblId("tbl")), None, Map.empty)(Seq()))
OptimizeTableCommand(None, Some(tblId("tbl")), Seq.empty, Map.empty)(Seq()))

assert(parser.parsePlan("OPTIMIZE ${spark:tbl_var}") ===
OptimizeTableCommand(None, Some(tblId("tbl")), None, Map.empty)(Seq()))
OptimizeTableCommand(None, Some(tblId("tbl")), Seq.empty, Map.empty)(Seq()))

assert(parser.parsePlan("OPTIMIZE ${sparkconf:tbl_var}") ===
OptimizeTableCommand(None, Some(tblId("tbl")), None, Map.empty)(Seq()))
OptimizeTableCommand(None, Some(tblId("tbl")), Seq.empty, Map.empty)(Seq()))

assert(parser.parsePlan("OPTIMIZE ${hiveconf:tbl_var}") ===
OptimizeTableCommand(None, Some(tblId("tbl")), None, Map.empty)(Seq()))
OptimizeTableCommand(None, Some(tblId("tbl")), Seq.empty, Map.empty)(Seq()))

assert(parser.parsePlan("OPTIMIZE ${hivevar:tbl_var}") ===
OptimizeTableCommand(None, Some(tblId("tbl")), None, Map.empty)(Seq()))
OptimizeTableCommand(None, Some(tblId("tbl")), Seq.empty, Map.empty)(Seq()))
}

assert(parser.parsePlan("OPTIMIZE '/path/to/tbl'") ===
OptimizeTableCommand(Some("/path/to/tbl"), None, None, Map.empty)(Seq()))
OptimizeTableCommand(Some("/path/to/tbl"), None, Seq.empty, Map.empty)(Seq()))

assert(parser.parsePlan("OPTIMIZE delta.`/path/to/tbl`") ===
OptimizeTableCommand(None, Some(tblId("/path/to/tbl", "delta")), None, Map.empty)(Seq()))
OptimizeTableCommand(None, Some(tblId("/path/to/tbl", "delta")), Seq.empty, Map.empty)(Seq()))

assert(parser.parsePlan("OPTIMIZE tbl WHERE part = 1") ===
OptimizeTableCommand(None, Some(tblId("tbl")), Some("part = 1"), Map.empty)(Seq()))
OptimizeTableCommand(None, Some(tblId("tbl")), Seq("part = 1"), Map.empty)(Seq()))

assert(parser.parsePlan("OPTIMIZE tbl ZORDER BY (col1)") ===
OptimizeTableCommand(None, Some(tblId("tbl")), None, Map.empty)(Seq(unresolvedAttr("col1"))))
OptimizeTableCommand(None, Some(tblId("tbl")), Seq.empty, Map.empty)
(Seq(unresolvedAttr("col1"))))

assert(parser.parsePlan("OPTIMIZE tbl WHERE part = 1 ZORDER BY col1, col2.subcol") ===
OptimizeTableCommand(None, Some(tblId("tbl")), Some("part = 1"), Map.empty)(
OptimizeTableCommand(None, Some(tblId("tbl")), Seq("part = 1"), Map.empty)(
Seq(unresolvedAttr("col1"), unresolvedAttr("col2", "subcol"))))

assert(parser.parsePlan("OPTIMIZE tbl WHERE part = 1 ZORDER BY (col1, col2.subcol)") ===
OptimizeTableCommand(None, Some(tblId("tbl")), Some("part = 1"), Map.empty)(
OptimizeTableCommand(None, Some(tblId("tbl")), Seq("part = 1"), Map.empty)(
Seq(unresolvedAttr("col1"), unresolvedAttr("col2", "subcol"))))
}

Expand All @@ -99,18 +100,19 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {

// Use the new keywords in table name
assert(parser.parsePlan("OPTIMIZE optimize") ===
OptimizeTableCommand(None, Some(tblId("optimize")), None, Map.empty)(Seq()))
OptimizeTableCommand(None, Some(tblId("optimize")), Seq.empty, Map.empty)(Seq()))

assert(parser.parsePlan("OPTIMIZE zorder") ===
OptimizeTableCommand(None, Some(tblId("zorder")), None, Map.empty)(Seq()))
OptimizeTableCommand(None, Some(tblId("zorder")), Seq.empty, Map.empty)(Seq()))

// Use the new keywords in column name
assert(parser.parsePlan("OPTIMIZE tbl WHERE zorder = 1 and optimize = 2") ===
OptimizeTableCommand(None,
Some(tblId("tbl")), Some("zorder = 1 and optimize = 2"), Map.empty)(Seq()))
Some(tblId("tbl"))
, Seq("zorder = 1 and optimize = 2"), Map.empty)(Seq()))

assert(parser.parsePlan("OPTIMIZE tbl ZORDER BY (optimize, zorder)") ===
OptimizeTableCommand(None, Some(tblId("tbl")), None, Map.empty)(
OptimizeTableCommand(None, Some(tblId("tbl")), Seq.empty, Map.empty)(
Seq(unresolvedAttr("optimize"), unresolvedAttr("zorder"))))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,34 @@ trait OptimizeCompactionSuiteBase extends QueryTest
}
}

test("optimize command with multiple partition predicates with multiple where") {
withTempDir { tempDir =>
def writeData(count: Int): Unit = {
spark.range(count).select('id, lit("2017-10-10").cast("date") as 'date, 'id % 5 as 'part)
.write
.partitionBy("date", "part")
.format("delta")
.mode("append")
.save(tempDir.getAbsolutePath)
}

writeData(10)
writeData(100)

DeltaTable.forPath(tempDir.getAbsolutePath).optimize()
.where("part = 3")
.where("date = '2017-10-10'")
.executeCompaction()

val df = spark.read.format("delta").load(tempDir.getAbsolutePath)
val deltaLog = loadDeltaLog(tempDir.getAbsolutePath)
val part = "part".phy(deltaLog)
val files = groupInputFilesByPartition(df.inputFiles, deltaLog)
assert(files.filter(_._1._1 == part).minBy(_._2.length)._1 === (part, "3"),
"part 3 should have been optimized and have least amount of files")
}
}

/**
* Utility method to append the given data to the Delta table located at the given path.
* Optionally partitions the data.
Expand Down

0 comments on commit ddc3691

Please sign in to comment.