From fcca4a671b8f97f3a0b5e863c58d273eae36309e Mon Sep 17 00:00:00 2001 From: Thang Long Vu <107926660+longvu-db@users.noreply.github.com> Date: Sat, 20 Apr 2024 02:10:39 +0200 Subject: [PATCH] [Spark] Add Preserving Row Tracking in Delete (#2925) #### Which Delta project/connector is this regarding? - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Add Preserving Row Tracking in Delete by reading the metadata column and writing it out to the physical column. ## How was this patch tested? Added UTs. ## Does this PR introduce _any_ user-facing changes? No. --- .../apache/spark/sql/delta/RowTracking.scala | 32 +- .../sql/delta/commands/DeleteCommand.scala | 8 +- .../sql/delta/rowid/RowIdTestUtils.scala | 95 +++++- .../delta/rowid/RowTrackingDeleteSuite.scala | 278 ++++++++++++++++++ 4 files changed, 408 insertions(+), 5 deletions(-) create mode 100644 spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowTrackingDeleteSuite.scala diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/RowTracking.scala b/spark/src/main/scala/org/apache/spark/sql/delta/RowTracking.scala index fe35c3394d8..adf4fe21fc7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/RowTracking.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/RowTracking.scala @@ -16,6 +16,7 @@ package org.apache.spark.sql.delta +import org.apache.spark.sql.delta.DeltaCommitTag.PreservedRowTrackingTag import org.apache.spark.sql.delta.actions.{Metadata, Protocol, TableFeatureProtocolUtils} import org.apache.spark.sql.DataFrame @@ -75,8 +76,35 @@ object RowTracking { RowCommitVersion.createMetadataStructField(protocol, metadata, nullable) } - def preserveRowTrackingColumns(dataFrame: DataFrame, snapshot: SnapshotDescriptor): DataFrame = { - val dfWithRowIds = RowId.preserveRowIds(dataFrame, snapshot) + /** + * Return a copy of tagsMap with the [[DeltaCommitTag.PreservedRowTrackingTag.key]] tag added + * or replaced with the new value. + */ + private def addPreservedRowTrackingTag( + tagsMap: Map[String, String], + preserved: Boolean): Map[String, String] = { + tagsMap + (DeltaCommitTag.PreservedRowTrackingTag.key -> preserved.toString) + } + + /** + * Sets the [[DeltaCommitTag.PreservedRowTrackingTag.key]] tag to true if not set. We add the tag + * to every operation because we assume all operations preserve row tracking by default. The + * absence of the tag means that row tracking is not preserved. + * Operations can set the tag to mark row tracking as preserved/not preserved. + */ + private[delta] def addPreservedRowTrackingTagIfNotSet( + snapshot: SnapshotDescriptor, + tagsMap: Map[String, String] = Map.empty): Map[String, String] = { + if (!isEnabled(snapshot.protocol, snapshot.metadata) || + tagsMap.contains(PreservedRowTrackingTag.key)) { + return tagsMap + } + addPreservedRowTrackingTag(tagsMap, preserved = true) + } + + def preserveRowTrackingColumns( + dfWithoutRowTrackingColumns: DataFrame, snapshot: SnapshotDescriptor): DataFrame = { + val dfWithRowIds = RowId.preserveRowIds(dfWithoutRowTrackingColumns, snapshot) RowCommitVersion.preserveRowCommitVersions(dfWithRowIds, snapshot) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala index 98c11f5a5c2..279c368f4b6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala @@ -126,7 +126,9 @@ case class DeleteCommand( } val deleteActions = performDelete(sparkSession, deltaLog, txn) - txn.commitIfNeeded(deleteActions, DeltaOperations.Delete(condition.toSeq)) + txn.commitIfNeeded(actions = deleteActions, + op = DeltaOperations.Delete(condition.toSeq), + tags = RowTracking.addPreservedRowTrackingTagIfNotSet(txn.snapshot)) } // Re-cache all cached plans(including this relation itself, if it's cached) that refer to // this data source relation. @@ -325,7 +327,9 @@ case class DeleteCommand( // Keep everything from the resolved target except a new TahoeFileIndex // that only involves the affected files instead of all files. val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location) - val targetDF = Dataset.ofRows(sparkSession, newTarget) + val targetDF = RowTracking.preserveRowTrackingColumns( + dfWithoutRowTrackingColumns = Dataset.ofRows(sparkSession, newTarget), + snapshot = txn.snapshot) val filterCond = Not(EqualNullSafe(cond, Literal.TrueLiteral)) val rewrittenActions = rewriteFiles(txn, targetDF, filterCond, filesToRewrite.length) val (changeFiles, rewrittenFiles) = rewrittenActions diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowIdTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowIdTestUtils.scala index 4e929053f64..577414d5046 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowIdTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowIdTestUtils.scala @@ -21,7 +21,7 @@ import java.io.File import scala.collection.JavaConverters._ import org.apache.spark.sql.delta._ -import org.apache.spark.sql.delta.actions.AddFile +import org.apache.spark.sql.delta.actions.{AddFile, CommitInfo} import org.apache.spark.sql.delta.rowtracking.RowTrackingTestUtils import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.hadoop.fs.Path @@ -142,4 +142,97 @@ trait RowIdTestUtils extends RowTrackingTestUtils with DeltaSQLCommandTest with log.update().metadata.configuration .get(MaterializedRowCommitVersion.MATERIALIZED_COLUMN_NAME_PROP) } + + /** Returns a Map of file path to base row ID from the AddFiles in a Snapshot. */ + private def getAddFilePathToBaseRowIdMap(snapshot: Snapshot): Map[String, Long] = { + val allAddFiles = snapshot.allFiles.collect() + allAddFiles.foreach(addFile => assert(addFile.baseRowId.isDefined, + "Every AddFile should have a base row ID")) + allAddFiles.map(a => a.path -> a.baseRowId.get).toMap + } + + /** Returns a Map of file path to base row ID from the RemoveFiles in a Snapshot. */ + private def getRemoveFilePathToBaseRowIdMap(snapshot: Snapshot): Map[String, Long] = { + val removeFiles = snapshot.tombstones.collect() + removeFiles.foreach(removeFile => assert(removeFile.baseRowId.isDefined, + "Every RemoveFile should have a base row ID")) + removeFiles.map(r => r.path -> r.baseRowId.get).toMap + } + + /** Check that the high watermark does not get updated if there aren't any new files */ + def checkHighWatermarkBeforeAndAfterOperation(log: DeltaLog)(operation: => Unit): Unit = { + val prevSnapshot = log.update() + val prevHighWatermark = RowId.extractHighWatermark(prevSnapshot) + val prevAddFiles = getAddFilePathToBaseRowIdMap(prevSnapshot).keySet + + operation + + val newAddFiles = getAddFilePathToBaseRowIdMap(log.update()).keySet + val newFilesAdded = newAddFiles.diff(prevAddFiles).nonEmpty + val newHighWatermark = RowId.extractHighWatermark(log.update()) + + if (newFilesAdded) { + assert(prevHighWatermark.get < newHighWatermark.get, + "The high watermark should have been updated after creating new files") + } else { + assert(prevHighWatermark === newHighWatermark, + "The high watermark should not be updated when there are no new file") + } + } + + /** + * Check that file actions do not violate Row ID invariants after an operation. + * More specifically: + * - We do not reassign the base row ID to the same AddFile. + * - RemoveFiles have the same base row ID as the corresponding AddFile + * with the same file path. + */ + def checkFileActionInvariantBeforeAndAfterOperation(log: DeltaLog)(operation: => Unit): Unit = { + val prevAddFilePathToBaseRowId = getAddFilePathToBaseRowIdMap(log.update()) + + operation + + val snapshot = log.update() + val newAddFileBaseRowIdsMap = getAddFilePathToBaseRowIdMap(snapshot) + val newRemoveFileBaseRowIds = getRemoveFilePathToBaseRowIdMap(snapshot) + + prevAddFilePathToBaseRowId.foreach { case (path, prevRowId) => + if (newAddFileBaseRowIdsMap.contains(path)) { + val currRowId = newAddFileBaseRowIdsMap(path) + assert(currRowId === prevRowId, + "We should not reassign base row IDs if it's the same AddFile") + } else if (newRemoveFileBaseRowIds.contains(path)) { + assert(newRemoveFileBaseRowIds(path) === prevRowId, + "No new base row ID should be assigned to RemoveFiles") + } + } + } + + /** + * Checks whether Row tracking is marked as preserved on the [[CommitInfo]] action + * committed during `operation`. + */ + def rowTrackingMarkedAsPreservedForCommit(log: DeltaLog)(operation: => Unit): Boolean = { + val versionPriorToCommit = log.update().version + + operation + + val versionOfCommit = log.update().version + assert(versionPriorToCommit < versionOfCommit) + val commitInfos = log.getChanges(versionOfCommit).flatMap(_._2).flatMap { + case commitInfo: CommitInfo => Some(commitInfo) + case _ => None + }.toList + assert(commitInfos.size === 1) + commitInfos.forall { commitInfo => + commitInfo.tags + .getOrElse(Map.empty) + .getOrElse(DeltaCommitTag.PreservedRowTrackingTag.key, "false").toBoolean + } + } + + def checkRowTrackingMarkedAsPreservedForCommit(log: DeltaLog)(operation: => Unit): Unit = { + assert(rowTrackingMarkedAsPreservedForCommit(log)(operation)) + } + } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowTrackingDeleteSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowTrackingDeleteSuite.scala new file mode 100644 index 00000000000..5895f1d597b --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowTrackingDeleteSuite.scala @@ -0,0 +1,278 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.rowid + +import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN +import org.apache.spark.sql.delta.sources.DeltaSQLConf + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.functions.col + +trait RowTrackingDeleteTestDimension + extends QueryTest + with RowIdTestUtils { + protected def deletionVectorEnabled: Boolean = false + protected def cdfEnabled: Boolean = false + val testTableName = "rowIdDeleteTable" + val initialNumRows = 5000 + + /** + * Create a table and validate that it has Row IDs and the expected number of files. + */ + def createTestTable( + tableName: String, + isPartitioned: Boolean, + multipleFilesPerPartition: Boolean): Unit = { + // We disable Optimize Write to ensure the right number of files are created. + withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_ENABLED.key -> "false") { + val numFilesPerPartition = if (isPartitioned && multipleFilesPerPartition) 2 else 1 + val numRowsPerPartition = 100 + val expectedNumFiles = if (isPartitioned) { + numFilesPerPartition * (initialNumRows / numRowsPerPartition) + } else { + 10 + } + val partitionColumnValue = (col("id") / numRowsPerPartition).cast("int") + + val df = spark.range(0, initialNumRows, 1, expectedNumFiles) + .withColumn("part", partitionColumnValue) + if (isPartitioned) { + df.repartition(numFilesPerPartition) + .write + .format("delta") + .partitionBy("part") + .saveAsTable(tableName) + } else { + df.write + .format("delta") + .saveAsTable(tableName) + } + + val (log, snapshot) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(tableName)) + assert(snapshot.allFiles.count() === expectedNumFiles) + } + } + + def withRowIdTestTable(isPartitioned: Boolean)(f: => Unit): Unit = { + withRowTrackingEnabled(enabled = true) { + withTable(testTableName) { + createTestTable(testTableName, isPartitioned, multipleFilesPerPartition = false) + f + } + } + } + + /** + * Read the stable row IDs before and after the DELETE operation. + * Validate the row IDs are the same. + */ + def deleteAndValidateStableRowId(whereCondition: Option[String]): Unit = { + val expectedRows: Array[Row] = spark.table(testTableName) + .select("id", RowId.QUALIFIED_COLUMN_NAME, RowCommitVersion.QUALIFIED_COLUMN_NAME) + .where(s"NOT (${whereCondition.getOrElse("true")})") + .collect() + + val log = DeltaLog.forTable(spark, TableIdentifier(testTableName)) + checkRowTrackingMarkedAsPreservedForCommit(log) { + checkFileActionInvariantBeforeAndAfterOperation(log) { + checkHighWatermarkBeforeAndAfterOperation(log) { + executeDelete(whereCondition) + } + } + } + + val actualDF = spark.table(testTableName) + .select("id", RowId.QUALIFIED_COLUMN_NAME, RowCommitVersion.QUALIFIED_COLUMN_NAME) + checkAnswer(actualDF, expectedRows) + } + + def executeDelete(whereCondition: Option[String]): Unit = { + val whereClause = whereCondition.map(cond => s" WHERE $cond").getOrElse("") + spark.sql(s"DELETE FROM $testTableName$whereClause") + } + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey, cdfEnabled.toString) + .set(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey, + deletionVectorEnabled.toString) + } +} + +trait RowTrackingDeleteSuiteBase extends RowTrackingDeleteTestDimension { + val subqueryTableName = "subqueryTable" + + for { + isPartitioned <- BOOLEAN_DOMAIN + whereClause <- Seq( + "id IN (5, 7, 11, 57, 66, 77, 79, 88, 91, 95)", // 0.2%, 10 rows match + "part = 5", // 10%, 500 rows match + "id % 20 = 0", // 20%, 1000 rows match + "id >= 0" // 100%, 5000 rows match + ) + } { + test(s"DELETE preserves Row IDs, isPartitioned=$isPartitioned, whereClause=`$whereClause`") { + withRowIdTestTable(isPartitioned) { + deleteAndValidateStableRowId(Some(whereClause)) + } + } + } + + test("Preserving Row Tracking - Subqueries are not supported in DELETE") { + withRowIdTestTable(isPartitioned = false) { + withTable(subqueryTableName) { + createTestTable(subqueryTableName, isPartitioned = false, multipleFilesPerPartition = false) + val ex = intercept[AnalysisException] { + deleteAndValidateStableRowId(Some( + s"id in (SELECT id FROM $subqueryTableName WHERE id = 7 OR id = 11)")) + }.getMessage + assert(ex.contains("Subqueries are not supported in the DELETE")) + } + } + } + + for (isPartitioned <- BOOLEAN_DOMAIN) { + test(s"Multiple DELETEs preserve Row IDs, isPartitioned=$isPartitioned") { + withRowIdTestTable(isPartitioned) { + val whereClause1 = "id % 20 = 0" + deleteAndValidateStableRowId(Some(whereClause1)) + val whereClause2 = "id % 10 = 0" + deleteAndValidateStableRowId(Some(whereClause2)) + } + } + } + + for (isPartitioned <- BOOLEAN_DOMAIN) { + test(s"Insert after DELETE on whole table, isPartitioned=$isPartitioned") { + withRowIdTestTable(isPartitioned) { + // Delete whole table. + deleteAndValidateStableRowId(whereCondition = None) + + spark.sql(s"INSERT INTO $testTableName VALUES (1, 0), (2, 0), (3, 0), (4, 0)") + + // The new rows should have new row IDs. + val actualDF = spark.table(testTableName) + .select("id", RowId.QUALIFIED_COLUMN_NAME) + assert(actualDF.filter(s"row_id < $initialNumRows").count() <= 0) + } + } + } + + for { + isPartitioned <- BOOLEAN_DOMAIN + } { + test(s"DELETE with optimized writes preserves Row ID, isPartitioned=$isPartitioned") { + withRowTrackingEnabled(enabled = true) { + withTable(testTableName) { + createTestTable(testTableName, isPartitioned, multipleFilesPerPartition = true) + val whereClause = "id % 20 = 0" + withSQLConf( + DeltaSQLConf.DELTA_OPTIMIZE_WRITE_ENABLED.key -> "true" + ) { + deleteAndValidateStableRowId(whereCondition = Some(whereClause)) + + val (log, snapshot) = + DeltaLog.forTableWithSnapshot(spark, TableIdentifier(testTableName)) + val currentNumFiles = snapshot.allFiles.count() + + val expectedNumFiles = if (deletionVectorEnabled) { + if (isPartitioned) 100 else 10 + } else { + if (isPartitioned) 53 else 1 + } + + assert(currentNumFiles === expectedNumFiles, + s"The current num files $currentNumFiles is unexpected for optimized writes") + } + } + } + } + } + + test("Row tracking marked as not preserved when row tracking disabled") { + withRowTrackingEnabled(enabled = false) { + withTable(testTableName) { + createTestTable(testTableName, isPartitioned = false, multipleFilesPerPartition = false) + val log = DeltaLog.forTable(spark, TableIdentifier(testTableName)) + assert( + !rowTrackingMarkedAsPreservedForCommit(log)( + executeDelete(whereCondition = Some("id = 5")))) + } + } + } +} + +trait RowTrackingDeleteDvBase + extends RowTrackingDeleteTestDimension + with DeletionVectorsTestUtils { + + override def beforeAll(): Unit = { + super.beforeAll() + enableDeletionVectorsInNewTables(spark.conf) + } + + override protected def deletionVectorEnabled = true + + for (isPartitioned <- BOOLEAN_DOMAIN) { + test(s"DELETE with persistent DVs disabled, isPartitioned=$isPartitioned") { + val whereClause = "id % 20 = 0" + withDeletionVectorsEnabled(enabled = false) { + withRowIdTestTable(isPartitioned) { + deleteAndValidateStableRowId(whereCondition = Some(whereClause)) + } + } + } + } +} + +trait RowTrackingDeleteCDCBase extends RowTrackingDeleteTestDimension { + override protected def cdfEnabled = true +} + +// No Column Mapping concrete test suites +class RowTrackingDeleteSuite extends RowTrackingDeleteSuiteBase + +class RowTrackingDeleteDvSuite extends RowTrackingDeleteSuiteBase + with RowTrackingDeleteDvBase + +class RowTrackingDeleteCDCSuite extends RowTrackingDeleteSuiteBase + with RowTrackingDeleteCDCBase + +class RowTrackingDeleteCDCDvSuite extends RowTrackingDeleteSuiteBase + with RowTrackingDeleteCDCBase + with RowTrackingDeleteDvBase + +// Name Column Mapping concrete test suites +class RowTrackingDeleteNameColumnMappingSuite extends RowTrackingDeleteSuiteBase + with DeltaColumnMappingEnableNameMode + +class RowTrackingDeleteCDCDvNameColumnMappingSuite extends RowTrackingDeleteSuiteBase + with RowTrackingDeleteCDCBase + with RowTrackingDeleteDvBase + with DeltaColumnMappingEnableNameMode + +// ID Column Mapping concrete test suites +class RowTrackingDeleteIdColumnMappingSuite extends RowTrackingDeleteSuiteBase + with DeltaColumnMappingEnableIdMode + +class RowTrackingDeleteCDCDvIdColumnMappingSuite extends RowTrackingDeleteSuiteBase + with RowTrackingDeleteCDCBase + with RowTrackingDeleteDvBase + with DeltaColumnMappingEnableIdMode