Skip to content

Commit

Permalink
fix failures
Browse files Browse the repository at this point in the history
  • Loading branch information
mgaido91 committed Dec 12, 2018
1 parent 1beb40c commit 0312558
Showing 1 changed file with 8 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,16 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
Project(aliasedExpressions, plan)
}

private def dedupSubqueryOnSelfJoin(values: Seq[Expression], sub: LogicalPlan): LogicalPlan = {
private def dedupSubqueryOnSelfJoin(
values: Seq[Expression],
outerPlan: LogicalPlan,
sub: LogicalPlan): LogicalPlan = {
// SPARK-26078: it may happen that the subquery has conflicting attributes with the outer
// values. In this case, the resulting join would contain trivially true conditions (eg.
// id#3 = id#3) which cannot be de-duplicated after. In this method, if there are conflicting
// attributes in the join condition, the subquery's conflicting attributes are changed using
// a projection which aliases them and resolves the problem.
val leftRefs = AttributeSet.fromAttributeSets(values.map(_.references))
val leftRefs = AttributeSet.fromAttributeSets(values.map(_.references)) ++ outerPlan.outputSet
val duplicates = leftRefs.intersect(sub.outputSet)
if (duplicates.isEmpty) {
sub
Expand Down Expand Up @@ -115,7 +118,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
dedupJoin(Join(outerPlan, sub, LeftAnti, joinCond))
case (p, InSubquery(values, ListQuery(sub, conditions, _, _))) =>
// Deduplicate conflicting attributes if any.
val newSub = dedupSubqueryOnSelfJoin(values, sub)
val newSub = dedupSubqueryOnSelfJoin(values, p, sub)
val inConditions = values.zip(newSub.output).map(EqualTo.tupled)
val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions ++ conditions, p)
Join(outerPlan, newSub, LeftSemi, joinCond)
Expand All @@ -128,7 +131,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
// Use EXISTS if performance matters to you.

// Deduplicate conflicting attributes if any.
val newSub = dedupSubqueryOnSelfJoin(values, sub)
val newSub = dedupSubqueryOnSelfJoin(values, p, sub)
val inConditions = values.zip(newSub.output).map(EqualTo.tupled)
val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions, p)
// Expand the NOT IN expression with the NULL-aware semantic
Expand Down Expand Up @@ -172,7 +175,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
case InSubquery(values, ListQuery(sub, conditions, _, _)) =>
val exists = AttributeReference("exists", BooleanType, nullable = false)()
// Deduplicate conflicting attributes if any.
val newSub = dedupSubqueryOnSelfJoin(values, sub)
val newSub = dedupSubqueryOnSelfJoin(values, newPlan, sub)
val inConditions = values.zip(newSub.output).map(EqualTo.tupled)
val newConditions = (inConditions ++ conditions).reduceLeftOption(And)
newPlan = Join(newPlan, newSub, ExistenceJoin(exists), newConditions)
Expand Down

0 comments on commit 0312558

Please sign in to comment.