Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21247][SQL] Type comparison should respect case-sensitive SQL conf #18460

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ object TypeCoercion {
case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) =>
Some(TimestampType)

case (t1 @ StructType(fields1), t2 @ StructType(fields2)) if t1.sameType(t2) =>
Some(StructType(fields1.zip(fields2).map { case (f1, f2) =>
// Since `t1.sameType(t2)` is true, two StructTypes have the same DataType
// except `name` (in case of `spark.sql.caseSensitive=false`) and `nullable`.
// - Different names: use f1.name
// - Different nullabilities: `nullable` is true iff one of them is nullable.
val dataType = findTightestCommonType(f1.dataType, f2.dataType).get
StructField(f1.name, dataType, nullable = f1.nullable || f2.nullable)
}))

case _ => None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.json4s.jackson.JsonMethods._

import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -80,7 +81,11 @@ abstract class DataType extends AbstractDataType {
* (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`).
*/
private[spark] def sameType(other: DataType): Boolean =
DataType.equalsIgnoreNullability(this, other)
if (SQLConf.get.caseSensitiveAnalysis) {
DataType.equalsIgnoreNullability(this, other)
} else {
DataType.equalsIgnoreCaseAndNullability(this, other)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we already have DataType.equalsIgnoreCaseAndNullability, we can use this according to the SQL configuration.

}

/**
* Returns the same data type but set all nullability fields are true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,17 @@ class TypeCoercionSuite extends AnalysisTest {
widenFunc: (DataType, DataType) => Option[DataType],
t1: DataType,
t2: DataType,
expected: Option[DataType]): Unit = {
expected: Option[DataType],
isSymmetric: Boolean = true): Unit = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile . I extended this function for using non-symmetric tests and addressed your comments.

var found = widenFunc(t1, t2)
assert(found == expected,
s"Expected $expected as wider common type for $t1 and $t2, found $found")
// Test both directions to make sure the widening is symmetric.
found = widenFunc(t2, t1)
assert(found == expected,
s"Expected $expected as wider common type for $t2 and $t1, found $found")
if (isSymmetric) {
found = widenFunc(t2, t1)
assert(found == expected,
s"Expected $expected as wider common type for $t2 and $t1, found $found")
}
}

test("implicit type cast - ByteType") {
Expand Down Expand Up @@ -385,6 +388,47 @@ class TypeCoercionSuite extends AnalysisTest {
widenTest(NullType, StructType(Seq()), Some(StructType(Seq())))
widenTest(StringType, MapType(IntegerType, StringType, true), None)
widenTest(ArrayType(IntegerType), StructType(Seq()), None)

widenTest(
StructType(Seq(StructField("a", IntegerType))),
StructType(Seq(StructField("b", IntegerType))),
None)
widenTest(
StructType(Seq(StructField("a", IntegerType, nullable = false))),
StructType(Seq(StructField("a", DoubleType, nullable = false))),
None)

widenTest(
StructType(Seq(StructField("a", IntegerType, nullable = false))),
StructType(Seq(StructField("a", IntegerType, nullable = false))),
Some(StructType(Seq(StructField("a", IntegerType, nullable = false)))))
widenTest(
StructType(Seq(StructField("a", IntegerType, nullable = false))),
StructType(Seq(StructField("a", IntegerType, nullable = true))),
Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))
widenTest(
StructType(Seq(StructField("a", IntegerType, nullable = true))),
StructType(Seq(StructField("a", IntegerType, nullable = false))),
Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))
widenTest(
StructType(Seq(StructField("a", IntegerType, nullable = true))),
StructType(Seq(StructField("a", IntegerType, nullable = true))),
Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
widenTest(
StructType(Seq(StructField("a", IntegerType))),
StructType(Seq(StructField("A", IntegerType))),
None)
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
checkWidenType(
TypeCoercion.findTightestCommonType,
StructType(Seq(StructField("a", IntegerType), StructField("B", IntegerType))),
StructType(Seq(StructField("A", IntegerType), StructField("b", IntegerType))),
Some(StructType(Seq(StructField("a", IntegerType), StructField("B", IntegerType)))),
isSymmetric = false)
}
}

test("wider common type for decimal and array") {
Expand Down
38 changes: 38 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2646,6 +2646,44 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}

test("SPARK-21247: Allow case-insensitive type equality in Set operation") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql("SELECT struct(1 a) UNION ALL (SELECT struct(2 A))")
sql("SELECT struct(1 a) EXCEPT (SELECT struct(2 A))")

withTable("t", "S") {
sql("CREATE TABLE t(c struct<f:int>) USING parquet")
sql("CREATE TABLE S(C struct<F:int>) USING parquet")
Seq(("c", "C"), ("C", "c"), ("c.f", "C.F"), ("C.F", "c.f")).foreach {
case (left, right) =>
checkAnswer(sql(s"SELECT * FROM t, S WHERE t.$left = S.$right"), Seq.empty)
}
}
}

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
val m1 = intercept[AnalysisException] {
sql("SELECT struct(1 a) UNION ALL (SELECT struct(2 A))")
}.message
assert(m1.contains("Union can only be performed on tables with the compatible column types"))

val m2 = intercept[AnalysisException] {
sql("SELECT struct(1 a) EXCEPT (SELECT struct(2 A))")
}.message
assert(m2.contains("Except can only be performed on tables with the compatible column types"))

withTable("t", "S") {
sql("CREATE TABLE t(c struct<f:int>) USING parquet")
sql("CREATE TABLE S(C struct<F:int>) USING parquet")
checkAnswer(sql("SELECT * FROM t, S WHERE t.c.f = S.C.F"), Seq.empty)
val m = intercept[AnalysisException] {
sql("SELECT * FROM t, S WHERE c = C")
}.message
assert(m.contains("cannot resolve '(t.`c` = S.`C`)' due to data type mismatch"))
}
}
}

test("SPARK-21335: support un-aliased subquery") {
withTempView("v") {
Seq(1 -> "a").toDF("i", "j").createOrReplaceTempView("v")
Expand Down