Skip to content

Commit

Permalink
[SPARK-2759][CORE] Generic Binary File Support in Spark
Browse files Browse the repository at this point in the history
The additions add the abstract BinaryFileInputFormat and BinaryRecordReader classes for reading in data as a byte stream and converting it to another format using the ```def parseByteArray(inArray: Array[Byte]): T``` function.
As a trivial example ```ByteInputFormat``` and ```ByteRecordReader``` are included which just return the Array[Byte] from a given file.
Finally a RDD for ```BinaryFileInputFormat``` (to allow for easier partitioning changes as was done for WholeFileInput) was added and the appropriate byteFiles to the ```SparkContext``` so the functions can be easily used by others.
A common use case might be to read in a folder
```
sc.byteFiles("s3://mydrive/tif/*.tif").map(rawData => ReadTiffFromByteArray(rawData))
```

Author: Kevin Mader <kevinmader@gmail.com>
Author: Kevin Mader <kmader@users.noreply.github.com>

Closes apache#1658 from kmader/master and squashes the following commits:

3c49a30 [Kevin Mader] fixing wholetextfileinput to it has the same setMinPartitions function as in BinaryData files
359a096 [Kevin Mader] making the final corrections suggested by @mateiz and renaming a few functions to make their usage clearer
6379be4 [Kevin Mader] reorganizing code
7b9d181 [Kevin Mader] removing developer API, cleaning up imports
8ac288b [Kevin Mader] fixed a single slightly over 100 character line
92bda0d [Kevin Mader] added new tests, renamed files, fixed several of the javaapi functions, formatted code more nicely
a32fef7 [Kevin Mader] removed unneeded classes added DeveloperApi note to portabledatastreams since the implementation might change
49174d9 [Kevin Mader] removed unneeded classes added DeveloperApi note to portabledatastreams since the implementation might change
c27a8f1 [Kevin Mader] jenkins crashed before running anything last time, so making minor change
b348ce1 [Kevin Mader] fixed order in check (prefix only appears on jenkins not when I run unit tests locally)
0588737 [Kevin Mader] filename check in "binary file input as byte array" test now ignores prefixes and suffixes which might get added by Hadoop
4163e38 [Kevin Mader] fixing line length and output from FSDataInputStream to DataInputStream to minimize sensitivity to Hadoop API changes
19812a8 [Kevin Mader] Fixed the serialization issue with PortableDataStream since neither CombineFileSplit nor TaskAttemptContext implement the Serializable interface, by using ByteArrays for storing both and then recreating the objects from these bytearrays as needed.
238c83c [Kevin Mader] fixed several scala-style issues, changed structure of binaryFiles, removed excessive classes added new tests. The caching tests still have a serialization issue, but that should be easily fixed as well.
932a206 [Kevin Mader] Update RawFileInput.scala
a01c9cf [Kevin Mader] Update RawFileInput.scala
441f79a [Kevin Mader] fixed a few small comments and dependency
12e7be1 [Kevin Mader] removing imglib from maven (definitely not ready yet)
5deb79e [Kevin Mader] added new portabledatastream to code so that it can be serialized correctly
f032bc0 [Kevin Mader] fixed bug in path name, renamed tests
bc5c0b9 [Kevin Mader] made minor stylistic adjustments from mateiz
df8e528 [Kevin Mader] fixed line lengths and changed java test
9a313d5 [Kevin Mader] making classes that needn't be public private, adding automatic file closure, adding new tests
edf5829 [Kevin Mader] fixing line lengths, adding new lines
f4841dc [Kevin Mader] un-optimizing imports, silly intellij
eacfaa6 [Kevin Mader] Added FixedLengthBinaryInputFormat and RecordReader from freeman-lab and added them to both the JavaSparkContext and the SparkContext as fixedLengthBinaryFile
1622935 [Kevin Mader] changing the line lengths to make jenkins happy
1cfa38a [Kevin Mader] added apache headers, added datainputstream directly as an output option for more complicated readers (HDF5 perhaps), and renamed several of the functions and files to be more consistent. Also added parallel functions to the java api
84035f1 [Kevin Mader] adding binary and byte file support spark
81c5f12 [Kevin Mader] Merge pull request #1 from apache/master
  • Loading branch information
kmader authored and mateiz committed Nov 1, 2014
1 parent ee29ef3 commit 7136719
Show file tree
Hide file tree
Showing 10 changed files with 892 additions and 5 deletions.
65 changes: 64 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import akka.actor.Props
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.WholeTextFileInputFormat
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, FixedLengthBinaryInputFormat}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -533,6 +533,69 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
minPartitions).setName(path)
}


/**
* Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file
* (useful for binary data)
*
* For example, if you have the following files:
* {{{
* hdfs://a-hdfs-path/part-00000
* hdfs://a-hdfs-path/part-00001
* ...
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do
* `val rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`,
*
* then `rdd` contains
* {{{
* (a-hdfs-path/part-00000, its content)
* (a-hdfs-path/part-00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*
* @note Small files are preferred; very large files may cause bad performance.
*/
@Experimental
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
RDD[(String, PortableDataStream)] = {
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
val updateConf = job.getConfiguration
new BinaryFileRDD(
this,
classOf[StreamInputFormat],
classOf[String],
classOf[PortableDataStream],
updateConf,
minPartitions).setName(path)
}

/**
* Load data from a flat binary file, assuming the length of each record is constant.
*
* @param path Directory to the input data files
* @param recordLength The length at which to split the records
* @return An RDD of data with values, represented as byte arrays
*/
@Experimental
def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration)
: RDD[Array[Byte]] = {
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path,
classOf[FixedLengthBinaryInputFormat],
classOf[LongWritable],
classOf[BytesWritable],
conf=conf)
val data = br.map{ case (k, v) => v.getBytes}
data
}

/**
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import java.io.Closeable
import java.util
import java.util.{Map => JMap}

import java.io.DataInputStream

import org.apache.hadoop.io.{BytesWritable, LongWritable}
import org.apache.spark.input.{PortableDataStream, FixedLengthBinaryInputFormat}

import scala.collection.JavaConversions
import scala.collection.JavaConversions._
import scala.language.implicitConversions
Expand All @@ -32,7 +37,8 @@ import org.apache.hadoop.mapred.{InputFormat, JobConf}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

import org.apache.spark._
import org.apache.spark.SparkContext.{DoubleAccumulatorParam, IntAccumulatorParam}
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
Expand Down Expand Up @@ -202,6 +208,8 @@ class JavaSparkContext(val sc: SparkContext)
def textFile(path: String, minPartitions: Int): JavaRDD[String] =
sc.textFile(path, minPartitions)



/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
Expand Down Expand Up @@ -245,6 +253,78 @@ class JavaSparkContext(val sc: SparkContext)
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
new JavaPairRDD(sc.wholeTextFiles(path))

/**
* Read a directory of binary files from HDFS, a local file system (available on all nodes),
* or any Hadoop-supported file system URI as a byte array. Each file is read as a single
* record and returned in a key-value pair, where the key is the path of each file,
* the value is the content of each file.
*
* For example, if you have the following files:
* {{{
* hdfs://a-hdfs-path/part-00000
* hdfs://a-hdfs-path/part-00001
* ...
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do
* `JavaPairRDD<String, byte[]> rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`,
*
* then `rdd` contains
* {{{
* (a-hdfs-path/part-00000, its content)
* (a-hdfs-path/part-00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @note Small files are preferred; very large files but may cause bad performance.
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
def binaryFiles(path: String, minPartitions: Int): JavaPairRDD[String, PortableDataStream] =
new JavaPairRDD(sc.binaryFiles(path, minPartitions))

/**
* Read a directory of binary files from HDFS, a local file system (available on all nodes),
* or any Hadoop-supported file system URI as a byte array. Each file is read as a single
* record and returned in a key-value pair, where the key is the path of each file,
* the value is the content of each file.
*
* For example, if you have the following files:
* {{{
* hdfs://a-hdfs-path/part-00000
* hdfs://a-hdfs-path/part-00001
* ...
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do
* `JavaPairRDD<String, byte[]> rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`,
*
* then `rdd` contains
* {{{
* (a-hdfs-path/part-00000, its content)
* (a-hdfs-path/part-00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @note Small files are preferred; very large files but may cause bad performance.
*/
def binaryFiles(path: String): JavaPairRDD[String, PortableDataStream] =
new JavaPairRDD(sc.binaryFiles(path, defaultMinPartitions))

/**
* Load data from a flat binary file, assuming the length of each record is constant.
*
* @param path Directory to the input data files
* @return An RDD of data with values, represented as byte arrays
*/
def binaryRecords(path: String, recordLength: Int): JavaRDD[Array[Byte]] = {
new JavaRDD(sc.binaryRecords(path, recordLength))
}

/** Get an RDD for a Hadoop SequenceFile with given key and value types.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.input

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, LongWritable}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}

/**
* Custom Input Format for reading and splitting flat binary files that contain records,
* each of which are a fixed size in bytes. The fixed record size is specified through
* a parameter recordLength in the Hadoop configuration.
*/
private[spark] object FixedLengthBinaryInputFormat {
/** Property name to set in Hadoop JobConfs for record length */
val RECORD_LENGTH_PROPERTY = "org.apache.spark.input.FixedLengthBinaryInputFormat.recordLength"

/** Retrieves the record length property from a Hadoop configuration */
def getRecordLength(context: JobContext): Int = {
context.getConfiguration.get(RECORD_LENGTH_PROPERTY).toInt
}
}

private[spark] class FixedLengthBinaryInputFormat
extends FileInputFormat[LongWritable, BytesWritable] {

private var recordLength = -1

/**
* Override of isSplitable to ensure initial computation of the record length
*/
override def isSplitable(context: JobContext, filename: Path): Boolean = {
if (recordLength == -1) {
recordLength = FixedLengthBinaryInputFormat.getRecordLength(context)
}
if (recordLength <= 0) {
println("record length is less than 0, file cannot be split")
false
} else {
true
}
}

/**
* This input format overrides computeSplitSize() to make sure that each split
* only contains full records. Each InputSplit passed to FixedLengthBinaryRecordReader
* will start at the first byte of a record, and the last byte will the last byte of a record.
*/
override def computeSplitSize(blockSize: Long, minSize: Long, maxSize: Long): Long = {
val defaultSize = super.computeSplitSize(blockSize, minSize, maxSize)
// If the default size is less than the length of a record, make it equal to it
// Otherwise, make sure the split size is as close to possible as the default size,
// but still contains a complete set of records, with the first record
// starting at the first byte in the split and the last record ending with the last byte
if (defaultSize < recordLength) {
recordLength.toLong
} else {
(Math.floor(defaultSize / recordLength) * recordLength).toLong
}
}

/**
* Create a FixedLengthBinaryRecordReader
*/
override def createRecordReader(split: InputSplit, context: TaskAttemptContext)
: RecordReader[LongWritable, BytesWritable] = {
new FixedLengthBinaryRecordReader
}
}
Loading

0 comments on commit 7136719

Please sign in to comment.