From 329071d196342ee67482b87a5745bf0a135d8296 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Tue, 29 Jul 2014 11:58:16 -0700 Subject: [PATCH] Address review comments; turn config name from string to field in SQLConf. --- .../scala/org/apache/spark/sql/SQLConf.scala | 27 ++++++++++--------- .../spark/sql/execution/SparkPlan.scala | 2 +- .../spark/sql/execution/SparkStrategies.scala | 18 ++++++------- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../spark/sql/hive/StatisticsSuite.scala | 10 +++---- 5 files changed, 30 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 825525de7804f..be8d4e15ec4b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -21,6 +21,16 @@ import java.util.Properties import scala.collection.JavaConverters._ +object SQLConf { + val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold" + val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" + val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes" + + object Deprecated { + val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" + } +} + /** * A trait that enables the setting and getting of mutable config parameters/hints. * @@ -49,16 +59,16 @@ trait SQLConf { * * Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is also 10000. */ - private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt + private[spark] def autoBroadcastJoinThreshold: Int = + get(AUTO_BROADCASTJOIN_THRESHOLD, "10000").toInt /** * The default size in bytes to assign to a logical operator's estimation statistics. By default, * it is set to a larger value than `autoConvertJoinSize`, hence any logical operator without a * properly implemented estimation of this statistic will not be incorrectly broadcasted in joins. */ - private[spark] def statsDefaultSizeInBytes: Long = - getOption("spark.sql.catalyst.stats.sizeInBytes").map(_.toLong) - .getOrElse(autoConvertJoinSize + 1) + private[spark] def defaultSizeInBytes: Long = + getOption(DEFAULT_SIZE_IN_BYTES).map(_.toLong).getOrElse(autoBroadcastJoinThreshold + 1) /** ********************** SQLConf functionality methods ************ */ @@ -99,12 +109,3 @@ trait SQLConf { } } - -object SQLConf { - val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size" - val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" - - object Deprecated { - val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 61dc1b54bb629..77c874d0315ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -84,7 +84,7 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQ @transient override lazy val statistics = Statistics( // TODO: Instead of returning a default value here, find a way to return a meaningful size // estimate for RDDs. See PR 1238 for more discussions. - sizeInBytes = BigInt(sqlContext.statsDefaultSizeInBytes) + sizeInBytes = BigInt(sqlContext.defaultSizeInBytes) ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 649ff82383de3..404d48ae05b45 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -53,11 +53,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * This strategy applies a simple optimization based on the estimates of the physical sizes of * the two join sides. When planning a [[execution.BroadcastHashJoin]], if one side has an * estimated physical size smaller than the user-settable threshold - * `spark.sql.auto.convert.join.size`, the planner would mark it as the ''build'' relation and - * mark the other relation as the ''stream'' side. The build table will be ''broadcasted'' to - * all of the executors involved in the join, as a [[org.apache.spark.broadcast.Broadcast]] - * object. If both estimates exceed the threshold, they will instead be used to decide the build - * side in a [[execution.ShuffledHashJoin]]. + * [[org.apache.spark.sql.SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]], the planner would mark it as the + * ''build'' relation and mark the other relation as the ''stream'' side. The build table will be + * ''broadcasted'' to all of the executors involved in the join, as a + * [[org.apache.spark.broadcast.Broadcast]] object. If both estimates exceed the threshold, they + * will instead be used to decide the build side in a [[execution.ShuffledHashJoin]]. */ object HashJoin extends Strategy with PredicateHelper { private[this] def makeBroadcastHashJoin( @@ -74,13 +74,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) - if Try(sqlContext.autoConvertJoinSize > 0 && - right.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize).getOrElse(false) => + if Try(sqlContext.autoBroadcastJoinThreshold > 0 && + right.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold).getOrElse(false) => makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) - if Try(sqlContext.autoConvertJoinSize > 0 && - left.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize).getOrElse(false) => + if Try(sqlContext.autoBroadcastJoinThreshold > 0 && + left.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold).getOrElse(false) => makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 5508c2da34de4..dff1d6a4b93bb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -279,7 +279,7 @@ private[hive] case class MetastoreRelation BigInt( Option(hiveQlTable.getParameters.get("totalSize")) .map(_.toLong) - .getOrElse(sqlContext.statsDefaultSizeInBytes)) + .getOrElse(sqlContext.defaultSizeInBytes)) } ) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 21eb2d95bed3f..a61fd9df95c94 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive import scala.reflect.ClassTag -import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.{SQLConf, QueryTest} import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin} import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ @@ -51,7 +51,7 @@ class StatisticsSuite extends QueryTest { val sizes = rdd.queryExecution.analyzed.collect { case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes } - assert(sizes.size === 2 && sizes(0) <= autoConvertJoinSize, + assert(sizes.size === 2 && sizes(0) <= autoBroadcastJoinThreshold, s"query should contain two relations, each of which has size smaller than autoConvertSize") // Using `sparkPlan` because for relevant patterns in HashJoin to be @@ -63,9 +63,9 @@ class StatisticsSuite extends QueryTest { checkAnswer(rdd, expectedAnswer) // check correctness of output TestHive.settings.synchronized { - val tmp = autoConvertJoinSize + val tmp = autoBroadcastJoinThreshold - hql("""SET spark.sql.auto.convert.join.size=-1""") + hql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""") rdd = hql(query) bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j } assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off") @@ -74,7 +74,7 @@ class StatisticsSuite extends QueryTest { assert(shj.size === 1, "ShuffledHashJoin should be planned when BroadcastHashJoin is turned off") - hql(s"""SET spark.sql.auto.convert.join.size=$tmp""") + hql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=$tmp""") } after()