Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Dec 16, 2014
1 parent bf800b9 commit 95d13eb
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.input

import scala.collection.JavaConversions._

import org.apache.hadoop.conf.{Configuration, Configurable}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.JobContext
Expand All @@ -38,18 +37,12 @@ private[spark] class WholeTextFileInputFormat

override protected def isSplitable(context: JobContext, file: Path): Boolean = false

private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf

override def createRecordReader(
split: InputSplit,
context: TaskAttemptContext): RecordReader[String, String] = {

val reader = new WholeCombineFileRecordReader(split, context)
reader.setConf(conf)
val reader = new ConfigurableCombineFileRecordReader(split, context)
reader.setConf(getConf)
reader
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.input

import org.apache.hadoop.conf.{Configuration, Configurable}
import org.apache.hadoop.conf.{Configuration, Configurable => HConfigurable}
import com.google.common.io.{ByteStreams, Closeables}

import org.apache.hadoop.io.Text
Expand All @@ -27,6 +27,18 @@ import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecor
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext


/**
* A trait to implement [[org.apache.hadoop.conf.Configurable Configurable]] interface.
*/
private[spark] trait Configurable extends HConfigurable {
private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf
}

/**
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
* out in a key-value pair, where the key is the file path and the value is the entire content of
Expand All @@ -38,12 +50,6 @@ private[spark] class WholeTextFileRecordReader(
index: Integer)
extends RecordReader[String, String] with Configurable {

private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf

private[this] val path = split.getPath(index)
private[this] val fs = path.getFileSystem(context.getConfiguration)

Expand Down Expand Up @@ -90,7 +96,7 @@ private[spark] class WholeTextFileRecordReader(
* A [[org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader CombineFileRecordReader]]
* that could pass Hadoop configuration to WholeTextFileRecordReader.
*/
private[spark] class WholeCombineFileRecordReader(
private[spark] class ConfigurableCombineFileRecordReader(
split: InputSplit,
context: TaskAttemptContext)
extends CombineFileRecordReader[String, String](
Expand All @@ -99,16 +105,10 @@ private[spark] class WholeCombineFileRecordReader(
classOf[WholeTextFileRecordReader]
) with Configurable {

private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf

override def initNextRecordReader(): Boolean = {
val r = super.initNextRecordReader()
if (r) {
this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(conf)
this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(getConf)
}
r
}
Expand Down

0 comments on commit 95d13eb

Please sign in to comment.