Skip to content

Commit

Permalink
[SQL] SPARK-1371 Hash Aggregation Improvements
Browse files Browse the repository at this point in the history
Given:
```scala
case class Data(a: Int, b: Int)
val rdd =
  sparkContext
    .parallelize(1 to 200)
    .flatMap(_ => (1 to 50000).map(i => Data(i % 100, i)))
rdd.registerAsTable("data")
cacheTable("data")
```
Before:
```
SELECT COUNT(*) FROM data:[10000000]
16795.567ms
SELECT a, SUM(b) FROM data GROUP BY a
7536.436ms
SELECT SUM(b) FROM data
10954.1ms
```

After:
```
SELECT COUNT(*) FROM data:[10000000]
1372.175ms
SELECT a, SUM(b) FROM data GROUP BY a
2070.446ms
SELECT SUM(b) FROM data
958.969ms
```

Author: Michael Armbrust <michael@databricks.com>

Closes apache#295 from marmbrus/hashAgg and squashes the following commits:

ec63575 [Michael Armbrust] Add comment.
d0495a9 [Michael Armbrust] Use scaladoc instead.
b4a6887 [Michael Armbrust] Address review comments.
a2d90ba [Michael Armbrust] Capture child output statically to avoid issues with generators and serialization.
7c13112 [Michael Armbrust] Rewrite Aggregate operator to stream input and use projections.  Remove unused local RDD functions implicits.
5096f99 [Michael Armbrust] Make HiveUDAF fields transient since object inspectors are not serializable.
6a4b671 [Michael Armbrust] Add option to avoid binding operators expressions automatically.
92cca08 [Michael Armbrust] Always include serialization debug info when running tests.
1279df2 [Michael Armbrust] Increase default number of partitions.
  • Loading branch information
marmbrus authored and rxin committed Apr 7, 2014
1 parent 87d0928 commit accd099
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 160 deletions.
1 change: 1 addition & 0 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ object SparkBuild extends Build {
fork := true,
javaOptions in Test += "-Dspark.home=" + sparkHome,
javaOptions in Test += "-Dspark.testing=1",
javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark").map { case (k,v) => s"-D$k=$v" }.toSeq,
javaOptions += "-Xmx3g",
// Show full stack trace and duration in test cases.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,17 @@ case class BoundReference(ordinal: Int, baseReference: Attribute)
override def apply(input: Row): Any = input(ordinal)
}

/**
* Used to denote operators that do their own binding of attributes internally.
*/
trait NoBind { self: trees.TreeNode[_] => }

class BindReferences[TreeNode <: QueryPlan[TreeNode]] extends Rule[TreeNode] {
import BindReferences._

def apply(plan: TreeNode): TreeNode = {
plan.transform {
case n: NoBind => n.asInstanceOf[TreeNode]
case leafNode if leafNode.children.isEmpty => leafNode
case unaryNode if unaryNode.children.size == 1 => unaryNode.transformExpressions { case e =>
bindReference(e, unaryNode.children.head.output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ class Projection(expressions: Seq[Expression]) extends (Row => Row) {

protected val exprArray = expressions.toArray
def apply(input: Row): Row = {
val outputArray = new Array[Any](exprArray.size)
val outputArray = new Array[Any](exprArray.length)
var i = 0
while (i < exprArray.size) {
while (i < exprArray.length) {
outputArray(i) = exprArray(i).apply(input)
i += 1
}
Expand All @@ -57,7 +57,7 @@ case class MutableProjection(expressions: Seq[Expression]) extends (Row => Row)

def apply(input: Row): Row = {
var i = 0
while (i < exprArray.size) {
while (i < exprArray.length) {
mutableRow(i) = exprArray(i).apply(input)
i += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ abstract class AggregateExpression extends Expression {
* Creates a new instance that can be used to compute this aggregate expression for a group
* of input rows/
*/
def newInstance: AggregateFunction
def newInstance(): AggregateFunction
}

/**
Expand Down Expand Up @@ -75,7 +75,7 @@ abstract class AggregateFunction
override def apply(input: Row): Any

// Do we really need this?
def newInstance = makeCopy(productIterator.map { case a: AnyRef => a }.toArray)
def newInstance() = makeCopy(productIterator.map { case a: AnyRef => a }.toArray)
}

case class Count(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
Expand All @@ -89,7 +89,7 @@ case class Count(child: Expression) extends PartialAggregate with trees.UnaryNod
SplitEvaluation(Sum(partialCount.toAttribute), partialCount :: Nil)
}

override def newInstance = new CountFunction(child, this)
override def newInstance()= new CountFunction(child, this)
}

case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpression {
Expand All @@ -98,7 +98,7 @@ case class CountDistinct(expressions: Seq[Expression]) extends AggregateExpressi
def nullable = false
def dataType = IntegerType
override def toString = s"COUNT(DISTINCT ${expressions.mkString(",")}})"
override def newInstance = new CountDistinctFunction(expressions, this)
override def newInstance()= new CountDistinctFunction(expressions, this)
}

case class Average(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
Expand All @@ -118,7 +118,7 @@ case class Average(child: Expression) extends PartialAggregate with trees.UnaryN
partialCount :: partialSum :: Nil)
}

override def newInstance = new AverageFunction(child, this)
override def newInstance()= new AverageFunction(child, this)
}

case class Sum(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
Expand All @@ -134,7 +134,7 @@ case class Sum(child: Expression) extends PartialAggregate with trees.UnaryNode[
partialSum :: Nil)
}

override def newInstance = new SumFunction(child, this)
override def newInstance()= new SumFunction(child, this)
}

case class SumDistinct(child: Expression)
Expand All @@ -145,7 +145,7 @@ case class SumDistinct(child: Expression)
def dataType = child.dataType
override def toString = s"SUM(DISTINCT $child)"

override def newInstance = new SumDistinctFunction(child, this)
override def newInstance()= new SumDistinctFunction(child, this)
}

case class First(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
Expand All @@ -160,7 +160,7 @@ case class First(child: Expression) extends PartialAggregate with trees.UnaryNod
First(partialFirst.toAttribute),
partialFirst :: Nil)
}
override def newInstance = new FirstFunction(child, this)
override def newInstance()= new FirstFunction(child, this)
}

case class AverageFunction(expr: Expression, base: AggregateExpression)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
*/
object AddExchange extends Rule[SparkPlan] {
// TODO: Determine the number of partitions.
val numPartitions = 8
val numPartitions = 150

def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case operator: SparkPlan =>
Expand Down
Loading

0 comments on commit accd099

Please sign in to comment.