Skip to content

Commit

Permalink
simplify test case and move it to FilterPushdownSuite
Browse files Browse the repository at this point in the history
  • Loading branch information
wzhfy committed Mar 28, 2017
1 parent 4739c0b commit bf7d202
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,6 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
case m: Map[_, _] => m
case d: DataType => d // Avoid unpacking Structs
case seq: Traversable[_] => seq.map(recursiveTransform)
case ExistenceJoin(exists: Attribute) =>
// `exists` must be an Attribute in ExistenceJoin
ExistenceJoin(transformExpression(exists).asInstanceOf[Attribute])
case other: AnyRef => other
case null => null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,26 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, answer)
}

test("SPARK-20094: don't push predicate with IN subquery into join condition") {
val x = testRelation.subquery('x)
val z = testRelation.subquery('z)
val w = testRelation1.subquery('w)

val queryPlan = x
.join(z)
.where(("x.b".attr === "z.b".attr) &&
("x.a".attr > 1 || "z.c".attr.in(ListQuery(w.select("w.d".attr)))))
.analyze

val expectedPlan = x
.join(z, Inner, Some("x.b".attr === "z.b".attr))
.where("x.a".attr > 1 || "z.c".attr.in(ListQuery(w.select("w.d".attr))))
.analyze

val optimized = Optimize.execute(queryPlan)
comparePlans(optimized, expectedPlan)
}

test("Window: predicate push down -- basic") {
val winExpr = windowExpr(count('b), windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,43 +21,30 @@ import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ListQuery, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.{Cross, Inner, InnerLike, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.types.BooleanType

class JoinOptimizationSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Eliminate Subqueries", Once,
Batch("Subqueries", Once,
EliminateSubqueryAliases) ::
Batch("Subquery", Once,
OptimizeSubqueries) ::
Batch("Filter Pushdown", FixedPoint(100),
CombineFilters,
PushDownPredicate,
BooleanSimplification,
ReorderJoin(SimpleCatalystConf(true)),
PushPredicateThroughJoin,
ColumnPruning,
CollapseProject) ::
Batch("RewriteSubquery", Once,
RewritePredicateSubquery) :: Nil
CollapseProject) :: Nil

}

object OptimizeSubqueries extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case s: SubqueryExpression =>
val Subquery(newPlan) = Optimize.execute(Subquery(s.plan))
s.withNewPlan(newPlan)
}
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
val testRelation1 = LocalRelation('d.int)

Expand Down Expand Up @@ -136,31 +123,6 @@ class JoinOptimizationSuite extends PlanTest {
}
}

test("reorder inner joins - don't put predicate with IN subquery into join condition") {
// ReorderJoin collects all predicates and try to put them into join condition when creating
// ordered join. If a predicate with an IN subquery is in a join condition instead of a filter
// condition, `RewritePredicateSubquery.rewriteExistentialExpr` would fail to convert the
// subquery to an ExistenceJoin, and thus result in error.
val x = testRelation.subquery('x)
val y = testRelation1.subquery('y)
val z = testRelation.subquery('z)
val w = testRelation1.subquery('w)
val exists: AttributeReference = AttributeReference("exists", BooleanType, nullable = false)()

val queryPlan = x.join(y).join(z)
.where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr) &&
("x.a".attr > 1 || "z.c".attr.in(ListQuery(w.select("w.d".attr)))))

val expectedPlan = x.join(z, Inner, Some("x.b".attr === "z.b".attr))
.join(w, ExistenceJoin(exists), Some("z.c".attr === "w.d".attr))
.where("x.a".attr > 1 || exists)
.select("x.a".attr, "x.b".attr, "x.c".attr, "z.a".attr, "z.b".attr, "z.c".attr)
.join(y, Inner, Some("y.d".attr === "z.a".attr))

val optimized = Optimize.execute(queryPlan.analyze)
comparePlans(optimized, analysis.EliminateSubqueryAliases(expectedPlan.analyze))
}

test("broadcasthint sets relation statistics to smallest value") {
val input = LocalRelation('key.int, 'value.string)

Expand Down

0 comments on commit bf7d202

Please sign in to comment.