Skip to content

Commit

Permalink
[SPARK-49975][SQL] Disallow LEGACY storeAssignmentPolicy with DataFra…
Browse files Browse the repository at this point in the history
…meWriterV2 API
  • Loading branch information
manuzhang committed Oct 15, 2024
1 parent 488f680 commit f6625f4
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3661,20 +3661,23 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
object ResolveOutputRelation extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
_.containsPattern(COMMAND), ruleId) {
case v2Write: V2WriteCommand
if v2Write.table.resolved && v2Write.query.resolved && !v2Write.outputResolved =>
case v2Write: V2WriteCommand =>
validateStoreAssignmentPolicy()
TableOutputResolver.suitableForByNameCheck(v2Write.isByName,
expected = v2Write.table.output, queryOutput = v2Write.query.output)
val projection = TableOutputResolver.resolveOutputColumns(
v2Write.table.name, v2Write.table.output, v2Write.query, v2Write.isByName, conf)
if (projection != v2Write.query) {
val cleanedTable = v2Write.table match {
case r: DataSourceV2Relation =>
r.copy(output = r.output.map(CharVarcharUtils.cleanAttrMetadata))
case other => other
if (v2Write.table.resolved && v2Write.query.resolved && !v2Write.outputResolved) {
TableOutputResolver.suitableForByNameCheck(v2Write.isByName,
expected = v2Write.table.output, queryOutput = v2Write.query.output)
val projection = TableOutputResolver.resolveOutputColumns(
v2Write.table.name, v2Write.table.output, v2Write.query, v2Write.isByName, conf)
if (projection != v2Write.query) {
val cleanedTable = v2Write.table match {
case r: DataSourceV2Relation =>
r.copy(output = r.output.map(CharVarcharUtils.cleanAttrMetadata))
case other => other
}
v2Write.withNewQuery(projection).withNewTable(cleanedTable)
} else {
v2Write
}
v2Write.withNewQuery(projection).withNewTable(cleanedTable)
} else {
v2Write
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -839,4 +839,18 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
condition = "CALL_ON_STREAMING_DATASET_UNSUPPORTED",
parameters = Map("methodName" -> "`writeTo`"))
}

test("SPARK-49975: write should fail with legacy store assignment policy") {
withSQLConf((SQLConf.STORE_ASSIGNMENT_POLICY.key, "LEGACY")) {
checkError(
exception = intercept[AnalysisException] {
spark.range(10)
.writeTo("table_name")
.append()
},
condition = "_LEGACY_ERROR_TEMP_1000",
parameters = Map("configKey" -> "spark.sql.storeAssignmentPolicy")
)
}
}
}

0 comments on commit f6625f4

Please sign in to comment.