From 2fa219881e670bf27d0b96f0f35758a31a704a67 Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Tue, 17 Oct 2023 20:59:35 -0700 Subject: [PATCH 1/2] fix --- .../apache/spark/sql/execution/limit.scala | 7 ++++++ .../org/apache/spark/sql/SubquerySuite.scala | 24 +++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 877f6508d963f..a6758ff609367 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -283,6 +283,8 @@ case class TakeOrderedAndProjectExec( } override def executeCollect(): Array[InternalRow] = { + // SPARK-45584: Wait for subquery execution to finish before executing collect. + prepareAndWaitForSubqueries() val orderingSatisfies = SortOrder.orderingSatisfies(child.outputOrdering, sortOrder) val ord = new LazilyGeneratedOrdering(sortOrder, child.output) val limited = if (orderingSatisfies) { @@ -299,6 +301,11 @@ case class TakeOrderedAndProjectExec( } } + private def prepareAndWaitForSubqueries(): Unit = { + prepare() + waitForSubqueries() + } + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) private lazy val writeMetrics = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 37fea85fcd2b9..4fab10c3d0c3c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -2711,4 +2711,28 @@ class SubquerySuite extends QueryTest checkAnswer(df, Row(1, "foo", 1, "foo")) } } + + test("SPARK-45584: subquery execution should not fail with ORDER BY and LIMIT") { + withTable("t1") { + sql( + """ + |CREATE TABLE t1 USING PARQUET + |AS SELECT * FROM VALUES + |(1, "a"), + |(2, "a"), + |(3, "a") t(id, value) + |""".stripMargin) + val df = sql( + """ + |WITH t2 AS ( + | SELECT * FROM t1 ORDER BY id + |) + |SELECT *, (SELECT COUNT(*) FROM t2) FROM t2 LIMIT 10 + |""".stripMargin) + // This should not fail with IllegalArgumentException. + checkAnswer( + df, + Row(1, "a", 3) :: Row(2, "a", 3) :: Row(3, "a", 3) :: Nil) + } + } } From 592ff6807c7aca3ac45392f1437c5476c12613fb Mon Sep 17 00:00:00 2001 From: allisonwang-db Date: Tue, 17 Oct 2023 22:28:05 -0700 Subject: [PATCH 2/2] address comments --- .../scala/org/apache/spark/sql/execution/limit.scala | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index a6758ff609367..77135d21a26ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -282,9 +282,7 @@ case class TakeOrderedAndProjectExec( projectList.map(_.toAttribute) } - override def executeCollect(): Array[InternalRow] = { - // SPARK-45584: Wait for subquery execution to finish before executing collect. - prepareAndWaitForSubqueries() + override def executeCollect(): Array[InternalRow] = executeQuery { val orderingSatisfies = SortOrder.orderingSatisfies(child.outputOrdering, sortOrder) val ord = new LazilyGeneratedOrdering(sortOrder, child.output) val limited = if (orderingSatisfies) { @@ -301,11 +299,6 @@ case class TakeOrderedAndProjectExec( } } - private def prepareAndWaitForSubqueries(): Unit = { - prepare() - waitForSubqueries() - } - private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) private lazy val writeMetrics =