Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2672] support compressed file in wholeTextFile #3005

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,38 @@ package org.apache.spark.input

import scala.collection.JavaConversions._

import org.apache.hadoop.conf.{Configuration, Configurable}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit

/**
* A [[org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat CombineFileInputFormat]] for
* reading whole text files. Each file is read as key-value pair, where the key is the file path and
* the value is the entire content of file.
*/

private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[String, String] {
private[spark] class WholeTextFileInputFormat
extends CombineFileInputFormat[String, String] with Configurable {

override protected def isSplitable(context: JobContext, file: Path): Boolean = false

private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf

override def createRecordReader(
split: InputSplit,
context: TaskAttemptContext): RecordReader[String, String] = {

new CombineFileRecordReader[String, String](
split.asInstanceOf[CombineFileSplit],
context,
classOf[WholeTextFileRecordReader])
val reader = new WholeCombineFileRecordReader(split, context)
reader.setConf(conf)
reader
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.spark.input

import org.apache.hadoop.conf.{Configuration, Configurable}
import com.google.common.io.{ByteStreams, Closeables}

import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader}
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext

Expand All @@ -34,7 +36,13 @@ private[spark] class WholeTextFileRecordReader(
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends RecordReader[String, String] {
extends RecordReader[String, String] with Configurable {

private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf

private[this] val path = split.getPath(index)
private[this] val fs = path.getFileSystem(context.getConfiguration)
Expand All @@ -57,8 +65,16 @@ private[spark] class WholeTextFileRecordReader(

override def nextKeyValue(): Boolean = {
if (!processed) {
val conf = new Configuration
val factory = new CompressionCodecFactory(conf)
val codec = factory.getCodec(path) // infers from file ext.
val fileIn = fs.open(path)
val innerBuffer = ByteStreams.toByteArray(fileIn)
val innerBuffer = if (codec != null) {
ByteStreams.toByteArray(codec.createInputStream(fileIn))
} else {
ByteStreams.toByteArray(fileIn)
}

value = new Text(innerBuffer).toString
Closeables.close(fileIn, false)
processed = true
Expand All @@ -68,3 +84,33 @@ private[spark] class WholeTextFileRecordReader(
}
}
}


/**
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
* out in a key-value pair, where the key is the file path and the value is the entire content of
* the file.
*/
private[spark] class WholeCombineFileRecordReader(
split: InputSplit,
context: TaskAttemptContext)
extends CombineFileRecordReader[String, String](
split.asInstanceOf[CombineFileSplit],
context,
classOf[WholeTextFileRecordReader]
) with Configurable {

private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf

override def initNextRecordReader(): Boolean = {
val r = super.initNextRecordReader()
if (r) {
this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(conf)
}
r
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text

import org.apache.spark.SparkContext
import org.apache.spark.util.Utils
import org.apache.hadoop.io.compress.{DefaultCodec, CompressionCodecFactory, GzipCodec}

/**
* Tests the correctness of
Expand All @@ -38,20 +39,32 @@ import org.apache.spark.util.Utils
*/
class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
private var sc: SparkContext = _
private var factory: CompressionCodecFactory = _

override def beforeAll() {
sc = new SparkContext("local", "test")

// Set the block size of local file system to test whether files are split right or not.
sc.hadoopConfiguration.setLong("fs.local.block.size", 32)
sc.hadoopConfiguration.set("io.compression.codecs",
"org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec")
factory = new CompressionCodecFactory(sc.hadoopConfiguration)
}

override def afterAll() {
sc.stop()
}

private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte]) = {
val out = new DataOutputStream(new FileOutputStream(s"${inputDir.toString}/$fileName"))
private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte],
compress: Boolean) = {
val out = if (compress) {
val codec = new GzipCodec
val path = s"${inputDir.toString}/$fileName${codec.getDefaultExtension}"
codec.createOutputStream(new DataOutputStream(new FileOutputStream(path)))
} else {
val path = s"${inputDir.toString}/$fileName"
new DataOutputStream(new FileOutputStream(path))
}
out.write(contents, 0, contents.length)
out.close()
}
Expand All @@ -68,7 +81,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
println(s"Local disk address is ${dir.toString}.")

WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
createNativeFile(dir, filename, contents)
createNativeFile(dir, filename, contents, false)
}

val res = sc.wholeTextFiles(dir.toString, 3).collect()
Expand All @@ -86,6 +99,31 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {

Utils.deleteRecursively(dir)
}

test("Correctness of WholeTextFileRecordReader with GzipCodec.") {
val dir = Utils.createTempDir()
println(s"Local disk address is ${dir.toString}.")

WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
createNativeFile(dir, filename, contents, true)
}

val res = sc.wholeTextFiles(dir.toString, 3).collect()

assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
"Number of files read out does not fit with the actual value.")

for ((filename, contents) <- res) {
val shortName = filename.split('/').last.split('.')(0)

assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName),
s"Missing file name $filename.")
assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString,
s"file $filename contents can not match.")
}

Utils.deleteRecursively(dir)
}
}

/**
Expand Down