Skip to content

Commit

Permalink
[Spark] Add Preserving Row Tracking in Delete (#2925)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description
Add Preserving Row Tracking in Delete by reading the metadata column and
writing it out to the physical column.

<!--
- Describe what this PR changes.
- Describe why we need the change.
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

## How was this patch tested?
Added UTs.
<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?
No.
<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
  • Loading branch information
longvu-db committed Apr 20, 2024
1 parent a1ddb8b commit fcca4a6
Show file tree
Hide file tree
Showing 4 changed files with 408 additions and 5 deletions.
32 changes: 30 additions & 2 deletions spark/src/main/scala/org/apache/spark/sql/delta/RowTracking.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.DeltaCommitTag.PreservedRowTrackingTag
import org.apache.spark.sql.delta.actions.{Metadata, Protocol, TableFeatureProtocolUtils}

import org.apache.spark.sql.DataFrame
Expand Down Expand Up @@ -75,8 +76,35 @@ object RowTracking {
RowCommitVersion.createMetadataStructField(protocol, metadata, nullable)
}

def preserveRowTrackingColumns(dataFrame: DataFrame, snapshot: SnapshotDescriptor): DataFrame = {
val dfWithRowIds = RowId.preserveRowIds(dataFrame, snapshot)
/**
* Return a copy of tagsMap with the [[DeltaCommitTag.PreservedRowTrackingTag.key]] tag added
* or replaced with the new value.
*/
private def addPreservedRowTrackingTag(
tagsMap: Map[String, String],
preserved: Boolean): Map[String, String] = {
tagsMap + (DeltaCommitTag.PreservedRowTrackingTag.key -> preserved.toString)
}

/**
* Sets the [[DeltaCommitTag.PreservedRowTrackingTag.key]] tag to true if not set. We add the tag
* to every operation because we assume all operations preserve row tracking by default. The
* absence of the tag means that row tracking is not preserved.
* Operations can set the tag to mark row tracking as preserved/not preserved.
*/
private[delta] def addPreservedRowTrackingTagIfNotSet(
snapshot: SnapshotDescriptor,
tagsMap: Map[String, String] = Map.empty): Map[String, String] = {
if (!isEnabled(snapshot.protocol, snapshot.metadata) ||
tagsMap.contains(PreservedRowTrackingTag.key)) {
return tagsMap
}
addPreservedRowTrackingTag(tagsMap, preserved = true)
}

def preserveRowTrackingColumns(
dfWithoutRowTrackingColumns: DataFrame, snapshot: SnapshotDescriptor): DataFrame = {
val dfWithRowIds = RowId.preserveRowIds(dfWithoutRowTrackingColumns, snapshot)
RowCommitVersion.preserveRowCommitVersions(dfWithRowIds, snapshot)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ case class DeleteCommand(
}

val deleteActions = performDelete(sparkSession, deltaLog, txn)
txn.commitIfNeeded(deleteActions, DeltaOperations.Delete(condition.toSeq))
txn.commitIfNeeded(actions = deleteActions,
op = DeltaOperations.Delete(condition.toSeq),
tags = RowTracking.addPreservedRowTrackingTagIfNotSet(txn.snapshot))
}
// Re-cache all cached plans(including this relation itself, if it's cached) that refer to
// this data source relation.
Expand Down Expand Up @@ -325,7 +327,9 @@ case class DeleteCommand(
// Keep everything from the resolved target except a new TahoeFileIndex
// that only involves the affected files instead of all files.
val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location)
val targetDF = Dataset.ofRows(sparkSession, newTarget)
val targetDF = RowTracking.preserveRowTrackingColumns(
dfWithoutRowTrackingColumns = Dataset.ofRows(sparkSession, newTarget),
snapshot = txn.snapshot)
val filterCond = Not(EqualNullSafe(cond, Literal.TrueLiteral))
val rewrittenActions = rewriteFiles(txn, targetDF, filterCond, filesToRewrite.length)
val (changeFiles, rewrittenFiles) = rewrittenActions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File
import scala.collection.JavaConverters._

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.actions.{AddFile, CommitInfo}
import org.apache.spark.sql.delta.rowtracking.RowTrackingTestUtils
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -142,4 +142,97 @@ trait RowIdTestUtils extends RowTrackingTestUtils with DeltaSQLCommandTest with
log.update().metadata.configuration
.get(MaterializedRowCommitVersion.MATERIALIZED_COLUMN_NAME_PROP)
}

/** Returns a Map of file path to base row ID from the AddFiles in a Snapshot. */
private def getAddFilePathToBaseRowIdMap(snapshot: Snapshot): Map[String, Long] = {
val allAddFiles = snapshot.allFiles.collect()
allAddFiles.foreach(addFile => assert(addFile.baseRowId.isDefined,
"Every AddFile should have a base row ID"))
allAddFiles.map(a => a.path -> a.baseRowId.get).toMap
}

/** Returns a Map of file path to base row ID from the RemoveFiles in a Snapshot. */
private def getRemoveFilePathToBaseRowIdMap(snapshot: Snapshot): Map[String, Long] = {
val removeFiles = snapshot.tombstones.collect()
removeFiles.foreach(removeFile => assert(removeFile.baseRowId.isDefined,
"Every RemoveFile should have a base row ID"))
removeFiles.map(r => r.path -> r.baseRowId.get).toMap
}

/** Check that the high watermark does not get updated if there aren't any new files */
def checkHighWatermarkBeforeAndAfterOperation(log: DeltaLog)(operation: => Unit): Unit = {
val prevSnapshot = log.update()
val prevHighWatermark = RowId.extractHighWatermark(prevSnapshot)
val prevAddFiles = getAddFilePathToBaseRowIdMap(prevSnapshot).keySet

operation

val newAddFiles = getAddFilePathToBaseRowIdMap(log.update()).keySet
val newFilesAdded = newAddFiles.diff(prevAddFiles).nonEmpty
val newHighWatermark = RowId.extractHighWatermark(log.update())

if (newFilesAdded) {
assert(prevHighWatermark.get < newHighWatermark.get,
"The high watermark should have been updated after creating new files")
} else {
assert(prevHighWatermark === newHighWatermark,
"The high watermark should not be updated when there are no new file")
}
}

/**
* Check that file actions do not violate Row ID invariants after an operation.
* More specifically:
* - We do not reassign the base row ID to the same AddFile.
* - RemoveFiles have the same base row ID as the corresponding AddFile
* with the same file path.
*/
def checkFileActionInvariantBeforeAndAfterOperation(log: DeltaLog)(operation: => Unit): Unit = {
val prevAddFilePathToBaseRowId = getAddFilePathToBaseRowIdMap(log.update())

operation

val snapshot = log.update()
val newAddFileBaseRowIdsMap = getAddFilePathToBaseRowIdMap(snapshot)
val newRemoveFileBaseRowIds = getRemoveFilePathToBaseRowIdMap(snapshot)

prevAddFilePathToBaseRowId.foreach { case (path, prevRowId) =>
if (newAddFileBaseRowIdsMap.contains(path)) {
val currRowId = newAddFileBaseRowIdsMap(path)
assert(currRowId === prevRowId,
"We should not reassign base row IDs if it's the same AddFile")
} else if (newRemoveFileBaseRowIds.contains(path)) {
assert(newRemoveFileBaseRowIds(path) === prevRowId,
"No new base row ID should be assigned to RemoveFiles")
}
}
}

/**
* Checks whether Row tracking is marked as preserved on the [[CommitInfo]] action
* committed during `operation`.
*/
def rowTrackingMarkedAsPreservedForCommit(log: DeltaLog)(operation: => Unit): Boolean = {
val versionPriorToCommit = log.update().version

operation

val versionOfCommit = log.update().version
assert(versionPriorToCommit < versionOfCommit)
val commitInfos = log.getChanges(versionOfCommit).flatMap(_._2).flatMap {
case commitInfo: CommitInfo => Some(commitInfo)
case _ => None
}.toList
assert(commitInfos.size === 1)
commitInfos.forall { commitInfo =>
commitInfo.tags
.getOrElse(Map.empty)
.getOrElse(DeltaCommitTag.PreservedRowTrackingTag.key, "false").toBoolean
}
}

def checkRowTrackingMarkedAsPreservedForCommit(log: DeltaLog)(operation: => Unit): Unit = {
assert(rowTrackingMarkedAsPreservedForCommit(log)(operation))
}

}
Loading

0 comments on commit fcca4a6

Please sign in to comment.