Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
AngersZhuuuu committed Jun 12, 2020
1 parent 603660b commit 69f1763
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
CollapseProject,
RemoveNoopOperators) :+
// This batch must be executed after the `RewriteSubquery` batch, which creates joins.
Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers)
Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers) :+
Batch("Final Filter Convert CNF", Once, finalScanFilterConvertRules: _*)

// remove any batches with no rules. this may happen when subclasses do not add optional rules.
batches.filter(_.rules.nonEmpty)
Expand Down Expand Up @@ -273,6 +274,11 @@ abstract class Optimizer(catalogManager: CatalogManager)
*/
def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil

/**
* Override to provide additional rules for final filter convert to CNF.
*/
def finalScanFilterConvertRules: Seq[Rule[LogicalPlan]] = Nil

/**
* Returns (defaultBatches - (excludedRules - nonExcludableRules)), the rule batches that
* eventually run in the Optimizer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,18 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
*/
object PushCNFPredicateThroughFileScan extends Rule[LogicalPlan] with PredicateHelper {

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case ScanOperation(projectList, conditions, relation: LogicalRelation)
if conditions.nonEmpty =>
val predicates = conjunctiveNormalFormAndGroupExpsByReference(conditions.reduceLeft(And))
if (predicates.isEmpty) {
plan
} else {
Project(projectList, Filter(predicates.reduceLeft(And), relation))
}
def apply(plan: LogicalPlan): LogicalPlan = {
var resolved = false
plan resolveOperatorsDown {
case ScanOperation(projectList, conditions, relation: LogicalRelation)
if conditions.nonEmpty && !resolved =>
resolved = true
val predicates = conjunctiveNormalFormAndGroupExpsByReference(conditions.reduceLeft(And))
if (predicates.isEmpty) {
plan
} else {
Project(projectList, Filter(predicates.reduceLeft(And), relation))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ class SparkOptimizer(

override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] =
// TODO: move SchemaPruning into catalyst
SchemaPruning :: V2ScanRelationPushDown :: PushCNFPredicateThroughFileScan ::
PruneFileSourcePartitions :: Nil
SchemaPruning :: V2ScanRelationPushDown :: PruneFileSourcePartitions :: Nil

override def finalScanFilterConvertRules: Seq[Rule[LogicalPlan]] =
PushCNFPredicateThroughFileScan :: Nil

override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ abstract class BaseSessionStateBuilder(
override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] =
super.earlyScanPushDownRules ++ customEarlyScanPushDownRules

override def finalScanFilterConvertRules: Seq[Rule[LogicalPlan]] =
super.finalScanFilterConvertRules ++ customFinalScanFilterConvertRules

override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules
}
Expand All @@ -258,6 +261,14 @@ abstract class BaseSessionStateBuilder(
*/
protected def customEarlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil

/**
* Custom final scan filter convert rules to add to the Optimizer. Prefer overriding this instead
* of creating your own Optimizer.
*
* Note that this may NOT depend on the `optimizer` function.
*/
protected def customFinalScanFilterConvertRules: Seq[Rule[LogicalPlan]] = Nil

/**
* Planner that converts optimized logical plans to physical plans.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
}

override def customEarlyScanPushDownRules: Seq[Rule[LogicalPlan]] =
Seq(PushCNFPredicateThroughHiveTableScan, new PruneHiveTablePartitions(session))
Seq(new PruneHiveTablePartitions(session))

override def customFinalScanFilterConvertRules: Seq[Rule[LogicalPlan]] =
Seq(PushCNFPredicateThroughHiveTableScan)

/**
* Planner that takes into account Hive-specific strategies.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@ import org.apache.spark.sql.catalyst.rules.Rule
*/
object PushCNFPredicateThroughHiveTableScan extends Rule[LogicalPlan] with PredicateHelper {

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case ScanOperation(projectList, conditions, relation: HiveTableRelation)
if conditions.nonEmpty =>
val predicates = conjunctiveNormalFormAndGroupExpsByReference(conditions.reduceLeft(And))
if (predicates.isEmpty) {
plan
} else {
Project(projectList, Filter(predicates.reduceLeft(And), relation))
}
def apply(plan: LogicalPlan): LogicalPlan = {
var resolved = false
plan resolveOperatorsDown {
case ScanOperation(projectList, conditions, relation: HiveTableRelation)
if conditions.nonEmpty && !resolved =>
resolved = true
val predicates = conjunctiveNormalFormAndGroupExpsByReference(conditions.reduceLeft(And))
if (predicates.isEmpty) {
plan
} else {
Project(projectList, Filter(predicates.reduceLeft(And), relation))
}
}
}
}

0 comments on commit 69f1763

Please sign in to comment.