Skip to content

Commit

Permalink
Merge pull request #1 from liancheng/refactor-dp
Browse files Browse the repository at this point in the history
Refactors dynamic partitioning support
  • Loading branch information
baishuo committed Sep 17, 2014
2 parents 5033928 + 1093c20 commit 096bbbc
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 312 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,32 @@ import java.io.IOException
import java.text.NumberFormat
import java.util.Date

import scala.collection.mutable

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
import org.apache.hadoop.hive.ql.plan.FileSinkDesc
import org.apache.hadoop.mapred._
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred._

import org.apache.spark.sql.Row
import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}

/**
* Internal helper class that saves an RDD using a Hive OutputFormat.
* It is based on [[SparkHadoopWriter]].
*/
private[hive] class SparkHiveHadoopWriter(
private[hive] class SparkHiveWriterContainer(
@transient jobConf: JobConf,
fileSinkConf: FileSinkDesc)
extends Logging
with SparkHadoopMapRedUtil
with Serializable {

private val now = new Date()
private val conf = new SerializableWritable(jobConf)
protected val conf = new SerializableWritable(jobConf)

private var jobID = 0
private var splitID = 0
Expand All @@ -51,152 +55,75 @@ private[hive] class SparkHiveHadoopWriter(
private var taID: SerializableWritable[TaskAttemptID] = null

@transient private var writer: FileSinkOperator.RecordWriter = null
@transient private var format: HiveOutputFormat[AnyRef, Writable] = null
@transient private var committer: OutputCommitter = null
@transient private var jobContext: JobContext = null
@transient private var taskContext: TaskAttemptContext = null
@transient private lazy val committer = conf.value.getOutputCommitter
@transient private lazy val jobContext = newJobContext(conf.value, jID.value)
@transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
@transient private lazy val outputFormat =
conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef,Writable]]

def preSetup() {
def driverSideSetup() {
setIDs(0, 0, 0)
setConfParams()

val jCtxt = getJobContext()
getOutputCommitter().setupJob(jCtxt)
committer.setupJob(jobContext)
}


def setup(jobid: Int, splitid: Int, attemptid: Int) {
setIDs(jobid, splitid, attemptid)
def executorSideSetup(jobId: Int, splitId: Int, attemptId: Int) {
setIDs(jobId, splitId, attemptId)
setConfParams()
}

def open() {
val numfmt = NumberFormat.getInstance()
numfmt.setMinimumIntegerDigits(5)
numfmt.setGroupingUsed(false)

val extension = Utilities.getFileExtension(
conf.value,
fileSinkConf.getCompressed,
getOutputFormat())

val outputName = "part-" + numfmt.format(splitID) + extension
val path = FileOutputFormat.getTaskOutputPath(conf.value, outputName)

getOutputCommitter().setupTask(getTaskContext())
writer = HiveFileFormatUtils.getHiveRecordWriter(
conf.value,
fileSinkConf.getTableInfo,
conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
fileSinkConf,
path,
null)
committer.setupTask(taskContext)
}

/**
* create an HiveRecordWriter. imitate the above function open()
* @param dynamicPartPath the relative path for dynamic partition
*
* since this function is used to create different writer for
* different dynamic partition.So we need a parameter dynamicPartPath
* and use it we can calculate a new path and pass the new path to
* the function HiveFileFormatUtils.getHiveRecordWriter
* Create a `HiveRecordWriter`. A relative dynamic partition path can be used to create a writer
* for writing data to a dynamic partition.
*/
def open(dynamicPartPath: String) {
val numfmt = NumberFormat.getInstance()
numfmt.setMinimumIntegerDigits(5)
numfmt.setGroupingUsed(false)

val extension = Utilities.getFileExtension(
conf.value,
fileSinkConf.getCompressed,
getOutputFormat())

val outputName = "part-" + numfmt.format(splitID) + extension
val outputPath: Path = FileOutputFormat.getOutputPath(conf.value)
if (outputPath == null) {
throw new IOException("Undefined job output-path")
}
val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/")) // remove "/"
val path = new Path(workPath, outputName)
getOutputCommitter().setupTask(getTaskContext())
def open() {
writer = HiveFileFormatUtils.getHiveRecordWriter(
conf.value,
fileSinkConf.getTableInfo,
conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
fileSinkConf,
path,
FileOutputFormat.getTaskOutputPath(conf.value, getOutputName),
Reporter.NULL)
}

def write(value: Writable) {
if (writer != null) {
writer.write(value)
} else {
throw new IOException("Writer is null, open() has not been called")
}
protected def getOutputName: String = {
val numberFormat = NumberFormat.getInstance()
numberFormat.setMinimumIntegerDigits(5)
numberFormat.setGroupingUsed(false)
val extension = Utilities.getFileExtension(conf.value, fileSinkConf.getCompressed, outputFormat)
"part-" + numberFormat.format(splitID) + extension
}

def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = writer

def close() {
// Seems the boolean value passed into close does not matter.
writer.close(false)
}

def commit() {
val taCtxt = getTaskContext()
val cmtr = getOutputCommitter()
if (cmtr.needsTaskCommit(taCtxt)) {
if (committer.needsTaskCommit(taskContext)) {
try {
cmtr.commitTask(taCtxt)
committer.commitTask(taskContext)
logInfo (taID + ": Committed")
} catch {
case e: IOException =>
logError("Error committing the output of task: " + taID.value, e)
cmtr.abortTask(taCtxt)
committer.abortTask(taskContext)
throw e
}
} else {
logWarning ("No need to commit output of task: " + taID.value)
logInfo("No need to commit output of task: " + taID.value)
}
}

def commitJob() {
// always ? Or if cmtr.needsTaskCommit ?
val cmtr = getOutputCommitter()
cmtr.commitJob(getJobContext())
committer.commitJob(jobContext)
}

// ********* Private Functions *********

private def getOutputFormat(): HiveOutputFormat[AnyRef,Writable] = {
if (format == null) {
format = conf.value.getOutputFormat()
.asInstanceOf[HiveOutputFormat[AnyRef,Writable]]
}
format
}

private def getOutputCommitter(): OutputCommitter = {
if (committer == null) {
committer = conf.value.getOutputCommitter
}
committer
}

private def getJobContext(): JobContext = {
if (jobContext == null) {
jobContext = newJobContext(conf.value, jID.value)
}
jobContext
}

private def getTaskContext(): TaskAttemptContext = {
if (taskContext == null) {
taskContext = newTaskAttemptContext(conf.value, taID.value)
}
taskContext
}

private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
jobID = jobId
splitID = splitId
Expand All @@ -216,7 +143,7 @@ private[hive] class SparkHiveHadoopWriter(
}
}

private[hive] object SparkHiveHadoopWriter {
private[hive] object SparkHiveWriterContainer {
def createPathFromString(path: String, conf: JobConf): Path = {
if (path == null) {
throw new IllegalArgumentException("Output path is null")
Expand All @@ -226,6 +153,61 @@ private[hive] object SparkHiveHadoopWriter {
if (outputPath == null || fs == null) {
throw new IllegalArgumentException("Incorrectly formatted output path")
}
outputPath.makeQualified(fs)
outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
}
}

private[spark] class SparkHiveDynamicPartitionWriterContainer(
@transient jobConf: JobConf,
fileSinkConf: FileSinkDesc,
dynamicPartColNames: Array[String])
extends SparkHiveWriterContainer(jobConf, fileSinkConf) {

private val defaultPartName = jobConf.get(
ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal)

@transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _

override def open(): Unit = {
writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter]
}

override def close(): Unit = {
writers.values.foreach(_.close(false))
}

override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = {
val dynamicPartPath = dynamicPartColNames
.zip(row.takeRight(dynamicPartColNames.length))
.map { case (col, rawVal) =>
val string = String.valueOf(rawVal)
s"/$col=${if (rawVal == null || string.isEmpty) defaultPartName else string}"
}
.mkString

val path = {
val outputPath = FileOutputFormat.getOutputPath(conf.value)
assert(outputPath != null, "Undefined job output-path")
val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/"))
new Path(workPath, getOutputName)
}

def newWriter = {
val newFileSinkDesc = new FileSinkDesc(
fileSinkConf.getDirName + dynamicPartPath,
fileSinkConf.getTableInfo,
fileSinkConf.getCompressed)
newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec)
newFileSinkDesc.setCompressType(fileSinkConf.getCompressType)
HiveFileFormatUtils.getHiveRecordWriter(
conf.value,
fileSinkConf.getTableInfo,
conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
newFileSinkDesc,
path,
Reporter.NULL)
}

writers.getOrElseUpdate(dynamicPartPath, newWriter)
}
}
Loading

0 comments on commit 096bbbc

Please sign in to comment.