From 5587a4995634af44ceecc9755165eb9a02bc0e5b Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 15 Mar 2016 15:32:38 -0700 Subject: [PATCH] Implement SessionCatalog using ExternalCatalog --- .../sql/catalyst/catalog/SessionCatalog.scala | 236 +++++++++++++++--- .../sql/catalyst/catalog/interface.scala | 19 ++ 2 files changed, 221 insertions(+), 34 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 1b3bf831ce89b..e761b34f85992 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -19,8 +19,11 @@ package org.apache.spark.sql.catalyst.catalog import java.util.concurrent.ConcurrentHashMap +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{SubqueryAlias, LogicalPlan} /** @@ -28,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary * tables and functions of the Spark Session that it belongs to. */ -abstract class SessionCatalog(catalog: ExternalCatalog) { +class SessionCatalog(externalCatalog: ExternalCatalog) { import ExternalCatalog._ private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan] @@ -41,19 +44,33 @@ abstract class SessionCatalog(catalog: ExternalCatalog) { // All methods in this category interact directly with the underlying catalog. // ---------------------------------------------------------------------------- - def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit + def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = { + externalCatalog.createDatabase(dbDefinition, ignoreIfExists) + } - def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit + def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = { + externalCatalog.dropDatabase(db, ignoreIfNotExists, cascade) + } - def alterDatabase(dbDefinition: CatalogDatabase): Unit + def alterDatabase(dbDefinition: CatalogDatabase): Unit = { + externalCatalog.alterDatabase(dbDefinition) + } - def getDatabase(db: String): CatalogDatabase + def getDatabase(db: String): CatalogDatabase = { + externalCatalog.getDatabase(db) + } - def databaseExists(db: String): Boolean + def databaseExists(db: String): Boolean = { + externalCatalog.databaseExists(db) + } - def listDatabases(): Seq[String] + def listDatabases(): Seq[String] = { + externalCatalog.listDatabases() + } - def listDatabases(pattern: String): Seq[String] + def listDatabases(pattern: String): Seq[String] = { + externalCatalog.listDatabases(pattern) + } // ---------------------------------------------------------------------------- // Tables @@ -75,7 +92,12 @@ abstract class SessionCatalog(catalog: ExternalCatalog) { def createTable( currentDb: String, tableDefinition: CatalogTable, - ignoreIfExists: Boolean): Unit + ignoreIfExists: Boolean): Unit = { + val db = tableDefinition.name.database.getOrElse(currentDb) + val newTableDefinition = tableDefinition.copy( + name = TableIdentifier(tableDefinition.name.table, Some(db))) + externalCatalog.createTable(db, newTableDefinition, ignoreIfExists) + } /** * Alter the metadata of an existing metastore table identified by `tableDefinition`. @@ -86,13 +108,21 @@ abstract class SessionCatalog(catalog: ExternalCatalog) { * Note: If the underlying implementation does not support altering a certain field, * this becomes a no-op. */ - def alterTable(currentDb: String, tableDefinition: CatalogTable): Unit + def alterTable(currentDb: String, tableDefinition: CatalogTable): Unit = { + val db = tableDefinition.name.database.getOrElse(currentDb) + val newTableDefinition = tableDefinition.copy( + name = TableIdentifier(tableDefinition.name.table, Some(db))) + externalCatalog.alterTable(db, newTableDefinition) + } /** * Retrieve the metadata of an existing metastore table. * If no database is specified, assume the table is in the current database. */ - def getTable(currentDb: String, name: TableIdentifier): CatalogTable + def getTable(currentDb: String, name: TableIdentifier): CatalogTable = { + val db = name.database.getOrElse(currentDb) + externalCatalog.getTable(db, name.table) + } // ------------------------------------------------------------- // | Methods that interact with temporary and metastore tables | @@ -100,9 +130,16 @@ abstract class SessionCatalog(catalog: ExternalCatalog) { /** * Create a temporary table. - * If a temporary table with the same name already exists, this throws an exception. */ - def createTempTable(name: String, tableDefinition: LogicalPlan): Unit + def createTempTable( + name: String, + tableDefinition: LogicalPlan, + ignoreIfExists: Boolean): Unit = { + if (tempTables.contains(name) && !ignoreIfExists) { + throw new AnalysisException(s"Temporary table '$name' already exists.") + } + tempTables.put(name, tableDefinition) + } /** * Rename a table. @@ -116,7 +153,15 @@ abstract class SessionCatalog(catalog: ExternalCatalog) { def renameTable( currentDb: String, oldName: TableIdentifier, - newName: TableIdentifier): Unit + newName: TableIdentifier): Unit = { + val db = oldName.database.getOrElse(currentDb) + if (oldName.database.isDefined || !tempTables.contains(oldName.table)) { + externalCatalog.renameTable(db, oldName.table, newName.table) + } else { + val table = tempTables.remove(oldName.table) + tempTables.put(newName.table, table) + } + } /** * Drop a table. @@ -128,7 +173,14 @@ abstract class SessionCatalog(catalog: ExternalCatalog) { def dropTable( currentDb: String, name: TableIdentifier, - ignoreIfNotExists: Boolean): Unit + ignoreIfNotExists: Boolean): Unit = { + val db = name.database.getOrElse(currentDb) + if (name.database.isDefined || !tempTables.contains(name.table)) { + externalCatalog.dropTable(db, name.table, ignoreIfNotExists) + } else { + tempTables.remove(name.table) + } + } /** * Return a [[LogicalPlan]] that represents the given table. @@ -140,17 +192,42 @@ abstract class SessionCatalog(catalog: ExternalCatalog) { def lookupRelation( currentDb: String, name: TableIdentifier, - alias: Option[String] = None): LogicalPlan + alias: Option[String] = None): LogicalPlan = { + val db = name.database.getOrElse(currentDb) + val relation = + if (name.database.isDefined || !tempTables.contains(name.table)) { + val metadata = externalCatalog.getTable(db, name.table) + CatalogRelation(db, metadata, alias) + } else { + tempTables.get(name.table) + } + val tableWithQualifiers = SubqueryAlias(name.table, relation) + // If an alias was specified by the lookup, wrap the plan in a subquery so that + // attributes are properly qualified with this alias. + alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers) + } /** * List all tables in the current database, including temporary tables. */ - def listTables(currentDb: String): Seq[TableIdentifier] + def listTables(currentDb: String): Seq[TableIdentifier] = { + val tablesInCurrentDb = externalCatalog.listTables(currentDb).map { t => + TableIdentifier(t, Some(currentDb)) + } + val _tempTables = tempTables.keys().asScala.map { t => TableIdentifier(t) } + tablesInCurrentDb ++ _tempTables + } /** * List all matching tables in the current database, including temporary tables. */ - def listTables(currentDb: String, pattern: String): Seq[TableIdentifier] + def listTables(currentDb: String, pattern: String): Seq[TableIdentifier] = { + val tablesInCurrentDb = externalCatalog.listTables(currentDb, pattern).map { t => + TableIdentifier(t, Some(currentDb)) + } + val _tempTables = tempTables.keys().asScala.map { t => TableIdentifier(t) } + tablesInCurrentDb ++ _tempTables + } // ---------------------------------------------------------------------------- // Partitions @@ -164,48 +241,86 @@ abstract class SessionCatalog(catalog: ExternalCatalog) { // the metastore. For now, partition values of a data source table will be // automatically discovered when we load the table. + /** + * Create partitions in an existing table, assuming it exists. + * If no database is specified, assume the table is in the current database. + */ def createPartitions( currentDb: String, tableName: TableIdentifier, parts: Seq[CatalogTablePartition], - ignoreIfExists: Boolean): Unit + ignoreIfExists: Boolean): Unit = { + val db = tableName.database.getOrElse(currentDb) + externalCatalog.createPartitions(db, tableName.table, parts, ignoreIfExists) + } + /** + * Drop partitions from a table, assuming they exist. + * If no database is specified, assume the table is in the current database. + */ def dropPartitions( currentDb: String, tableName: TableIdentifier, parts: Seq[TablePartitionSpec], - ignoreIfNotExists: Boolean): Unit + ignoreIfNotExists: Boolean): Unit = { + val db = tableName.database.getOrElse(currentDb) + externalCatalog.dropPartitions(db, tableName.table, parts, ignoreIfNotExists) + } /** * Override the specs of one or many existing table partitions, assuming they exist. + * * This assumes index i of `specs` corresponds to index i of `newSpecs`. + * If no database is specified, assume the table is in the current database. */ def renamePartitions( currentDb: String, tableName: TableIdentifier, specs: Seq[TablePartitionSpec], - newSpecs: Seq[TablePartitionSpec]): Unit + newSpecs: Seq[TablePartitionSpec]): Unit = { + val db = tableName.database.getOrElse(currentDb) + externalCatalog.renamePartitions(db, tableName.table, specs, newSpecs) + } /** * Alter one or many table partitions whose specs that match those specified in `parts`, * assuming the partitions exist. * + * If no database is specified, assume the table is in the current database. + * * Note: If the underlying implementation does not support altering a certain field, * this becomes a no-op. */ def alterPartitions( currentDb: String, tableName: TableIdentifier, - parts: Seq[CatalogTablePartition]): Unit + parts: Seq[CatalogTablePartition]): Unit = { + val db = tableName.database.getOrElse(currentDb) + externalCatalog.alterPartitions(db, tableName.table, parts) + } + /** + * Retrieve the metadata of a table partition, assuming it exists. + * If no database is specified, assume the table is in the current database. + */ def getPartition( currentDb: String, tableName: TableIdentifier, - spec: TablePartitionSpec): CatalogTablePartition + spec: TablePartitionSpec): CatalogTablePartition = { + val db = tableName.database.getOrElse(currentDb) + externalCatalog.getPartition(db, tableName.table, spec) + } + /** + * List all partitions in a table, assuming it exists. + * If no database is specified, assume the table is in the current database. + */ def listPartitions( currentDb: String, - tableName: TableIdentifier): Seq[CatalogTablePartition] + tableName: TableIdentifier): Seq[CatalogTablePartition] = { + val db = tableName.database.getOrElse(currentDb) + externalCatalog.listPartitions(db, tableName.table) + } // ---------------------------------------------------------------------------- // Functions @@ -224,16 +339,24 @@ abstract class SessionCatalog(catalog: ExternalCatalog) { * Create a metastore function in the database specified in `funcDefinition`. * If no such database is specified, create it in the current database. */ - def createFunction(currentDb: String, funcDefinition: CatalogFunction): Unit + def createFunction(currentDb: String, funcDefinition: CatalogFunction): Unit = { + val db = funcDefinition.name.database.getOrElse(currentDb) + val newFuncDefinition = funcDefinition.copy( + name = FunctionIdentifier(funcDefinition.name.funcName, Some(db))) + externalCatalog.createFunction(db, newFuncDefinition) + } /** * Drop a metastore function. * If no database is specified, assume the function is in the current database. */ - def dropFunction(currentDb: String, funcName: FunctionIdentifier): Unit + def dropFunction(currentDb: String, name: FunctionIdentifier): Unit = { + val db = name.database.getOrElse(currentDb) + externalCatalog.dropFunction(db, name.funcName) + } /** - * Alter a function whose name that matches the one specified in `funcDefinition`. + * Alter a metastore function whose name that matches the one specified in `funcDefinition`. * * If no database is specified in `funcDefinition`, assume the function is in the * current database. @@ -241,7 +364,12 @@ abstract class SessionCatalog(catalog: ExternalCatalog) { * Note: If the underlying implementation does not support altering a certain field, * this becomes a no-op. */ - def alterFunction(currentDb: String, funcDefinition: CatalogFunction): Unit + def alterFunction(currentDb: String, funcDefinition: CatalogFunction): Unit = { + val db = funcDefinition.name.database.getOrElse(currentDb) + val newFuncDefinition = funcDefinition.copy( + name = FunctionIdentifier(funcDefinition.name.funcName, Some(db))) + externalCatalog.alterFunction(db, newFuncDefinition) + } // ---------------------------------------------------------------- // | Methods that interact with temporary and metastore functions | @@ -251,12 +379,31 @@ abstract class SessionCatalog(catalog: ExternalCatalog) { * Create a temporary function. * This assumes no database is specified in `funcDefinition`. */ - def createTempFunction(funcDefinition: CatalogFunction): Unit + def createTempFunction( + funcDefinition: CatalogFunction, + ignoreIfExists: Boolean): Unit = { + require(funcDefinition.name.database.isEmpty, + "attempted to create a temporary function while specifying a database") + val name = funcDefinition.name.funcName + if (tempFunctions.contains(name) && !ignoreIfExists) { + throw new AnalysisException(s"Temporary function '$name' already exists.") + } + tempFunctions.put(name, funcDefinition) + } + /** + * Drop a temporary function. + */ // TODO: The reason that we distinguish dropFunction and dropTempFunction is that // Hive has DROP FUNCTION and DROP TEMPORARY FUNCTION. We may want to consolidate // dropFunction and dropTempFunction. - def dropTempFunction(name: String): Unit + def dropTempFunction(name: String, ignoreIfNotExists: Boolean): Unit = { + if (!tempFunctions.contains(name) && !ignoreIfNotExists) { + throw new AnalysisException( + s"Temporary function '$name' cannot be dropped because it does not exist!") + } + tempFunctions.remove(name) + } /** * Rename a function. @@ -270,7 +417,16 @@ abstract class SessionCatalog(catalog: ExternalCatalog) { def renameFunction( currentDb: String, oldName: FunctionIdentifier, - newName: FunctionIdentifier): Unit + newName: FunctionIdentifier): Unit = { + val db = oldName.database.getOrElse(currentDb) + if (oldName.database.isDefined || !tempFunctions.contains(oldName.funcName)) { + externalCatalog.renameFunction(db, oldName.funcName, newName.funcName) + } else { + val func = tempFunctions.remove(oldName.funcName) + val newFunc = func.copy(name = func.name.copy(funcName = newName.funcName)) + tempFunctions.put(newName.funcName, newFunc) + } + } /** * Retrieve the metadata of an existing function. @@ -279,11 +435,23 @@ abstract class SessionCatalog(catalog: ExternalCatalog) { * If no database is specified, this will first attempt to return a temporary function with * the same name, then, if that does not exist, return the function in the current database. */ - def getFunction(currentDb: String, name: FunctionIdentifier): CatalogFunction + def getFunction(currentDb: String, name: FunctionIdentifier): CatalogFunction = { + val db = name.database.getOrElse(currentDb) + if (name.database.isDefined || !tempFunctions.contains(name.funcName)) { + externalCatalog.getFunction(db, name.funcName) + } else { + tempFunctions.get(name.funcName) + } + } /** * List all matching functions in the current database, including temporary functions. */ - def listFunctions(currentDb: String, pattern: String): Seq[FunctionIdentifier] + def listFunctions(currentDb: String, pattern: String): Seq[FunctionIdentifier] = { + val functionsInCurrentDb = externalCatalog.listFunctions(currentDb, pattern).map { f => + FunctionIdentifier(f, Some(currentDb)) + } + functionsInCurrentDb ++ tempFunctions.keys.asScala.map { f => FunctionIdentifier(f) } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 2d20c5436c356..5e4460e9a744b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -21,6 +21,8 @@ import javax.annotation.Nullable import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} /** @@ -272,3 +274,20 @@ object ExternalCatalog { */ type TablePartitionSpec = Map[String, String] } + + +/** + * A [[LogicalPlan]] that wraps [[CatalogTable]]. + */ +case class CatalogRelation( + db: String, + metadata: CatalogTable, + alias: Option[String]) + extends LeafNode { + + // TODO: implement this + override def output: Seq[Attribute] = Seq.empty + + require(metadata.name.database == Some(db), + "provided database does not much the one specified in the table definition") +}