Skip to content

Commit

Permalink
Spark-2706: hive-0.13.1 support on spark
Browse files Browse the repository at this point in the history
  • Loading branch information
zhzhan committed Sep 2, 2014
1 parent 87ebf3b commit 94b4fdc
Show file tree
Hide file tree
Showing 18 changed files with 481 additions and 84 deletions.
16 changes: 16 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,22 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>hive-0.13</id>
<activation>
<property>
<name>hive.version</name>
<value>0.13.1</value>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-ganglia-lgpl</id>
<dependencies>
Expand Down
28 changes: 25 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@
<hbase.version>0.94.6</hbase.version>
<flume.version>1.4.0</flume.version>
<zookeeper.version>3.4.5</zookeeper.version>
<hive.version>0.12.0</hive.version>
<parquet.version>1.4.3</parquet.version>
<jblas.version>1.2.3</jblas.version>
<jetty.version>8.1.14.v20131031</jetty.version>
Expand Down Expand Up @@ -427,7 +426,7 @@
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.4.2.0</version>
<version>${derby.version}</version>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
Expand Down Expand Up @@ -1225,7 +1224,18 @@
</dependency>
</dependencies>
</profile>

<profile>
<id>hive-default</id>
<activation>
<property>
<name>!hive.version</name>
</property>
</activation>
<properties>
<hive.version>0.12.0</hive.version>
<derby.version>10.4.2.0</derby.version>
</properties>
</profile>
<profile>
<id>hive</id>
<activation>
Expand All @@ -1235,6 +1245,18 @@
<module>sql/hive-thriftserver</module>
</modules>
</profile>
<profile>
<id>hive-0.13</id>
<activation>
<property>
<name>hive.version</name>
<value>0.13.1</value>
</property>
</activation>
<properties>
<derby.version>10.10.1.1</derby.version>
</properties>
</profile>

</profiles>
</project>
180 changes: 144 additions & 36 deletions sql/hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@
</properties>

<dependencies>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-hive-bundle</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
Expand All @@ -51,46 +46,15 @@
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>org.spark-project.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>org.spark-project.hive</groupId>
<artifactId>hive-serde</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- hive-serde already depends on avro, but this brings in customized config of avro deps from parent -->
<dependency>
<groupId>org.apache.avro</groupId>
Expand All @@ -109,6 +73,74 @@
</dependencies>

<profiles>
<profile>
<id>hive-default</id>
<activation>
<property>
<name>!hive.version</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-hive-bundle</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.spark-project.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.spark-project.hive</groupId>
<artifactId>hive-serde</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-default-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>v0.12/src/main/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>hive</id>
<build>
Expand All @@ -135,6 +167,82 @@
</plugins>
</build>
</profile>
<profile>
<id>hive-0.13</id>
<activation>
<property>
<name>hive.version</name>
<value>0.13.1</value>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-serde</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-v13-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>v0.13/src/main/scala</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-scala-test-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import java.util.Date
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
import org.apache.hadoop.hive.ql.plan.FileSinkDesc
import org.apache.hadoop.mapred._
import org.apache.hadoop.io.Writable

import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
import org.apache.spark.sql.hive.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive.HiveShim._

/**
* Internal helper class that saves an RDD using a Hive OutputFormat.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.ql.stats.StatsSetupConst
import org.apache.hadoop.hive.serde2.io.TimestampWritable

import org.apache.spark.SparkContext
Expand All @@ -46,6 +45,8 @@ import org.apache.spark.sql.execution.ExtractPythonUdfs
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.{Command => PhysicalCommand}
import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand
import org.apache.spark.sql.hive.HiveShim
import org.apache.spark.sql.hive.HiveShim._

/**
* DEPRECATED: Use HiveContext instead.
Expand Down Expand Up @@ -170,13 +171,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {

val tableParameters = relation.hiveQlTable.getParameters
val oldTotalSize =
Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE)).map(_.toLong).getOrElse(0L)
Option(tableParameters.get(HiveShim.getStatsSetupConstTotalSize)).
map(_.toLong).getOrElse(0L)
val newTotalSize = getFileSizeForTable(hiveconf, relation.hiveQlTable)
// Update the Hive metastore if the total size of the table is different than the size
// recorded in the Hive metastore.
// This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
tableParameters.put(StatsSetupConst.TOTAL_SIZE, newTotalSize.toString)
tableParameters.put(HiveShim.getStatsSetupConstTotalSize, newTotalSize.toString)
val hiveTTable = relation.hiveQlTable.getTTable
hiveTTable.setParameters(tableParameters)
val tableFullName =
Expand Down Expand Up @@ -286,24 +288,20 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
val cmd_trimmed: String = cmd.trim()
val tokens: Array[String] = cmd_trimmed.split("\\s+")
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
val proc: CommandProcessor = CommandProcessorFactory.get(tokens(0), hiveconf)

SessionState.start(sessionState)
val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf)

proc match {
case driver: Driver =>
driver.init()

val results = new JArrayList[String]
val response: CommandProcessorResponse = driver.run(cmd)
// Throw an exception if there is an error in query processing.
if (response.getResponseCode != 0) {
driver.destroy()
driver.close
throw new QueryExecutionException(response.getErrorMessage)
}
driver.setMaxRows(maxRows)
driver.getResults(results)
driver.destroy()
driver.close
results
case _ =>
sessionState.out.println(tokens(0) + " " + cmd_1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.types._

/* Implicit conversions */
import scala.collection.JavaConversions._
import org.apache.spark.sql.hive.HiveShim

private[hive] trait HiveInspectors {

Expand Down Expand Up @@ -137,7 +138,7 @@ private[hive] trait HiveInspectors {

/** Converts native catalyst types to the types expected by Hive */
def wrap(a: Any): AnyRef = a match {
case s: String => new hadoopIo.Text(s) // TODO why should be Text?
case s: String => HiveShim.convertCatalystString2Hive(s)
case i: Int => i: java.lang.Integer
case b: Boolean => b: java.lang.Boolean
case f: Float => f: java.lang.Float
Expand Down
Loading

0 comments on commit 94b4fdc

Please sign in to comment.