Skip to content

Commit

Permalink
[SPARK-31958][SQL] normalize special floating numbers in subquery
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This is a followup of #23388 .

#23388 has an issue: it doesn't handle subquery expressions and assumes they will be turned into joins. However, this is not true for non-correlated subquery expressions.

This PR fixes this issue. It now doesn't skip `Subquery`, and subquery expressions will be handled by `OptimizeSubqueries`, which runs the optimizer with the subquery.

Note that, correlated subquery expressions will be handled twice: once in `OptimizeSubqueries`, once later when it becomes join. This is OK as `NormalizeFloatingNumbers` is idempotent now.

### Why are the changes needed?

fix a bug

### Does this PR introduce _any_ user-facing change?

yes, see the newly added test.

### How was this patch tested?

new test

Closes #28785 from cloud-fan/normalize.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 6fb9c80)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cloud-fan committed Jun 11, 2020
1 parent d1a3fad commit 593b423
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ import org.apache.spark.sql.types._
object NormalizeFloatingNumbers extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan match {
// A subquery will be rewritten into join later, and will go through this rule
// eventually. Here we skip subquery, as we only need to run this rule once.
case _: Subquery => plan

case _ => plan transform {
case w: Window if w.partitionSpec.exists(p => needNormalize(p)) =>
// Although the `windowExpressions` may refer to `partitionSpec` expressions, we don't need
Expand Down
18 changes: 18 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3449,6 +3449,24 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
checkAnswer(sql("select CAST(-32768 as short) DIV CAST (-1 as short)"),
Seq(Row(Short.MinValue.toLong * -1)))
}

test("normalize special floating numbers in subquery") {
withTempView("v1", "v2", "v3") {
Seq(-0.0).toDF("d").createTempView("v1")
Seq(0.0).toDF("d").createTempView("v2")
spark.range(2).createTempView("v3")

// non-correlated subquery
checkAnswer(sql("SELECT (SELECT v1.d FROM v1 JOIN v2 ON v1.d = v2.d)"), Row(-0.0))
// correlated subquery
checkAnswer(
sql(
"""
|SELECT id FROM v3 WHERE EXISTS
| (SELECT v1.d FROM v1 JOIN v2 ON v1.d = v2.d WHERE id > 0)
|""".stripMargin), Row(1))
}
}
}

case class Foo(bar: Option[String])

0 comments on commit 593b423

Please sign in to comment.