Skip to content

Commit

Permalink
follow comment
Browse files Browse the repository at this point in the history
  • Loading branch information
AngersZhuuuu committed Jul 1, 2020
1 parent 1b8466e commit e2777c9
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ trait PredicateHelper extends Logging {
* @return the CNF result as sequence of disjunctive expressions. If the number of expressions
* exceeds threshold on converting `Or`, `Seq.empty` is returned.
*/
def conjunctiveNormalFormAndGroupExpsByQualifier(condition: Expression): Seq[Expression] = {
def CNFWithGroupExpressionsByQualifier(condition: Expression): Seq[Expression] = {
conjunctiveNormalForm(condition, (expressions: Seq[Expression]) =>
expressions.groupBy(_.references.map(_.qualifier)).map(_._2.reduceLeft(And)).toSeq)
}
Expand All @@ -278,7 +278,7 @@ trait PredicateHelper extends Logging {
* @return the CNF result as sequence of disjunctive expressions. If the number of expressions
* exceeds threshold on converting `Or`, `Seq.empty` is returned.
*/
def conjunctiveNormalFormAndGroupExpsByReference(condition: Expression): Seq[Expression] = {
def CNFWithGroupExpressionsByReference(condition: Expression): Seq[Expression] = {
conjunctiveNormalForm(condition, (expressions: Seq[Expression]) =>
expressions.groupBy(e => AttributeSet(e.references)).map(_._2.reduceLeft(And)).toSeq)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object PushCNFPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelpe
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case j @ Join(left, right, joinType, Some(joinCondition), hint)
if canPushThrough(joinType) =>
val predicates = conjunctiveNormalFormAndGroupExpsByQualifier(joinCondition)
val predicates = CNFWithGroupExpressionsByQualifier(joinCondition)
if (predicates.isEmpty) {
j
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ConjunctiveNormalFormPredicateSuite extends SparkFunSuite with PredicateHe

// Check CNF conversion with expected expression, assuming the input has non-empty result.
private def checkCondition(input: Expression, expected: Expression): Unit = {
val cnf = conjunctiveNormalFormAndGroupExpsByQualifier(input)
val cnf = CNFWithGroupExpressionsByQualifier(input)
assert(cnf.nonEmpty)
val result = cnf.reduceLeft(And)
assert(result.semanticEquals(expected))
Expand Down Expand Up @@ -113,14 +113,14 @@ class ConjunctiveNormalFormPredicateSuite extends SparkFunSuite with PredicateHe
Seq(8, 9, 10, 35, 36, 37).foreach { maxCount =>
withSQLConf(SQLConf.MAX_CNF_NODE_COUNT.key -> maxCount.toString) {
if (maxCount < 36) {
assert(conjunctiveNormalFormAndGroupExpsByQualifier(input).isEmpty)
assert(CNFWithGroupExpressionsByQualifier(input).isEmpty)
} else {
assert(conjunctiveNormalFormAndGroupExpsByQualifier(input).nonEmpty)
assert(CNFWithGroupExpressionsByQualifier(input).nonEmpty)
}
if (maxCount < 9) {
assert(conjunctiveNormalFormAndGroupExpsByQualifier(input2).isEmpty)
assert(CNFWithGroupExpressionsByQualifier(input2).isEmpty)
} else {
assert(conjunctiveNormalFormAndGroupExpsByQualifier(input2).nonEmpty)
assert(CNFWithGroupExpressionsByQualifier(input2).nonEmpty)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private[sql] object PruneFileSourcePartitions
_,
_))
if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined =>
val predicates = conjunctiveNormalFormAndGroupExpsByReference(filters.reduceLeft(And))
val predicates = CNFWithGroupExpressionsByReference(filters.reduceLeft(And))
val finalPredicates = if (predicates.nonEmpty) predicates else filters
val (partitionKeyFilters, _) = getPartitionKeyFiltersAndDataFilters(
fsRelation.sparkSession, logicalRelation, partitionSchema, finalPredicates,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession)
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation)
if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty =>
val predicates = conjunctiveNormalFormAndGroupExpsByReference(filters.reduceLeft(And))
val predicates = CNFWithGroupExpressionsByReference(filters.reduceLeft(And))
val finalPredicates = if (predicates.nonEmpty) predicates else filters
val partitionKeyFilters = getPartitionKeyFilters(finalPredicates, relation)
if (partitionKeyFilters.nonEmpty) {
Expand Down

0 comments on commit e2777c9

Please sign in to comment.