-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-3414][SQL] Replace LowerCaseSchema with Resolver #2382
Changes from all commits
5b93711
219805a
2de881e
d4320f1
c21171e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,13 +17,15 @@ | |
|
||
package org.apache.spark.sql.catalyst.plans.logical | ||
|
||
import org.apache.spark.Logging | ||
import org.apache.spark.sql.catalyst.analysis.Resolver | ||
import org.apache.spark.sql.catalyst.errors.TreeNodeException | ||
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.catalyst.plans.QueryPlan | ||
import org.apache.spark.sql.catalyst.types.StructType | ||
import org.apache.spark.sql.catalyst.trees | ||
|
||
abstract class LogicalPlan extends QueryPlan[LogicalPlan] { | ||
abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { | ||
self: Product => | ||
|
||
/** | ||
|
@@ -75,42 +77,95 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { | |
* nodes of this LogicalPlan. The attribute is expressed as | ||
* as string in the following form: `[scope].AttributeName.[nested].[fields]...`. | ||
*/ | ||
def resolveChildren(name: String): Option[NamedExpression] = | ||
resolve(name, children.flatMap(_.output)) | ||
def resolveChildren(name: String, resolver: Resolver): Option[NamedExpression] = | ||
resolve(name, children.flatMap(_.output), resolver) | ||
|
||
/** | ||
* Optionally resolves the given string to a [[NamedExpression]] based on the output of this | ||
* LogicalPlan. The attribute is expressed as string in the following form: | ||
* `[scope].AttributeName.[nested].[fields]...`. | ||
*/ | ||
def resolve(name: String): Option[NamedExpression] = | ||
resolve(name, output) | ||
def resolve(name: String, resolver: Resolver): Option[NamedExpression] = | ||
resolve(name, output, resolver) | ||
|
||
/** Performs attribute resolution given a name and a sequence of possible attributes. */ | ||
protected def resolve(name: String, input: Seq[Attribute]): Option[NamedExpression] = { | ||
protected def resolve( | ||
name: String, | ||
input: Seq[Attribute], | ||
resolver: Resolver): Option[NamedExpression] = { | ||
|
||
val parts = name.split("\\.") | ||
|
||
// Collect all attributes that are output by this nodes children where either the first part | ||
// matches the name or where the first part matches the scope and the second part matches the | ||
// name. Return these matches along with any remaining parts, which represent dotted access to | ||
// struct fields. | ||
val options = input.flatMap { option => | ||
// If the first part of the desired name matches a qualifier for this possible match, drop it. | ||
val remainingParts = | ||
if (option.qualifiers.contains(parts.head) && parts.size > 1) parts.drop(1) else parts | ||
if (option.name == remainingParts.head) (option, remainingParts.tail.toList) :: Nil else Nil | ||
if (option.qualifiers.find(resolver(_, parts.head)).nonEmpty && parts.size > 1) { | ||
parts.drop(1) | ||
} else { | ||
parts | ||
} | ||
|
||
if (resolver(option.name, remainingParts.head)) { | ||
// Preserve the case of the user's attribute reference. | ||
(option.withName(remainingParts.head), remainingParts.tail.toList) :: Nil | ||
} else { | ||
Nil | ||
} | ||
} | ||
|
||
options.distinct match { | ||
case Seq((a, Nil)) => Some(a) // One match, no nested fields, use it. | ||
// One match, no nested fields, use it. | ||
case Seq((a, Nil)) => Some(a) | ||
|
||
// One match, but we also need to extract the requested nested field. | ||
case Seq((a, nestedFields)) => | ||
Some(Alias(nestedFields.foldLeft(a: Expression)(GetField), nestedFields.last)()) | ||
case Seq() => None // No matches. | ||
val aliased = | ||
Alias( | ||
resolveNesting(nestedFields, a, resolver), | ||
nestedFields.last)() // Preserve the case of the user's field access. | ||
Some(aliased) | ||
|
||
// No matches. | ||
case Seq() => | ||
logTrace(s"Could not find $name in ${input.mkString(", ")}") | ||
None | ||
|
||
// More than one match. | ||
case ambiguousReferences => | ||
throw new TreeNodeException( | ||
this, s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}") | ||
} | ||
} | ||
|
||
/** | ||
* Given a list of successive nested field accesses, and a based expression, attempt to resolve | ||
* the actual field lookups on this expression. | ||
*/ | ||
private def resolveNesting( | ||
nestedFields: List[String], | ||
expression: Expression, | ||
resolver: Resolver): Expression = { | ||
|
||
(nestedFields, expression.dataType) match { | ||
case (Nil, _) => expression | ||
case (requestedField :: rest, StructType(fields)) => | ||
val actualField = fields.filter(f => resolver(f.name, requestedField)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a problem here. Currently There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, good point. Right now you can't make a SQLContext case insensitive, but when you can this will be problem. Maybe you should note this on SPARK-3617 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh wait, sorry... Is that how the HiveQL parser will do it too? I'm not a huge fan of moving resolution logic into the expressions. What about a rule that only ran in case insensitive mode that fixes unresolved GetFields? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this bug exists in HiveQL. I have opened a PR to fix this(adding a rule to fix unresolved |
||
actualField match { | ||
case Seq() => | ||
sys.error( | ||
s"No such struct field $requestedField in ${fields.map(_.name).mkString(", ")}") | ||
case Seq(singleMatch) => | ||
resolveNesting(rest, GetField(expression, singleMatch.name), resolver) | ||
case multipleMatches => | ||
sys.error(s"Ambiguous reference to fields ${multipleMatches.mkString(", ")}") | ||
} | ||
case (_, dt) => sys.error(s"Can't access nested field in type $dt") | ||
} | ||
} | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolver
probably a general name, can we use a more precise name for this?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will actually end up providing more general resolution functionality in the long term. I've added some scala doc for clarity though.