From 81bb36628f647433a9bd2203cb4f30bfbda02b9e Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 7 May 2015 22:51:52 +0000 Subject: [PATCH] comments from vanzin --- .../hive/thriftserver/SparkSQLCLIDriver.scala | 7 +-- .../apache/spark/sql/hive/HiveContext.scala | 50 +++++++++---------- .../hive/execution/CreateTableAsSelect.scala | 12 ++--- .../apache/spark/sql/hive/test/TestHive.scala | 12 ++--- 4 files changed, 37 insertions(+), 44 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 7c4d50dd56cb0..ab69d72e80439 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -81,8 +81,9 @@ private[hive] object SparkSQLCLIDriver { } val cliConf = new HiveConf(classOf[SessionState]) // Override the location of the metastore since this is only used for local execution. - cliConf.set( - "javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$localMetastore;create=true") + HiveContext.newTemporaryConfiguation().foreach { + case (key, value) => cliConf.set(key, value) + } val sessionState = new CliSessionState(cliConf) sessionState.in = System.in @@ -103,7 +104,7 @@ private[hive] object SparkSQLCLIDriver { sessionState.cmdProperties.entrySet().foreach { item => val key = item.getKey.asInstanceOf[String] val value = item.getValue.asInstanceOf[String] - // We do not propogate metastore options to the execution copy of hive. + // We do not propagate metastore options to the execution copy of hive. if (key != "javax.jdo.option.ConnectionURL") { conf.set(key, value) sessionState.getOverriddenConfigurations.put(key, value) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index d433f9de0a994..b1dc74f45869d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive -import java.io.{BufferedReader, InputStreamReader, PrintStream} +import java.io.{BufferedReader, File, InputStreamReader, PrintStream} import java.sql.Timestamp import java.util.{ArrayList => JArrayList} @@ -36,8 +36,9 @@ import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} -import org.apache.spark.SparkContext +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.annotation.Experimental +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubQueries, OverrideCatalog, OverrideFunctionRegistry} import org.apache.spark.sql.catalyst.plans.logical._ @@ -105,12 +106,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { * Spark SQL for execution. */ protected[hive] def hiveMetastoreVersion: String = - getConf(HIVE_METASTORE_VERSION, "0.13.1") + getConf(HIVE_METASTORE_VERSION, hiveExecutionVersion) /** * The location of the jars that should be used to instantiate the HiveMetastoreClient. This * property can be one of three options: - * - a colon-separated list of jar files or directories for hive and hadoop. + * - a classpath in the standard format for both hive and hadoop. * - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This * option is only valid when using the execution version of Hive. * - maven - download the correct version of hive on demand from maven. @@ -121,22 +122,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { @transient protected[sql] lazy val substitutor = new VariableSubstitution() - - /** A local instance of hive that is only used for execution. */ - protected[hive] lazy val localMetastore = { - val temp = Utils.createTempDir() - temp.delete() - temp - } - - @transient - protected[hive] lazy val executionConf = new HiveConf() - executionConf.set( - "javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$localMetastore;create=true") - - /** The version of hive used internally by Spark SQL. */ - lazy val hiveExecutionVersion: String = "0.13.1" - /** * The copy of the hive client that is used for execution. Currently this must always be * Hive 13 as this is the version of Hive that is packaged with Spark SQL. This copy of the @@ -149,9 +134,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { logInfo(s"Initilizing execution hive, version $hiveExecutionVersion") new ClientWrapper( version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion), - config = Map( - "javax.jdo.option.ConnectionURL" -> - s"jdbc:derby:;databaseName=$localMetastore;create=true")) + config = newTemporaryConfiguation()) } SessionState.setCurrentSessionState(executionHive.state) @@ -203,11 +186,13 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { // Convert to files and expand any directories. val jars = hiveMetastoreJars - .split(":") - .map(new java.io.File(_)) + .split(File.pathSeparator) .flatMap { - case f if f.isDirectory => f.listFiles() - case f => f :: Nil + case path if path.endsWith("*") => + val directory = new File(path.dropRight(1)) + directory.listFiles.filter(_.getName.endsWith("jar")) + case path => + new File(path) :: Nil } .map(_.toURI.toURL) @@ -471,9 +456,20 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { private[hive] object HiveContext { + /** The version of hive used internally by Spark SQL. */ + val hiveExecutionVersion: String = "0.13.1" + val HIVE_METASTORE_VERSION: String = "spark.sql.hive.metastore.version" val HIVE_METASTORE_JARS: String = "spark.sql.hive.metastore.jars" + /** Constructs a configuration for hive, where the metastore is located in a temp directory. */ + def newTemporaryConfiguation(): Map[String, String] = { + val tempDir = Utils.createTempDir() + val localMetastore = new File(tempDir, "metastore") + Map( + "javax.jdo.option.ConnectionURL" -> s"jdbc:derby:;databaseName=$localMetastore;create=true") + } + protected val primitiveTypes = Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType, ShortType, DateType, TimestampType, BinaryType) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 4b260feb5368a..91e6ac4032204 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -60,12 +60,12 @@ case class CreateTableAsSelect( schema = query.output.map(c => HiveColumn(c.name, HiveMetastoreTypes.toMetastoreType(c.dataType), null)), - inputFormat = - tableDesc.inputFormat.orElse(Some(classOf[TextInputFormat].getName)), - outputFormat = - tableDesc.outputFormat - .orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)), - serde = tableDesc.serde.orElse(Some(classOf[LazySimpleSerDe].getName()))) + inputFormat = + tableDesc.inputFormat.orElse(Some(classOf[TextInputFormat].getName)), + outputFormat = + tableDesc.outputFormat + .orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)), + serde = tableDesc.serde.orElse(Some(classOf[LazySimpleSerDe].getName()))) hiveContext.catalog.client.createTable(withSchema) // Get the Metastore Relation diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 47b2594c9dff1..61fb61685a213 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -63,6 +63,8 @@ object TestHive class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { self => + import HiveContext._ + // By clearing the port we force Spark to pick a new one. This allows us to rerun tests // without restarting the JVM. System.clearProperty("spark.hostPort") @@ -71,16 +73,10 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { hiveconf.set("hive.plan.serialization.format", "javaXML") lazy val warehousePath = Utils.createTempDir() - lazy val metastorePath = { - val temp = Utils.createTempDir() - temp.delete() - temp - } /** Sets up the system initially or after a RESET command */ - protected override def configure(): Map[String, String] = Map( - "javax.jdo.option.ConnectionURL" -> s"jdbc:derby:;databaseName=$metastorePath;create=true", - "hive.metastore.warehouse.dir" -> warehousePath.toString) + protected override def configure(): Map[String, String] = + newTemporaryConfiguation() ++ Map("hive.metastore.warehouse.dir" -> warehousePath.toString) val testTempDir = Utils.createTempDir()