diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c8d2d1b204568..3398c71367991 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -423,47 +423,31 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive def apply(plan: LogicalPlan): LogicalPlan = plan transform { // Wait until children are resolved. case p: LogicalPlan if !p.childrenResolved => p + case p: LogicalPlan if p.resolved => p + case p @ CreateTableAsSelect(table, child, allowExisting) => + val (dbName, tblName) = + processDatabaseAndTableName( + table.specifiedDatabase.getOrElse(client.currentDatabase), table.name) - case CreateTableAsSelect(desc, child, allowExisting) => - if (hive.convertCTAS && !desc.serde.isDefined) { - // Do the conversion when spark.sql.hive.convertCTAS is true and the query - // does not specify any storage format (file format and storage handler). - if (desc.specifiedDatabase.isDefined) { - throw new AnalysisException( - "Cannot specify database name in a CTAS statement " + - "when spark.sql.hive.convertCTAS is set to true.") - } - - val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists - CreateTableUsingAsSelect( - desc.name, - hive.conf.defaultDataSourceName, - temporary = false, - mode, - options = Map.empty[String, String], - child - ) + val schema = if (table.schema.size > 0) { + table.schema } else { - execution.CreateTableAsSelect( - desc.copy( - specifiedDatabase = Option(desc.specifiedDatabase.getOrElse(client.currentDatabase))), - child, - allowExisting) + child.output.map { + attr => new HiveColumn(attr.name, toMetastoreType(attr.dataType), null) + } } - case p: LogicalPlan if p.resolved => p - - case p @ CreateTableAsSelect(desc, child, allowExisting) => - val (dbName, tblName) = processDatabaseAndTableName(desc.database, desc.name) + val desc = table.copy( + specifiedDatabase = Some(dbName), + name = tblName, + schema = schema) - if (hive.convertCTAS) { - if (desc.specifiedDatabase.isDefined) { - throw new AnalysisException( - "Cannot specify database name in a CTAS statement " + - "when spark.sql.hive.convertCTAS is set to true.") - } + // Check if the query specifies file format or storage handler. + val hasStorageSpec = desc.serde.isDefined + if (hive.convertCTAS && !hasStorageSpec) { val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists + // TODO how to handle the database name? CreateTableUsingAsSelect( tblName, hive.conf.defaultDataSourceName, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 88c5546cff155..b2bd597d0063a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -22,12 +22,16 @@ import java.sql.Date import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.metastore.TableType +import org.apache.hadoop.hive.metastore.api.FieldSchema +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.hive.ql.Context import org.apache.hadoop.hive.ql.exec.{FunctionRegistry, FunctionInfo} import org.apache.hadoop.hive.ql.lib.Node import org.apache.hadoop.hive.ql.metadata.Table import org.apache.hadoop.hive.ql.parse._ -import org.apache.hadoop.hive.ql.plan.PlanUtils +import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, PlanUtils} import org.apache.spark.sql.{AnalysisException, SparkSQLParser} import org.apache.spark.sql.catalyst.analysis._ @@ -62,7 +66,8 @@ case class CreateTableAsSelect( allowExisting: Boolean) extends UnaryNode with Command { override def output: Seq[Attribute] = Seq.empty[Attribute] - override lazy val resolved: Boolean = tableDesc.specifiedDatabase.isDefined && childrenResolved + override lazy val resolved: Boolean = + tableDesc.specifiedDatabase.isDefined && tableDesc.schema.size > 0 && childrenResolved } /** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */ @@ -238,17 +243,24 @@ private[hive] object HiveQl { * Returns the AST for the given SQL string. */ def getAst(sql: String): ASTNode = { - /* - * Context has to be passed in hive0.13.1. - * Otherwise, there will be Null pointer exception, - * when retrieving properties form HiveConf. - */ - val hContext = new Context(new HiveConf()) + val hContext = hiveContext() val node = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, hContext)) hContext.clear() node } + /** + * Returns the HiveConf + * TODO get it from HiveContext? + */ + private[this] def newHiveConf(): HiveConf = new HiveConf() + + /** + * Context has to be passed in hive0.13.1. + * Otherwise, there will be Null pointer exception, + * when retrieving properties form HiveConf. + */ + private[this] def hiveContext(hconf: HiveConf = newHiveConf()): Context = new Context(hconf) /** Returns a LogicalPlan for a given HiveQL string. */ def parseSql(sql: String): LogicalPlan = hqlParser.parse(sql) @@ -579,19 +591,134 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C children) val (db, tableName) = extractDbNameTableName(tableNameParts) - var tableDesc = + def extractTableDesc(crtTbl: CreateTableDesc) = { + val tbl = new Table(db.getOrElse(null), tableName) + tbl.setFields(crtTbl.getCols) + + // Most of code are similar with the DDLTask.createTable() of Hive, + if (crtTbl != null && crtTbl.getTblProps() != null) { + tbl.getTTable().getParameters().putAll(crtTbl.getTblProps()) + } + + if (crtTbl != null && crtTbl.getPartCols() != null) { + tbl.setPartCols(crtTbl.getPartCols()) + } + + if (crtTbl != null && crtTbl.getStorageHandler() != null) { + tbl.setProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE, + crtTbl.getStorageHandler()) + } + + /* + * We use LazySimpleSerDe by default. + * + * If the user didn't specify a SerDe, and any of the columns are not simple + * types, we will have to use DynamicSerDe instead. + */ + if (crtTbl == null || crtTbl.getSerName() == null) { + val storageHandler = tbl.getStorageHandler() + if (storageHandler == null) { + tbl.setSerializationLib(classOf[LazySimpleSerDe].getName()) + + import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + import org.apache.hadoop.io.Text + import org.apache.hadoop.mapred.TextInputFormat + + tbl.setInputFormatClass(classOf[TextInputFormat]) + tbl.setOutputFormatClass(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]]) + tbl.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") + } else { + val serDeClassName = storageHandler.getSerDeClass().getName() + tbl.setSerializationLib(serDeClassName) + } + } else { + // TODO validate the serde existence + val serdeName = crtTbl.getSerName() + tbl.setSerializationLib(crtTbl.getSerName()) + } + + val serdeParams = new java.util.HashMap[String, String]() + if (crtTbl != null && crtTbl.getFieldDelim() != null) { + serdeParams.put(serdeConstants.FIELD_DELIM, crtTbl.getFieldDelim()) + serdeParams.put(serdeConstants.SERIALIZATION_FORMAT, crtTbl.getFieldDelim()) + } + if (crtTbl != null && crtTbl.getFieldEscape() != null) { + serdeParams.put(serdeConstants.ESCAPE_CHAR, crtTbl.getFieldEscape()) + } + + if (crtTbl != null && crtTbl.getCollItemDelim() != null) { + serdeParams.put(serdeConstants.COLLECTION_DELIM, crtTbl.getCollItemDelim()) + } + if (crtTbl != null && crtTbl.getMapKeyDelim() != null) { + serdeParams.put(serdeConstants.MAPKEY_DELIM, crtTbl.getMapKeyDelim()) + } + if (crtTbl != null && crtTbl.getLineDelim() != null) { + serdeParams.put(serdeConstants.LINE_DELIM, crtTbl.getLineDelim()) + } + + if (crtTbl != null && crtTbl.getSerdeProps() != null) { + val iter = crtTbl.getSerdeProps().entrySet().iterator() + while (iter.hasNext()) { + val m = iter.next() + serdeParams.put(m.getKey(), m.getValue()) + } + } + + if (crtTbl != null && crtTbl.getComment() != null) { + tbl.setProperty("comment", crtTbl.getComment()) + } + + if (crtTbl != null && crtTbl.getLocation() != null) { + HiveShim.setLocation(tbl, crtTbl) + } + + if (crtTbl != null) { + tbl.setStoredAsSubDirectories(crtTbl.isStoredAsSubDirectories()) + tbl.setInputFormatClass(crtTbl.getInputFormat()) + tbl.setOutputFormatClass(crtTbl.getOutputFormat()) + } + + tbl.getTTable().getSd().setInputFormat(tbl.getInputFormatClass().getName()) + tbl.getTTable().getSd().setOutputFormat(tbl.getOutputFormatClass().getName()) + + if (crtTbl != null && crtTbl.isExternal()) { + tbl.setProperty("EXTERNAL", "TRUE") + tbl.setTableType(TableType.EXTERNAL_TABLE) + } + + // TODO add bucket support + // TODO support storage handler HiveTable( specifiedDatabase = db, name = tableName, - schema = Seq.empty, - partitionColumns = Seq.empty, - properties = Map.empty, - serdeProperties = Map.empty, - tableType = ManagedTable, - location = None, - inputFormat = None, - outputFormat = None, - serde = None) + schema = tbl.getAllCols.map(field => + HiveColumn(field.getName, field.getType, field.getComment)), + partitionColumns = tbl.getPartCols.map(part => + HiveColumn(part.getName, part.getType, part.getComment)), + properties = tbl.getParameters.toMap, + serdeProperties = serdeParams.toMap, + tableType = if (crtTbl.isExternal()) ExternalTable else ManagedTable, + location = Some(crtTbl.getLocation()), + inputFormat = Some(crtTbl.getInputFormat()), + outputFormat = Some(crtTbl.getOutputFormat()), + serde = Some(crtTbl.getSerName())) + } + + // Get the CreateTableDesc from Hive SemanticAnalyzer + val hconf = newHiveConf() + val sa = new SemanticAnalyzer(hconf) { + override def analyzeInternal(ast: ASTNode) { + // A hack to intercept the SemanticAnalyzer.analyzeInternal, + // to ignore the SELECT clause of the CTAS + val method = classOf[SemanticAnalyzer].getDeclaredMethod( + "analyzeCreateTable", classOf[ASTNode], classOf[QB]) + method.setAccessible(true) + method.invoke(this, ast, this.getQB) + } + } + sa.analyze(node.asInstanceOf[ASTNode], new Context(hconf)) + var tableDesc = extractTableDesc(sa.getQB().getTableDesc) // TODO: Handle all the cases here... children.foreach { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index d39e5450c8d36..848204e0836f4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -205,6 +205,12 @@ class ClientWrapper( table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment))) table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) } table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) } + + // set owner + qlTable.setOwner(conf.getUser) + // set create time + qlTable.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int]) + version match { case hive.v12 => table.location.map(new URI(_)).foreach(u => qlTable.call[URI, Unit]("setDataLocation", u))