From 4163e38bccca33608fc4a241760e86d4862793b5 Mon Sep 17 00:00:00 2001 From: Kevin Mader Date: Wed, 1 Oct 2014 13:47:53 +0200 Subject: [PATCH] fixing line length and output from FSDataInputStream to DataInputStream to minimize sensitivity to Hadoop API changes --- .../org/apache/spark/input/RawFileInput.scala | 59 ++++++++----------- 1 file changed, 26 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/input/RawFileInput.scala b/core/src/main/scala/org/apache/spark/input/RawFileInput.scala index 1f3989995373c..9c97f41097e52 100644 --- a/core/src/main/scala/org/apache/spark/input/RawFileInput.scala +++ b/core/src/main/scala/org/apache/spark/input/RawFileInput.scala @@ -18,25 +18,24 @@ package org.apache.spark.input import scala.collection.JavaConversions._ -import com.google.common.io.{ByteStreams, Closeables} +import com.google.common.io.{ ByteStreams, Closeables } import org.apache.hadoop.mapreduce.InputSplit import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit import org.apache.hadoop.mapreduce.RecordReader import org.apache.hadoop.mapreduce.TaskAttemptContext -import org.apache.hadoop.fs.{FSDataInputStream, Path} +import org.apache.hadoop.fs.{ FSDataInputStream, Path } import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat import org.apache.hadoop.mapreduce.JobContext import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataOutputStream, DataInputStream} - +import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, DataOutputStream, DataInputStream } /** * A general format for reading whole files in as streams, byte arrays, * or other functions to be added */ abstract class StreamFileInputFormat[T] - extends CombineFileInputFormat[String,T] { + extends CombineFileInputFormat[String, T] { override protected def isSplitable(context: JobContext, file: Path): Boolean = false /** * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API. @@ -51,8 +50,7 @@ abstract class StreamFileInputFormat[T] super.setMaxSplitSize(maxSplitSize) } - def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): - RecordReader[String,T] + def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): RecordReader[String, T] } @@ -62,13 +60,14 @@ abstract class StreamFileInputFormat[T] * @note TaskAttemptContext is not serializable resulting in the confBytes construct * @note CombineFileSplit is not serializable resulting in the splitBytes construct */ -class PortableDataStream(@transient isplit: CombineFileSplit, @transient context: TaskAttemptContext, index: Integer) +class PortableDataStream(@transient isplit: CombineFileSplit, + @transient context: TaskAttemptContext, index: Integer) extends Serializable { // transient forces file to be reopened after being serialization // it is also used for non-serializable classes @transient - private var fileIn: FSDataInputStream = null.asInstanceOf[FSDataInputStream] + private var fileIn: DataInputStream = null.asInstanceOf[DataInputStream] @transient private var isOpen = false @@ -111,12 +110,12 @@ class PortableDataStream(@transient isplit: CombineFileSplit, @transient context /** * create a new DataInputStream from the split and context */ - def open(): FSDataInputStream = { + def open(): DataInputStream = { if (!isOpen) { val pathp = split.getPath(index) val fs = pathp.getFileSystem(conf) fileIn = fs.open(pathp) - isOpen=true + isOpen = true } fileIn } @@ -138,7 +137,7 @@ class PortableDataStream(@transient isplit: CombineFileSplit, @transient context if (isOpen) { try { fileIn.close() - isOpen=false + isOpen = false } catch { case ioe: java.io.IOException => // do nothing } @@ -152,20 +151,17 @@ class PortableDataStream(@transient isplit: CombineFileSplit, @transient context * to reading files out as streams */ abstract class StreamBasedRecordReader[T]( - split: CombineFileSplit, - context: TaskAttemptContext, - index: Integer) + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) extends RecordReader[String, T] { - - // True means the current file has been processed, then skip it. private var processed = false private var key = "" private var value: T = null.asInstanceOf[T] - override def initialize(split: InputSplit, context: TaskAttemptContext) = {} override def close() = {} @@ -175,8 +171,6 @@ abstract class StreamBasedRecordReader[T]( override def getCurrentValue = value - - override def nextKeyValue = { if (!processed) { val fileIn = new PortableDataStream(split, context, index) @@ -202,9 +196,9 @@ abstract class StreamBasedRecordReader[T]( * Reads the record in directly as a stream for other objects to manipulate and handle */ private[spark] class StreamRecordReader( - split: CombineFileSplit, - context: TaskAttemptContext, - index: Integer) + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) extends StreamBasedRecordReader[PortableDataStream](split, context, index) { def parseStream(inStream: PortableDataStream): PortableDataStream = inStream @@ -215,12 +209,11 @@ private[spark] class StreamRecordReader( * BinaryRecordReader (as Byte array) */ private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDataStream] { - override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)= - { - new CombineFileRecordReader[String,PortableDataStream]( - split.asInstanceOf[CombineFileSplit], taContext, classOf[StreamRecordReader] - ) - } + override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext) = + { + new CombineFileRecordReader[String, PortableDataStream]( + split.asInstanceOf[CombineFileSplit], taContext, classOf[StreamRecordReader]) + } } /** @@ -229,10 +222,10 @@ private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDat * the file as a byte array */ abstract class BinaryRecordReader[T]( - split: CombineFileSplit, - context: TaskAttemptContext, - index: Integer) - extends StreamBasedRecordReader[T](split,context,index) { + split: CombineFileSplit, + context: TaskAttemptContext, + index: Integer) + extends StreamBasedRecordReader[T](split, context, index) { def parseStream(inpStream: PortableDataStream): T = { val inStream = inpStream.open()