Skip to content

Commit

Permalink
making the final corrections suggested by @mateiz and renaming a few …
Browse files Browse the repository at this point in the history
…functions to make their usage clearer
  • Loading branch information
kmader committed Oct 30, 2014
1 parent 6379be4 commit 359a096
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 52 deletions.
1 change: 0 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,6 @@ class SparkContext(config: SparkConf) extends Logging {
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*
* @note Care must be taken to close the files afterwards
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*/
@DeveloperApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,39 +263,6 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
JavaPairRDD[String,PortableDataStream] =
new JavaPairRDD(sc.binaryFiles(path,defaultMinPartitions))

/**
* Read a directory of files as DataInputStream from HDFS,
* a local file system (available on all nodes), or any Hadoop-supported file system URI
* as a byte array. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each.
*
* <p> For example, if you have the following files:
* {{{
* hdfs://a-hdfs-path/part-00000
* hdfs://a-hdfs-path/part-00001
* ...
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do
* `JavaPairRDD<String,DataInputStream> rdd = sparkContext.binaryFiles("hdfs://a-hdfs-path")`,
*
* <p> then `rdd` contains
* {{{
* (a-hdfs-path/part-00000, its content)
* (a-hdfs-path/part-00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
def binaryArrays(path: String, minPartitions: Int = defaultMinPartitions):
JavaPairRDD[String, Array[Byte]] =
new JavaPairRDD(sc.binaryFiles(path,minPartitions).mapValues(_.toArray()))

/**
* Load data from a flat binary file, assuming each record is a set of numbers
* with the specified numerical format (see ByteBuffer), and the number of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ private[spark] abstract class StreamFileInputFormat[T]
extends CombineFileInputFormat[String, T] {
override protected def isSplitable(context: JobContext, file: Path): Boolean = false
/**
* Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API.
* Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API
* which is set through setMaxSplitSize
*/
def setMaxSplitSize(context: JobContext, minPartitions: Int) {
def setMinPartitions(context: JobContext, minPartitions: Int) {
val files = listStatus(context)
val totalLen = files.map { file =>
if (file.isDir) 0L else file.getLen
Expand Down
22 changes: 10 additions & 12 deletions core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,21 @@

package org.apache.spark.rdd

import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.conf.{ Configurable, Configuration }
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.spark.input.StreamFileInputFormat
import org.apache.spark.{Partition, SparkContext}
import org.apache.spark.{ Partition, SparkContext }

private[spark] class BinaryFileRDD[T](
sc : SparkContext,
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
keyClass: Class[String],
valueClass: Class[T],
@transient conf: Configuration,
minPartitions: Int)
sc: SparkContext,
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
keyClass: Class[String],
valueClass: Class[T],
@transient conf: Configuration,
minPartitions: Int)
extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) {


override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
inputFormat match {
Expand All @@ -41,13 +40,12 @@ private[spark] class BinaryFileRDD[T](
case _ =>
}
val jobContext = newJobContext(conf, jobId)
inputFormat.setMaxSplitSize(jobContext, minPartitions)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
result(i) = new NewHadoopPartition(
id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]
)
id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
}
result
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
file.close()

val inRdd = sc.binaryFiles(outFileName)
val (infile: String, indata: PortableDataStream) = inRdd.first
val (infile: String, indata: PortableDataStream) = inRdd.collect.head

// Make sure the name and array match
assert(infile.contains(outFileName)) // a prefix may get added
Expand Down Expand Up @@ -274,7 +274,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
curData: (String, PortableDataStream) =>
(curData._2.getPath(),curData._2)
}
val (infile: String, indata: PortableDataStream) = mappedRdd.first
val (infile: String, indata: PortableDataStream) = mappedRdd.collect.head

// Try reading the output back as an object file

Expand Down Expand Up @@ -305,7 +305,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
curData: (String, PortableDataStream) =>
(curData._2.getPath(),curData._2)
}
val (infile: String, indata: PortableDataStream) = mappedRdd.first
val (infile: String, indata: PortableDataStream) = mappedRdd.collect.head

// Try reading the output back as an object file

Expand Down Expand Up @@ -376,7 +376,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
assert(inRdd.count == testOutputCopies)

// now just compare the first one
val indata: Array[Byte] = inRdd.first
val indata: Array[Byte] = inRdd.collect.head
assert(indata === testOutput)
}

Expand Down

0 comments on commit 359a096

Please sign in to comment.