Skip to content

Commit

Permalink
yin's comment: use external sort if option is enabled, add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
adrian-wang committed Apr 15, 2015
1 parent f515cd2 commit f91a2ae
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.util.MutablePair

object Exchange {
/** Returns true when the ordering expressions are a subset of the key. */
/**
* Returns true when the ordering expressions are a subset of the key.
* if true, ShuffledRDD can use `setKeyOrdering(orderingKey)` to sort within [[Exchange]].
*/
def canSortWithShuffle(partitioning: Partitioning, desiredOrdering: Seq[SortOrder]): Boolean = {
desiredOrdering.map(_.child).toSet.subsetOf(partitioning.keyExpressions.toSet)
}
Expand Down Expand Up @@ -224,7 +227,11 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
}

val withSort = if (needSort) {
Sort(rowOrdering, global = false, withShuffle)
if (sqlContext.conf.externalSortEnabled) {
ExternalSort(rowOrdering, global = false, withShuffle)
} else {
Sort(rowOrdering, global = false, withShuffle)
}
} else {
withShuffle
}
Expand Down Expand Up @@ -253,7 +260,11 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
case (UnspecifiedDistribution, Seq(), child) =>
child
case (UnspecifiedDistribution, rowOrdering, child) =>
Sort(rowOrdering, global = false, child)
if (sqlContext.conf.externalSortEnabled) {
ExternalSort(rowOrdering, global = false, child)
} else {
Sort(rowOrdering, global = false, child)
}

case (dist, ordering, _) =>
sys.error(s"Don't know how to ensure $dist with ordering $ordering")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ case class Limit(limit: Int, child: SparkPlan)
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = SinglePartition

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def executeCollect(): Array[Row] = child.executeTake(limit)

override def execute(): RDD[Row] = {
Expand Down

0 comments on commit f91a2ae

Please sign in to comment.