diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index 1eeed95b3b560..cbb7d2b77ccfe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -74,13 +74,16 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { Project(aliasedExpressions, plan) } - private def dedupSubqueryOnSelfJoin(values: Seq[Expression], sub: LogicalPlan): LogicalPlan = { + private def dedupSubqueryOnSelfJoin( + values: Seq[Expression], + outerPlan: LogicalPlan, + sub: LogicalPlan): LogicalPlan = { // SPARK-26078: it may happen that the subquery has conflicting attributes with the outer // values. In this case, the resulting join would contain trivially true conditions (eg. // id#3 = id#3) which cannot be de-duplicated after. In this method, if there are conflicting // attributes in the join condition, the subquery's conflicting attributes are changed using // a projection which aliases them and resolves the problem. - val leftRefs = AttributeSet.fromAttributeSets(values.map(_.references)) + val leftRefs = AttributeSet.fromAttributeSets(values.map(_.references)) ++ outerPlan.outputSet val duplicates = leftRefs.intersect(sub.outputSet) if (duplicates.isEmpty) { sub @@ -115,7 +118,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { dedupJoin(Join(outerPlan, sub, LeftAnti, joinCond)) case (p, InSubquery(values, ListQuery(sub, conditions, _, _))) => // Deduplicate conflicting attributes if any. - val newSub = dedupSubqueryOnSelfJoin(values, sub) + val newSub = dedupSubqueryOnSelfJoin(values, p, sub) val inConditions = values.zip(newSub.output).map(EqualTo.tupled) val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions ++ conditions, p) Join(outerPlan, newSub, LeftSemi, joinCond) @@ -128,7 +131,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { // Use EXISTS if performance matters to you. // Deduplicate conflicting attributes if any. - val newSub = dedupSubqueryOnSelfJoin(values, sub) + val newSub = dedupSubqueryOnSelfJoin(values, p, sub) val inConditions = values.zip(newSub.output).map(EqualTo.tupled) val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions, p) // Expand the NOT IN expression with the NULL-aware semantic @@ -172,7 +175,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { case InSubquery(values, ListQuery(sub, conditions, _, _)) => val exists = AttributeReference("exists", BooleanType, nullable = false)() // Deduplicate conflicting attributes if any. - val newSub = dedupSubqueryOnSelfJoin(values, sub) + val newSub = dedupSubqueryOnSelfJoin(values, newPlan, sub) val inConditions = values.zip(newSub.output).map(EqualTo.tupled) val newConditions = (inConditions ++ conditions).reduceLeftOption(And) newPlan = Join(newPlan, newSub, ExistenceJoin(exists), newConditions)