Skip to content

Commit

Permalink
SPARK-26138: pushdown limit through InnerLike when condition is empty
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyum committed Feb 15, 2021
1 parent 1fbd576 commit a0a0685
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -546,10 +546,14 @@ object LimitPushDown extends Rule[LogicalPlan] {
// on both sides if it is applied multiple times. Therefore:
// - If one side is already limited, stack another limit on top if the new limit is smaller.
// The redundant limit will be collapsed by the CombineLimits rule.
case LocalLimit(exp, join @ Join(left, right, joinType, _, _)) =>
case LocalLimit(exp, join @ Join(left, right, joinType, conditionOpt, _)) =>
val newJoin = joinType match {
case RightOuter => join.copy(right = maybePushLocalLimit(exp, right))
case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left))
case _: InnerLike if conditionOpt.isEmpty =>
join.copy(
left = maybePushLocalLimit(exp, left),
right = maybePushLocalLimit(exp, right))
case _ => join
}
LocalLimit(exp, newJoin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Add
import org.apache.spark.sql.catalyst.plans.{FullOuter, LeftOuter, PlanTest, RightOuter}
import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, PlanTest, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._

Expand Down Expand Up @@ -194,4 +194,22 @@ class LimitPushdownSuite extends PlanTest {
LocalLimit(1, y.groupBy(Symbol("b"))(count(1))))).analyze
comparePlans(expected2, optimized2)
}

test("SPARK-26138: pushdown limit through InnerLike when condition is empty") {
Seq(Cross, Inner).foreach { joinType =>
val originalQuery = x.join(y, joinType).limit(1)
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = Limit(1, LocalLimit(1, x).join(LocalLimit(1, y), joinType)).analyze
comparePlans(optimized, correctAnswer)
}
}

test("SPARK-26138: Should not pushdown limit through InnerLike when condition is not empty") {
Seq(Cross, Inner).foreach { joinType =>
val originalQuery = x.join(y, joinType, Some("x.a".attr === "y.b".attr)).limit(1)
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = Limit(1, x.join(y, joinType, Some("x.a".attr === "y.b".attr))).analyze
comparePlans(optimized, correctAnswer)
}
}
}

0 comments on commit a0a0685

Please sign in to comment.