Skip to content

Commit

Permalink
added Substitution bastch
Browse files Browse the repository at this point in the history
  • Loading branch information
scwf committed Apr 29, 2015
1 parent f49284b commit 1c9a092
Showing 1 changed file with 48 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,21 @@ class Analyzer(

val fixedPoint = FixedPoint(maxIterations)

/**
* Override to provide additional rules for the "Substitution" batch.
*/
val extendedSubstitutionRules: Seq[Rule[LogicalPlan]] = Nil

/**
* Override to provide additional rules for the "Resolution" batch.
*/
val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil

lazy val batches: Seq[Batch] = Seq(
Batch("Substitution", fixedPoint,
CTESubstitution ::
Nil ++
extendedSubstitutionRules : _*),
Batch("Resolution", fixedPoint,
ResolveRelations ::
ResolveReferences ::
Expand All @@ -68,6 +77,38 @@ class Analyzer(
extendedResolutionRules : _*)
)

/**
* Substitute CTE definitions
*/
object CTESubstitution extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
val (realPlan, cteRelations) = plan match {
case With(child, relations) =>
(child, relations)
case other => (other, Map.empty[String, LogicalPlan])
}
substituteCTE(realPlan, cteRelations)
}

def substituteCTE(plan: LogicalPlan, cteRelations: Map[String, LogicalPlan]): LogicalPlan = {
plan transform {
case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
// In hive, if there is same table name in database and CTE definition,
// hive will use the table in database, not the CTE one.
// Taking into account the reasonableness and the implementation complexity,
// here use the CTE definition first, check table name only and ignore database name
val relation = cteRelations.get(u.tableIdentifier.last)
.map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation))
.getOrElse(u)
i.copy(table = relation)
case u : UnresolvedRelation =>
cteRelations.get(u.tableIdentifier.last)
.map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation))
.getOrElse(u)
}
}
}

/**
* Removes no-op Alias expressions from the plan.
*/
Expand Down Expand Up @@ -169,36 +210,20 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
*/
object ResolveRelations extends Rule[LogicalPlan] {
def getTable(u: UnresolvedRelation, cteRelations: Map[String, LogicalPlan]): LogicalPlan = {
def getTable(u: UnresolvedRelation): LogicalPlan = {
try {
// In hive, if there is same table name in database and CTE definition,
// hive will use the table in database, not the CTE one.
// Taking into account the reasonableness and the implementation complexity,
// here use the CTE definition first, check table name only and ignore database name
cteRelations.get(u.tableIdentifier.last)
.map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation))
.getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias))
catalog.lookupRelation(u.tableIdentifier, u.alias)
} catch {
case _: NoSuchTableException =>
u.failAnalysis(s"no such table ${u.tableName}")
}
}

def apply(plan: LogicalPlan): LogicalPlan = {
val (realPlan, cteRelations) = plan match {
// TODO allow subquery to define CTE
// Add cte table to a temp relation map,drop `with` plan and keep its child
case With(child, relations) => (child, relations)
case other => (other, Map.empty[String, LogicalPlan])
}

realPlan transform {
case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
i.copy(
table = EliminateSubQueries(getTable(u, cteRelations)))
case u: UnresolvedRelation =>
getTable(u, cteRelations)
}
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
i.copy(table = EliminateSubQueries(getTable(u)))
case u: UnresolvedRelation =>
getTable(u, cteRelations)
}
}

Expand Down

0 comments on commit 1c9a092

Please sign in to comment.