From 50045422630711cf92768b30e305d0dde4ee04c8 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 16 Sep 2014 22:41:56 -0700 Subject: [PATCH] Minor refactoring --- .../spark/sql/hive/SparkHadoopWriter.scala | 9 +++-- .../hive/execution/InsertIntoHiveTable.scala | 33 +++++-------------- 2 files changed, 15 insertions(+), 27 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala index 6e07b51f49230..5fdca4fcdabe5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala @@ -24,6 +24,7 @@ import java.util.Date import scala.collection.mutable import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities} import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} import org.apache.hadoop.hive.ql.plan.FileSinkDesc @@ -159,11 +160,13 @@ private[hive] object SparkHiveWriterContainer { private[spark] class SparkHiveDynamicPartitionWriterContainer( @transient jobConf: JobConf, fileSinkConf: FileSinkDesc, - dynamicPartColNames: Array[String], - defaultPartName: String) + dynamicPartColNames: Array[String]) extends SparkHiveWriterContainer(jobConf, fileSinkConf) { - @transient var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _ + private val defaultPartName = jobConf.get( + ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal) + + @transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _ override def open(): Unit = { writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 8e9f7e5aa7374..ea884dc4ffa24 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -103,17 +103,16 @@ case class InsertIntoHiveTable( valueClass: Class[_], fileSinkConf: FileSinkDesc, conf: SerializableWritable[JobConf], - isCompressed: Boolean, writerContainer: SparkHiveWriterContainer) { assert(valueClass != null, "Output value class not set") conf.value.setOutputValueClass(valueClass) - assert(fileSinkConf.getTableInfo.getOutputFileFormatClassName != null) - // Doesn't work in Scala 2.9 due to what may be a generics bug - // TODO: Should we uncomment this for Scala 2.10? - // conf.setOutputFormat(outputFormatClass) - conf.value.set( - "mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName) + val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName + assert(outputFileFormatClassName != null, "Output format class not set") + conf.value.set("mapred.output.format.class", outputFileFormatClassName) + + val isCompressed = conf.value.getBoolean( + ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) if (isCompressed) { // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", @@ -218,28 +217,14 @@ case class InsertIntoHiveTable( val jobConf = new JobConf(sc.hiveconf) val jobConfSer = new SerializableWritable(jobConf) - val defaultPartName = jobConf.get( - ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal) val writerContainer = if (numDynamicPartitions > 0) { - new SparkHiveDynamicPartitionWriterContainer( - jobConf, - fileSinkConf, - partitionColumnNames.takeRight(numDynamicPartitions), - defaultPartName) + val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions) + new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames) } else { new SparkHiveWriterContainer(jobConf, fileSinkConf) } - val isCompressed = jobConf.getBoolean( - ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) - - saveAsHiveFile( - child.execute(), - outputClass, - fileSinkConf, - jobConfSer, - isCompressed, - writerContainer) + saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer) val outputPath = FileOutputFormat.getOutputPath(jobConf) // Have to construct the format of dbname.tablename.