From 79cb385461e9070294136c6d8f257562d778c801 Mon Sep 17 00:00:00 2001 From: Tom van Bussel Date: Thu, 25 May 2023 20:40:02 +0200 Subject: [PATCH] Add Default Row Commit Version to AddFile and RemoveFile This PR implements part of the changes proposed in https://github.com/delta-io/delta/pull/1747. It adds the `defaultRowCommitVersion` field to `AddFile` and `RemoveFile`, and it makes sure that it's populated during commits and read during log replay. It **does not** handle any transaction conflicts yet. Closes delta-io/delta#1781 GitOrigin-RevId: 781617fd33b3be2f39ac8ab36aa0b741ba99c97e --- .../apache/spark/sql/delta/Checkpoints.scala | 3 +- .../sql/delta/DefaultRowCommitVersion.scala | 36 ++++ .../sql/delta/OptimisticTransaction.scala | 12 +- .../org/apache/spark/sql/delta/Snapshot.scala | 3 +- .../spark/sql/delta/actions/actions.scala | 11 +- .../sql/delta/commands/CloneTableBase.scala | 5 +- .../spark/sql/delta/CheckpointsSuite.scala | 3 +- .../spark/sql/delta/DeltaLogSuite.scala | 2 +- .../DefaultRowCommitVersionSuite.scala | 155 ++++++++++++++++++ 9 files changed, 220 insertions(+), 10 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/sql/delta/DefaultRowCommitVersion.scala create mode 100644 core/src/test/scala/org/apache/spark/sql/delta/rowtracking/DefaultRowCommitVersionSuite.scala diff --git a/core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala b/core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala index 5887a1bf4f..5bb8c090cd 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala @@ -626,7 +626,8 @@ object Checkpoints extends DeltaLogging { col("add.dataChange"), // actually not really useful here col("add.tags"), col("add.deletionVector"), - col("add.baseRowId")) ++ + col("add.baseRowId"), + col("add.defaultRowCommitVersion")) ++ additionalCols: _* )) ) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DefaultRowCommitVersion.scala b/core/src/main/scala/org/apache/spark/sql/delta/DefaultRowCommitVersion.scala new file mode 100644 index 0000000000..a1b3eeb04b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/DefaultRowCommitVersion.scala @@ -0,0 +1,36 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.actions.{Action, AddFile, Protocol} + +object DefaultRowCommitVersion { + def assignIfMissing( + protocol: Protocol, + actions: Iterator[Action], + version: Long): Iterator[Action] = { + if (!RowTracking.isSupported(protocol)) { + return actions + } + actions.map { + case a: AddFile if a.defaultRowCommitVersion.isEmpty => + a.copy(defaultRowCommitVersion = Some(version)) + case a => + a + } + } +} diff --git a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index d3f4a94e47..2d9629f97e 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1044,7 +1044,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite } val (commitVersion, postCommitSnapshot, updatedCurrentTransactionInfo) = - doCommitRetryIteratively(snapshot.version + 1, currentTransactionInfo, isolationLevelToUse) + doCommitRetryIteratively( + getFirstAttemptVersion, currentTransactionInfo, isolationLevelToUse) logInfo(s"Committed delta #$commitVersion to ${deltaLog.logPath}") (commitVersion, postCommitSnapshot, updatedCurrentTransactionInfo.actions) } catch { @@ -1091,7 +1092,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite context: Map[String, String], metrics: Map[String, String]): (Long, Snapshot) = { commitStartNano = System.nanoTime() - val attemptVersion = readVersion + 1 + val attemptVersion = getFirstAttemptVersion try { val commitInfo = CommitInfo( time = clock.getTimeMillis(), @@ -1141,6 +1142,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite } allActions = RowId.assignFreshRowIds(spark, protocol, snapshot, allActions) + allActions = DefaultRowCommitVersion + .assignIfMissing(protocol, allActions, getFirstAttemptVersion) if (readVersion < 0) { deltaLog.createLogDirectory() @@ -1361,6 +1364,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite finalActions = RowId.assignFreshRowIds(spark, protocol, snapshot, finalActions.toIterator).toList + finalActions = DefaultRowCommitVersion + .assignIfMissing(protocol, finalActions.toIterator, getFirstAttemptVersion).toList // We make sure that this isn't an appendOnly table as we check if we need to delete // files. @@ -1721,6 +1726,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite conflictChecker.checkConflicts() } + /** Returns the version that the first attempt will try to commit at. */ + protected def getFirstAttemptVersion: Long = readVersion + 1L + /** Returns the next attempt version given the last attempted version */ protected def getNextAttemptVersion(previousAttemptVersion: Long): Long = { val latestSnapshot = deltaLog.update() diff --git a/core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index 76b280fa88..9e03e11cec 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -253,7 +253,8 @@ class Snapshot( col(ADD_STATS_TO_USE_COL_NAME).as("stats"), col("add.tags"), col("add.deletionVector"), - col("add.baseRowId") + col("add.baseRowId"), + col("add.defaultRowCommitVersion") ))) .withColumn("remove", when( col("remove.path").isNotNull, diff --git a/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala index 7284e7ae45..b392baaf51 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala @@ -578,7 +578,9 @@ case class AddFile( override val tags: Map[String, String] = null, deletionVector: DeletionVectorDescriptor = null, @JsonDeserialize(contentAs = classOf[java.lang.Long]) - baseRowId: Option[Long] = None + baseRowId: Option[Long] = None, + @JsonDeserialize(contentAs = classOf[java.lang.Long]) + defaultRowCommitVersion: Option[Long] = None ) extends FileAction { require(path.nonEmpty) @@ -596,7 +598,8 @@ case class AddFile( path, Some(timestamp), dataChange, extendedFileMetadata = Some(true), partitionValues, Some(size), newTags, deletionVector = deletionVector, - baseRowId = baseRowId + baseRowId = baseRowId, + defaultRowCommitVersion = defaultRowCommitVersion ) removedFile.numLogicalRecords = numLogicalRecords removedFile.estLogicalFileSize = estLogicalFileSize @@ -818,7 +821,9 @@ case class RemoveFile( override val tags: Map[String, String] = null, deletionVector: DeletionVectorDescriptor = null, @JsonDeserialize(contentAs = classOf[java.lang.Long]) - baseRowId: Option[Long] = None + baseRowId: Option[Long] = None, + @JsonDeserialize(contentAs = classOf[java.lang.Long]) + defaultRowCommitVersion: Option[Long] = None ) extends FileAction { override def wrap: SingleAction = SingleAction(remove = this) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala index 614c649daa..7d3ba53056 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala @@ -278,8 +278,11 @@ abstract class CloneTableBase( val copiedFile = fileToCopy.copy(dataChange = true) opName match { case CloneTableCommand.OP_NAME => + // CLONE does not preserve Row IDs and Commit Versions + copiedFile.copy(baseRowId = None, defaultRowCommitVersion = None) + case RestoreTableCommand.OP_NAME => + // RESTORE preserves Row IDs and Commit Versions copiedFile - case RestoreTableCommand.OP_NAME => copiedFile } } val sourceName = sourceTable.name diff --git a/core/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala index 93f3569752..bf69efc4a5 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala @@ -233,7 +233,8 @@ class CheckpointsSuite extends QueryTest "partitionValues", "size", "deletionVector", - "baseRowId") + "baseRowId", + "defaultRowCommitVersion") val tablePath = tempDir.getAbsolutePath // Append rows [0, 9] to table and merge tablePath. diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala index 26a4a6e9cd..54d6eed566 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala @@ -313,7 +313,7 @@ class DeltaLogSuite extends QueryTest assert(log.update().allFiles.collect().find(_.path == "foo") // `dataChange` is set to `false` after replaying logs. - === Some(add2.copy(dataChange = false))) + === Some(add2.copy(dataChange = false, defaultRowCommitVersion = Some(2)))) } } diff --git a/core/src/test/scala/org/apache/spark/sql/delta/rowtracking/DefaultRowCommitVersionSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/rowtracking/DefaultRowCommitVersionSuite.scala new file mode 100644 index 0000000000..af13072ae8 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/sql/delta/rowtracking/DefaultRowCommitVersionSuite.scala @@ -0,0 +1,155 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.rowtracking + +import scala.collection.mutable + +import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog} +import org.apache.spark.sql.delta.actions.{AddFile, RemoveFile} +import org.apache.spark.sql.delta.rowid.RowIdTestUtils + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.test.SharedSparkSession + +class DefaultRowCommitVersionSuite extends QueryTest with SharedSparkSession with RowIdTestUtils { + def expectedCommitVersionsForAllFiles(deltaLog: DeltaLog): Map[String, Long] = { + val commitVersionForFiles = mutable.Map.empty[String, Long] + deltaLog.getChanges(startVersion = 0).foreach { case (commitVersion, actions) => + actions.foreach { + case a: AddFile if !commitVersionForFiles.contains(a.path) => + commitVersionForFiles += a.path -> commitVersion + case r: RemoveFile if commitVersionForFiles.contains(r.path) => + assert(r.defaultRowCommitVersion.contains(commitVersionForFiles(r.path))) + case _ => + // Do nothing + } + } + commitVersionForFiles.toMap + } + + test("defaultRowCommitVersion is not set when feature is disabled") { + withRowTrackingEnabled(enabled = false) { + withTempDir { tempDir => + spark.range(start = 0, end = 100, step = 1, numPartitions = 1) + .write.format("delta").mode("overwrite").save(tempDir.getAbsolutePath) + spark.range(start = 100, end = 200, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(tempDir.getAbsolutePath) + + val deltaLog = DeltaLog.forTable(spark, tempDir) + deltaLog.update().allFiles.collect().foreach { f => + assert(f.defaultRowCommitVersion.isEmpty) + } + } + } + } + + test("checkpoint preserves defaultRowCommitVersion") { + withRowTrackingEnabled(enabled = true) { + withTempDir { tempDir => + spark.range(start = 0, end = 100, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(tempDir.getAbsolutePath) + spark.range(start = 100, end = 200, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(tempDir.getAbsolutePath) + spark.range(start = 200, end = 300, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(tempDir.getAbsolutePath) + + val deltaLog = DeltaLog.forTable(spark, tempDir) + val commitVersionForFiles = expectedCommitVersionsForAllFiles(deltaLog) + + deltaLog.update().allFiles.collect().foreach { f => + assert(f.defaultRowCommitVersion.contains(commitVersionForFiles(f.path))) + } + + deltaLog.checkpoint(deltaLog.update()) + + deltaLog.update().allFiles.collect().foreach { f => + assert(f.defaultRowCommitVersion.contains(commitVersionForFiles(f.path))) + } + } + } + } + + test("data skipping reads defaultRowCommitVersion") { + withRowTrackingEnabled(enabled = true) { + withTempDir { tempDir => + spark.range(start = 0, end = 100, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(tempDir.getAbsolutePath) + spark.range(start = 100, end = 200, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(tempDir.getAbsolutePath) + spark.range(start = 200, end = 300, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(tempDir.getAbsolutePath) + + val deltaLog = DeltaLog.forTable(spark, tempDir) + val commitVersionForFiles = expectedCommitVersionsForAllFiles(deltaLog) + + val filters = Seq(col("id = 150").expr) + val scan = deltaLog.update().filesForScan(filters) + + scan.files.foreach { f => + assert(f.defaultRowCommitVersion.contains(commitVersionForFiles(f.path))) + } + } + } + } + + test("clone does not preserve default row commit versions") { + withRowTrackingEnabled(enabled = true) { + withTempDir { sourceDir => + spark.range(start = 0, end = 100, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(sourceDir.getAbsolutePath) + spark.range(start = 100, end = 200, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(sourceDir.getAbsolutePath) + spark.range(start = 200, end = 300, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(sourceDir.getAbsolutePath) + + withTable("target") { + spark.sql(s"CREATE TABLE target SHALLOW CLONE delta.`${sourceDir.getAbsolutePath}` " + + s"TBLPROPERTIES ('${DeltaConfigs.ROW_TRACKING_ENABLED.key}' = 'true')") + + val targetLog = DeltaLog.forTable(spark, TableIdentifier("target")) + targetLog.update().allFiles.collect().foreach { f => + assert(f.defaultRowCommitVersion.contains(0L)) + } + } + } + } + } + + test("restore does preserve default row commit versions") { + withRowTrackingEnabled(enabled = true) { + withTempDir { tempDir => + spark.range(start = 0, end = 100, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(tempDir.getAbsolutePath) + spark.range(start = 100, end = 200, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(tempDir.getAbsolutePath) + spark.range(start = 200, end = 300, step = 1, numPartitions = 1) + .write.format("delta").mode("append").save(tempDir.getAbsolutePath) + + val deltaLog = DeltaLog.forTable(spark, tempDir) + val commitVersionForFiles = expectedCommitVersionsForAllFiles(deltaLog) + + spark.sql(s"RESTORE delta.`${tempDir.getAbsolutePath}` TO VERSION AS OF 1") + + deltaLog.update().allFiles.collect().foreach { f => + assert(f.defaultRowCommitVersion.contains(commitVersionForFiles(f.path))) + } + } + } + } +}