Skip to content

Commit

Permalink
Address @rxin 's comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Apr 25, 2014
1 parent 366b6d9 commit d5cc79b
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,18 @@ trait PredicateHelper {
}
}

private def combineConjunctivePredicates(predicates: Seq[Expression]) =
predicates.reduceLeft(And)

/** Returns true if `expr` can be evaluated using only the output of `plan`. */
/**
* Returns true if `expr` can be evaluated using only the output of `plan`. This method
* can be used to determine when is is acceptable to move expression evaluation within a query
* plan.
*
* For example consider a join between two relations R(a, b) and S(c, d).
*
* `canEvaluate(Equals(a,b), R)` returns `true` where as `canEvaluate(Equals(a,c), R)` returns
* `false`.
*/
protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean =
expr.references subsetOf plan.outputSet
expr.references.subsetOf(plan.outputSet)
}

abstract class BinaryPredicate extends BinaryExpression with Predicate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ object PhysicalOperation extends PredicateHelper {
* techniques. For inner joins, any filters on top of the join operator are also matched.
*/
object HashFilteredJoin extends Logging with PredicateHelper {
/** (joinType, rightKeys, leftKeys, condition, left, right) */
/** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */
type ReturnType =
(JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)

Expand All @@ -136,8 +136,8 @@ object HashFilteredJoin extends Logging with PredicateHelper {
}

val joinKeys = joinPredicates.map {
case Equals(l,r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
case Equals(l,r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
case Equals(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
case Equals(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
}

// Do not consider this strategy if there are no join keys.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

object HashJoin extends Strategy with PredicateHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// Find inner joins where at least some predicates can be evaluated by matching hash keys
// using the HashFilteredJoin pattern.
case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) =>
val hashJoin =
execution.HashJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))
Expand Down

0 comments on commit d5cc79b

Please sign in to comment.