Skip to content

Commit

Permalink
Minor refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Sep 17, 2014
1 parent fae9eff commit 5004542
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.Date
import scala.collection.mutable

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
import org.apache.hadoop.hive.ql.plan.FileSinkDesc
Expand Down Expand Up @@ -159,11 +160,13 @@ private[hive] object SparkHiveWriterContainer {
private[spark] class SparkHiveDynamicPartitionWriterContainer(
@transient jobConf: JobConf,
fileSinkConf: FileSinkDesc,
dynamicPartColNames: Array[String],
defaultPartName: String)
dynamicPartColNames: Array[String])
extends SparkHiveWriterContainer(jobConf, fileSinkConf) {

@transient var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _
private val defaultPartName = jobConf.get(
ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal)

@transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _

override def open(): Unit = {
writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,16 @@ case class InsertIntoHiveTable(
valueClass: Class[_],
fileSinkConf: FileSinkDesc,
conf: SerializableWritable[JobConf],
isCompressed: Boolean,
writerContainer: SparkHiveWriterContainer) {
assert(valueClass != null, "Output value class not set")
conf.value.setOutputValueClass(valueClass)

assert(fileSinkConf.getTableInfo.getOutputFileFormatClassName != null)
// Doesn't work in Scala 2.9 due to what may be a generics bug
// TODO: Should we uncomment this for Scala 2.10?
// conf.setOutputFormat(outputFormatClass)
conf.value.set(
"mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName
assert(outputFileFormatClassName != null, "Output format class not set")
conf.value.set("mapred.output.format.class", outputFileFormatClassName)

val isCompressed = conf.value.getBoolean(
ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)

if (isCompressed) {
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
Expand Down Expand Up @@ -218,28 +217,14 @@ case class InsertIntoHiveTable(
val jobConf = new JobConf(sc.hiveconf)
val jobConfSer = new SerializableWritable(jobConf)

val defaultPartName = jobConf.get(
ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal)
val writerContainer = if (numDynamicPartitions > 0) {
new SparkHiveDynamicPartitionWriterContainer(
jobConf,
fileSinkConf,
partitionColumnNames.takeRight(numDynamicPartitions),
defaultPartName)
val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames)
} else {
new SparkHiveWriterContainer(jobConf, fileSinkConf)
}

val isCompressed = jobConf.getBoolean(
ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)

saveAsHiveFile(
child.execute(),
outputClass,
fileSinkConf,
jobConfSer,
isCompressed,
writerContainer)
saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)

val outputPath = FileOutputFormat.getOutputPath(jobConf)
// Have to construct the format of dbname.tablename.
Expand Down

0 comments on commit 5004542

Please sign in to comment.