Skip to content

Commit

Permalink
[Spark] Drop Type Widening Table Feature
Browse files Browse the repository at this point in the history
This PR includes changes from
#2708 which isn't merged yet.
The changes related only to dropping the table feature are in commit
e2601a6


## Description
This change is part of the type widening table feature.
Type widening feature request:
#2622
Type Widening protocol RFC: #2624

It adds the ability to remove the type widening table feature by running
the `ALTER TABLE DROP FEATURE` command.
Before dropping the table feature, traces of it are removed from the
current version of the table:
- Files that were written before the latest type change and thus contain
types that differ from the current table schema are rewritten using an
internal `REORG TABLE` operation.
- Metadata in the table schema recording previous type changes is
removed.

## How was this patch tested?
- A new set of tests are added to `DeltaTypeWideningSuite` to cover
dropping the table feature with tables in various states: with/without
files to rewrite or metadata to remove.

## Does this PR introduce _any_ user-facing changes?
The table feature is available in testing only, there's no user-facing
changes as of now.

When the feature is available, this change enables the following user
action:
- Drop the type widening table feature:
```
ALTER TABLE t DROP FEATURE typeWidening
```
This succeeds immediately if no version of the table contains traces of
the table feature (= no type changes were applied in the available
history of the table.
Otherwise, if the current version contains traces of the feature, these
are removed: files are rewritten if needed and type widening metadata is
removed from the table schema. Then, an error
`DELTA_FEATURE_DROP_WAIT_FOR_RETENTION_PERIOD` is thrown, telling the
user to retry once the retention period expires.

If only previous versions contain traces of the feature, no action is
applied on the table, and an error
`DELTA_FEATURE_DROP_HISTORICAL_VERSIONS_EXIST` is thrown, telling the
user to retry once the retention period expires.
  • Loading branch information
johanl-db authored Mar 15, 2024
1 parent 9a59c0a commit ec8ab16
Show file tree
Hide file tree
Showing 10 changed files with 512 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,9 @@ private[delta] class ConflictChecker(
* to handle the row tracking feature being enabled by the winning transaction.
*/
private def reassignRowCommitVersions(): Unit = {
if (!RowTracking.isSupported(currentTransactionInfo.protocol)) {
if (!RowTracking.isSupported(currentTransactionInfo.protocol) &&
// Type widening relies on default row commit versions to be set.
!TypeWidening.isSupported(currentTransactionInfo.protocol)) {
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ object DefaultRowCommitVersion {
protocol: Protocol,
actions: Iterator[Action],
version: Long): Iterator[Action] = {
if (!RowTracking.isSupported(protocol)) {
// Type Widening relies on default row commit versions to be set.
if (!RowTracking.isSupported(protocol) && !TypeWidening.isSupported(protocol)) {
return actions
}
actions.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package org.apache.spark.sql.delta
import java.util.concurrent.TimeUnit

import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand}
import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, DeltaReorgTableCommand, DeltaReorgTableMode, DeltaReorgTableSpec}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}

import org.apache.spark.sql.catalyst.analysis.ResolvedTable

/**
* A base class for implementing a preparation command for removing table features.
* Must implement a run method. Note, the run method must be implemented in a way that when
Expand Down Expand Up @@ -126,3 +128,88 @@ case class V2CheckpointPreDowngradeCommand(table: DeltaTableV2)
true
}
}

case class TypeWideningPreDowngradeCommand(table: DeltaTableV2)
extends PreDowngradeTableFeatureCommand
with DeltaLogging {

/**
* Unset the type widening table property to prevent new type changes to be applied to the table,
* then removes traces of the feature:
* - Rewrite files that have columns or fields with a different type than in the current table
* schema. These are all files not added or modified after the last type change.
* - Remove the type widening metadata attached to fields in the current table schema.
*
* @return Return true if files were rewritten or metadata was removed. False otherwise.
*/
override def removeFeatureTracesIfNeeded(): Boolean = {
if (TypeWideningTableFeature.validateRemoval(table.initialSnapshot)) return false

val startTimeNs = System.nanoTime()
val properties = Seq(DeltaConfigs.ENABLE_TYPE_WIDENING.key)
AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark)
val numFilesRewritten = rewriteFilesIfNeeded()
val metadataRemoved = removeMetadataIfNeeded()

recordDeltaEvent(
table.deltaLog,
opType = "delta.typeWideningFeatureRemovalMetrics",
data = Map(
"downgradeTimeMs" -> TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs),
"numFilesRewritten" -> numFilesRewritten,
"metadataRemoved" -> metadataRemoved
)
)
numFilesRewritten > 0 || metadataRemoved
}

/**
* Rewrite files that have columns or fields with a different type than in the current table
* schema. These are all files not added or modified after the last type change.
* @return Return the number of files rewritten.
*/
private def rewriteFilesIfNeeded(): Long = {
val numFilesToRewrite = TypeWidening.numFilesRequiringRewrite(table.initialSnapshot)
if (numFilesToRewrite == 0L) return 0L

// Get the table Id and catalog from the delta table to build a ResolvedTable plan for the reorg
// command.
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val tableId = table.spark
.sessionState
.sqlParser
.parseTableIdentifier(table.name).nameParts.asIdentifier
val catalog = table.spark.sessionState.catalogManager.currentCatalog.asTableCatalog

val reorg = DeltaReorgTableCommand(
ResolvedTable.create(
catalog,
tableId,
table
),
DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None)
)(Nil)

reorg.run(table.spark)
numFilesToRewrite
}

/**
* Remove the type widening metadata attached to fields in the current table schema.
* @return Return true if any metadata was removed. False otherwise.
*/
private def removeMetadataIfNeeded(): Boolean = {
if (!TypeWideningMetadata.containsTypeWideningMetadata(table.initialSnapshot.schema)) {
return false
}

val txn = table.startTransaction()
val metadata = txn.metadata
val (cleanedSchema, changes) =
TypeWideningMetadata.removeTypeWideningMetadata(metadata.schema)
txn.commit(
metadata.copy(schemaString = cleanedSchema.json) :: Nil,
DeltaOperations.UpdateColumnMetadata("DROP FEATURE", changes))
true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,8 @@ object ManagedCommitTableFeature
}

object TypeWideningTableFeature extends ReaderWriterFeature(name = "typeWidening-dev")
with FeatureAutomaticallyEnabledByMetadata {
with FeatureAutomaticallyEnabledByMetadata
with RemovableFeature {
override def automaticallyUpdateProtocolOfExistingTables: Boolean = true

private def isTypeWideningSupportNeededByMetadata(metadata: Metadata): Boolean =
Expand All @@ -638,6 +639,19 @@ object TypeWideningTableFeature extends ReaderWriterFeature(name = "typeWidening
override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = isTypeWideningSupportNeededByMetadata(metadata)

override def validateRemoval(snapshot: Snapshot): Boolean =
!isTypeWideningSupportNeededByMetadata(snapshot.metadata) &&
!TypeWideningMetadata.containsTypeWideningMetadata(snapshot.metadata.schema)

override def actionUsesFeature(action: Action): Boolean =
action match {
case m: Metadata => TypeWideningMetadata.containsTypeWideningMetadata(m.schema)
case _ => false
}

override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand =
TypeWideningPreDowngradeCommand(table)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.{Metadata, Protocol, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol, TableFeatureProtocolUtils}

import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types._

object TypeWidening {
Expand Down Expand Up @@ -60,4 +61,33 @@ object TypeWidening {
case (ByteType | ShortType, IntegerType) => true
case _ => false
}

/**
* Filter the given list of files to only keep files that were written before the latest type
* change, if any. These older files contain a column or field with a type that is different than
* in the current table schema and must be rewritten when dropping the type widening table feature
* to make the table readable by readers that don't support the feature.
*/
def filterFilesRequiringRewrite(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] =
TypeWideningMetadata.getLatestTypeChangeVersion(snapshot.metadata.schema) match {
case Some(latestVersion) =>
files.filter(_.defaultRowCommitVersion match {
case Some(version) => version < latestVersion
// Files written before the type widening table feature was added to the table don't
// have a defaultRowCommitVersion. That does mean they were written before the latest
// type change.
case None => true
})
case None =>
Seq.empty
}


/**
* Return the number of files that were written before the latest type change and that then
* contain a column or field with a type that is different from the current able schema.
*/
def numFilesRequiringRewrite(snapshot: Snapshot): Long = {
filterFilesRequiringRewrite(snapshot, snapshot.allFiles.collect()).size
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import scala.collection.mutable

import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.util.ScalaExtensions._

import org.apache.spark.sql.types._
Expand Down Expand Up @@ -156,12 +158,12 @@ private[delta] object TypeWideningMetadata {
typeChange.copy(fieldPath = "element" +: typeChange.fieldPath)
}
case (fromType: AtomicType, toType: AtomicType) if fromType != toType =>
Seq(TypeChange(
version,
fromType,
toType,
fieldPath = Seq.empty
))
Seq(TypeChange(
version,
fromType,
toType,
fieldPath = Seq.empty
))
case (_: AtomicType, _: AtomicType) => Seq.empty
// Don't recurse inside structs, `collectTypeChanges` should be called directly on each struct
// fields instead to only collect type changes inside these fields.
Expand Down Expand Up @@ -192,4 +194,49 @@ private[delta] object TypeWideningMetadata {
case None => field
}
}

/**
* Remove the type widening metadata from all the fields in the given schema.
* Return the cleaned schema and a list of fields with their path that had type widening metadata.
*/
def removeTypeWideningMetadata(schema: StructType)
: (StructType, Seq[(Seq[String], StructField)]) = {
if (!containsTypeWideningMetadata(schema)) return (schema, Seq.empty)

val changes = mutable.Buffer.empty[(Seq[String], StructField)]
val newSchema = SchemaMergingUtils.transformColumns(schema) {
case (fieldPath: Seq[String], field: StructField, _)
if field.metadata.contains(TYPE_CHANGES_METADATA_KEY) =>
changes.append((fieldPath, field))
val cleanMetadata = new MetadataBuilder()
.withMetadata(field.metadata)
.remove(TYPE_CHANGES_METADATA_KEY)
.build()
field.copy(metadata = cleanMetadata)
case (_, field: StructField, _) => field
}
newSchema -> changes.toSeq
}

/** Recursively checks whether any struct field in the schema contains type widening metadata. */
def containsTypeWideningMetadata(schema: StructType): Boolean =
schema.existsRecursively {
case s: StructType => s.exists(_.metadata.contains(TYPE_CHANGES_METADATA_KEY))
case _ => false
}

/** Return the version of the latest type change recorded in the schema metadata */
def getLatestTypeChangeVersion(schema: StructType): Option[Long] = {
val allStructFields = SchemaUtils.filterRecursively(schema, checkComplexTypes = true) {
_ => true
}.map(_._2)

// Collect all type change versions from all struct fields.
val versions = allStructFields
.flatMap(TypeWideningMetadata.fromField)
.flatMap(_.typeChanges)
.map(_.version)

if (versions.nonEmpty) Some(versions.max) else None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

package org.apache.spark.sql.delta.commands

import org.apache.spark.sql.delta.{Snapshot, TypeWidening}
import org.apache.spark.sql.delta.actions.AddFile

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LeafCommand, LogicalPlan, UnaryCommand}

object DeltaReorgTableMode extends Enumeration {
val PURGE, UNIFORM_ICEBERG = Value
val PURGE, UNIFORM_ICEBERG, REWRITE_TYPE_WIDENING = Value
}

case class DeltaReorgTableSpec(
Expand Down Expand Up @@ -70,7 +71,8 @@ case class DeltaReorgTableCommand(
}

override def run(sparkSession: SparkSession): Seq[Row] = reorgTableSpec match {
case DeltaReorgTableSpec(DeltaReorgTableMode.PURGE, None) =>
case DeltaReorgTableSpec(
DeltaReorgTableMode.PURGE | DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None) =>
optimizeByReorg(sparkSession)
case DeltaReorgTableSpec(DeltaReorgTableMode.UNIFORM_ICEBERG, Some(icebergCompatVersion)) =>
val table = getDeltaTable(target, "REORG")
Expand All @@ -82,6 +84,8 @@ case class DeltaReorgTableCommand(
new DeltaPurgeOperation()
case DeltaReorgTableSpec(DeltaReorgTableMode.UNIFORM_ICEBERG, Some(icebergCompatVersion)) =>
new DeltaUpgradeUniformOperation(icebergCompatVersion)
case DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None) =>
new DeltaRewriteTypeWideningOperation()
}
}

Expand All @@ -93,14 +97,14 @@ sealed trait DeltaReorgOperation {
* Collects files that need to be processed by the reorg operation from the list of candidate
* files.
*/
def filterFilesToReorg(files: Seq[AddFile]): Seq[AddFile]
def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile]
}

/**
* Reorg operation to purge files with soft deleted rows.
*/
class DeltaPurgeOperation extends DeltaReorgOperation {
override def filterFilesToReorg(files: Seq[AddFile]): Seq[AddFile] =
override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] =
files.filter { file =>
(file.deletionVector != null && file.numPhysicalRecords.isEmpty) ||
file.numDeletedRecords > 0L
Expand All @@ -111,7 +115,7 @@ class DeltaPurgeOperation extends DeltaReorgOperation {
* Reorg operation to upgrade the iceberg compatibility version of a table.
*/
class DeltaUpgradeUniformOperation(icebergCompatVersion: Int) extends DeltaReorgOperation {
override def filterFilesToReorg(files: Seq[AddFile]): Seq[AddFile] = {
override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] = {
def shouldRewriteToBeIcebergCompatible(file: AddFile): Boolean = {
if (file.tags == null) return true
val icebergCompatVersion = file.tags.getOrElse(AddFile.Tags.ICEBERG_COMPAT_VERSION.name, "0")
Expand All @@ -120,3 +124,12 @@ class DeltaUpgradeUniformOperation(icebergCompatVersion: Int) extends DeltaReorg
files.filter(shouldRewriteToBeIcebergCompatible)
}
}

/**
* Internal reorg operation to rewrite files to conform to the current table schema when dropping
* the type widening table feature.
*/
class DeltaRewriteTypeWideningOperation extends DeltaReorgOperation {
override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] =
TypeWidening.filterFilesRequiringRewrite(snapshot, files)
}
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ class OptimizeExecutor(
val partitionSchema = txn.metadata.partitionSchema

val filesToProcess = optimizeContext.reorg match {
case Some(reorgOperation) => reorgOperation.filterFilesToReorg(candidateFiles)
case Some(reorgOperation) => reorgOperation.filterFilesToReorg(txn.snapshot, candidateFiles)
case None => filterCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles)
}
val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq
Expand Down
Loading

0 comments on commit ec8ab16

Please sign in to comment.