diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala index d3601cca832b2..c21509d57847d 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala @@ -19,7 +19,6 @@ package org.apache.spark.input import scala.collection.JavaConversions._ -import org.apache.hadoop.conf.{Configuration, Configurable} import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.InputSplit import org.apache.hadoop.mapreduce.JobContext @@ -38,18 +37,12 @@ private[spark] class WholeTextFileInputFormat override protected def isSplitable(context: JobContext, file: Path): Boolean = false - private var conf: Configuration = _ - def setConf(c: Configuration) { - conf = c - } - def getConf: Configuration = conf - override def createRecordReader( split: InputSplit, context: TaskAttemptContext): RecordReader[String, String] = { - val reader = new WholeCombineFileRecordReader(split, context) - reader.setConf(conf) + val reader = new ConfigurableCombineFileRecordReader(split, context) + reader.setConf(getConf) reader } diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala index 40aaeac9d05b5..89e85abb60d13 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala @@ -17,7 +17,7 @@ package org.apache.spark.input -import org.apache.hadoop.conf.{Configuration, Configurable} +import org.apache.hadoop.conf.{Configuration, Configurable => HConfigurable} import com.google.common.io.{ByteStreams, Closeables} import org.apache.hadoop.io.Text @@ -27,6 +27,18 @@ import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecor import org.apache.hadoop.mapreduce.RecordReader import org.apache.hadoop.mapreduce.TaskAttemptContext + +/** + * A trait to implement [[org.apache.hadoop.conf.Configurable Configurable]] interface. + */ +private[spark] trait Configurable extends HConfigurable { + private var conf: Configuration = _ + def setConf(c: Configuration) { + conf = c + } + def getConf: Configuration = conf +} + /** * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file * out in a key-value pair, where the key is the file path and the value is the entire content of @@ -38,12 +50,6 @@ private[spark] class WholeTextFileRecordReader( index: Integer) extends RecordReader[String, String] with Configurable { - private var conf: Configuration = _ - def setConf(c: Configuration) { - conf = c - } - def getConf: Configuration = conf - private[this] val path = split.getPath(index) private[this] val fs = path.getFileSystem(context.getConfiguration) @@ -90,7 +96,7 @@ private[spark] class WholeTextFileRecordReader( * A [[org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader CombineFileRecordReader]] * that could pass Hadoop configuration to WholeTextFileRecordReader. */ -private[spark] class WholeCombineFileRecordReader( +private[spark] class ConfigurableCombineFileRecordReader( split: InputSplit, context: TaskAttemptContext) extends CombineFileRecordReader[String, String]( @@ -99,16 +105,10 @@ private[spark] class WholeCombineFileRecordReader( classOf[WholeTextFileRecordReader] ) with Configurable { - private var conf: Configuration = _ - def setConf(c: Configuration) { - conf = c - } - def getConf: Configuration = conf - override def initNextRecordReader(): Boolean = { val r = super.initNextRecordReader() if (r) { - this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(conf) + this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(getConf) } r }