From 6cdc32fb33cf717dbc6ad6e674f8c942535683ba Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 17 Aug 2020 13:19:49 -0700 Subject: [PATCH] [SPARK-32622][SQL][TEST] Add case-sensitivity test for ORC predicate pushdown ### What changes were proposed in this pull request? During working on SPARK-25557, we found that ORC predicate pushdown doesn't have case-sensitivity test. This PR proposes to add case-sensitivity test for ORC predicate pushdown. ### Why are the changes needed? Increasing test coverage for ORC predicate pushdown. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass Jenkins tests. Closes #29427 from viirya/SPARK-25557-followup3. Authored-by: Liang-Chi Hsieh Signed-off-by: Dongjoon Hyun (cherry picked from commit b33066f42bd474f5f80b14221f97d09a76e0b398) Signed-off-by: Dongjoon Hyun --- .../datasources/orc/OrcFilterSuite.scala | 97 ++++++++++++++++++- .../datasources/orc/OrcFilterSuite.scala | 97 ++++++++++++++++++- 2 files changed, 190 insertions(+), 4 deletions(-) diff --git a/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index 88b4b243b543a..beb72322326fe 100644 --- a/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -25,8 +25,8 @@ import scala.collection.JavaConverters._ import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument} -import org.apache.spark.SparkConf -import org.apache.spark.sql.{AnalysisException, Column, DataFrame} +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Row} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -469,5 +469,98 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession { ).get.toString } } + + test("SPARK-32622: case sensitivity in predicate pushdown") { + withTempPath { dir => + val count = 10 + val tableName = "spark_32622" + val tableDir1 = dir.getAbsoluteFile + "/table1" + + // Physical ORC files have both `A` and `a` fields. + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + spark.range(count).repartition(count).selectExpr("id - 1 as A", "id as a") + .write.mode("overwrite").orc(tableDir1) + } + + // Metastore table has both `A` and `a` fields too. + withTable(tableName) { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + sql( + s""" + |CREATE TABLE $tableName (A LONG, a LONG) USING ORC LOCATION '$tableDir1' + """.stripMargin) + + checkAnswer(sql(s"select a, A from $tableName"), (0 until count).map(c => Row(c, c - 1))) + + val actual1 = stripSparkFilter(sql(s"select A from $tableName where A < 0")) + assert(actual1.count() == 1) + + val actual2 = stripSparkFilter(sql(s"select A from $tableName where a < 0")) + assert(actual2.count() == 0) + } + + // Exception thrown for ambiguous case. + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val e = intercept[AnalysisException] { + sql(s"select a from $tableName where a < 0").collect() + } + assert(e.getMessage.contains( + "Reference 'a' is ambiguous")) + } + } + + // Metastore table has only `A` field. + withTable(tableName) { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + sql( + s""" + |CREATE TABLE $tableName (A LONG) USING ORC LOCATION '$tableDir1' + """.stripMargin) + + val e = intercept[SparkException] { + sql(s"select A from $tableName where A < 0").collect() + } + assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains( + """Found duplicate field(s) "A": [A, a] in case-insensitive mode""")) + } + } + + // Physical ORC files have only `A` field. + val tableDir2 = dir.getAbsoluteFile + "/table2" + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + spark.range(count).repartition(count).selectExpr("id - 1 as A") + .write.mode("overwrite").orc(tableDir2) + } + + withTable(tableName) { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + sql( + s""" + |CREATE TABLE $tableName (a LONG) USING ORC LOCATION '$tableDir2' + """.stripMargin) + + checkAnswer(sql(s"select a from $tableName"), (0 until count).map(c => Row(c - 1))) + + val actual = stripSparkFilter(sql(s"select a from $tableName where a < 0")) + // TODO: ORC predicate pushdown should work under case-insensitive analysis. + // assert(actual.count() == 1) + } + } + + withTable(tableName) { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + sql( + s""" + |CREATE TABLE $tableName (A LONG) USING ORC LOCATION '$tableDir2' + """.stripMargin) + + checkAnswer(sql(s"select A from $tableName"), (0 until count).map(c => Row(c - 1))) + + val actual = stripSparkFilter(sql(s"select A from $tableName where A < 0")) + assert(actual.count() == 1) + } + } + } + } } diff --git a/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index 2263179515a5f..a3e450c5cd9f5 100644 --- a/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -25,8 +25,8 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument} -import org.apache.spark.SparkConf -import org.apache.spark.sql.{AnalysisException, Column, DataFrame} +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Row} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -470,5 +470,98 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession { ).get.toString } } + + test("SPARK-32622: case sensitivity in predicate pushdown") { + withTempPath { dir => + val count = 10 + val tableName = "spark_32622" + val tableDir1 = dir.getAbsoluteFile + "/table1" + + // Physical ORC files have both `A` and `a` fields. + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + spark.range(count).repartition(count).selectExpr("id - 1 as A", "id as a") + .write.mode("overwrite").orc(tableDir1) + } + + // Metastore table has both `A` and `a` fields too. + withTable(tableName) { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + sql( + s""" + |CREATE TABLE $tableName (A LONG, a LONG) USING ORC LOCATION '$tableDir1' + """.stripMargin) + + checkAnswer(sql(s"select a, A from $tableName"), (0 until count).map(c => Row(c, c - 1))) + + val actual1 = stripSparkFilter(sql(s"select A from $tableName where A < 0")) + assert(actual1.count() == 1) + + val actual2 = stripSparkFilter(sql(s"select A from $tableName where a < 0")) + assert(actual2.count() == 0) + } + + // Exception thrown for ambiguous case. + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val e = intercept[AnalysisException] { + sql(s"select a from $tableName where a < 0").collect() + } + assert(e.getMessage.contains( + "Reference 'a' is ambiguous")) + } + } + + // Metastore table has only `A` field. + withTable(tableName) { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + sql( + s""" + |CREATE TABLE $tableName (A LONG) USING ORC LOCATION '$tableDir1' + """.stripMargin) + + val e = intercept[SparkException] { + sql(s"select A from $tableName where A < 0").collect() + } + assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains( + """Found duplicate field(s) "A": [A, a] in case-insensitive mode""")) + } + } + + // Physical ORC files have only `A` field. + val tableDir2 = dir.getAbsoluteFile + "/table2" + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + spark.range(count).repartition(count).selectExpr("id - 1 as A") + .write.mode("overwrite").orc(tableDir2) + } + + withTable(tableName) { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + sql( + s""" + |CREATE TABLE $tableName (a LONG) USING ORC LOCATION '$tableDir2' + """.stripMargin) + + checkAnswer(sql(s"select a from $tableName"), (0 until count).map(c => Row(c - 1))) + + val actual = stripSparkFilter(sql(s"select a from $tableName where a < 0")) + // TODO: ORC predicate pushdown should work under case-insensitive analysis. + // assert(actual.count() == 1) + } + } + + withTable(tableName) { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + sql( + s""" + |CREATE TABLE $tableName (A LONG) USING ORC LOCATION '$tableDir2' + """.stripMargin) + + checkAnswer(sql(s"select A from $tableName"), (0 until count).map(c => Row(c - 1))) + + val actual = stripSparkFilter(sql(s"select A from $tableName where A < 0")) + assert(actual.count() == 1) + } + } + } + } }