Skip to content

Commit

Permalink
support with
Browse files Browse the repository at this point in the history
  • Loading branch information
haiyang committed Mar 3, 2015
1 parent c3fa4c2 commit a99ecd2
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ class Analyzer(catalog: Catalog,

lazy val batches: Seq[Batch] = Seq(
Batch("Resolution", fixedPoint,
ResolveWith ::
ResolveRelations ::
ResolveReferences ::
ResolveGroupingAnalytics ::
Expand Down Expand Up @@ -167,34 +166,37 @@ class Analyzer(catalog: Catalog,
}
}

object ResolveWith extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case origin @ With(child, subQueries)
if subQueries.forall(query => !catalog.tableExists(Seq(query.alias))) =>
subQueries.foreach(subQuery =>
catalog.registerTable(Seq(subQuery.alias), subQuery.child))
origin
}
}
/**
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
*/
object ResolveRelations extends Rule[LogicalPlan] {
def getTable(u: UnresolvedRelation) = {
def getTable(u: UnresolvedRelation, extraRelations: Option[Seq[Subquery]]) = {
try {
catalog.lookupRelation(u.tableIdentifier, u.alias)
extraRelations.fold(catalog.lookupRelation(u.tableIdentifier, u.alias)) { cteRelations =>
cteRelations.find(_.alias == u.tableIdentifier)
.map(relation => u.alias.map(Subquery(_, relation.child)).getOrElse(relation.child))
.getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias))
}
} catch {
case _: NoSuchTableException =>
u.failAnalysis(s"no such table ${u.tableIdentifier}")
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _) =>
i.copy(
table = EliminateSubQueries(getTable(u)))
case u: UnresolvedRelation =>
getTable(u)
def apply(plan: LogicalPlan): LogicalPlan = {
val cteRelations = plan match {
case With(child, subQueries) =>
Some(subQueries)
case _ => None
}

plan transform {
case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _) =>
i.copy(
table = EliminateSubQueries(getTable(u, cteRelations)))
case u: UnresolvedRelation =>
getTable(u, cteRelations)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ object DefaultOptimizer extends Optimizer {
PushPredicateThroughGenerate,
ColumnPruning) ::
Batch("LocalRelation", FixedPoint(100),
ConvertToLocalRelation) :: Nil
ConvertToLocalRelation) ::
Batch("Drop With", Once, DropWith) :: Nil
}

object DropWith extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case With(child, _) => child
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case e @ EvaluatePython(udf, child, _) =>
BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd) :: Nil
case logical.With(child, subQueries) =>
execution.With(planLater(child), subQueries.map(_.alias)) :: Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,16 +257,6 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
}
}

case class With(child: SparkPlan, subQueries: Seq[String]) extends UnaryNode {
override def output = child.output

override def execute() = {
val result = child.execute()
subQueries.foreach(name => sqlContext.catalog.unregisterTable(Seq(name)))
result
}
}

/**
* :: DeveloperApi ::
* A plan node that does nothing but lie about the output of its child. Used to spice a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,17 +572,16 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
val (fromClause: Option[ASTNode], insertClauses, cteSubqueries) =
queryArgs match {
case Token("TOK_FROM", args: Seq[ASTNode]) :: insertClauses =>
val (newInsertClauses, cteSubqueries) = insertClauses.last match {
insertClauses.last match {
case Token("TOK_CTE", cteClauses) =>
val subQueries = cteClauses.map(node => {
val relation = nodeToRelation(node)
relation.asInstanceOf[Subquery]
}).toSeq
(insertClauses.init, Some(subQueries))
(Some(args.head), insertClauses.init, Some(subQueries))

case _ => (insertClauses, None)
case _ => (Some(args.head), insertClauses, None)
}
(Some(args.head), newInsertClauses, cteSubqueries)
case Token("TOK_INSERT", _) :: Nil => (None, queryArgs, None)
}

Expand Down

0 comments on commit a99ecd2

Please sign in to comment.