Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14123] [SPARK-14384] [SQL] Handle CreateFunction/DropFunction #12117

Closed
wants to merge 49 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
7d00184
init import.
viirya Mar 27, 2016
9af70af
Merge remote-tracking branch 'upstream/master' into native-ddl-function
viirya Mar 29, 2016
35ad7ae
Fix hive temp function feature.
viirya Mar 29, 2016
cb29f0f
Fix scala style.
viirya Mar 29, 2016
77848c9
Use HiveFunctionRegistry to find Generator function in HiveSqlParser.
viirya Mar 29, 2016
b8dda84
Fix describe function problem.
viirya Mar 29, 2016
ee957db
Fix scala style.
viirya Mar 29, 2016
133ce1a
Fix test.
viirya Mar 30, 2016
6b76980
Merge remote-tracking branch 'upstream/master' into native-ddl-function
viirya Mar 30, 2016
e05b108
Address some comments.
viirya Mar 30, 2016
314c4db
Add resources to CatalogFunction.
viirya Mar 30, 2016
c370c47
Let Spark load Hive permanent function and create expression.
viirya Mar 30, 2016
acf9299
A little refactoring.
viirya Mar 30, 2016
2cab41c
Refactoring.
viirya Mar 31, 2016
65d9dbd
Fix compilation error.
viirya Mar 31, 2016
05709f0
Merge remote-tracking branch 'upstream/master' into native-ddl-function
viirya Mar 31, 2016
67df04f
Rename alias to className
yhuai Apr 1, 2016
b67c444
Make SessionCatalog handle makeFunctionBuilder and remove HiveFunctio…
yhuai Apr 1, 2016
1753cac
functionClassName => className
yhuai Apr 1, 2016
a2d588f
Put loadFunctionResources to SessionCatalog
yhuai Apr 1, 2016
51b72dd
SessionCatalog loads permanent function instead of letting function r…
yhuai Apr 1, 2016
66c5261
Take care UnresolvedGenerator and cleanup.
yhuai Apr 1, 2016
2aa3725
If we cannot find a function in Spark's FunctionRegistry. We will als…
yhuai Apr 1, 2016
e343087
Merge remote-tracking branch 'upstream/master' into function
yhuai Apr 1, 2016
8154359
Fix tests and pre-load two Hive's builtin functions.
yhuai Apr 1, 2016
e0570cd
Merge remote-tracking branch 'upstream/master' into function
yhuai Apr 1, 2016
04a5926
Cleanup
yhuai Apr 1, 2016
979b03e
Fix test.
yhuai Apr 1, 2016
323cea6
Remove unnecessary import
yhuai Apr 2, 2016
aa41b20
Merge remote-tracking branch 'upstream/master' into function
yhuai Apr 2, 2016
f40a85e
Merge remote-tracking branch 'upstream/master' into function
yhuai Apr 2, 2016
3718e61
Cleanup #1
yhuai Apr 2, 2016
ac6dfc5
Remove withGenerator.
yhuai Apr 3, 2016
64e2ee3
Cleanup
yhuai Apr 3, 2016
ae359fb
Merge remote-tracking branch 'upstream/master' into function
yhuai Apr 3, 2016
776c09a
style
yhuai Apr 3, 2016
18d6042
Update tests.
yhuai Apr 3, 2016
f72e6a9
Show functions should use SessionCatalog to get all functions.
yhuai Apr 4, 2016
5fcf6bc
Add tests.
yhuai Apr 4, 2016
9d39a83
Merge remote-tracking branch 'upstream/master' into function
yhuai Apr 4, 2016
0572acc
Move CreateFunction and DropFunction to functions.scala
yhuai Apr 4, 2016
a16395a
Fix test
yhuai Apr 4, 2016
1f77973
Update test and comments
yhuai Apr 4, 2016
611fe17
Merge remote-tracking branch 'upstream/master' into function
yhuai Apr 5, 2016
88fd93c
Address comments.
yhuai Apr 5, 2016
8a41f6d
Make resource loader a transient lazy val.
yhuai Apr 5, 2016
361421c
Review comments
yhuai Apr 5, 2016
21ffafc
Merge remote-tracking branch 'upstream/master' into function
yhuai Apr 5, 2016
3938766
Remove unnecessary changes.
yhuai Apr 5, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ object SimpleAnalyzer
new SimpleCatalystConf(caseSensitiveAnalysis = true))

class SimpleAnalyzer(functionRegistry: FunctionRegistry, conf: CatalystConf)
extends Analyzer(
new SessionCatalog(new InMemoryCatalog, functionRegistry, conf),
functionRegistry,
conf)
extends Analyzer(new SessionCatalog(new InMemoryCatalog, functionRegistry, conf), conf)

/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
Expand All @@ -59,7 +56,6 @@ class SimpleAnalyzer(functionRegistry: FunctionRegistry, conf: CatalystConf)
*/
class Analyzer(
catalog: SessionCatalog,
registry: FunctionRegistry,
conf: CatalystConf,
maxIterations: Int = 100)
extends RuleExecutor[LogicalPlan] with CheckAnalysis {
Expand Down Expand Up @@ -793,9 +789,18 @@ class Analyzer(
case q: LogicalPlan =>
q transformExpressions {
case u if !u.childrenResolved => u // Skip until children are resolved.
case u @ UnresolvedGenerator(name, children) =>
withPosition(u) {
catalog.lookupFunction(name, children) match {
case generator: Generator => generator
case other =>
failAnalysis(s"$name is expected to be a generator. However, " +
s"its class is ${other.getClass.getCanonicalName}, which is not a generator.")
}
}
case u @ UnresolvedFunction(name, children, isDistinct) =>
withPosition(u) {
registry.lookupFunction(name, children) match {
catalog.lookupFunction(name, children) match {
// DISTINCT is not meaningful for a Max or a Min.
case max: Max if isDistinct =>
AggregateExpression(max, Complete, isDistinct = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ trait FunctionRegistry {
/** Drop a function and return whether the function existed. */
def dropFunction(name: String): Boolean

/** Checks if a function with a given name exists. */
def functionExists(name: String): Boolean = lookupFunction(name).isDefined
}

class SimpleFunctionRegistry extends FunctionRegistry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{errors, TableIdentifier}
import org.apache.spark.sql.catalyst.{errors, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.quoteIdentifier
Expand Down Expand Up @@ -133,6 +133,33 @@ object UnresolvedAttribute {
}
}

/**
* Represents an unresolved generator, which will be created by the parser for
* the [[org.apache.spark.sql.catalyst.plans.logical.Generate]] operator.
* The analyzer will resolve this generator.
*/
case class UnresolvedGenerator(name: String, children: Seq[Expression]) extends Generator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay!


override def elementTypes: Seq[(DataType, Boolean, String)] =
throw new UnresolvedException(this, "elementTypes")
override def dataType: DataType = throw new UnresolvedException(this, "dataType")
override def foldable: Boolean = throw new UnresolvedException(this, "foldable")
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
override lazy val resolved = false

override def prettyName: String = name
override def toString: String = s"'$name(${children.mkString(", ")})"

override def eval(input: InternalRow = null): TraversableOnce[InternalRow] =
throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")

override protected def genCode(ctx: CodegenContext, ev: ExprCode): String =
throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")

override def terminate(): TraversableOnce[InternalRow] =
throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")
}

case class UnresolvedFunction(
name: String,
children: Seq[Expression],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,6 @@ class InMemoryCatalog extends ExternalCatalog {
catalog(db).functions.put(newName, newFunc)
}

override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized {
requireFunctionExists(db, funcDefinition.identifier.funcName)
catalog(db).functions.put(funcDefinition.identifier.funcName, funcDefinition)
}

override def getFunction(db: String, funcName: String): CatalogFunction = synchronized {
requireFunctionExists(db, funcName)
catalog(db).functions(funcName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, SimpleFunctionRegistry}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchFunctionException, SimpleFunctionRegistry}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
Expand All @@ -39,17 +39,21 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
*/
class SessionCatalog(
externalCatalog: ExternalCatalog,
functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
conf: CatalystConf) {
import ExternalCatalog._

def this(externalCatalog: ExternalCatalog, functionRegistry: FunctionRegistry) {
this(externalCatalog, functionRegistry, new SimpleCatalystConf(true))
def this(
externalCatalog: ExternalCatalog,
functionRegistry: FunctionRegistry,
conf: CatalystConf) {
this(externalCatalog, DummyFunctionResourceLoader, functionRegistry, conf)
}

// For testing only.
def this(externalCatalog: ExternalCatalog) {
this(externalCatalog, new SimpleFunctionRegistry)
this(externalCatalog, new SimpleFunctionRegistry, new SimpleCatalystConf(true))
}

protected[this] val tempTables = new mutable.HashMap[String, LogicalPlan]
Expand Down Expand Up @@ -439,53 +443,88 @@ class SessionCatalog(
*/
def dropFunction(name: FunctionIdentifier): Unit = {
val db = name.database.getOrElse(currentDb)
val qualified = name.copy(database = Some(db)).unquotedString
if (functionRegistry.functionExists(qualified)) {
// If we have loaded this function into the FunctionRegistry,
// also drop it from there.
// For a permanent function, because we loaded it to the FunctionRegistry
// when it's first used, we also need to drop it from the FunctionRegistry.
functionRegistry.dropFunction(qualified)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we have to do this. Isn't the function registry only for temporary functions? We only put things in it in createTempFunction, so we just only remove things from it in dropTempFunction. This method here is for persistent functions only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this PR, Function registry is not just for temp functions. Builders of permanent functions will be lazily loaded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see... it's because in lookupFunction we actually load a persistent function into the registry as a temporary function. That needs to be documented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
externalCatalog.dropFunction(db, name.funcName)
}

/**
* Alter a metastore function whose name that matches the one specified in `funcDefinition`.
*
* If no database is specified in `funcDefinition`, assume the function is in the
* current database.
*
* Note: If the underlying implementation does not support altering a certain field,
* this becomes a no-op.
*/
def alterFunction(funcDefinition: CatalogFunction): Unit = {
val db = funcDefinition.identifier.database.getOrElse(currentDb)
val newFuncDefinition = funcDefinition.copy(
identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)))
externalCatalog.alterFunction(db, newFuncDefinition)
}

/**
* Retrieve the metadata of a metastore function.
*
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users cannot alter functions (no API exposed). So, I just delete it.

* If a database is specified in `name`, this will return the function in that database.
* If no database is specified, this will return the function in the current database.
*/
// TODO: have a better name. This method is actually for fetching the metadata of a function.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetchFunctionMetadata?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just call it getFunctionMetadata

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we change this (which we should do separately) then we should rename getTable too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Will do it in another PR.

def getFunction(name: FunctionIdentifier): CatalogFunction = {
val db = name.database.getOrElse(currentDb)
externalCatalog.getFunction(db, name.funcName)
}

/**
* Check if the specified function exists.
*/
def functionExists(name: FunctionIdentifier): Boolean = {
if (functionRegistry.functionExists(name.unquotedString)) {
// This function exists in the FunctionRegistry.
true
} else {
// Need to check if this function exists in the metastore.
try {
// TODO: It's better to ask external catalog if this function exists.
// So, we can avoid of having this hacky try/catch block.
getFunction(name) != null
} catch {
case _: NoSuchFunctionException => false
case _: AnalysisException => false // HiveExternalCatalog wraps all exceptions with it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This use of try is kind of hacky. The right thing to do here is to implement functionExists in the external catalog as well. That means we also need to implement it in HiveClientImpl. We can do that later if you prefer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think it is not consistent that getFunction could return a null or an exception when it can't find the function. I did this part before is just to make think work first. Better to refactor it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a TODO.

}
}
}

// ----------------------------------------------------------------
// | Methods that interact with temporary and metastore functions |
// ----------------------------------------------------------------

/**
* Construct a [[FunctionBuilder]] based on the provided class that represents a function.
*
* This performs reflection to decide what type of [[Expression]] to return in the builder.
*/
private[sql] def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = {
// TODO: at least support UDAFs here
throw new UnsupportedOperationException("Use sqlContext.udf.register(...) instead.")
}

/**
* Loads resources such as JARs and Files for a function. Every resource is represented
* by a tuple (resource type, resource uri).
*/
def loadFunctionResources(resources: Seq[(String, String)]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to document in the javadoc what the keys and values are.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

resources.foreach { case (resourceType, uri) =>
val functionResource =
FunctionResource(FunctionResourceType.fromString(resourceType.toLowerCase), uri)
functionResourceLoader.loadResource(functionResource)
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users cannot rename a function (no API exposed). So, I just delete it.


/**
* Create a temporary function.
* This assumes no database is specified in `funcDefinition`.
*/
def createTempFunction(
name: String,
info: ExpressionInfo,
funcDefinition: FunctionBuilder,
ignoreIfExists: Boolean): Unit = {
if (functionRegistry.lookupFunctionBuilder(name).isDefined && !ignoreIfExists) {
throw new AnalysisException(s"Temporary function '$name' already exists.")
}
functionRegistry.registerFunction(name, funcDefinition)
functionRegistry.registerFunction(name, info, funcDefinition)
}

/**
Expand All @@ -501,41 +540,59 @@ class SessionCatalog(
}
}

/**
* Rename a function.
*
* If a database is specified in `oldName`, this will rename the function in that database.
* If no database is specified, this will first attempt to rename a temporary function with
* the same name, then, if that does not exist, rename the function in the current database.
*
* This assumes the database specified in `oldName` matches the one specified in `newName`.
*/
def renameFunction(oldName: FunctionIdentifier, newName: FunctionIdentifier): Unit = {
if (oldName.database != newName.database) {
throw new AnalysisException("rename does not support moving functions across databases")
}
val db = oldName.database.getOrElse(currentDb)
val oldBuilder = functionRegistry.lookupFunctionBuilder(oldName.funcName)
if (oldName.database.isDefined || oldBuilder.isEmpty) {
externalCatalog.renameFunction(db, oldName.funcName, newName.funcName)
} else {
val oldExpressionInfo = functionRegistry.lookupFunction(oldName.funcName).get
val newExpressionInfo = new ExpressionInfo(
oldExpressionInfo.getClassName,
newName.funcName,
oldExpressionInfo.getUsage,
oldExpressionInfo.getExtended)
functionRegistry.dropFunction(oldName.funcName)
functionRegistry.registerFunction(newName.funcName, newExpressionInfo, oldBuilder.get)
}
protected def failFunctionLookup(name: String): Nothing = {
throw new AnalysisException(s"Undefined function: $name. This function is " +
s"neither a registered temporary function nor " +
s"a permanent function registered in the database $currentDb.")
}

/**
* Return an [[Expression]] that represents the specified function, assuming it exists.
* Note: This is currently only used for temporary functions.
*
* For a temporary function or a permanent function that has been loaded,
* this method will simply lookup the function through the
* FunctionRegistry and create an expression based on the builder.
*
* For a permanent function that has not been loaded, we will first fetch its metadata
* from the underlying external catalog. Then, we will load all resources associated
* with this function (i.e. jars and files). Finally, we create a function builder
* based on the function class and put the builder into the FunctionRegistry.
* The name of this function in the FunctionRegistry will be `databaseName.functionName`.
*/
def lookupFunction(name: String, children: Seq[Expression]): Expression = {
functionRegistry.lookupFunction(name, children)
// TODO: Right now, the name can be qualified or not qualified.
// It will be better to get a FunctionIdentifier.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

// TODO: Right now, we assume that name is not qualified!
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add a test for this (the test will be ignored right now).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in org.apache.spark.sql.hive.UDFSuite.

val qualifiedName = FunctionIdentifier(name, Some(currentDb)).unquotedString
if (functionRegistry.functionExists(name)) {
// This function has been already loaded into the function registry.
functionRegistry.lookupFunction(name, children)
} else if (functionRegistry.functionExists(qualifiedName)) {
// This function has been already loaded into the function registry.
// Unlike the above block, we find this function by using the qualified name.
functionRegistry.lookupFunction(qualifiedName, children)
} else {
// The function has not been loaded to the function registry, which means
// that the function is a permanent function (if it actually has been registered
// in the metastore). We need to first put the function in the FunctionRegistry.
val catalogFunction = try {
externalCatalog.getFunction(currentDb, name)
} catch {
case e: AnalysisException => failFunctionLookup(name)
case e: NoSuchFunctionException => failFunctionLookup(name)
}
loadFunctionResources(catalogFunction.resources)
// Please note that qualifiedName is provided by the user. However,
// catalogFunction.identifier.unquotedString is returned by the underlying
// catalog. So, it is possible that qualifiedName is not exactly the same as
// catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
// At here, we preserve the input from the user.
val info = new ExpressionInfo(catalogFunction.className, qualifiedName)
val builder = makeFunctionBuilder(qualifiedName, catalogFunction.className)
createTempFunction(qualifiedName, info, builder, ignoreIfExists = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the javadoc says nothing about this. We should add a sentence there to say we cache it in the registry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the doc.

// Now, we need to create the Expression.
functionRegistry.lookupFunction(qualifiedName, children)
}
}

/**
Expand All @@ -545,17 +602,11 @@ class SessionCatalog(
val dbFunctions =
externalCatalog.listFunctions(db, pattern).map { f => FunctionIdentifier(f, Some(db)) }
val regex = pattern.replaceAll("\\*", ".*").r
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please include my fix for that, or create an utility for all the occurrence.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do that separately so we can get this patch in first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may create another PR for put all the pattern stuff in one place. Right now, it was scattered all over with replaceAll(). Please keep your code as it is for now, I will do it later. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will keep this part as is.

val _tempFunctions = functionRegistry.listFunction()
val loadedFunctions = functionRegistry.listFunction()
.filter { f => regex.pattern.matcher(f).matches() }
.map { f => FunctionIdentifier(f) }
dbFunctions ++ _tempFunctions
}

/**
* Return a temporary function. For testing only.
*/
private[catalog] def getTempFunction(name: String): Option[FunctionBuilder] = {
functionRegistry.lookupFunctionBuilder(name)
// TODO: Actually, there will be dbFunctions that have been loaded into the FunctionRegistry.
// So, the returned list may have two entries for the same function.
dbFunctions ++ loadedFunctions
}

}
Loading