Skip to content

Commit

Permalink
Create CreateTableDesc in HiveQl
Browse files Browse the repository at this point in the history
  • Loading branch information
chenghao-intel committed May 5, 2015
1 parent 4d8bf02 commit b3180b4
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -423,47 +423,31 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p
case p: LogicalPlan if p.resolved => p
case p @ CreateTableAsSelect(table, child, allowExisting) =>
val (dbName, tblName) =
processDatabaseAndTableName(
table.specifiedDatabase.getOrElse(client.currentDatabase), table.name)

case CreateTableAsSelect(desc, child, allowExisting) =>
if (hive.convertCTAS && !desc.serde.isDefined) {
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
// does not specify any storage format (file format and storage handler).
if (desc.specifiedDatabase.isDefined) {
throw new AnalysisException(
"Cannot specify database name in a CTAS statement " +
"when spark.sql.hive.convertCTAS is set to true.")
}

val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
desc.name,
hive.conf.defaultDataSourceName,
temporary = false,
mode,
options = Map.empty[String, String],
child
)
val schema = if (table.schema.size > 0) {
table.schema
} else {
execution.CreateTableAsSelect(
desc.copy(
specifiedDatabase = Option(desc.specifiedDatabase.getOrElse(client.currentDatabase))),
child,
allowExisting)
child.output.map {
attr => new HiveColumn(attr.name, toMetastoreType(attr.dataType), null)
}
}

case p: LogicalPlan if p.resolved => p

case p @ CreateTableAsSelect(desc, child, allowExisting) =>
val (dbName, tblName) = processDatabaseAndTableName(desc.database, desc.name)
val desc = table.copy(
specifiedDatabase = Some(dbName),
name = tblName,
schema = schema)

if (hive.convertCTAS) {
if (desc.specifiedDatabase.isDefined) {
throw new AnalysisException(
"Cannot specify database name in a CTAS statement " +
"when spark.sql.hive.convertCTAS is set to true.")
}
// Check if the query specifies file format or storage handler.
val hasStorageSpec = desc.serde.isDefined

if (hive.convertCTAS && !hasStorageSpec) {
val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
// TODO how to handle the database name?
CreateTableUsingAsSelect(
tblName,
hive.conf.defaultDataSourceName,
Expand Down
163 changes: 145 additions & 18 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ import java.sql.Date
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.TableType
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.exec.{FunctionRegistry, FunctionInfo}
import org.apache.hadoop.hive.ql.lib.Node
import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.hive.ql.parse._
import org.apache.hadoop.hive.ql.plan.PlanUtils
import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, PlanUtils}
import org.apache.spark.sql.{AnalysisException, SparkSQLParser}

import org.apache.spark.sql.catalyst.analysis._
Expand Down Expand Up @@ -62,7 +66,8 @@ case class CreateTableAsSelect(
allowExisting: Boolean) extends UnaryNode with Command {

override def output: Seq[Attribute] = Seq.empty[Attribute]
override lazy val resolved: Boolean = tableDesc.specifiedDatabase.isDefined && childrenResolved
override lazy val resolved: Boolean =
tableDesc.specifiedDatabase.isDefined && tableDesc.schema.size > 0 && childrenResolved
}

/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
Expand Down Expand Up @@ -238,17 +243,24 @@ private[hive] object HiveQl {
* Returns the AST for the given SQL string.
*/
def getAst(sql: String): ASTNode = {
/*
* Context has to be passed in hive0.13.1.
* Otherwise, there will be Null pointer exception,
* when retrieving properties form HiveConf.
*/
val hContext = new Context(new HiveConf())
val hContext = hiveContext()
val node = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, hContext))
hContext.clear()
node
}

/**
* Returns the HiveConf
* TODO get it from HiveContext?
*/
private[this] def newHiveConf(): HiveConf = new HiveConf()

/**
* Context has to be passed in hive0.13.1.
* Otherwise, there will be Null pointer exception,
* when retrieving properties form HiveConf.
*/
private[this] def hiveContext(hconf: HiveConf = newHiveConf()): Context = new Context(hconf)

/** Returns a LogicalPlan for a given HiveQL string. */
def parseSql(sql: String): LogicalPlan = hqlParser.parse(sql)
Expand Down Expand Up @@ -579,19 +591,134 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
children)
val (db, tableName) = extractDbNameTableName(tableNameParts)

var tableDesc =
def extractTableDesc(crtTbl: CreateTableDesc) = {
val tbl = new Table(db.getOrElse(null), tableName)
tbl.setFields(crtTbl.getCols)

// Most of code are similar with the DDLTask.createTable() of Hive,
if (crtTbl != null && crtTbl.getTblProps() != null) {
tbl.getTTable().getParameters().putAll(crtTbl.getTblProps())
}

if (crtTbl != null && crtTbl.getPartCols() != null) {
tbl.setPartCols(crtTbl.getPartCols())
}

if (crtTbl != null && crtTbl.getStorageHandler() != null) {
tbl.setProperty(
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE,
crtTbl.getStorageHandler())
}

/*
* We use LazySimpleSerDe by default.
*
* If the user didn't specify a SerDe, and any of the columns are not simple
* types, we will have to use DynamicSerDe instead.
*/
if (crtTbl == null || crtTbl.getSerName() == null) {
val storageHandler = tbl.getStorageHandler()
if (storageHandler == null) {
tbl.setSerializationLib(classOf[LazySimpleSerDe].getName())

import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.TextInputFormat

tbl.setInputFormatClass(classOf[TextInputFormat])
tbl.setOutputFormatClass(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]])
tbl.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
} else {
val serDeClassName = storageHandler.getSerDeClass().getName()
tbl.setSerializationLib(serDeClassName)
}
} else {
// TODO validate the serde existence
val serdeName = crtTbl.getSerName()
tbl.setSerializationLib(crtTbl.getSerName())
}

val serdeParams = new java.util.HashMap[String, String]()
if (crtTbl != null && crtTbl.getFieldDelim() != null) {
serdeParams.put(serdeConstants.FIELD_DELIM, crtTbl.getFieldDelim())
serdeParams.put(serdeConstants.SERIALIZATION_FORMAT, crtTbl.getFieldDelim())
}
if (crtTbl != null && crtTbl.getFieldEscape() != null) {
serdeParams.put(serdeConstants.ESCAPE_CHAR, crtTbl.getFieldEscape())
}

if (crtTbl != null && crtTbl.getCollItemDelim() != null) {
serdeParams.put(serdeConstants.COLLECTION_DELIM, crtTbl.getCollItemDelim())
}
if (crtTbl != null && crtTbl.getMapKeyDelim() != null) {
serdeParams.put(serdeConstants.MAPKEY_DELIM, crtTbl.getMapKeyDelim())
}
if (crtTbl != null && crtTbl.getLineDelim() != null) {
serdeParams.put(serdeConstants.LINE_DELIM, crtTbl.getLineDelim())
}

if (crtTbl != null && crtTbl.getSerdeProps() != null) {
val iter = crtTbl.getSerdeProps().entrySet().iterator()
while (iter.hasNext()) {
val m = iter.next()
serdeParams.put(m.getKey(), m.getValue())
}
}

if (crtTbl != null && crtTbl.getComment() != null) {
tbl.setProperty("comment", crtTbl.getComment())
}

if (crtTbl != null && crtTbl.getLocation() != null) {
HiveShim.setLocation(tbl, crtTbl)
}

if (crtTbl != null) {
tbl.setStoredAsSubDirectories(crtTbl.isStoredAsSubDirectories())
tbl.setInputFormatClass(crtTbl.getInputFormat())
tbl.setOutputFormatClass(crtTbl.getOutputFormat())
}

tbl.getTTable().getSd().setInputFormat(tbl.getInputFormatClass().getName())
tbl.getTTable().getSd().setOutputFormat(tbl.getOutputFormatClass().getName())

if (crtTbl != null && crtTbl.isExternal()) {
tbl.setProperty("EXTERNAL", "TRUE")
tbl.setTableType(TableType.EXTERNAL_TABLE)
}

// TODO add bucket support
// TODO support storage handler
HiveTable(
specifiedDatabase = db,
name = tableName,
schema = Seq.empty,
partitionColumns = Seq.empty,
properties = Map.empty,
serdeProperties = Map.empty,
tableType = ManagedTable,
location = None,
inputFormat = None,
outputFormat = None,
serde = None)
schema = tbl.getAllCols.map(field =>
HiveColumn(field.getName, field.getType, field.getComment)),
partitionColumns = tbl.getPartCols.map(part =>
HiveColumn(part.getName, part.getType, part.getComment)),
properties = tbl.getParameters.toMap,
serdeProperties = serdeParams.toMap,
tableType = if (crtTbl.isExternal()) ExternalTable else ManagedTable,
location = Some(crtTbl.getLocation()),
inputFormat = Some(crtTbl.getInputFormat()),
outputFormat = Some(crtTbl.getOutputFormat()),
serde = Some(crtTbl.getSerName()))
}

// Get the CreateTableDesc from Hive SemanticAnalyzer
val hconf = newHiveConf()
val sa = new SemanticAnalyzer(hconf) {
override def analyzeInternal(ast: ASTNode) {
// A hack to intercept the SemanticAnalyzer.analyzeInternal,
// to ignore the SELECT clause of the CTAS
val method = classOf[SemanticAnalyzer].getDeclaredMethod(
"analyzeCreateTable", classOf[ASTNode], classOf[QB])
method.setAccessible(true)
method.invoke(this, ast, this.getQB)
}
}
sa.analyze(node.asInstanceOf[ASTNode], new Context(hconf))
var tableDesc = extractTableDesc(sa.getQB().getTableDesc)

// TODO: Handle all the cases here...
children.foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ class ClientWrapper(
table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)))
table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) }
table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) }

// set owner
qlTable.setOwner(conf.getUser)
// set create time
qlTable.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])

version match {
case hive.v12 =>
table.location.map(new URI(_)).foreach(u => qlTable.call[URI, Unit]("setDataLocation", u))
Expand Down

0 comments on commit b3180b4

Please sign in to comment.