Skip to content

Commit

Permalink
Address review comments; turn config name from string to field in SQL…
Browse files Browse the repository at this point in the history
…Conf.
  • Loading branch information
concretevitamin committed Jul 29, 2014
1 parent 8663e84 commit 329071d
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 29 deletions.
27 changes: 14 additions & 13 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ import java.util.Properties

import scala.collection.JavaConverters._

object SQLConf {
val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
}

/**
* A trait that enables the setting and getting of mutable config parameters/hints.
*
Expand Down Expand Up @@ -49,16 +59,16 @@ trait SQLConf {
*
* Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is also 10000.
*/
private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt
private[spark] def autoBroadcastJoinThreshold: Int =
get(AUTO_BROADCASTJOIN_THRESHOLD, "10000").toInt

/**
* The default size in bytes to assign to a logical operator's estimation statistics. By default,
* it is set to a larger value than `autoConvertJoinSize`, hence any logical operator without a
* properly implemented estimation of this statistic will not be incorrectly broadcasted in joins.
*/
private[spark] def statsDefaultSizeInBytes: Long =
getOption("spark.sql.catalyst.stats.sizeInBytes").map(_.toLong)
.getOrElse(autoConvertJoinSize + 1)
private[spark] def defaultSizeInBytes: Long =
getOption(DEFAULT_SIZE_IN_BYTES).map(_.toLong).getOrElse(autoBroadcastJoinThreshold + 1)

/** ********************** SQLConf functionality methods ************ */

Expand Down Expand Up @@ -99,12 +109,3 @@ trait SQLConf {
}

}

object SQLConf {
val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size"
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQ
@transient override lazy val statistics = Statistics(
// TODO: Instead of returning a default value here, find a way to return a meaningful size
// estimate for RDDs. See PR 1238 for more discussions.
sizeInBytes = BigInt(sqlContext.statsDefaultSizeInBytes)
sizeInBytes = BigInt(sqlContext.defaultSizeInBytes)
)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* This strategy applies a simple optimization based on the estimates of the physical sizes of
* the two join sides. When planning a [[execution.BroadcastHashJoin]], if one side has an
* estimated physical size smaller than the user-settable threshold
* `spark.sql.auto.convert.join.size`, the planner would mark it as the ''build'' relation and
* mark the other relation as the ''stream'' side. The build table will be ''broadcasted'' to
* all of the executors involved in the join, as a [[org.apache.spark.broadcast.Broadcast]]
* object. If both estimates exceed the threshold, they will instead be used to decide the build
* side in a [[execution.ShuffledHashJoin]].
* [[org.apache.spark.sql.SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]], the planner would mark it as the
* ''build'' relation and mark the other relation as the ''stream'' side. The build table will be
* ''broadcasted'' to all of the executors involved in the join, as a
* [[org.apache.spark.broadcast.Broadcast]] object. If both estimates exceed the threshold, they
* will instead be used to decide the build side in a [[execution.ShuffledHashJoin]].
*/
object HashJoin extends Strategy with PredicateHelper {
private[this] def makeBroadcastHashJoin(
Expand All @@ -74,13 +74,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
if Try(sqlContext.autoConvertJoinSize > 0 &&
right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize).getOrElse(false) =>
if Try(sqlContext.autoBroadcastJoinThreshold > 0 &&
right.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold).getOrElse(false) =>
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)

case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
if Try(sqlContext.autoConvertJoinSize > 0 &&
left.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize).getOrElse(false) =>
if Try(sqlContext.autoBroadcastJoinThreshold > 0 &&
left.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold).getOrElse(false) =>
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)

case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private[hive] case class MetastoreRelation
BigInt(
Option(hiveQlTable.getParameters.get("totalSize"))
.map(_.toLong)
.getOrElse(sqlContext.statsDefaultSizeInBytes))
.getOrElse(sqlContext.defaultSizeInBytes))
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive

import scala.reflect.ClassTag

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.{SQLConf, QueryTest}
import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
Expand Down Expand Up @@ -51,7 +51,7 @@ class StatisticsSuite extends QueryTest {
val sizes = rdd.queryExecution.analyzed.collect {
case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes
}
assert(sizes.size === 2 && sizes(0) <= autoConvertJoinSize,
assert(sizes.size === 2 && sizes(0) <= autoBroadcastJoinThreshold,
s"query should contain two relations, each of which has size smaller than autoConvertSize")

// Using `sparkPlan` because for relevant patterns in HashJoin to be
Expand All @@ -63,9 +63,9 @@ class StatisticsSuite extends QueryTest {
checkAnswer(rdd, expectedAnswer) // check correctness of output

TestHive.settings.synchronized {
val tmp = autoConvertJoinSize
val tmp = autoBroadcastJoinThreshold

hql("""SET spark.sql.auto.convert.join.size=-1""")
hql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""")
rdd = hql(query)
bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off")
Expand All @@ -74,7 +74,7 @@ class StatisticsSuite extends QueryTest {
assert(shj.size === 1,
"ShuffledHashJoin should be planned when BroadcastHashJoin is turned off")

hql(s"""SET spark.sql.auto.convert.join.size=$tmp""")
hql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=$tmp""")
}

after()
Expand Down

0 comments on commit 329071d

Please sign in to comment.