Skip to content

Commit

Permalink
Fixed coding style issues in sql/hive
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Mar 23, 2014
1 parent 0b56f77 commit b531273
Show file tree
Hide file tree
Showing 18 changed files with 119 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import java.text.NumberFormat
import java.util.Date

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
import org.apache.hadoop.hive.ql.plan.FileSinkDesc
import org.apache.hadoop.io.Writable

import org.apache.spark.Logging
import org.apache.spark.SerializableWritable

import org.apache.hadoop.hive.ql.exec.{Utilities, FileSinkOperator}
import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
import org.apache.hadoop.hive.ql.plan.FileSinkDesc

/**
* Internal helper class that saves an RDD using a Hive OutputFormat.
* It is based on [[SparkHadoopWriter]].
Expand Down
25 changes: 13 additions & 12 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,26 @@
package org.apache.spark.sql
package hive

import java.io.{PrintStream, InputStreamReader, BufferedReader, File}
import java.util.{ArrayList => JArrayList}
import scala.language.implicitConversions

import org.apache.spark.SparkContext
import java.io.{BufferedReader, File, InputStreamReader, PrintStream}
import java.util.{ArrayList => JArrayList}

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.ql.processors.{CommandProcessorResponse, CommandProcessorFactory}
import org.apache.hadoop.hive.ql.processors.CommandProcessor
import org.apache.hadoop.hive.ql.Driver
import org.apache.spark.rdd.RDD

import catalyst.analysis.{Analyzer, OverrideCatalog}
import catalyst.expressions.GenericRow
import catalyst.plans.logical.{BaseRelation, LogicalPlan, NativeCommand, ExplainCommand}
import catalyst.types._
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand, ExplainCommand}
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution._

/* Implicit conversions */
import scala.collection.JavaConversions._

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde2.Deserializer

import catalyst.analysis.Catalog
import catalyst.expressions._
import catalyst.plans.logical
import catalyst.plans.logical._
import catalyst.rules._
import catalyst.types._
import org.apache.spark.sql.catalyst.analysis.Catalog
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.types._

import scala.collection.JavaConversions._

Expand All @@ -45,7 +45,7 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
db: Option[String],
tableName: String,
alias: Option[String]): LogicalPlan = {
val databaseName = db.getOrElse(hive.sessionState.getCurrentDatabase())
val databaseName = db.getOrElse(hive.sessionState.getCurrentDatabase)
val table = client.getTable(databaseName, tableName)
val partitions: Seq[Partition] =
if (table.isPartitioned) {
Expand Down Expand Up @@ -91,7 +91,7 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
object CreateTables extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case InsertIntoCreatedTable(db, tableName, child) =>
val databaseName = db.getOrElse(SessionState.get.getCurrentDatabase())
val databaseName = db.getOrElse(SessionState.get.getCurrentDatabase)

createTable(databaseName, tableName, child.output)

Expand Down Expand Up @@ -123,8 +123,8 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
} else {
// Only do the casting when child output data types differ from table output data types.
val castedChildOutput = child.output.zip(table.output).map {
case (input, table) if input.dataType != table.dataType =>
Alias(Cast(input, table.dataType), input.name)()
case (input, output) if input.dataType != output.dataType =>
Alias(Cast(input, output.dataType), input.name)()
case (input, _) => input
}

Expand All @@ -135,7 +135,7 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {

/**
* UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
* For now, if this functionallity is desired mix in the in-memory [[OverrideCatalog]].
* For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
*/
override def registerTable(
databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = ???
Expand Down
25 changes: 13 additions & 12 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@
package org.apache.spark.sql
package hive

import scala.collection.JavaConversions._

import org.apache.hadoop.hive.ql.lib.Node
import org.apache.hadoop.hive.ql.parse._
import org.apache.hadoop.hive.ql.plan.PlanUtils

import catalyst.analysis._
import catalyst.expressions._
import catalyst.plans._
import catalyst.plans.logical
import catalyst.plans.logical._
import catalyst.types._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types._

/* Implicit conversions */
import scala.collection.JavaConversions._

/**
* Used when we need to start parsing the AST before deciding that we are going to pass the command
Expand All @@ -48,7 +49,7 @@ case class AddJar(jarPath: String) extends Command

case class AddFile(filePath: String) extends Command

/** Provides a mapping from HiveQL statments to catalyst logical plans and expression trees. */
/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
object HiveQl {
protected val nativeCommands = Seq(
"TOK_DESCFUNCTION",
Expand Down Expand Up @@ -150,13 +151,13 @@ object HiveQl {
}

/**
* Returns a scala.Seq equivilent to [s] or Nil if [s] is null.
* Returns a scala.Seq equivalent to [s] or Nil if [s] is null.
*/
private def nilIfEmpty[A](s: java.util.List[A]): Seq[A] =
Option(s).map(_.toSeq).getOrElse(Nil)

/**
* Returns this ASTNode with the text changed to `newText``.
* Returns this ASTNode with the text changed to `newText`.
*/
def withText(newText: String): ASTNode = {
n.token.asInstanceOf[org.antlr.runtime.CommonToken].setText(newText)
Expand Down Expand Up @@ -667,7 +668,7 @@ object HiveQl {
case Token(allJoinTokens(joinToken),
relation1 ::
relation2 :: other) =>
assert(other.size <= 1, s"Unhandled join child ${other}")
assert(other.size <= 1, s"Unhandled join child $other")
val joinType = joinToken match {
case "TOK_JOIN" => Inner
case "TOK_RIGHTOUTERJOIN" => RightOuter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
package org.apache.spark.sql
package hive

import catalyst.expressions._
import catalyst.planning._
import catalyst.plans._
import catalyst.plans.logical.{BaseRelation, LogicalPlan}

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.parquet.{ParquetRelation, InsertIntoParquetTable, ParquetTableScan}
import org.apache.spark.sql.parquet.{InsertIntoParquetTable, ParquetRelation, ParquetTableScan}

trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
package org.apache.spark.sql
package hive

import java.io.{InputStreamReader, BufferedReader}
import java.io.{BufferedReader, InputStreamReader}

import catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution._

/* Implicit conversions */
import scala.collection.JavaConversions._

/**
Expand Down
26 changes: 12 additions & 14 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@ package org.apache.spark.sql
package hive

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.io.Writable
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.mapred.{FileInputFormat, JobConf, InputFormat}
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}

import org.apache.spark.SerializableWritable
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{HadoopRDD, UnionRDD, EmptyRDD, RDD}

import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}

/**
* A trait for subclasses that handle table scans.
Expand All @@ -40,7 +39,6 @@ private[hive] sealed trait TableReader {
def makeRDDForTable(hiveTable: HiveTable): RDD[_]

def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[_]

}


Expand All @@ -57,7 +55,6 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
private val _minSplitsPerRDD = math.max(
sc.hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinSplits)


// TODO: set aws s3 credentials.

private val _broadcastedHiveConf =
Expand Down Expand Up @@ -85,8 +82,8 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
def makeRDDForTable(
hiveTable: HiveTable,
deserializerClass: Class[_ <: Deserializer],
filterOpt: Option[PathFilter]): RDD[_] =
{
filterOpt: Option[PathFilter]): RDD[_] = {

assert(!hiveTable.isPartitioned, """makeRDDForTable() cannot be called on a partitioned table,
since input formats may differ across partitions. Use makeRDDForTablePartitions() instead.""")

Expand Down Expand Up @@ -115,6 +112,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
sys.error(s"Unable to deserialize non-Writable: $value of ${value.getClass.getName}")
}
}

deserializedHadoopRDD
}

Expand All @@ -136,8 +134,8 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
*/
def makeRDDForPartitionedTable(
partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]],
filterOpt: Option[PathFilter]): RDD[_] =
{
filterOpt: Option[PathFilter]): RDD[_] = {

val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) =>
val partDesc = Utilities.getPartitionDesc(partition)
val partPath = partition.getPartitionPath
Expand Down Expand Up @@ -178,6 +176,7 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
}
}
}.toSeq

// Even if we don't use any partitions, we still need an empty RDD
if (hivePartitionRDDs.size == 0) {
new EmptyRDD[Object](sc.sparkContext)
Expand Down Expand Up @@ -207,8 +206,8 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
private def createHadoopRdd(
tableDesc: TableDesc,
path: String,
inputFormatClass: Class[InputFormat[Writable, Writable]])
: RDD[Writable] = {
inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = {

val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _

val rdd = new HadoopRDD(
Expand All @@ -227,7 +226,6 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
}

private[hive] object HadoopTableReader {

/**
* Curried. After given an argument for 'path', the resulting JobConf => Unit closure is used to
* instantiate a HadoopRDD.
Expand Down
21 changes: 10 additions & 11 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,22 @@ import java.io.File
import java.util.{Set => JavaSet}

import scala.collection.mutable
import scala.collection.JavaConversions._
import scala.language.implicitConversions

import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor}
import org.apache.hadoop.hive.metastore.MetaStoreUtils
import org.apache.hadoop.hive.ql.exec.FunctionRegistry
import org.apache.hadoop.hive.ql.io.avro.{AvroContainerOutputFormat, AvroContainerInputFormat}
import org.apache.hadoop.hive.ql.io.avro.{AvroContainerInputFormat, AvroContainerOutputFormat}
import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.hive.serde2.avro.AvroSerDe
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.hive.serde2.RegexSerDe
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.hive.serde2.avro.AvroSerDe

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, NativeCommand}
import org.apache.spark.sql.catalyst.util._

import catalyst.analysis._
import catalyst.plans.logical.{LogicalPlan, NativeCommand}
import catalyst.util._
/* Implicit conversions */
import scala.collection.JavaConversions._

object TestHive
extends TestHiveContext(new SparkContext("local", "TestSQLContext", new SparkConf()))
Expand All @@ -52,7 +51,7 @@ object TestHive
*
* TestHive is singleton object version of this class because instantiating multiple copies of the
* hive metastore seems to lead to weird non-deterministic failures. Therefore, the execution of
* testcases that rely on TestHive must be serialized.
* test cases that rely on TestHive must be serialized.
*/
class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
self =>
Expand Down
Loading

0 comments on commit b531273

Please sign in to comment.