Skip to content

Commit

Permalink
Support insert-into-by-name for generated columns
Browse files Browse the repository at this point in the history
## Description

Spark 3.4 no longer requires users to provide _all_ columns in insert-by-name queries. This means Delta can now support omitting generated columns from the column list in such queries.

This test adds support for this and adds some additional tests related to the changed by-name support.

Resolves #1215

Adds unit tests.

## Does this PR introduce _any_ user-facing changes?

Yes. Users will be able to omit generated columns from the column list when inserting by name.

Closes #1743

GitOrigin-RevId: 8694fab3d93b71b4230bf6f5dd0f2a21be6f3634
  • Loading branch information
allisonport-db committed May 11, 2023
1 parent 9fac2e6 commit 422a670
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -830,12 +830,6 @@ class DeltaAnalysis(session: SparkSession)
*/
private def needsSchemaAdjustmentByName(query: LogicalPlan, targetAttrs: Seq[Attribute],
deltaTable: DeltaTableV2): Boolean = {
// TODO: update this to allow columns with default expressions to not be
// specified (i.e. generated columns)
if (targetAttrs.length != query.output.length) {
throw QueryCompilationErrors.writeTableWithMismatchedColumnsError(
targetAttrs.length, query.output.length, query)
}
insertIntoByNameMissingColumn(query, targetAttrs, deltaTable)
val userSpecifiedNames = if (session.sessionState.conf.caseSensitiveAnalysis) {
query.output.map(a => (a.name, a)).toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,12 @@ class DeltaDataFrameWriterV2Suite
checkAnswer(
spark.table(s"delta.`$location`"),
Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c")))

// allows missing columns
Seq(4L).toDF("id").writeTo(s"delta.`$location`").append()
checkAnswer(
spark.table(s"delta.`$location`"),
Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c"), Row(4L, null)))
}

test("Create: basic behavior by path") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,105 @@ class DeltaInsertIntoSQLSuite
}
}

test("insertInto: append by name") {
import testImplicits._
val t1 = "tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
sql(s"INSERT INTO $t1(id, data) VALUES(1L, 'a')")
// Can be in a different order
sql(s"INSERT INTO $t1(data, id) VALUES('b', 2L)")
// Can be casted automatically
sql(s"INSERT INTO $t1(data, id) VALUES('c', 3)")
verifyTable(t1, df)
withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") {
// Missing columns
assert(intercept[AnalysisException] {
sql(s"INSERT INTO $t1(data) VALUES(4)")
}.getMessage.contains("Column id is not specified in INSERT"))
// Missing columns with matching dataType
assert(intercept[AnalysisException] {
sql(s"INSERT INTO $t1(data) VALUES('b')")
}.getMessage.contains("Column id is not specified in INSERT"))
}
// Duplicate columns
assert(intercept[AnalysisException](
sql(s"INSERT INTO $t1(data, data) VALUES(5)")).getMessage.nonEmpty)
}
}

test("insertInto: overwrite by name") {
import testImplicits._
val t1 = "tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
sql(s"INSERT OVERWRITE $t1(id, data) VALUES(1L, 'a')")
verifyTable(t1, Seq((1L, "a")).toDF("id", "data"))
// Can be in a different order
sql(s"INSERT OVERWRITE $t1(data, id) VALUES('b', 2L)")
verifyTable(t1, Seq((2L, "b")).toDF("id", "data"))
// Can be casted automatically
sql(s"INSERT OVERWRITE $t1(data, id) VALUES('c', 3)")
verifyTable(t1, Seq((3L, "c")).toDF("id", "data"))
withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") {
// Missing columns
assert(intercept[AnalysisException] {
sql(s"INSERT OVERWRITE $t1(data) VALUES(4)")
}.getMessage.contains("Column id is not specified in INSERT"))
// Missing columns with matching datatype
assert(intercept[AnalysisException] {
sql(s"INSERT OVERWRITE $t1(data) VALUES(4L)")
}.getMessage.contains("Column id is not specified in INSERT"))
}
// Duplicate columns
assert(intercept[AnalysisException](
sql(s"INSERT OVERWRITE $t1(data, data) VALUES(5)")).getMessage.nonEmpty)
}
}

dynamicOverwriteTest("insertInto: dynamic overwrite by name") {
import testImplicits._
val t1 = "tbl"
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string, data2 string) " +
s"USING $v2Format PARTITIONED BY (id)")
sql(s"INSERT OVERWRITE $t1(id, data, data2) VALUES(1L, 'a', 'b')")
verifyTable(t1, Seq((1L, "a", "b")).toDF("id", "data", "data2"))
// Can be in a different order
sql(s"INSERT OVERWRITE $t1(data, data2, id) VALUES('b', 'd', 2L)")
verifyTable(t1, Seq((1L, "a", "b"), (2L, "b", "d")).toDF("id", "data", "data2"))
// Can be casted automatically
sql(s"INSERT OVERWRITE $t1(data, data2, id) VALUES('c', 'e', 1)")
verifyTable(t1, Seq((1L, "c", "e"), (2L, "b", "d")).toDF("id", "data", "data2"))
withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") {
// Missing columns
assert(intercept[AnalysisException] {
sql(s"INSERT OVERWRITE $t1(data, id) VALUES('c', 1)")
}.getMessage.contains("Column data2 is not specified in INSERT"))
// Missing columns with matching datatype
assert(intercept[AnalysisException] {
sql(s"INSERT OVERWRITE $t1(data, id) VALUES('c', 1L)")
}.getMessage.contains("Column data2 is not specified in INSERT"))
}
// Duplicate columns
assert(intercept[AnalysisException](
sql(s"INSERT OVERWRITE $t1(data, data) VALUES(5)")).getMessage.nonEmpty)
}
}

test("insertInto: static partition column name should not be used in the column list") {
withTable("t") {
sql(s"CREATE TABLE t(i STRING, c string) USING $v2Format PARTITIONED BY (c)")
checkError(
exception = intercept[AnalysisException] {
sql("INSERT OVERWRITE t PARTITION (c='1') (c) VALUES ('2')")
},
errorClass = "STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST",
parameters = Map("staticName" -> "c"))
}
}


Seq(("ordinal", ""), ("name", "(id, col2, col)")).foreach { case (testName, values) =>
test(s"INSERT OVERWRITE schema evolution works for array struct types - $testName") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,14 @@ trait GeneratedColumnSuiteBase extends GeneratedColumnTest {
assert(errMsg.contains(str))
}

protected def testTableUpdateDPO(
testName: String)(updateFunc: (String, String) => Seq[Row]): Unit = {
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key ->
SQLConf.PartitionOverwriteMode.DYNAMIC.toString) {
testTableUpdate("dpo_" + testName)(updateFunc)
}
}

testTableUpdate("append_data") { (table, path) =>
Seq(
Tuple5(1L, "foo", "2020-10-11 12:30:30", 100, "2020-11-12")
Expand Down Expand Up @@ -269,6 +277,26 @@ trait GeneratedColumnSuiteBase extends GeneratedColumnTest {
100, 1000, sqlDate("2020-11-12")) :: Nil
}

testTableUpdate("insert_into_by_name_provide_all_columns") { (table, _) =>
sql(s"INSERT INTO $table (c5, c6, c7_g_p, c8, c1, c2_g, c3_p, c4_g_p) VALUES" +
s"('2020-10-11 12:30:30', 100, 1000, '2020-11-12', 1, 11, 'foo', '2020-10-11')")
Row(1L, 11L, "foo", sqlDate("2020-10-11"), sqlTimestamp("2020-10-11 12:30:30"),
100, 1000, sqlDate("2020-11-12")) :: Nil
}

testTableUpdate("insert_into_by_name_not_provide_generated_columns") { (table, _) =>
sql(s"INSERT INTO $table (c6, c8, c1, c3_p, c5) VALUES" +
s"(100, '2020-11-12', 1L, 'foo', '2020-10-11 12:30:30')")
Row(1L, 11L, "foo", sqlDate("2020-10-11"), sqlTimestamp("2020-10-11 12:30:30"),
100, 1000, sqlDate("2020-11-12")) :: Nil
}

testTableUpdate("insert_into_by_name_with_some_generated_columns") { (table, _) =>
sql(s"INSERT INTO $table (c5, c6, c8, c1, c3_p, c4_g_p) VALUES" +
s"('2020-10-11 12:30:30', 100, '2020-11-12', 1L, 'foo', '2020-10-11')")
Row(1L, 11L, "foo", sqlDate("2020-10-11"), sqlTimestamp("2020-10-11 12:30:30"),
100, 1000, sqlDate("2020-11-12")) :: Nil
}

testTableUpdate("insert_into_select_provide_all_columns") { (table, path) =>
sql(s"INSERT INTO $table SELECT " +
Expand All @@ -277,6 +305,17 @@ trait GeneratedColumnSuiteBase extends GeneratedColumnTest {
100, 1000, sqlDate("2020-11-12")) :: Nil
}

testTableUpdate("insert_into_by_name_not_provide_normal_columns") { (table, _) =>
val e = intercept[AnalysisException] {
withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") {
sql(s"INSERT INTO $table (c6, c8, c1, c3_p) VALUES" +
s"(100, '2020-11-12', 1L, 'foo')")
}
}
errorContains(e.getMessage, "Column c5 is not specified in INSERT")
Nil
}

testTableUpdate("insert_overwrite_values_provide_all_columns") { (table, path) =>
sql(s"INSERT OVERWRITE TABLE $table VALUES" +
s"(1, 11, 'foo', '2020-10-11', '2020-10-11 12:30:30', 100, 1000, '2020-11-12')")
Expand All @@ -291,27 +330,84 @@ trait GeneratedColumnSuiteBase extends GeneratedColumnTest {
100, 1000, sqlDate("2020-11-12")) :: Nil
}

testTableUpdate("insert_overwrite_by_name_provide_all_columns") { (table, _) =>
sql(s"INSERT OVERWRITE $table (c5, c6, c7_g_p, c8, c1, c2_g, c3_p, c4_g_p) VALUES" +
s"('2020-10-11 12:30:30', 100, 1000, '2020-11-12', 1, 11, 'foo', '2020-10-11')")
Row(1L, 11L, "foo", sqlDate("2020-10-11"), sqlTimestamp("2020-10-11 12:30:30"),
100, 1000, sqlDate("2020-11-12")) :: Nil
}

testTableUpdate("dpo_insert_overwrite_values_provide_all_columns") { (table, path) =>
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key ->
SQLConf.PartitionOverwriteMode.DYNAMIC.toString) {
sql(s"INSERT OVERWRITE TABLE $table VALUES" +
s"(1, 11, 'foo', '2020-10-11', '2020-10-11 12:30:30', 100, 1000, '2020-11-12')")
}
testTableUpdate("insert_overwrite_by_name_not_provide_generated_columns") { (table, _) =>
sql(s"INSERT OVERWRITE $table (c6, c8, c1, c3_p, c5) VALUES" +
s"(100, '2020-11-12', 1L, 'foo', '2020-10-11 12:30:30')")
Row(1L, 11L, "foo", sqlDate("2020-10-11"), sqlTimestamp("2020-10-11 12:30:30"),
100, 1000, sqlDate("2020-11-12")) :: Nil
}

testTableUpdate("dpo_insert_overwrite_select_provide_all_columns") { (table, path) =>
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key ->
SQLConf.PartitionOverwriteMode.DYNAMIC.toString) {
sql(s"INSERT OVERWRITE TABLE $table SELECT " +
s"1, 11, 'foo', '2020-10-11', '2020-10-11 12:30:30', 100, 1000, '2020-11-12'")
testTableUpdate("insert_overwrite_by_name_with_some_generated_columns") { (table, _) =>
sql(s"INSERT OVERWRITE $table (c5, c6, c8, c1, c3_p, c4_g_p) VALUES" +
s"('2020-10-11 12:30:30', 100, '2020-11-12', 1L, 'foo', '2020-10-11')")
Row(1L, 11L, "foo", sqlDate("2020-10-11"), sqlTimestamp("2020-10-11 12:30:30"),
100, 1000, sqlDate("2020-11-12")) :: Nil
}

testTableUpdate("insert_overwrite_by_name_not_provide_normal_columns") { (table, _) =>
val e = intercept[AnalysisException] {
withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") {
sql(s"INSERT OVERWRITE $table (c6, c8, c1, c3_p) VALUES" +
s"(100, '2020-11-12', 1L, 'foo')")
}
}
errorContains(e.getMessage, "Column c5 is not specified in INSERT")
Nil
}

testTableUpdateDPO("insert_overwrite_values_provide_all_columns") { (table, path) =>
sql(s"INSERT OVERWRITE TABLE $table VALUES" +
s"(1, 11, 'foo', '2020-10-11', '2020-10-11 12:30:30', 100, 1000, '2020-11-12')")
Row(1L, 11L, "foo", sqlDate("2020-10-11"), sqlTimestamp("2020-10-11 12:30:30"),
100, 1000, sqlDate("2020-11-12")) :: Nil
}

testTableUpdateDPO("insert_overwrite_select_provide_all_columns") { (table, path) =>
sql(s"INSERT OVERWRITE TABLE $table SELECT " +
s"1, 11, 'foo', '2020-10-11', '2020-10-11 12:30:30', 100, 1000, '2020-11-12'")
Row(1L, 11L, "foo", sqlDate("2020-10-11"), sqlTimestamp("2020-10-11 12:30:30"),
100, 1000, sqlDate("2020-11-12")) :: Nil
}

testTableUpdateDPO("insert_overwrite_by_name_values_provide_all_columns") { (table, _) =>
sql(s"INSERT OVERWRITE $table (c5, c6, c7_g_p, c8, c1, c2_g, c3_p, c4_g_p) VALUES" +
s"(CAST('2020-10-11 12:30:30' AS TIMESTAMP), 100, 1000, CAST('2020-11-12' AS DATE), " +
s"1L, 11L, 'foo', CAST('2020-10-11' AS DATE))")
Row(1L, 11L, "foo", sqlDate("2020-10-11"), sqlTimestamp("2020-10-11 12:30:30"),
100, 1000, sqlDate("2020-11-12")) :: Nil
}

testTableUpdateDPO(
"insert_overwrite_by_name_not_provide_generated_columns") { (table, _) =>
sql(s"INSERT OVERWRITE $table (c6, c8, c1, c3_p, c5) VALUES" +
s"(100, CAST('2020-11-12' AS DATE), 1L, 'foo', CAST('2020-10-11 12:30:30' AS TIMESTAMP))")
Row(1L, 11L, "foo", sqlDate("2020-10-11"), sqlTimestamp("2020-10-11 12:30:30"),
100, 1000, sqlDate("2020-11-12")) :: Nil
}

testTableUpdateDPO("insert_overwrite_by_name_with_some_generated_columns") { (table, _) =>
sql(s"INSERT OVERWRITE $table (c5, c6, c8, c1, c3_p, c4_g_p) VALUES" +
s"(CAST('2020-10-11 12:30:30' AS TIMESTAMP), 100, CAST('2020-11-12' AS DATE), 1L, " +
s"'foo', CAST('2020-10-11' AS DATE))")
Row(1L, 11L, "foo", sqlDate("2020-10-11"), sqlTimestamp("2020-10-11 12:30:30"),
100, 1000, sqlDate("2020-11-12")) :: Nil
}

testTableUpdateDPO("insert_overwrite_by_name_not_provide_normal_columns") { (table, _) =>
val e = intercept[AnalysisException] {
sql(s"INSERT OVERWRITE $table (c6, c8, c1, c3_p) VALUES" +
s"(100, '2020-11-12', 1L, 'foo')")
}
assert(e.getMessage.contains("Column c5 is not specified in INSERT"))
Nil
}

testTableUpdate("delete") { (table, path) =>
Seq(
Expand Down

0 comments on commit 422a670

Please sign in to comment.