Skip to content

Commit

Permalink
Address the comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dongjoon-hyun committed Apr 21, 2016
1 parent 64989b4 commit c7a6d9b
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ trait CatalystConf {
def groupByOrdinal: Boolean

def optimizerMaxIterations: Int
def optimizerMinSetSize: Int
def optimizerInSetConversionThreshold: Int
def maxCaseBranchesForCodegen: Int

/**
Expand All @@ -48,7 +48,7 @@ case class SimpleCatalystConf(
orderByOrdinal: Boolean = true,
groupByOrdinal: Boolean = true,
optimizerMaxIterations: Int = 100,
optimizerMinSetSize: Int = 10,
optimizerInSetConversionThreshold: Int = 10,
maxCaseBranchesForCodegen: Int = 20)
extends CatalystConf {
}
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan => q transformExpressionsDown {
case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) &&
list.size > conf.optimizerMinSetSize =>
list.size > conf.optimizerInSetConversionThreshold =>
val hSet = list.map(e => e.eval(EmptyRow))
InSet(v, HashSet() ++ hSet)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.catalyst.optimizer

import scala.collection.immutable.HashSet

import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.dsl.expressions._
Expand All @@ -39,7 +37,7 @@ class OptimizeInSuite extends PlanTest {
NullPropagation,
ConstantFolding,
BooleanSimplification,
OptimizeIn(SimpleCatalystConf(true))) :: Nil
OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true))) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
Expand Down Expand Up @@ -131,16 +129,18 @@ class OptimizeInSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("OptimizedIn test: Use configuration.") {
test("OptimizedIn test: Setting the threshold for turning Set into InSet.") {
val plan =
testRelation
.where(In(UnresolvedAttribute("a"), Seq(Literal(1), Literal(2), Literal(3))))
.analyze

val notOptimizedPlan = OptimizeIn(SimpleCatalystConf(true))(plan)
val notOptimizedPlan = OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true))(plan)
comparePlans(notOptimizedPlan, plan)

val optimizedPlan = OptimizeIn(SimpleCatalystConf(true, optimizerMinSetSize = 2))(plan)
// Reduce the threshold to turning into InSet.
val optimizedPlan = OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true,
optimizerInSetConversionThreshold = 2))(plan)
optimizedPlan match {
case Filter(cond, _)
if cond.isInstanceOf[InSet] && cond.asInstanceOf[InSet].getHSet().size == 3 =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ object SQLConf {
.intConf
.createWithDefault(100)

val OPTIMIZER_MIN_SET_SIZE = SQLConfigBuilder("spark.sql.optimizer.minSetSize")
.internal()
.doc("The minimum threshold of set size to be optimized.")
.intConf
.createWithDefault(10)
val OPTIMIZER_INSET_CONVERSION_THRESHOLD =
SQLConfigBuilder("spark.sql.optimizer.inSetConversionThreshold")
.internal()
.doc("The threshold of set size for InSet conversion.")
.intConf
.createWithDefault(10)

val ALLOW_MULTIPLE_CONTEXTS = SQLConfigBuilder("spark.sql.allowMultipleContexts")
.doc("When set to true, creating multiple SQLContexts/HiveContexts is allowed. " +
Expand Down Expand Up @@ -535,7 +536,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS)

def optimizerMinSetSize: Int = getConf(OPTIMIZER_MIN_SET_SIZE)
def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)

def checkpointLocation: String = getConf(CHECKPOINT_LOCATION)

Expand Down

0 comments on commit c7a6d9b

Please sign in to comment.