Skip to content

Commit

Permalink
change default outputordering
Browse files Browse the repository at this point in the history
  • Loading branch information
adrian-wang committed Apr 11, 2015
1 parent 47455c9 commit 171001f
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ case class Aggregate(

override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)

override def outputOrdering: Seq[SortOrder] = Nil

/**
* An aggregate that needs to be computed for each row in a group.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ private[sql] case class AddExchange(sqlContext: SQLContext) extends Rule[SparkPl
def addExchangeIfNecessary(partitioning: Partitioning, child: SparkPlan): SparkPlan =
if (child.outputPartitioning != partitioning) Exchange(partitioning, child) else child

// Check if the partitioning we want to ensure is the same as the child's output
// partitioning. If so, we do not need to add the Exchange operator.
// Check if the ordering we want to ensure is the same as the child's output
// ordering. If so, we do not need to add the Sort operator.
def addSortIfNecessary(ordering: Seq[SortOrder], child: SparkPlan): SparkPlan =
if (child.outputOrdering != ordering) Sort(ordering, global = false, child) else child

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ private[sql] trait LeafNode extends SparkPlan with trees.LeafNode[SparkPlan] {
private[sql] trait UnaryNode extends SparkPlan with trees.UnaryNode[SparkPlan] {
self: Product =>
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
}

private[sql] trait BinaryNode extends SparkPlan with trees.BinaryNode[SparkPlan] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child:
override def execute(): RDD[Row] = {
child.execute().map(_.copy()).sample(withReplacement, fraction, seed)
}

override def outputOrdering: Seq[SortOrder] = Nil
}

/**
Expand Down Expand Up @@ -146,6 +148,8 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
// TODO: Terminal split should be implemented differently from non-terminal split.
// TODO: Pick num splits based on |limit|.
override def execute(): RDD[Row] = sparkContext.makeRDD(collectData(), 1)

override def outputOrdering: Seq[SortOrder] = sortOrder
}

/**
Expand All @@ -171,6 +175,8 @@ case class Sort(
}

override def output: Seq[Attribute] = child.output

override def outputOrdering: Seq[SortOrder] = sortOrder
}

/**
Expand Down Expand Up @@ -201,6 +207,8 @@ case class ExternalSort(
}

override def output: Seq[Attribute] = child.output

override def outputOrdering: Seq[SortOrder] = sortOrder
}

/**
Expand Down
17 changes: 10 additions & 7 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,16 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
("SELECT * FROM testData full JOIN testData2 ON (key * a != key + a)",
classOf[BroadcastNestedLoopJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
conf.setConf("spark.sql.autoSortMergeJoin", "true")
Seq(
("SELECT * FROM testData JOIN testData2 ON key = a", classOf[SortMergeJoin]),
("SELECT * FROM testData JOIN testData2 ON key = a and key = 2", classOf[SortMergeJoin]),
("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
conf.setConf("spark.sql.autoSortMergeJoin", AUTO_SORTMERGEJOIN.toString)
try {
conf.setConf("spark.sql.autoSortMergeJoin", "true")
Seq(
("SELECT * FROM testData JOIN testData2 ON key = a", classOf[SortMergeJoin]),
("SELECT * FROM testData JOIN testData2 ON key = a and key = 2", classOf[SortMergeJoin]),
("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
} finally {
conf.setConf("spark.sql.autoSortMergeJoin", AUTO_SORTMERGEJOIN.toString)
}
}

test("broadcasted hash join operator selection") {
Expand Down

0 comments on commit 171001f

Please sign in to comment.