Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21247][SQL] Type comparison should respect case-sensitive SQL conf #18460

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.analysis

import java.util.Locale
import javax.annotation.Nullable

import scala.annotation.tailrec
Expand Down Expand Up @@ -100,6 +101,17 @@ object TypeCoercion {
case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) =>
Some(TimestampType)

case (t1 @ StructType(fields1), t2 @ StructType(fields2)) if t1.sameType(t2) =>
Some(StructType(fields1.zip(fields2).map { case (f1, f2) =>
// Since `t1.sameType(t2)` is true, two StructTypes have the same DataType
// except `name` (in case of `spark.sql.caseSensitive=false`) and `nullable`.
// - Different names: use a lower case name because findTightestCommonType is commutative.
// - Different nullabilities: `nullable` is true iff one of them is nullable.
val name = if (f1.name == f2.name) f1.name else f1.name.toLowerCase(Locale.ROOT)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the output nested column name is lower case? Is Hive behaving like this?

In addition, could you add one more test and check whether we also respect case sensitivity conf when we resolve the queries that contain the nested column in the references?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Hive does like the following.

hive> CREATE TABLE S AS SELECT named_struct('A',1);
hive> DESCRIBE S;
OK
_c0                 	struct<a:int>

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a test case, does nested column in the references mean WHERE clause?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do both tab.Col1.a and tab.Col1.Awork well when case sensitivity is off?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When case sensitive is off, Spark considers them in lower cases.
For example, in the test case, we need table name struct1 and struct2. In case of a = A, it raises ambiquous column exceptions.

checkAnswer(sql("SELECT * FROM struct1, struct2 WHERE struct1.a = struct2.A"), Seq.empty)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, does the case sensitivity conf works in nest column name resolution?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Hive, your example works like the following.

hive> CREATE TABLE S1 AS SELECT named_struct('A',1) Col1;
hive> SELECT S1.Col1.a, S1.Col1.A FROM S1;
OK
1	1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. I thought so. Let me check that again.

I mean, does the case sensitivity conf works in nest column name resolution?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It works. I updated the test cases.

val dataType = findTightestCommonType(f1.dataType, f2.dataType).get
StructField(name, dataType, nullable = f1.nullable || f2.nullable)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we follow what we are doing for union/except/intersect? Always pick the name of the head one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the example,

      sql("SELECT 1 as a UNION ALL (SELECT 1 as A)").show()
      sql("SELECT 1 as A UNION ALL (SELECT 1 as a)").show()

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Oct 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR works as you want. This function is used to compare the equality only. BTW, for this function, it should use one of lower or upper case because it should be commutative.

scala> sql("SELECT struct(1 a) UNION ALL (SELECT struct(2 A))").printSchema
root
 |-- named_struct(a, 1 AS `a`): struct (nullable = false)
 |    |-- a: integer (nullable = false)

scala> sql("SELECT struct(1 A) UNION ALL (SELECT struct(2 a))").printSchema
root
 |-- named_struct(A, 1 AS `A`): struct (nullable = false)
 |    |-- A: integer (nullable = false)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val name = if (f1.name == f2.name) f1.name else f1.name.toLowerCase(Locale.ROOT)

The above code changes the case, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, right. It's for commutativity.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see TypeCoercionSuite.checkWidenType.

In order to use the first type name, we need to loosen this test helper function and to break the existing commutative assumption. I'm ok for that if you want.

}))

case _ => None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.json4s.jackson.JsonMethods._

import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -80,7 +81,11 @@ abstract class DataType extends AbstractDataType {
* (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`).
*/
private[spark] def sameType(other: DataType): Boolean =
DataType.equalsIgnoreNullability(this, other)
if (SQLConf.get.caseSensitiveAnalysis) {
DataType.equalsIgnoreNullability(this, other)
} else {
DataType.equalsIgnoreCaseAndNullability(this, other)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we already have DataType.equalsIgnoreCaseAndNullability, we can use this according to the SQL configuration.

}

/**
* Returns the same data type but set all nullability fields are true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,45 @@ class TypeCoercionSuite extends AnalysisTest {
widenTest(NullType, StructType(Seq()), Some(StructType(Seq())))
widenTest(StringType, MapType(IntegerType, StringType, true), None)
widenTest(ArrayType(IntegerType), StructType(Seq()), None)

widenTest(
StructType(Seq(StructField("a", IntegerType))),
StructType(Seq(StructField("b", IntegerType))),
None)
widenTest(
StructType(Seq(StructField("a", IntegerType, nullable = false))),
StructType(Seq(StructField("a", DoubleType, nullable = false))),
None)

widenTest(
StructType(Seq(StructField("a", IntegerType, nullable = false))),
StructType(Seq(StructField("a", IntegerType, nullable = false))),
Some(StructType(Seq(StructField("a", IntegerType, nullable = false)))))
widenTest(
StructType(Seq(StructField("a", IntegerType, nullable = false))),
StructType(Seq(StructField("a", IntegerType, nullable = true))),
Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))
widenTest(
StructType(Seq(StructField("a", IntegerType, nullable = true))),
StructType(Seq(StructField("a", IntegerType, nullable = false))),
Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))
widenTest(
StructType(Seq(StructField("a", IntegerType, nullable = true))),
StructType(Seq(StructField("a", IntegerType, nullable = true))),
Some(StructType(Seq(StructField("a", IntegerType, nullable = true)))))

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
widenTest(
StructType(Seq(StructField("a", IntegerType))),
StructType(Seq(StructField("A", IntegerType))),
None)
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
widenTest(
StructType(Seq(StructField("a", IntegerType), StructField("B", IntegerType))),
StructType(Seq(StructField("A", IntegerType), StructField("b", IntegerType))),
Some(StructType(Seq(StructField("a", IntegerType), StructField("b", IntegerType)))))
}
}

test("wider common type for decimal and array") {
Expand Down
38 changes: 38 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2646,6 +2646,44 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}

test("SPARK-21247: Allow case-insensitive type equality in Set operation") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql("SELECT struct(1 a) UNION ALL (SELECT struct(2 A))")
sql("SELECT struct(1 a) EXCEPT (SELECT struct(2 A))")

withTable("t", "S") {
sql("CREATE TABLE t(c struct<f:int>) USING parquet")
sql("CREATE TABLE S(C struct<F:int>) USING parquet")
Seq(("c", "C"), ("C", "c"), ("c.f", "C.F"), ("C.F", "c.f")).foreach {
case (left, right) =>
checkAnswer(sql(s"SELECT * FROM t, S WHERE t.$left = S.$right"), Seq.empty)
}
}
}

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
val m1 = intercept[AnalysisException] {
sql("SELECT struct(1 a) UNION ALL (SELECT struct(2 A))")
}.message
assert(m1.contains("Union can only be performed on tables with the compatible column types"))

val m2 = intercept[AnalysisException] {
sql("SELECT struct(1 a) EXCEPT (SELECT struct(2 A))")
}.message
assert(m2.contains("Except can only be performed on tables with the compatible column types"))

withTable("t", "S") {
sql("CREATE TABLE t(c struct<f:int>) USING parquet")
sql("CREATE TABLE S(C struct<F:int>) USING parquet")
checkAnswer(sql("SELECT * FROM t, S WHERE t.c.f = S.C.F"), Seq.empty)
val m = intercept[AnalysisException] {
sql("SELECT * FROM t, S WHERE c = C")
}.message
assert(m.contains("cannot resolve '(t.`c` = S.`C`)' due to data type mismatch"))
}
}
}

test("SPARK-21335: support un-aliased subquery") {
withTempView("v") {
Seq(1 -> "a").toDF("i", "j").createOrReplaceTempView("v")
Expand Down