Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2873] [SQL] using ExternalAppendOnlyMap to resolve OOM when aggregating #2029

Closed
wants to merge 1 commit into from

Conversation

guowei2
Copy link
Contributor

@guowei2 guowei2 commented Aug 19, 2014

A new PR clone from PR 1822

Fix numbers of problems

Reuse the CompactBuffer from Spark Core to save memory and pointer dereferences as PR 1993

Hive UDAF not support external aggregate, for hive AggregationBuffer need serializable and hive GenericUDAFEvaluator has no method implement to merge two evaluators

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@guowei2
Copy link
Contributor Author

guowei2 commented Aug 19, 2014

@marmbrus
what outputs should i give about the benchmarks?

@marmbrus
Copy link
Contributor

People usually just summarize the benchmark itself and the results in description of the PR. For example: #1439

@guowei2
Copy link
Contributor Author

guowei2 commented Aug 26, 2014

import org.apache.spark.sql.catalyst.types.{IntegerType, DataType}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.SparkContext._
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.OnHeapAggregate
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.expressions.BoundReference


object AggregateBenchMark extends App {

  val sc = new SparkContext(
    new SparkConf().setMaster("local").setAppName("agg-benchmark"))

  val dataType: DataType = IntegerType
  val aggExps = Seq(Alias(sum(BoundReference(1, dataType, true)),"sum")())
  val groupExps = Seq(BoundReference(0, dataType, true))
  val attributes =  aggExps.map(_.toAttribute)
  val childPlan = rowsPlan(sc, attributes)

  def benchmarkOnHeap = {
    val begin = System.currentTimeMillis()
    OnHeapAggregate(false, groupExps, aggExps, childPlan).execute().foreach(_ => {})
    val end = System.currentTimeMillis()
    end - begin
  }

  def benchmarkExternal = {
    val begin = System.currentTimeMillis()
    ExternalAggregate(false, groupExps, aggExps, childPlan).execute().foreach(_ => {})
    val end = System.currentTimeMillis()
    end - begin
  }

  (1 to 5).map(_=> println("OnHeapAggregate time: "+ benchmarkOnHeap))
  (1 to 5).map(_=> println("ExternalAggregate time: "+ benchmarkExternal))

}
private[spark] class TestRDD(
   sc: SparkContext,
   numPartitions: Int) extends RDD[Row](sc, Nil) with Serializable {

  override def compute(split: Partition, context: TaskContext): Iterator[Row] = {
    new Iterator[Row] {
      var lines = 0
      override final def hasNext: Boolean = lines < 50000
      override final def next(): Row = {
        lines += 1
        val row = new GenericMutableRow(2)
        //key numbers
        row(0) = (math.random * 1500).toInt
        row(1) = (math.random * 50).toInt
        row.asInstanceOf[Row]
      }
    }
  }
  override def getPartitions = (0 until numPartitions).map(i => new Partition {
    override def index = i
  }).toArray
  override def getPreferredLocations(split: Partition): Seq[String] = Nil
  override def toString: String = "TestRDD " + id
}


case class rowsPlan(@transient val sc:SparkContext, attributes: Seq[Attribute]) extends LeafNode {

  override def output = attributes

  override def execute() = {
    new TestRDD(sc, 1).asInstanceOf[RDD[Row]]
  }
}

@guowei2
Copy link
Contributor Author

guowei2 commented Aug 26, 2014

@marmbrus

it's very sad about the result of benchmark above.
once one spill happen, usually batch of spills will happen one by one.

the size of AppendOnlyMap is according to the number of keys for values with the same key merged

i think it's not a good way by using ExternalAppendOnlyMap,fot it is too expensive when records with the same key spill to disk over and over again.

otherwise, user can easily avoid OOM by raising spark.sql.shuffle.partitions to reduce the key numbsers

i think the logic of ExternalAppendOnlyMap should Optimize.

join seems have similar problems. meanwhile, both left and right table put into ExternalAppendOnlyMap is expensive too

@marmbrus
Copy link
Contributor

marmbrus commented Sep 4, 2014

What were the actual results of the benchmark? It is acceptable for there to be some performance hit here. In cases where there are too many keys, its much better to spill to disk than to OOM, though you have a good point about just adding more partitions.

@SparkQA
Copy link

SparkQA commented Sep 5, 2014

Can one of the admins verify this patch?

@guowei2
Copy link
Contributor Author

guowei2 commented Sep 17, 2014

I've run a micro benchmark in my local with 50000 records,1500 keys.

Type OnHeapAggregate ExternalAggregate (happens 10 spills)
First run 876ms 16.9s
Stablized runs 150ms 15.0s

@marmbrus
Copy link
Contributor

marmbrus commented Dec 2, 2014

Thanks for working on this, but we are trying to clean up the PR queue (in order to make it easier for us to review). Thus, I think we should close this issue for now and reopen when its ready for review.

@asfgit asfgit closed this in b0a46d8 Dec 2, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants