Skip to content

Commit

Permalink
[SPARK-2672] support compressed file in wholeTextFile
Browse files Browse the repository at this point in the history
The wholeFile() can not read compressed files, it should be, just like textFile().

Author: Davies Liu <davies@databricks.com>

Closes #3005 from davies/whole and squashes the following commits:

a43fcfb [Davies Liu] remove semicolon
c83571a [Davies Liu] remove = if return type is Unit
83c844f [Davies Liu] Merge branch 'master' of github.com:apache/spark into whole
22e8b3e [Davies Liu] support compressed file in wholeTextFile
  • Loading branch information
Davies Liu authored and JoshRosen committed Nov 12, 2014
1 parent bd86118 commit d7d54a4
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,38 @@ package org.apache.spark.input

import scala.collection.JavaConversions._

import org.apache.hadoop.conf.{Configuration, Configurable}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit

/**
* A [[org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat CombineFileInputFormat]] for
* reading whole text files. Each file is read as key-value pair, where the key is the file path and
* the value is the entire content of file.
*/

private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[String, String] {
private[spark] class WholeTextFileInputFormat
extends CombineFileInputFormat[String, String] with Configurable {

override protected def isSplitable(context: JobContext, file: Path): Boolean = false

private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf

override def createRecordReader(
split: InputSplit,
context: TaskAttemptContext): RecordReader[String, String] = {

new CombineFileRecordReader[String, String](
split.asInstanceOf[CombineFileSplit],
context,
classOf[WholeTextFileRecordReader])
val reader = new WholeCombineFileRecordReader(split, context)
reader.setConf(conf)
reader
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.spark.input

import org.apache.hadoop.conf.{Configuration, Configurable}
import com.google.common.io.{ByteStreams, Closeables}

import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader}
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext

Expand All @@ -34,7 +36,13 @@ private[spark] class WholeTextFileRecordReader(
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends RecordReader[String, String] {
extends RecordReader[String, String] with Configurable {

private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf

private[this] val path = split.getPath(index)
private[this] val fs = path.getFileSystem(context.getConfiguration)
Expand All @@ -57,8 +65,16 @@ private[spark] class WholeTextFileRecordReader(

override def nextKeyValue(): Boolean = {
if (!processed) {
val conf = new Configuration
val factory = new CompressionCodecFactory(conf)
val codec = factory.getCodec(path) // infers from file ext.
val fileIn = fs.open(path)
val innerBuffer = ByteStreams.toByteArray(fileIn)
val innerBuffer = if (codec != null) {
ByteStreams.toByteArray(codec.createInputStream(fileIn))
} else {
ByteStreams.toByteArray(fileIn)
}

value = new Text(innerBuffer).toString
Closeables.close(fileIn, false)
processed = true
Expand All @@ -68,3 +84,33 @@ private[spark] class WholeTextFileRecordReader(
}
}
}


/**
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
* out in a key-value pair, where the key is the file path and the value is the entire content of
* the file.
*/
private[spark] class WholeCombineFileRecordReader(
split: InputSplit,
context: TaskAttemptContext)
extends CombineFileRecordReader[String, String](
split.asInstanceOf[CombineFileSplit],
context,
classOf[WholeTextFileRecordReader]
) with Configurable {

private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf

override def initNextRecordReader(): Boolean = {
val r = super.initNextRecordReader()
if (r) {
this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(conf)
}
r
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text

import org.apache.spark.SparkContext
import org.apache.spark.util.Utils
import org.apache.hadoop.io.compress.{DefaultCodec, CompressionCodecFactory, GzipCodec}

/**
* Tests the correctness of
Expand All @@ -38,20 +39,32 @@ import org.apache.spark.util.Utils
*/
class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
private var sc: SparkContext = _
private var factory: CompressionCodecFactory = _

override def beforeAll() {
sc = new SparkContext("local", "test")

// Set the block size of local file system to test whether files are split right or not.
sc.hadoopConfiguration.setLong("fs.local.block.size", 32)
sc.hadoopConfiguration.set("io.compression.codecs",
"org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec")
factory = new CompressionCodecFactory(sc.hadoopConfiguration)
}

override def afterAll() {
sc.stop()
}

private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte]) = {
val out = new DataOutputStream(new FileOutputStream(s"${inputDir.toString}/$fileName"))
private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte],
compress: Boolean) = {
val out = if (compress) {
val codec = new GzipCodec
val path = s"${inputDir.toString}/$fileName${codec.getDefaultExtension}"
codec.createOutputStream(new DataOutputStream(new FileOutputStream(path)))
} else {
val path = s"${inputDir.toString}/$fileName"
new DataOutputStream(new FileOutputStream(path))
}
out.write(contents, 0, contents.length)
out.close()
}
Expand All @@ -68,7 +81,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
println(s"Local disk address is ${dir.toString}.")

WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
createNativeFile(dir, filename, contents)
createNativeFile(dir, filename, contents, false)
}

val res = sc.wholeTextFiles(dir.toString, 3).collect()
Expand All @@ -86,6 +99,31 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {

Utils.deleteRecursively(dir)
}

test("Correctness of WholeTextFileRecordReader with GzipCodec.") {
val dir = Utils.createTempDir()
println(s"Local disk address is ${dir.toString}.")

WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
createNativeFile(dir, filename, contents, true)
}

val res = sc.wholeTextFiles(dir.toString, 3).collect()

assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
"Number of files read out does not fit with the actual value.")

for ((filename, contents) <- res) {
val shortName = filename.split('/').last.split('.')(0)

assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName),
s"Missing file name $filename.")
assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString,
s"file $filename contents can not match.")
}

Utils.deleteRecursively(dir)
}
}

/**
Expand Down

0 comments on commit d7d54a4

Please sign in to comment.