Skip to content

Commit

Permalink
Assign fresh Row IDs during commit
Browse files Browse the repository at this point in the history
## Description
This change implements assigning unique fresh Row IDs when committing files on tables that support Row Ids:
- Set the `baseRowId` field of every `add` and `remove` actions in commits.
- Generate `highWaterMark` action to update the Row ID high watermark.
- Gracefully resolve conflicts between transactions by reassigning overlapping Row IDs before committing.

- Adding tests to RowIdSuite to cover assigning fresh Row IDs.
- Adding tests to CheckpointSuite to ensure `baseRowId` and `highWaterMark` information survives checkpointing.

Closes #1723

GitOrigin-RevId: 3ab7cba5b66585baf17de10b1d5fbe6e320e7665
  • Loading branch information
johanl-db authored and vkorukanti committed May 16, 2023
1 parent c8b5acb commit ba10d15
Show file tree
Hide file tree
Showing 16 changed files with 508 additions and 29 deletions.
16 changes: 16 additions & 0 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,22 @@
],
"sqlState" : "0AKDC"
},
"DELTA_CONVERT_TO_DELTA_ROW_TRACKING_WITHOUT_STATS" : {
"message" : [
"Cannot enable row tracking without collecting statistics.",
"If you want to enable row tracking, do the following:",
" 1. Enable statistics collection by running the command",
" SET <statisticsCollectionPropertyKey> = true",
" 2. Run CONVERT TO DELTA without the NO STATISTICS option.",
"",
"If you do not want to collect statistics, disable row tracking:",
" 1. Deactivate enabling the table feature by default by running the command:",
" RESET <rowTrackingTableFeatureDefaultKey>",
" 2. Deactivate the table property by default by running:",
" SET <rowTrackingDefaultPropertyKey> = false"
],
"sqlState" : "22000"
},
"DELTA_CREATE_EXTERNAL_TABLE_WITHOUT_SCHEMA" : {
"message" : [
"",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,8 @@ object Checkpoints extends DeltaLogging {
col("add.modificationTime"),
col("add.dataChange"), // actually not really useful here
col("add.tags"),
col("add.deletionVector")) ++
col("add.deletionVector"),
col("add.baseRowId")) ++
additionalCols: _*
))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ case class VersionChecksum(
numMetadata: Long,
numProtocol: Long,
setTransactions: Option[Seq[SetTransaction]],
rowIdHighWaterMark: Option[RowIdHighWaterMark] = None,
metadata: Metadata,
protocol: Protocol,
histogramOpt: Option[FileSizeHistogram],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,18 @@ import org.apache.spark.sql.types.StructType
* @param readSnapshot read [[Snapshot]] used for the transaction
* @param commitInfo [[CommitInfo]] for the commit
*/
private[delta] class CurrentTransactionInfo(
private[delta] case class CurrentTransactionInfo(
val txnId: String,
val readPredicates: Seq[DeltaTableReadPredicate],
val readFiles: Set[AddFile],
val readWholeTable: Boolean,
val readAppIds: Set[String],
val metadata: Metadata,
val protocol: Protocol,
val actions: Seq[Action],
val readSnapshot: Snapshot,
val commitInfo: Option[CommitInfo]) {
val commitInfo: Option[CommitInfo],
val readRowIdHighWatermark: RowIdHighWaterMark) {

/**
* Final actions to commit - including the [[CommitInfo]] which should always come first so we can
Expand Down Expand Up @@ -120,7 +122,8 @@ private[delta] class ConflictChecker(
protected val timingStats = mutable.HashMap[String, Long]()
protected val deltaLog = initialCurrentTransactionInfo.readSnapshot.deltaLog

def currentTransactionInfo: CurrentTransactionInfo = initialCurrentTransactionInfo
protected var currentTransactionInfo: CurrentTransactionInfo = initialCurrentTransactionInfo

protected val winningCommitSummary: WinningCommitSummary = createWinningCommitSummary()

/**
Expand All @@ -135,6 +138,7 @@ private[delta] class ConflictChecker(
checkForDeletedFilesAgainstCurrentTxnReadFiles()
checkForDeletedFilesAgainstCurrentTxnDeletedFiles()
checkForUpdatedApplicationTransactionIdsThatCurrentTxnDependsOn()
reassignOverlappingRowIds()
logMetrics()
currentTransactionInfo
}
Expand Down Expand Up @@ -311,6 +315,44 @@ private[delta] class ConflictChecker(
}
}

/**
* Checks whether the Row IDs assigned by the current transaction overlap with the Row IDs
* assigned by the winning transaction. I.e. this function checks whether both the winning and the
* current transaction assigned new Row IDs. If this the case, then this check assigns new Row IDs
* to the new files added by the current transaction so that they no longer overlap.
*/
private def reassignOverlappingRowIds(): Unit = {
// The current transaction should only assign Row Ids if they are supported.
if (!RowId.rowIdsSupported(currentTransactionInfo.protocol)) return

winningCommitSummary.actions.collectFirst {
case RowIdHighWaterMark(winningHighWaterMark) =>
// The winning transaction assigned conflicting Row IDs. Adjust the Row IDs assigned by the
// current transaction as if it had read the result of the winning transaction.
val readHighWaterMark = currentTransactionInfo.readRowIdHighWatermark.highWaterMark
assert(winningHighWaterMark >= readHighWaterMark)
val watermarkDiff = winningHighWaterMark - readHighWaterMark

val actionsWithReassignedRowIds = currentTransactionInfo.actions.map {
// We should only update the row IDs that were assigned by this transaction, and not the
// row IDs that were assigned by an earlier transaction and merely copied over to a new
// AddFile as part of this transaction. I.e., we should only update the base row IDs
// that are larger than the read high watermark.
case a: AddFile if a.baseRowId.exists(_ > readHighWaterMark) =>
val newBaseRowId = a.baseRowId.map(_ + watermarkDiff)
a.copy(baseRowId = newBaseRowId)

case waterMark @ RowIdHighWaterMark(v) =>
waterMark.copy(highWaterMark = v + watermarkDiff)

case a => a
}
currentTransactionInfo = currentTransactionInfo.copy(
actions = actionsWithReassignedRowIds,
readRowIdHighWatermark = RowIdHighWaterMark(winningHighWaterMark))
}
}

/** A helper function for pretty printing a specific partition directory. */
protected def getPrettyPartitionMessage(partitionValues: Map[String, String]): String = {
val partitionColumns = currentTransactionInfo.partitionSchemaAtReadTime
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2282,6 +2282,19 @@ trait DeltaErrorsBase
)
}

def convertToDeltaRowTrackingEnabledWithoutStatsCollection: Throwable = {
val statisticsCollectionPropertyKey = DeltaSQLConf.DELTA_COLLECT_STATS.key
val rowTrackingTableFeatureDefaultKey =
TableFeatureProtocolUtils.defaultPropertyKey(RowIdFeature)
val rowTrackingDefaultPropertyKey = DeltaConfigs.ROW_IDS_ENABLED.defaultTablePropertyKey
new DeltaIllegalStateException(
errorClass = "DELTA_CONVERT_TO_DELTA_ROW_TRACKING_WITHOUT_STATS",
messageParameters = Array(
statisticsCollectionPropertyKey,
rowTrackingTableFeatureDefaultKey,
rowTrackingDefaultPropertyKey))
}

/** This is a method only used for testing Py4J exception handling. */
def throwDeltaIllegalArgumentException(): Throwable = {
new DeltaIllegalArgumentException(errorClass = "DELTA_UNRECOGNIZED_INVARIANT")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite
onlyAddFiles && !dependsOnFiles
}

val readRowIdHighWatermark =
RowId.extractHighWatermark(spark, snapshot).getOrElse(RowId.MISSING_HIGH_WATER_MARK)

commitInfo = CommitInfo(
clock.getTimeMillis(),
op.name,
Expand All @@ -1005,16 +1008,18 @@ trait OptimisticTransactionImpl extends TransactionalWrite
tags = if (tags.nonEmpty) Some(tags) else None,
txnId = Some(txnId))

val currentTransactionInfo = new CurrentTransactionInfo(
val currentTransactionInfo = CurrentTransactionInfo(
txnId = txnId,
readPredicates = readPredicates.toSeq,
readFiles = readFiles.toSet,
readWholeTable = readTheWholeTable,
readAppIds = readTxn.toSet,
metadata = metadata,
protocol = protocol,
actions = preparedActions,
readSnapshot = snapshot,
commitInfo = Option(commitInfo))
commitInfo = Option(commitInfo),
readRowIdHighWatermark = readRowIdHighWatermark)

// Register post-commit hooks if any
lazy val hasFileActions = preparedActions.exists {
Expand Down Expand Up @@ -1125,6 +1130,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite
}
action
}

allActions = RowId.assignFreshRowIds(spark, protocol, snapshot, allActions)

if (readVersion < 0) {
deltaLog.createLogDirectory()
}
Expand Down Expand Up @@ -1341,6 +1349,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite

deltaLog.protocolWrite(snapshot.protocol)

finalActions =
RowId.assignFreshRowIds(spark, protocol, snapshot, finalActions.toIterator).toList

// We make sure that this isn't an appendOnly table as we check if we need to delete
// files.
val removes = actions.collect { case r: RemoveFile => r }
Expand Down Expand Up @@ -1451,8 +1462,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
protected def doCommitRetryIteratively(
attemptVersion: Long,
currentTransactionInfo: CurrentTransactionInfo,
isolationLevel: IsolationLevel
): (Long, Snapshot, CurrentTransactionInfo) = lockCommitIfEnabled {
isolationLevel: IsolationLevel)
: (Long, Snapshot, CurrentTransactionInfo) = lockCommitIfEnabled {

var commitVersion = attemptVersion
var updatedCurrentTransactionInfo = currentTransactionInfo
Expand Down Expand Up @@ -1623,7 +1634,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
checkVersion: Long,
currentTransactionInfo: CurrentTransactionInfo,
attemptNumber: Int,
commitIsolationLevel: IsolationLevel): (Long, CurrentTransactionInfo) = recordDeltaOperation(
commitIsolationLevel: IsolationLevel)
: (Long, CurrentTransactionInfo) = recordDeltaOperation(
deltaLog,
"delta.commit.retry.conflictCheck",
tags = Map(TAG_LOG_STORE_CLASS -> deltaLog.store.getClass.getName)) {
Expand Down
84 changes: 82 additions & 2 deletions core/src/main/scala/org/apache/spark/sql/delta/RowId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.actions.{Action, AddFile, Metadata, Protocol, RowIdHighWaterMark}
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.propertyKey
import org.apache.spark.sql.delta.sources.DeltaSQLConf

Expand All @@ -27,6 +27,8 @@ import org.apache.spark.sql.SparkSession
*/
object RowId {

val MISSING_HIGH_WATER_MARK: RowIdHighWaterMark = RowIdHighWaterMark(highWaterMark = -1L)

/**
* Returns whether Row IDs can be written to Delta tables and read from Delta tables. This acts as
* a feature flag during development: every Row ID code path should be hidden behind this flag and
Expand All @@ -53,7 +55,7 @@ object RowId {
def rowIdsEnabled(protocol: Protocol, metadata: Metadata): Boolean = {
val isEnabled = DeltaConfigs.ROW_IDS_ENABLED.fromMetaData(metadata)
if (isEnabled && !rowIdsSupported(protocol)) {
throw new IllegalStateException(s"Table property '${DeltaConfigs.ROW_IDS_ENABLED.key}' is" +
throw new IllegalStateException(s"Table property '${DeltaConfigs.ROW_IDS_ENABLED.key}' is " +
s"set on the table but this table version doesn't support table feature " +
s"'${propertyKey(RowIdFeature)}'.")
}
Expand Down Expand Up @@ -87,4 +89,82 @@ object RowId {
}
latestMetadata
}

/**
* Assigns fresh row IDs to all AddFiles inside `actions` that do not have row IDs yet and emits
* a [[RowIdHighWaterMark]] action with the new high-water mark.
*/
private[delta] def assignFreshRowIds(
spark: SparkSession,
protocol: Protocol,
snapshot: Snapshot,
actions: Iterator[Action]): Iterator[Action] = {
if (!rowIdsAllowed(spark) || !rowIdsSupported(protocol)) return actions

val oldHighWatermark = extractHighWatermark(spark, snapshot)
.getOrElse(MISSING_HIGH_WATER_MARK)
.highWaterMark
var newHighWatermark = oldHighWatermark

val actionsWithFreshRowIds = actions.map {
case a: AddFile if a.baseRowId.isEmpty =>
val baseRowId = newHighWatermark + 1L
newHighWatermark += a.numPhysicalRecords.getOrElse {
throw new UnsupportedOperationException(
"Cannot assign row IDs without row count statistics.")
}
a.copy(baseRowId = Some(baseRowId))
case _: RowIdHighWaterMark =>
throw new IllegalStateException(
"Manually setting the Row ID high water mark is not allowed")
case other => other
}

val newHighWatermarkAction: Iterator[Action] = new Iterator[Action] {
// Iterators are lazy, so the first call to `hasNext` won't happen until after we
// exhaust the remapped actions iterator. At that point, the watermark (changed or not)
// decides whether the iterator is empty or infinite; take(1) below to bound it.
override def hasNext(): Boolean = newHighWatermark != oldHighWatermark
override def next(): Action = RowIdHighWaterMark(newHighWatermark)
}
actionsWithFreshRowIds ++ newHighWatermarkAction.take(1)
}

/**
* Extracts the high watermark of row IDs from a snapshot.
*/
private[delta] def extractHighWatermark(
spark: SparkSession, snapshot: Snapshot): Option[RowIdHighWaterMark] = {
if (rowIdsAllowed(spark)) {
snapshot.rowIdHighWaterMarkOpt
} else {
None
}
}

private[delta] def extractHighWatermark(
spark: SparkSession, actions: Seq[Action]): Option[RowIdHighWaterMark] = {
if (rowIdsAllowed(spark)) {
actions.collectFirst { case r: RowIdHighWaterMark => r }
} else {
None
}
}

/**
* Checks whether CONVERT TO DELTA collects statistics if row tracking is supported. If it does
* not collect statistics, we cannot assign fresh row IDs, hence we throw an error to either rerun
* the command without enabling the row tracking table feature, or to enable the necessary
* flags to collect statistics.
*/
private[delta] def checkStatsCollectedIfRowTrackingSupported(
spark: SparkSession,
protocol: Protocol,
convertToDeltaShouldCollectStats: Boolean,
statsCollectionEnabled: Boolean): Unit = {
if (!rowIdsAllowed(spark) || !rowIdsSupported(protocol)) return
if (!convertToDeltaShouldCollectStats || !statsCollectionEnabled) {
throw DeltaErrors.convertToDeltaRowTrackingEnabledWithoutStatsCollection
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ class Snapshot(
col("add.dataChange"),
col(ADD_STATS_TO_USE_COL_NAME).as("stats"),
col("add.tags"),
col("add.deletionVector")
col("add.deletionVector"),
col("add.baseRowId")
)))
.withColumn("remove", when(
col("remove.path").isNotNull,
Expand Down Expand Up @@ -324,6 +325,7 @@ class Snapshot(
numMetadata = numOfMetadata,
numProtocol = numOfProtocol,
setTransactions = checksumOpt.flatMap(_.setTransactions),
rowIdHighWaterMark = rowIdHighWaterMarkOpt,
metadata = metadata,
protocol = protocol,
histogramOpt = fileSizeHistogram,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.delta

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.actions.{Metadata, Protocol, SetTransaction}
import org.apache.spark.sql.delta.actions.{Metadata, Protocol, RowIdHighWaterMark, SetTransaction}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.FileSizeHistogram
Expand Down Expand Up @@ -50,6 +50,7 @@ case class SnapshotState(
numOfMetadata: Long,
numOfProtocol: Long,
setTransactions: Seq[SetTransaction],
rowIdHighWaterMark: RowIdHighWaterMark,
metadata: Metadata,
protocol: Protocol,
fileSizeHistogram: Option[FileSizeHistogram] = None
Expand Down Expand Up @@ -142,6 +143,7 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot =>
"numOfMetadata" -> count(col("metaData")),
"numOfProtocol" -> count(col("protocol")),
"setTransactions" -> collect_set(col("txn")),
"rowIdHighWaterMark" -> last(col("rowIdHighWaterMark"), ignoreNulls = true),
"metadata" -> last(col("metaData"), ignoreNulls = true),
"protocol" -> last(col("protocol"), ignoreNulls = true),
"fileSizeHistogram" -> lit(null).cast(FileSizeHistogram.schema)
Expand All @@ -162,6 +164,8 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot =>
protected[delta] def sizeInBytesIfKnown: Option[Long] = Some(sizeInBytes)
protected[delta] def setTransactionsIfKnown: Option[Seq[SetTransaction]] = Some(setTransactions)
protected[delta] def numOfFilesIfKnown: Option[Long] = Some(numOfFiles)
protected[delta] def rowIdHighWaterMarkOpt: Option[RowIdHighWaterMark] =
Option(computedState.rowIdHighWaterMark)

/** Generate a default SnapshotState of a new table, given the table metadata */
protected def initialState(metadata: Metadata): SnapshotState = {
Expand All @@ -175,6 +179,7 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot =>
numOfMetadata = 1L,
numOfProtocol = 1L,
setTransactions = Nil,
rowIdHighWaterMark = null,
metadata = metadata,
protocol = protocol
)
Expand Down
Loading

0 comments on commit ba10d15

Please sign in to comment.