Skip to content

Commit

Permalink
Support idempotent overwrite and append on saveAsTable
Browse files Browse the repository at this point in the history
1. Unit test to ensure idempotent write is supported in saveAsTable
2. Unit test to ensure idempotent write is supported in save.

GitOrigin-RevId: ece4875f8f254f58a7595c448534a01967179ca8
  • Loading branch information
kamcheungting-db authored and vkorukanti committed Dec 21, 2022
1 parent b6a1c50 commit 574fe73
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta.commands

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.Action
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaUtils
Expand Down Expand Up @@ -116,43 +117,52 @@ case class CreateDeltaTableCommand(
assertPathEmpty(hadoopConf, tableWithLocation)
}
}

// Execute write command for `deltaWriter` by
// - replacing the metadata new target table for DataFrameWriterV2 writer if it is a
// REPLACE or CREATE_OR_REPLACE command,
// - running the write procedure of DataFrameWriter command and returning the
// new created actions,
// - returning the Delta Operation type of this DataFrameWriter
def doDeltaWrite(
deltaWriter: WriteIntoDelta,
schema: StructType): (Seq[Action], DeltaOperations.Operation) = {
// In the V2 Writer, methods like "replace" and "createOrReplace" implicitly mean that
// the metadata should be changed. This wasn't the behavior for DataFrameWriterV1.
if (!isV1Writer) {
replaceMetadataIfNecessary(
txn, tableWithLocation, options, schema)
}
val actions = deltaWriter.write(txn, sparkSession)
val op = getOperation(txn.metadata, isManagedTable, Some(options))
(actions, op)
}

// We are either appending/overwriting with saveAsTable or creating a new table with CTAS or
// we are creating a table as part of a RunnableCommand
query.get match {
case writer: WriteIntoDelta =>
// In the V2 Writer, methods like "replace" and "createOrReplace" implicitly mean that
// the metadata should be changed. This wasn't the behavior for DataFrameWriterV1.
if (!isV1Writer) {
replaceMetadataIfNecessary(
txn, tableWithLocation, options, writer.data.schema.asNullable)
}
val actions = writer.write(txn, sparkSession)
val op = getOperation(txn.metadata, isManagedTable, Some(options))
txn.commit(actions, op)
case deltaWriter: WriteIntoDelta =>
if (!deltaWriter.writeHasBeenExecuted(txn, sparkSession, Some(options))) {
val (actions, op) = doDeltaWrite(deltaWriter, deltaWriter.data.schema.asNullable)
txn.commit(actions, op)
}
case cmd: RunnableCommand =>
result = cmd.run(sparkSession)
case other =>
// When using V1 APIs, the `other` plan is not yet optimized, therefore, it is safe
// to once again go through analysis
val data = Dataset.ofRows(sparkSession, other)

// In the V2 Writer, methods like "replace" and "createOrReplace" implicitly mean that
// the metadata should be changed. This wasn't the behavior for DataFrameWriterV1.
if (!isV1Writer) {
replaceMetadataIfNecessary(
txn, tableWithLocation, options, other.schema.asNullable)
}

val actions = WriteIntoDelta(
val deltaWriter = WriteIntoDelta(
deltaLog = deltaLog,
mode = mode,
options,
partitionColumns = table.partitionColumnNames,
configuration = tableWithLocation.properties + ("comment" -> table.comment.orNull),
data = data).write(txn, sparkSession)

val op = getOperation(txn.metadata, isManagedTable, Some(options))
txn.commit(actions, op)
data = data)
if (!deltaWriter.writeHasBeenExecuted(txn, sparkSession, Some(options))) {
val (actions, op) = doDeltaWrite(deltaWriter, other.schema.asNullable)
txn.commit(actions, op)
}
}
} else {
def createTransactionLogOrVerify(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,7 @@ case class WriteIntoDelta(

override def run(sparkSession: SparkSession): Seq[Row] = {
deltaLog.withNewTransaction { txn =>
// If this batch has already been executed within this query, then return.
var skipExecution = hasBeenExecuted(txn)
if (skipExecution) {
if (writeHasBeenExecuted(txn, sparkSession, Some(options))) {
return Seq.empty
}

Expand All @@ -104,6 +102,21 @@ case class WriteIntoDelta(
Seq.empty
}

/**
* Determines whether this delta write command has been executed by comparing the current
* transaction version with the transaction version of the options of this write command.
* If the current transaction version is greater or equal to the transaction version of this
* write command, then this command has been executed and can be skipped.
*/
def writeHasBeenExecuted(
txn: OptimisticTransaction,
sparkSession: SparkSession,
options: Option[DeltaOptions]): Boolean = {
// If this batch has already been executed within this query, then return.
var skipExecution = hasBeenExecuted(txn)
skipExecution
}

// TODO: replace the method below with `CharVarcharUtils.replaceCharWithVarchar`, when 3.3 is out.
import org.apache.spark.sql.types.{ArrayType, CharType, DataType, MapType, VarcharType}
private def replaceCharWithVarchar(dt: DataType): DataType = dt match {
Expand Down
145 changes: 111 additions & 34 deletions core/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2261,45 +2261,122 @@ class DeltaSuite extends QueryTest
}
}

test("idempotent Dataframe writes") {
withTempDir{ dir =>
def idempotentWrite(
mode: String,
appId: String,
seq: DataFrame,
path: String,
name: String,
version: Long,
expectedCount: Long,
commitVersion: Int,
isSaveAsTable: Boolean = true): Unit = {
val df = seq.write.format("delta")
.option(DeltaOptions.TXN_VERSION, version)
.option(DeltaOptions.TXN_APP_ID, appId)
.mode(mode)
if (isSaveAsTable) {
df.option("path", path).saveAsTable(name)
} else {
df.save(path)
}
val i = spark.read.format("delta").load(path).count()
assert(i == expectedCount)
val snapshot = DeltaLog.forTable(spark, path).update()
assert(snapshot.version == (commitVersion - 1))
}

Seq((true, true), (true, false), (false, true), (false, false))
.foreach {case (isSaveAsTable, isLegacy) =>
val op = if (isSaveAsTable) "saveAsTable" else "save"
val version = if (isLegacy) "legacy" else "non-legacy"
val appId1 = "myAppId1"
val appId2 = "myAppId2"
def runQuery(appId: String, seq: Seq[Int], version: Long, expectedCount: Long): Unit = {
seq.toDF().write.format("delta")
.option(DeltaOptions.TXN_VERSION, version)
.option(DeltaOptions.TXN_APP_ID, appId)
.mode("append")
.save(dir.getCanonicalPath)
val i = spark.read.format("delta").load(dir.getCanonicalPath).count()
assert(i == expectedCount)
}
var s = Seq(1, 2, 3)
// The first 2 runs must succeed increasing the expected count.
runQuery(appId1, s, 1, 3)
runQuery(appId1, s, 2, 6)

// Even if the version is not consecutive, higher versions should commit successfully.
runQuery(appId1, s, 5, 9)

// This run should be ignored because it uses an older version.
runQuery(appId1, s, 5, 9)
val confs = if (isLegacy) Seq(SQLConf.USE_V1_SOURCE_LIST.key -> "tahoe,delta") else Seq.empty

// Use a different app ID, but same version. This should succeed.
runQuery(appId2, s, 5, 12)

// Verify that specifying only one of the options -- either appId or version -- fails.
val e1 = intercept[Exception] {
Seq(1, 2, 3).toDF().write.format("delta").option(DeltaOptions.TXN_APP_ID, 1)
.mode("append").save(dir.getCanonicalPath)
if (!(isSaveAsTable && isLegacy)) {
test(s"Idempotent $version Dataframe $op: append") {
withSQLConf(confs: _*) {
withTempDir { dir =>
val path = dir.getCanonicalPath
val name = "append_table_t1"
val mode = "append"
sql("DROP TABLE IF EXISTS append_table_t1")
val df = Seq((1, 2, 3), (4, 5, 6), (7, 8, 9)).toDF("a", "b", "c")
// The first 2 runs must succeed increasing the expected count.
idempotentWrite(mode, appId1, df, path, name, 1, 3, 1, isSaveAsTable)
idempotentWrite(mode, appId1, df, path, name, 2, 6, 2, isSaveAsTable)

// Even if the version is not consecutive, higher versions should commit successfully.
idempotentWrite(mode, appId1, df, path, name, 5, 9, 3, isSaveAsTable)

// This run should be ignored because it uses an older version.
idempotentWrite(mode, appId1, df, path, name, 5, 9, 3, isSaveAsTable)

// Use a different app ID, but same version. This should succeed.
idempotentWrite(mode, appId2, df, path, name, 5, 12, 4, isSaveAsTable)
idempotentWrite(mode, appId2, df, path, name, 5, 12, 4, isSaveAsTable)

// Verify that specifying only one of the options -- either appId or version -- fails.
val e1 = intercept[Exception] {
val stage = df.write.format("delta").option(DeltaOptions.TXN_APP_ID, 1).mode(mode)
if (isSaveAsTable) {
stage.option("path", path).saveAsTable(name)
} else {
stage.save(path)
}
}
assert(e1.getMessage.contains("Invalid options for idempotent Dataframe writes"))
val e2 = intercept[Exception] {
val stage = df.write.format("delta").option(DeltaOptions.TXN_VERSION, 1).mode(mode)
if (isSaveAsTable) {
stage.option("path", path).saveAsTable(name)
} else {
stage.save(path)
}
}
assert(e2.getMessage.contains("Invalid options for idempotent Dataframe writes"))
}
}
}
}
assert(e1.getMessage.contains("Invalid options for idempotent Dataframe writes"))
val e2 = intercept[Exception] {
Seq(1, 2, 3).toDF().write.format("delta").option(DeltaOptions.TXN_VERSION, 1)
.mode("append").save(dir.getCanonicalPath)

test(s"Idempotent $version Dataframe $op: overwrite") {
withSQLConf(confs: _*) {
withTempDir { dir =>
val path = dir.getCanonicalPath
val name = "overwrite_table_t1"
val mode = "overwrite"
sql("DROP TABLE IF EXISTS overwrite_table_t1")
val df = Seq((1, 2, 3), (4, 5, 6), (7, 8, 9)).toDF("a", "b", "c")
// The first 2 runs must succeed increasing the expected count.
idempotentWrite(mode, appId1, df, path, name, 1, 3, 1, isSaveAsTable)
idempotentWrite(mode, appId1, df, path, name, 2, 3, 2, isSaveAsTable)

// Even if the version is not consecutive, higher versions should commit successfully.
idempotentWrite(mode, appId1, df, path, name, 5, 3, 3, isSaveAsTable)

// This run should be ignored because it uses an older version.
idempotentWrite(mode, appId1, df, path, name, 5, 3, 3, isSaveAsTable)

// Use a different app ID, but same version. This should succeed.
idempotentWrite(mode, appId2, df, path, name, 5, 3, 4, isSaveAsTable)
idempotentWrite(mode, appId2, df, path, name, 5, 3, 4, isSaveAsTable)

// Verify that specifying only one of the options -- either appId or version -- fails.
val e1 = intercept[Exception] {
val stage = df.write.format("delta").option(DeltaOptions.TXN_APP_ID, 1).mode(mode)
if (isSaveAsTable) stage.option("path", path).saveAsTable(name) else stage.save(path)
}
assert(e1.getMessage.contains("Invalid options for idempotent Dataframe writes"))
val e2 = intercept[Exception] {
val stage = df.write.format("delta").option(DeltaOptions.TXN_VERSION, 1).mode(mode)
if (isSaveAsTable) stage.option("path", path).saveAsTable(name) else stage.save(path)
}
assert(e2.getMessage.contains("Invalid options for idempotent Dataframe writes"))
}
}
}
assert(e2.getMessage.contains("Invalid options for idempotent Dataframe writes"))
}
}

test("idempotent writes in streaming foreachBatch") {
Expand Down

0 comments on commit 574fe73

Please sign in to comment.