Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3349] [SQL] Output partitioning of limit should not be inherited from child #2262

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, UnspecifiedDistribution}
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, SinglePartition, UnspecifiedDistribution}
import org.apache.spark.util.MutablePair

/**
Expand Down Expand Up @@ -97,6 +97,7 @@ case class Limit(limit: Int, child: SparkPlan)
// partition local limit -> exchange into one partition -> partition local limit again

override def output = child.output
override def outputPartitioning = SinglePartition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SinglePartition for LIMIT may cause performance issue for large number of records(in multiple partitions), do we really need to change this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not changing the implementation, just correcting a bug that
prevents exchange operators from being inserted when we need them.
On Sep 3, 2014 11:00 PM, "Cheng Hao" notifications@github.com wrote:

In
sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala:

@@ -97,6 +97,7 @@ case class Limit(limit: Int, child: SparkPlan)
// partition local limit -> exchange into one partition -> partition local limit again

override def output = child.output

  • override def outputPartitioning = SinglePartition

SinglePartition for LIMIT may cause performance issue for large number of
records(in multiple partitions), do we really need to change this?


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/2262/files#r17096863.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, understood, thanks for explanation.


/**
* A custom implementation modeled after the take function on RDDs but which never runs any job
Expand Down Expand Up @@ -164,6 +165,7 @@ case class Limit(limit: Int, child: SparkPlan)
case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) extends UnaryNode {

override def output = child.output
override def outputPartitioning = SinglePartition

val ordering = new RowOrdering(sortOrder, child.output)

Expand Down
17 changes: 17 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,23 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
(null, null, 6, "F") :: Nil)
}

test("SPARK-3349 partitioning after limit") {
sql("SELECT DISTINCT n FROM lowerCaseData ORDER BY n DESC")
.limit(2)
.registerTempTable("subset1")
sql("SELECT DISTINCT n FROM lowerCaseData")
.limit(2)
.registerTempTable("subset2")
checkAnswer(
sql("SELECT * FROM lowerCaseData INNER JOIN subset1 ON subset1.n = lowerCaseData.n"),
(3, "c", 3) ::
(4, "d", 4) :: Nil)
checkAnswer(
sql("SELECT * FROM lowerCaseData INNER JOIN subset2 ON subset2.n = lowerCaseData.n"),
(1, "a", 1) ::
(2, "b", 2) :: Nil)
}

test("mixed-case keywords") {
checkAnswer(
sql(
Expand Down