Skip to content

Commit

Permalink
Limit/TakeOrdered:
Browse files Browse the repository at this point in the history
1. Renamed StopAfter to Limit to be more consistent with naming in other relational databases.
2. Renamed TopK to TakeOrdered to be more consistent with Spark RDD API.
3. Avoid breaking lineage in Limit.
4. Added a bunch of override's to execution/basicOperators.scala.
  • Loading branch information
rxin committed Mar 26, 2014
1 parent 8237df8 commit 231af3a
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class SqlParser extends StandardTokenParsers {
val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection)
val withHaving = h.map(h => Filter(h, withDistinct)).getOrElse(withDistinct)
val withOrder = o.map(o => Sort(o, withHaving)).getOrElse(withHaving)
val withLimit = l.map { l => StopAfter(l, withOrder) }.getOrElse(withOrder)
val withLimit = l.map { l => Limit(l, withOrder) }.getOrElse(withOrder)
withLimit
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ case class Aggregate(
def references = child.references
}

case class StopAfter(limit: Expression, child: LogicalPlan) extends UnaryNode {
case class Limit(limit: Expression, child: LogicalPlan) extends UnaryNode {
def output = child.output
def references = limit.references
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
val sparkContext = self.sparkContext

val strategies: Seq[Strategy] =
TopK ::
TakeOrdered ::
PartialAggregation ::
SparkEquiInnerJoin ::
ParquetOperations ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case other => other
}

object TopK extends Strategy {
object TakeOrdered extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.StopAfter(IntegerLiteral(limit), logical.Sort(order, child)) =>
execution.TopK(limit, order, planLater(child))(sparkContext) :: Nil
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, child)) =>
execution.TakeOrdered(limit, order, planLater(child))(sparkContext) :: Nil
case _ => Nil
}
}
Expand Down Expand Up @@ -213,8 +213,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
sparkContext.parallelize(data.map(r =>
new GenericRow(r.productIterator.map(convertToCatalyst).toArray): Row))
execution.ExistingRdd(output, dataAsRdd) :: Nil
case logical.StopAfter(IntegerLiteral(limit), child) =>
execution.StopAfter(limit, planLater(child))(sparkContext) :: Nil
case logical.Limit(IntegerLiteral(limit), child) =>
execution.Limit(limit, planLater(child))(sparkContext) :: Nil
case Unions(unionChildren) =>
execution.Union(unionChildren.map(planLater))(sparkContext) :: Nil
case logical.Generate(generator, join, outer, _, child) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,56 +29,69 @@ import org.apache.spark.sql.catalyst.plans.physical.{OrderedDistribution, Unspec
import org.apache.spark.sql.catalyst.ScalaReflection

case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
def output = projectList.map(_.toAttribute)
override def output = projectList.map(_.toAttribute)

def execute() = child.execute().mapPartitions { iter =>
override def execute() = child.execute().mapPartitions { iter =>
@transient val reusableProjection = new MutableProjection(projectList)
iter.map(reusableProjection)
}
}

case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
def output = child.output
override def output = child.output

def execute() = child.execute().mapPartitions { iter =>
override def execute() = child.execute().mapPartitions { iter =>
iter.filter(condition.apply(_).asInstanceOf[Boolean])
}
}

case class Sample(fraction: Double, withReplacement: Boolean, seed: Int, child: SparkPlan)
extends UnaryNode {

def output = child.output
override def output = child.output

// TODO: How to pick seed?
def execute() = child.execute().sample(withReplacement, fraction, seed)
override def execute() = child.execute().sample(withReplacement, fraction, seed)
}

case class Union(children: Seq[SparkPlan])(@transient sc: SparkContext) extends SparkPlan {
// TODO: attributes output by union should be distinct for nullability purposes
def output = children.head.output
def execute() = sc.union(children.map(_.execute()))
override def output = children.head.output
override def execute() = sc.union(children.map(_.execute()))

override def otherCopyArgs = sc :: Nil
}

case class StopAfter(limit: Int, child: SparkPlan)(@transient sc: SparkContext) extends UnaryNode {
/**
* Take the first limit elements.
*/
case class Limit(limit: Int, child: SparkPlan)(@transient sc: SparkContext) extends UnaryNode {
override def otherCopyArgs = sc :: Nil
// Note that the implementation is different depending on
// whether this is a terminal operator or not.

def output = child.output
override def output = child.output

override def executeCollect() = child.execute().map(_.copy()).take(limit)

// TODO: Terminal split should be implemented differently from non-terminal split.
// TODO: Pick num splits based on |limit|.
def execute() = sc.makeRDD(executeCollect(), 1)
override def execute() = {
child.execute()
.mapPartitions(_.take(limit))
.coalesce(1, shuffle = true)
.mapPartitions(_.take(limit))
}
}

case class TopK(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
(@transient sc: SparkContext) extends UnaryNode {
/**
* Take the first limit elements as defined by the sortOrder. This is logically equivalent to
* having a [[Limit]] operator after a [[Sort]] operator. This could have been named TopK, but
* Spark's top operator does the opposite in ordering so we name it TakeOrdered to avoid confusion.
*/
case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
(@transient sc: SparkContext) extends UnaryNode {
override def otherCopyArgs = sc :: Nil

def output = child.output
override def output = child.output

@transient
lazy val ordering = new RowOrdering(sortOrder)
Expand All @@ -87,7 +100,7 @@ case class TopK(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)

// TODO: Terminal split should be implemented differently from non-terminal split.
// TODO: Pick num splits based on |limit|.
def execute() = sc.makeRDD(executeCollect(), 1)
override def execute() = sc.makeRDD(executeCollect(), 1)
}


Expand All @@ -102,15 +115,15 @@ case class Sort(
@transient
lazy val ordering = new RowOrdering(sortOrder)

def execute() = attachTree(this, "sort") {
override def execute() = attachTree(this, "sort") {
// TODO: Optimize sorting operation?
child.execute()
.mapPartitions(
iterator => iterator.map(_.copy()).toArray.sorted(ordering).iterator,
preservesPartitioning = true)
}

def output = child.output
override def output = child.output
}

object ExistingRdd {
Expand All @@ -131,6 +144,6 @@ object ExistingRdd {
}

case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
def execute() = rdd
override def execute() = rdd
}

Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
val hiveContext = self

override val strategies: Seq[Strategy] = Seq(
TopK,
TakeOrdered,
ParquetOperations,
HiveTableScans,
DataSinks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ object HiveQl {

val withLimit =
limitClause.map(l => nodeToExpr(l.getChildren.head))
.map(StopAfter(_, withSort))
.map(Limit(_, withSort))
.getOrElse(withSort)

// TOK_INSERT_INTO means to add files to the table.
Expand Down Expand Up @@ -603,7 +603,7 @@ object HiveQl {
case Token("TOK_TABLESPLITSAMPLE",
Token("TOK_ROWCOUNT", Nil) ::
Token(count, Nil) :: Nil) =>
StopAfter(Literal(count.toInt), relation)
Limit(Literal(count.toInt), relation)
case Token("TOK_TABLESPLITSAMPLE",
Token("TOK_PERCENT", Nil) ::
Token(fraction, Nil) :: Nil) =>
Expand Down

0 comments on commit 231af3a

Please sign in to comment.