Skip to content

Commit

Permalink
SHALLOW CLONE support for Delta tables with deletion vectors.
Browse files Browse the repository at this point in the history
This PR is part of the feature: Support Delta tables with deletion vectors (more details at #1485)

This PR adds support for SHALLOW CLONEing a Delta table with deletion vectors. The main change is to convert the relative path of DV file in `AddFile` to absolute path when cloning the table.

Added tests

Closes #1733

GitOrigin-RevId: b634496b57b93fc4b7a7cc16e33c200e3a83ba64
  • Loading branch information
vkorukanti authored and allisonport-db committed May 10, 2023
1 parent 7c352e9 commit 6556d6f
Show file tree
Hide file tree
Showing 4 changed files with 295 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,12 @@ object DeltaFileOperations extends DeltaLogging {
files.mapPartitions { fileList =>
fileList.map { addFile =>
val fileSource = DeltaFileOperations.absolutePath(qualifiedTablePath, addFile.path)
if (addFile.deletionVector != null) {
val absoluteDV = addFile.deletionVector.copyWithAbsolutePath(new Path(qualifiedTablePath))
addFile.copy(path = fileSource.toUri.toString, deletionVector = absoluteDV)
} else {
addFile.copy(path = fileSource.toUri.toString)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@

package org.apache.spark.sql.delta

import scala.collection.immutable.NumericRange

import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN
import org.apache.spark.sql.delta.actions.{AddFile, FileAction, RemoveFile}
import org.apache.spark.sql.delta.test.{DeltaExcludedTestMixin, DeltaSQLCommandTest}
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
Expand Down Expand Up @@ -276,3 +283,183 @@ object CloneTableSQLTestUtils {
}
// scalastyle:on argcount
}

class CloneTableScalaDeletionVectorSuite
extends CloneTableSQLSuite
with DeltaSQLCommandTest
with DeltaExcludedTestMixin
with DeletionVectorsTestUtils {

override def excluded: Seq[String] = super.excluded ++
Seq(
// These require the initial table protocol version to be low to work properly.
"Cloning a table with new table properties that force protocol version upgrade -" +
" delta.enableChangeDataFeed"
, "Cloning a table with new table properties that force protocol version upgrade -" +
" delta.enableDeletionVectors"
, "Cloning a table without DV property should not upgrade protocol version"
, "CLONE respects table features set by table property override, targetExists=true"
, "CLONE ignores reader/writer session defaults")

override def beforeAll(): Unit = {
super.beforeAll()
enableDeletionVectors(spark.conf)
}

override protected def uniqueFileActionGroupBy(action: FileAction): String = {
val filePath = action.pathAsUri.toString
val dvId = action match {
case add: AddFile => Option(add.deletionVector).map(_.uniqueId).getOrElse("")
case remove: RemoveFile => Option(remove.deletionVector).map(_.uniqueId).getOrElse("")
case _ => ""
}
filePath + dvId
}

testAllClones("Cloning table with persistent DVs") { (source, target, isShallow) =>
// Create source table
writeMultiFileSourceTable(
source,
fileRanges = Seq(0L until 30L, 30L until 60L, 60L until 90L))
// Add DVs to 2 files, leave 1 file without DVs.
spark.sql(s"DELETE FROM delta.`$source` WHERE id IN (24, 42)")
runAndValidateCloneWithDVs(
source,
target,
expectedNumFilesWithDVs = 2)
}

testAllClones("Cloning table with persistent DVs and absolute parquet paths"
) { (source, target, isShallow) =>
withTempDir { originalSourceDir =>
val originalSource = originalSourceDir.getCanonicalPath
// Create source table, by writing to an upstream table and then shallow cloning before
// adding DVs.
writeMultiFileSourceTable(
source = originalSource,
fileRanges = Seq(0L until 30L, 30L until 60L, 60L until 90L))
spark.sql(s"CREATE OR REPLACE TABLE delta.`$source` SHALLOW CLONE delta.`$originalSource`")
// Add DVs to 2 files, leave 1 file without DVs.
spark.sql(s"DELETE FROM delta.`$source` WHERE id IN (24, 42)")
runAndValidateCloneWithDVs(
source,
target,
expectedNumFilesWithDVs = 2)
}
}

testAllClones("Cloning table with persistent DVs and absolute DV file paths"
) { (source, target, isShallow) =>
withTempDir { originalSourceDir =>
val originalSource = originalSourceDir.getCanonicalPath
// Create source table, by writing to an upstream table, adding DVs and then shallow cloning.
writeMultiFileSourceTable(
source = originalSource,
fileRanges = Seq(0L until 30L, 30L until 60L, 60L until 90L))
// Add DVs to 2 files, leave 1 file without DVs.
spark.sql(s"DELETE FROM delta.`$originalSource` WHERE id IN (24, 42)")
val originalSourceTable = io.delta.tables.DeltaTable.forPath(spark, originalSource)
spark.sql(s"CREATE OR REPLACE TABLE delta.`$source` SHALLOW CLONE delta.`$originalSource`")
// Double check this clone was correct.
checkAnswer(
spark.read.format("delta").load(source), expectedAnswer = originalSourceTable.toDF)
runAndValidateCloneWithDVs(
source,
target,
expectedNumFilesWithDVs = 2)
}
}

cloneTest("Shallow clone round-trip with DVs") { (source, target) =>
// Create source table.
writeMultiFileSourceTable(
source = source,
fileRanges = Seq(
0L until 30L, // file 1
30L until 60L, // file 2
60L until 90L, // file 3
90L until 120L)) // file 4
// Add DVs to files 1 and 2 and then shallow clone.
spark.sql(s"DELETE FROM delta.`$source` WHERE id IN (24, 42)")
runAndValidateCloneWithDVs(
source = source,
target = target,
expectedNumFilesWithDVs = 2)

// Add a new DV to file 3 and update the DV file 2,
// leaving file 4 without a DV and file 1 with the existing DV.
// Then shallow clone back into source.
spark.sql(s"DELETE FROM delta.`$target` WHERE id IN (43, 69)")
runAndValidateCloneWithDVs(
source = target,
target = source,
expectedNumFilesWithDVs = 3,
isReplaceOperation = true)
}

/** Write one file per range in `fileRanges`. */
private def writeMultiFileSourceTable(
source: String,
fileRanges: Seq[NumericRange.Exclusive[Long]]): Unit = {
for (range <- fileRanges) {
spark.range(start = range.start, end = range.end, step = 1L, numPartitions = 1).toDF("id")
.write.format("delta").mode("append").save(source)
}
}

private def runAndValidateCloneWithDVs(
source: String,
target: String,
expectedNumFilesWithDVs: Int,
isReplaceOperation: Boolean = false): Unit = {
val sourceDeltaLog = DeltaLog.forTable(spark, source)
val targetDeltaLog = DeltaLog.forTable(spark, source)
val filesWithDVsInSource = getFilesWithDeletionVectors(sourceDeltaLog)
assert(filesWithDVsInSource.size === expectedNumFilesWithDVs)
val numberOfUniqueDVFilesInSource = filesWithDVsInSource
.map(_.deletionVector.pathOrInlineDv)
.toSet
.size

runAndValidateClone(
source,
target,
isReplaceOperation = isReplaceOperation)()
val filesWithDVsInTarget = getFilesWithDeletionVectors(targetDeltaLog)
val numberOfUniqueDVFilesInTarget = filesWithDVsInTarget
.map(_.deletionVector.pathOrInlineDv)
.toSet
.size
// Make sure we didn't accidentally copy some file multiple times.
assert(numberOfUniqueDVFilesInSource === numberOfUniqueDVFilesInTarget)
// Check contents of the copied DV files.
val filesWithDVsInTargetByPath = filesWithDVsInTarget
.map(addFile => addFile.path -> addFile)
.toMap
// scalastyle:off deltahadoopconfiguration
val hadoopConf = spark.sessionState.newHadoopConf()
// scalastyle:on deltahadoopconfiguration
for (sourceFile <- filesWithDVsInSource) {
val targetFile = filesWithDVsInTargetByPath(sourceFile.path)
if (sourceFile.deletionVector.isInline) {
assert(targetFile.deletionVector.isInline)
assert(sourceFile.deletionVector.inlineData === targetFile.deletionVector.inlineData)
} else {
def readDVData(path: Path): Array[Byte] = {
val fs = path.getFileSystem(hadoopConf)
val size = fs.getFileStatus(path).getLen
val data = new Array[Byte](size.toInt)
Utils.tryWithResource(fs.open(path)) { reader =>
reader.readFully(data)
}
data
}
val sourceDVPath = sourceFile.deletionVector.absolutePath(sourceDeltaLog.dataPath)
val targetDVPath = targetFile.deletionVector.absolutePath(targetDeltaLog.dataPath)
val sourceData = readDVData(sourceDVPath)
val targetData = readDVData(targetDVPath)
assert(sourceData === targetData)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,77 @@ trait CloneTableSuiteBase extends QueryTest
val targetLog = DeltaLog.forTable(spark, target)
assert(targetLog.update().protocol.isFeatureSupported(TestWriterFeature))
}

// Delta properties that automatically cause a version upgrade when enabled via ALTER TABLE.
final val featuresWithAutomaticProtocolUpgrade: Seq[DeltaConfig[Boolean]] = Seq(
DeltaConfigs.CHANGE_DATA_FEED,
DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION
)
// This test ensures this upgrade also happens when enabled during a CLONE.
for (feature <- featuresWithAutomaticProtocolUpgrade)
testAllClones("Cloning a table with new table properties" +
s" that force protocol version upgrade - ${feature.key}"
) { (source, target, isShallow) =>
import DeltaTestUtils.StrictProtocolOrdering

spark.range(5).write.format("delta").save(source)
val sourceDeltaLog = DeltaLog.forTable(spark, source)
val sourceSnapshot = sourceDeltaLog.update()
// This only works if the feature is not enabled by default.
assert(!feature.fromMetaData(sourceSnapshot.metadata))
// Check that the original version is not already sufficient for the feature.
assert(!StrictProtocolOrdering.fulfillsVersionRequirements(
actual = sourceSnapshot.protocol,
requirement = feature.minimumProtocolVersion.get
))

// Clone the table, enabling the feature in an override.
val tblProperties = Map(feature.key -> "true")
cloneTable(
source,
target,
isReplace = true,
tableProperties = tblProperties)

val targetDeltaLog = DeltaLog.forTable(spark, target)
val targetSnapshot = targetDeltaLog.update()
assert(targetSnapshot.metadata.configuration ===
tblProperties ++ sourceSnapshot.metadata.configuration)
// Check that the protocol has been upgraded.
assert(StrictProtocolOrdering.fulfillsVersionRequirements(
actual = targetSnapshot.protocol,
requirement = feature.minimumProtocolVersion.get
))
}

testAllClones("Cloning a table without DV property should not upgrade protocol version"
) { (source, target, isShallow) =>
import DeltaTestUtils.StrictProtocolOrdering

spark.range(5).write.format("delta").save(source)
withSQLConf(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey -> "true") {
val sourceDeltaLog = DeltaLog.forTable(spark, source)
val sourceSnapshot = sourceDeltaLog.update()
// Should not be enabled, just because it's allowed.
assert(!DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.fromMetaData(sourceSnapshot.metadata))
// Check that the original version is not already sufficient for the feature.
assert(!StrictProtocolOrdering.fulfillsVersionRequirements(
actual = sourceSnapshot.protocol,
requirement = DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.minimumProtocolVersion.get
))

// Clone the table.
cloneTable(
source,
target,
isReplace = true)

val targetDeltaLog = DeltaLog.forTable(spark, target)
val targetSnapshot = targetDeltaLog.update()
// Protocol should not have been upgraded.
assert(sourceSnapshot.protocol === targetSnapshot.protocol)
}
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.util.matching.Regex

import org.apache.spark.sql.delta.DeltaTestUtils.Plans
import org.apache.spark.sql.delta.actions.Protocol
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest

import org.apache.spark.SparkContext
Expand Down Expand Up @@ -236,6 +237,37 @@ object DeltaTestUtils extends DeltaTestUtilsBase {
case tableName => tableName -> None
}
}

/**
* Implements an ordering where `x < y` iff both reader and writer versions of
* `x` are strictly less than those of `y`.
*
* Can be used to conveniently check that this relationship holds in tests/assertions
* without having to write out the conjunction of the two subconditions every time.
*/
case object StrictProtocolOrdering extends PartialOrdering[Protocol] {
override def tryCompare(x: Protocol, y: Protocol): Option[Int] = {
if (x.minReaderVersion == y.minReaderVersion &&
x.minWriterVersion == y.minWriterVersion) {
Some(0)
} else if (x.minReaderVersion < y.minReaderVersion &&
x.minWriterVersion < y.minWriterVersion) {
Some(-1)
} else if (x.minReaderVersion > y.minReaderVersion &&
x.minWriterVersion > y.minWriterVersion) {
Some(1)
} else {
None
}
}

override def lteq(x: Protocol, y: Protocol): Boolean =
x.minReaderVersion <= y.minReaderVersion && x.minWriterVersion <= y.minWriterVersion

// Just a more readable version of `lteq`.
def fulfillsVersionRequirements(actual: Protocol, requirement: Protocol): Boolean =
lteq(requirement, actual)
}
}

trait DeltaTestUtilsForTempViews
Expand Down

0 comments on commit 6556d6f

Please sign in to comment.