Skip to content

Commit

Permalink
[Spark] CreateOrReplace table command should not follow dynamic parti…
Browse files Browse the repository at this point in the history
…tion overwrite option

CreateOrReplace table command accidentally followed DPO semantics when replacing an existing table.
This PR adds writeOptions to the write command to differenciate a replace command from other inserts.

Closes delta-io#2482

GitOrigin-RevId: f1085d3a34775d93949b55ec52f2f2e0af09bcc6
  • Loading branch information
sabir-akhadov authored and vkorukanti committed Jan 12, 2024
1 parent 68d56e9 commit 11cd832
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,15 @@ case class CreateDeltaTableCommand(
var actions = deltaWriter.write(
txn,
sparkSession,
ClusteredTableUtils.getClusterBySpecOptional(table)
ClusteredTableUtils.getClusterBySpecOptional(table),
// Pass this option to the writer so that it can differentiate between an INSERT and a
// REPLACE command. This is needed because the writer is shared between the two commands.
// But some options, such as dynamic partition overwrite, are only valid for INSERT.
// Only allow createOrReplace command which is not a V1 writer.
// saveAsTable() command uses this same code path and is marked as a V1 writer.
// We do not want saveAsTable() to be treated as a REPLACE command wrt dynamic partition
// overwrite.
isTableReplace = isReplace && !isV1Writer
)
// Metadata updates for creating table (with any writer) and replacing table
// (only with V1 writer) will be handled inside WriteIntoDelta.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ case class WriteIntoDelta(
override def write(
txn: OptimisticTransaction,
sparkSession: SparkSession,
clusterBySpecOpt: Option[ClusterBySpec] = None): Seq[Action] = {
clusterBySpecOpt: Option[ClusterBySpec] = None,
isTableReplace: Boolean = false): Seq[Action] = {
import org.apache.spark.sql.delta.implicits._
if (txn.readVersion > -1) {
// This table already exists, check if the insert is valid.
Expand Down Expand Up @@ -175,6 +176,9 @@ case class WriteIntoDelta(
if (txn.metadata.partitionColumns.isEmpty) {
// We ignore dynamic partition overwrite mode for non-partitioned tables
false
} else if (isTableReplace) {
// A replace table command should always replace the table, not just some partitions.
false
} else if (options.replaceWhere.nonEmpty) {
if (options.partitionOverwriteModeInOptions && options.isDynamicPartitionOverwriteMode) {
// replaceWhere and dynamic partition overwrite conflict because they both specify which
Expand All @@ -188,7 +192,9 @@ case class WriteIntoDelta(
// precedence over session configs
false
}
} else options.isDynamicPartitionOverwriteMode
} else {
options.isDynamicPartitionOverwriteMode
}
}

if (useDynamicPartitionOverwriteMode && canOverwriteSchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ trait WriteIntoDeltaLike {
def write(
txn: OptimisticTransaction,
sparkSession: SparkSession,
clusterBySpecOpt: Option[ClusterBySpec] = None): Seq[Action]
clusterBySpecOpt: Option[ClusterBySpec] = None,
isTableReplace: Boolean = false): Seq[Action]

val deltaLog: DeltaLog

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.commons.io.FileUtils
import org.apache.parquet.format.CompressionCodec

import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.internal.SQLConf.PARTITION_OVERWRITE_MODE
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -306,6 +307,26 @@ class DeltaOptionSuite extends QueryTest
}
}

test("overwriteSchema=true should be invalid with partitionOverwriteMode=dynamic, " +
"saveAsTable") {
withTable("temp") {
val e = intercept[DeltaIllegalArgumentException] {
withSQLConf(DeltaSQLConf.DYNAMIC_PARTITION_OVERWRITE_ENABLED.key -> "true") {
Seq(1, 2, 3).toDF
.withColumn("part", $"value" % 2)
.write
.mode("overwrite")
.format("delta")
.partitionBy("part")
.option(OVERWRITE_SCHEMA_OPTION, "true")
.option(PARTITION_OVERWRITE_MODE_OPTION, "dynamic")
.saveAsTable("temp")
}
}
assert(e.getErrorClass == "DELTA_OVERWRITE_SCHEMA_WITH_DYNAMIC_PARTITION_OVERWRITE")
}
}

test("Prohibit spark.databricks.delta.dynamicPartitionOverwrite.enabled=false in " +
"dynamic partition overwrite mode") {
withTempDir { tempDir =>
Expand Down Expand Up @@ -334,4 +355,78 @@ class DeltaOptionSuite extends QueryTest
}
}
}

for (createOrReplace <- Seq("CREATE OR REPLACE", "REPLACE")) {
test(s"$createOrReplace table command should not respect " +
"dynamic partition overwrite mode") {
withTempDir { tempDir =>
Seq(0, 1).toDF
.withColumn("key", $"value" % 2)
.withColumn("stringColumn", lit("string"))
.withColumn("part", $"value" % 2)
.write
.format("delta")
.partitionBy("part")
.save(tempDir.getAbsolutePath)
withSQLConf(PARTITION_OVERWRITE_MODE.key -> "dynamic") {
// Write only to one partition with a different schema type of stringColumn.
sql(
s"""
|$createOrReplace TABLE delta.`${tempDir.getAbsolutePath}`
|USING delta
|PARTITIONED BY (part)
|LOCATION '${tempDir.getAbsolutePath}'
|AS SELECT -1 as value, 0 as part, 0 as stringColumn
|""".stripMargin)
assert(spark.read.format("delta").load(tempDir.getAbsolutePath).count() == 1,
"Table should be fully replaced even with DPO mode enabled")
}
}
}
}

// Same test as above but using DeltaWriter V2.
test("create or replace table V2 should not respect dynamic partition overwrite mode") {
withTable("temp") {
Seq(0, 1).toDF
.withColumn("part", $"value" % 2)
.write
.format("delta")
.partitionBy("part")
.saveAsTable("temp")
withSQLConf(PARTITION_OVERWRITE_MODE.key -> "dynamic") {
// Write to one partition only.
Seq(0).toDF
.withColumn("part", $"value" % 2)
.writeTo("temp")
.using("delta")
.createOrReplace()
assert(spark.read.format("delta").table("temp").count() == 1,
"Table should be fully replaced even with DPO mode enabled")
}
}
}

// Same test as above but using saveAsTable.
test("saveAsTable with overwrite should respect dynamic partition overwrite mode") {
withTable("temp") {
Seq(0, 1).toDF
.withColumn("part", $"value" % 2)
.write
.format("delta")
.partitionBy("part")
.saveAsTable("temp")
// Write to one partition only.
Seq(0).toDF
.withColumn("part", $"value" % 2)
.write
.mode("overwrite")
.option(PARTITION_OVERWRITE_MODE_OPTION, "dynamic")
.partitionBy("part")
.format("delta")
.saveAsTable("temp")
assert(spark.read.format("delta").table("temp").count() == 2,
"Table should keep the original partition with DPO mode enabled.")
}
}
}

0 comments on commit 11cd832

Please sign in to comment.