Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic type widening in MERGE #2764

Merged
merged 7 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ object ResolveDeltaMergeInto {
// schema before merging it with the target schema. We don't consider NOT MATCHED BY SOURCE
// clauses since these can't by definition reference source columns and thus can't introduce
// new columns in the target schema.
val actions = (matchedClauses ++ notMatchedClauses).flatMap(_.actions)
val actions = (resolvedMatchedClauses ++ resolvedNotMatchedClauses).flatMap(_.actions)
val assignments = actions.collect { case a: DeltaMergeAction => a.targetColNameParts }
val containsStarAction = actions.exists {
case _: UnresolvedStar => true
Expand Down Expand Up @@ -278,14 +278,22 @@ object ResolveDeltaMergeInto {
})

val migrationSchema = filterSchema(source.schema, Seq.empty)
val allowTypeWidening = target.exists {
case DeltaTable(fileIndex) =>
TypeWidening.isEnabled(fileIndex.protocol, fileIndex.metadata)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if type widening is disabled after this statement, but before the main transaction starts?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two calls to SchemaMergingUtils.mergeSchemas (here and in ImplicitMetadataOperation) will return a different schema: first one with the wider type, second one with the original type.

This works currently because the second call to mergeSchemas doesn't allow implicit casts and will fail but it's quite brittle. I added a check at the start of MERGE to properly fail when type widening is enabled/disabled concurrently

case _ => false
}

// The implicit conversions flag allows any type to be merged from source to target if Spark
// SQL considers the source type implicitly castable to the target. Normally, mergeSchemas
// enforces Parquet-level write compatibility, which would mean an INT source can't be merged
// into a LONG target.
SchemaMergingUtils.mergeSchemas(
target.schema,
migrationSchema,
allowImplicitConversions = true)
allowImplicitConversions = true,
allowTypeWidening = allowTypeWidening
)
} else {
target.schema
}
Expand Down
27 changes: 27 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,20 @@ object TypeWidening {
isEnabled
}

/**
* Checks that the type widening table property wasn't disabled or enabled between the two given
* states, throws an errors if it was.
*/
def ensureFeatureConsistentlyEnabled(
protocol: Protocol,
metadata: Metadata,
otherProtocol: Protocol,
otherMetadata: Metadata): Unit = {
if (isEnabled(protocol, metadata) != isEnabled(otherProtocol, otherMetadata)) {
throw DeltaErrors.metadataChangedException(None)
}
}

/**
* Returns whether the given type change is eligible for widening. This only checks atomic types.
* It is the responsibility of the caller to recurse into structs, maps and arrays.
Expand All @@ -62,6 +76,19 @@ object TypeWidening {
case _ => false
}

/**
* Returns whether the given type change can be applied during schema evolution. Only a
* subset of supported type changes are considered for schema evolution.
*/
def isTypeChangeSupportedForSchemaEvolution(fromType: AtomicType, toType: AtomicType): Boolean =
(fromType, toType) match {
case (from, to) if from == to => true
case (from, to) if !isTypeChangeSupported(from, to) => false
case (ByteType, ShortType) => true
case (ByteType | ShortType, IntegerType) => true
johanl-db marked this conversation as resolved.
Show resolved Hide resolved
case _ => false
}

/**
* Filter the given list of files to only keep files that were written before the latest type
* change, if any. These older files contain a column or field with a type that is different than
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ case class MergeIntoCommand(
atAnalysis = target.schema, latestSchema = deltaTxn.metadata.schema)
}

// Check that type widening wasn't enabled/disabled between analysis and the start of the
// transaction.
TypeWidening.ensureFeatureConsistentlyEnabled(
protocol = targetFileIndex.protocol,
metadata = targetFileIndex.metadata,
otherProtocol = deltaTxn.protocol,
otherMetadata = deltaTxn.metadata
)

if (canMergeSchema) {
updateMetadata(
spark, deltaTxn, migratedSchema.getOrElse(target.schema),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,14 @@ trait ImplicitMetadataOperation extends DeltaLogging {
if (rearrangeOnly) {
throw DeltaErrors.unexpectedDataChangeException("Change the Delta table schema")
}
txn.updateMetadata(txn.metadata.copy(schemaString = mergedSchema.json

val schemaWithTypeWideningMetadata = TypeWideningMetadata.addTypeWideningMetadata(
txn,
schema = mergedSchema,
oldSchema = txn.metadata.schema
)

txn.updateMetadata(txn.metadata.copy(schemaString = schemaWithTypeWideningMetadata.json
))
} else if (isNewSchema || isNewPartitioning
) {
Expand Down Expand Up @@ -201,7 +208,8 @@ object ImplicitMetadataOperation {
SchemaMergingUtils.mergeSchemas(
txn.metadata.schema,
dataSchema,
fixedTypeColumns = fixedTypeColumns)
fixedTypeColumns = fixedTypeColumns,
allowTypeWidening = TypeWidening.isEnabled(txn.protocol, txn.metadata))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ import java.util.Locale

import scala.util.control.NonFatal

import org.apache.spark.sql.delta.DeltaAnalysisException
import org.apache.spark.sql.delta.{DeltaAnalysisException, TypeWidening}

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.types.{ArrayType, ByteType, DataType, DecimalType, IntegerType, MapType, NullType, ShortType, StructField, StructType}
import org.apache.spark.sql.types._

/**
* Utils to merge table schema with data schema.
Expand Down Expand Up @@ -168,6 +167,7 @@ object SchemaMergingUtils {
dataSchema: StructType,
allowImplicitConversions: Boolean = false,
keepExistingType: Boolean = false,
allowTypeWidening: Boolean = false,
fixedTypeColumns: Set[String] = Set.empty,
caseSensitive: Boolean = false): StructType = {
checkColumnNameDuplication(dataSchema, "in the data to save", caseSensitive)
Expand Down Expand Up @@ -232,6 +232,9 @@ object SchemaMergingUtils {
// Simply keeps the existing type for primitive types
case (current, update) if keepExistingType => current

case (current: AtomicType, update: AtomicType) if allowTypeWidening &&
TypeWidening.isTypeChangeSupportedForSchemaEvolution(current, update) => update

// If implicit conversions are allowed, that means we can use any valid implicit cast to
// perform the merge.
case (current, update)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1432,7 +1432,12 @@ trait DeltaErrorsSuiteBase
val e = intercept[DeltaAnalysisException] {
val s1 = StructType(Seq(StructField("c0", IntegerType, true)))
val s2 = StructType(Seq(StructField("c0", StringType, false)))
SchemaMergingUtils.mergeSchemas(s1, s2, false, false, Set("c0"))
SchemaMergingUtils.mergeSchemas(s1, s2,
allowImplicitConversions = false,
keepExistingType = false,
allowTypeWidening = false,
Set("c0")
)
}
checkErrorMessage(e, Some("DELTA_GENERATED_COLUMNS_DATA_TYPE_MISMATCH"), Some("42K09"),
Some("Column c0 is a generated column or a column used by a generated " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ import org.apache.spark.scheduler.{JobFailed, SparkListener, SparkListenerJobEnd
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.catalyst.util.{quietly, FailFastMode}
import org.apache.spark.sql.execution.{FileSourceScanExec, QueryExecution, RDDScanExec, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -475,6 +476,8 @@ trait DeltaDMLTestUtils
with BeforeAndAfterEach {
self: SharedSparkSession =>

import testImplicits._

protected var tempDir: File = _

protected var deltaLog: DeltaLog = _
Expand Down Expand Up @@ -523,6 +526,23 @@ trait DeltaDMLTestUtils
}
}

/**
* Parse the input JSON data into a dataframe, one row per input element.
* Throws an exception on malformed inputs or records that don't comply with the provided schema.
*/
protected def readFromJSON(data: Seq[String], schema: StructType = null): DataFrame = {
if (schema != null) {
spark.read
.schema(schema)
.option("mode", FailFastMode.name)
.json(data.toDS)
} else {
spark.read
.option("mode", FailFastMode.name)
.json(data.toDS)
}
}

protected def readDeltaTable(path: String): DataFrame = {
spark.read.format("delta").load(path)
}
Expand Down
Loading
Loading